tailpipe.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #!/usr/bin/env python3
  2. #
  3. # Efficiently displays the last n lines of a file/pipe.
  4. #
  5. # Example:
  6. # ./scripts/tailpipe.py trace -n5
  7. #
  8. # Copyright (c) 2022, The littlefs authors.
  9. # SPDX-License-Identifier: BSD-3-Clause
  10. #
  11. import collections as co
  12. import io
  13. import os
  14. import shutil
  15. import sys
  16. import time
  17. def openio(path, mode='r'):
  18. if path == '-':
  19. if mode == 'r':
  20. return os.fdopen(os.dup(sys.stdin.fileno()), 'r')
  21. else:
  22. return os.fdopen(os.dup(sys.stdout.fileno()), 'w')
  23. else:
  24. return open(path, mode)
  25. class LinesIO:
  26. def __init__(self, maxlen=None):
  27. self.maxlen = maxlen
  28. self.lines = co.deque(maxlen=maxlen)
  29. self.tail = io.StringIO()
  30. # trigger automatic sizing
  31. if maxlen == 0:
  32. self.resize(0)
  33. def write(self, s):
  34. # note using split here ensures the trailing string has no newline
  35. lines = s.split('\n')
  36. if len(lines) > 1 and self.tail.getvalue():
  37. self.tail.write(lines[0])
  38. lines[0] = self.tail.getvalue()
  39. self.tail = io.StringIO()
  40. self.lines.extend(lines[:-1])
  41. if lines[-1]:
  42. self.tail.write(lines[-1])
  43. def resize(self, maxlen):
  44. self.maxlen = maxlen
  45. if maxlen == 0:
  46. maxlen = shutil.get_terminal_size((80, 5))[1]
  47. if maxlen != self.lines.maxlen:
  48. self.lines = co.deque(self.lines, maxlen=maxlen)
  49. last_lines = 1
  50. def draw(self):
  51. # did terminal size change?
  52. if self.maxlen == 0:
  53. self.resize(0)
  54. # first thing first, give ourself a canvas
  55. while LinesIO.last_lines < len(self.lines):
  56. sys.stdout.write('\n')
  57. LinesIO.last_lines += 1
  58. for j, line in enumerate(self.lines):
  59. # move cursor, clear line, disable/reenable line wrapping
  60. sys.stdout.write('\r')
  61. if len(self.lines)-1-j > 0:
  62. sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-j))
  63. sys.stdout.write('\x1b[K')
  64. sys.stdout.write('\x1b[?7l')
  65. sys.stdout.write(line)
  66. sys.stdout.write('\x1b[?7h')
  67. if len(self.lines)-1-j > 0:
  68. sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-j))
  69. sys.stdout.flush()
  70. def main(path='-', *, lines=5, cat=False, sleep=0.01, keep_open=False):
  71. if cat:
  72. ring = sys.stdout
  73. else:
  74. ring = LinesIO(lines)
  75. ptime = time.time()
  76. try:
  77. while True:
  78. with openio(path) as f:
  79. for line in f:
  80. ring.write(line)
  81. # need to redraw?
  82. if not cat and time.time()-ptime >= sleep:
  83. ring.draw()
  84. ptime = time.time()
  85. if not keep_open:
  86. break
  87. # don't just flood open calls
  88. time.sleep(sleep or 0.1)
  89. except KeyboardInterrupt:
  90. pass
  91. if not cat:
  92. sys.stdout.write('\n')
  93. if __name__ == "__main__":
  94. import sys
  95. import argparse
  96. parser = argparse.ArgumentParser(
  97. description="Efficiently displays the last n lines of a file/pipe.")
  98. parser.add_argument(
  99. 'path',
  100. nargs='?',
  101. help="Path to read from.")
  102. parser.add_argument(
  103. '-n', '--lines',
  104. nargs='?',
  105. type=lambda x: int(x, 0),
  106. const=0,
  107. help="Show this many lines of history. 0 uses the terminal height. "
  108. "Defaults to 5.")
  109. parser.add_argument(
  110. '-z', '--cat',
  111. action='store_true',
  112. help="Pipe directly to stdout.")
  113. parser.add_argument(
  114. '-s', '--sleep',
  115. type=float,
  116. help="Seconds to sleep between reads. Defaults to 0.01.")
  117. parser.add_argument(
  118. '-k', '--keep-open',
  119. action='store_true',
  120. help="Reopen the pipe on EOF, useful when multiple "
  121. "processes are writing.")
  122. sys.exit(main(**{k: v
  123. for k, v in vars(parser.parse_intermixed_args()).items()
  124. if v is not None}))