tailpipe.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. #!/usr/bin/env python3
  2. #
  3. # Efficiently displays the last n lines of a file/pipe.
  4. #
  5. # Example:
  6. # ./scripts/tailpipe.py trace -n5
  7. #
  8. # Copyright (c) 2022, The littlefs authors.
  9. # SPDX-License-Identifier: BSD-3-Clause
  10. #
  11. import collections as co
  12. import io
  13. import os
  14. import select
  15. import shutil
  16. import sys
  17. import threading as th
  18. import time
  19. def openio(path, mode='r', buffering=-1):
  20. # allow '-' for stdin/stdout
  21. if path == '-':
  22. if mode == 'r':
  23. return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
  24. else:
  25. return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
  26. else:
  27. return open(path, mode, buffering)
  28. class LinesIO:
  29. def __init__(self, maxlen=None):
  30. self.maxlen = maxlen
  31. self.lines = co.deque(maxlen=maxlen)
  32. self.tail = io.StringIO()
  33. # trigger automatic sizing
  34. if maxlen == 0:
  35. self.resize(0)
  36. def write(self, s):
  37. # note using split here ensures the trailing string has no newline
  38. lines = s.split('\n')
  39. if len(lines) > 1 and self.tail.getvalue():
  40. self.tail.write(lines[0])
  41. lines[0] = self.tail.getvalue()
  42. self.tail = io.StringIO()
  43. self.lines.extend(lines[:-1])
  44. if lines[-1]:
  45. self.tail.write(lines[-1])
  46. def resize(self, maxlen):
  47. self.maxlen = maxlen
  48. if maxlen == 0:
  49. maxlen = shutil.get_terminal_size((80, 5))[1]
  50. if maxlen != self.lines.maxlen:
  51. self.lines = co.deque(self.lines, maxlen=maxlen)
  52. canvas_lines = 1
  53. def draw(self):
  54. # did terminal size change?
  55. if self.maxlen == 0:
  56. self.resize(0)
  57. # first thing first, give ourself a canvas
  58. while LinesIO.canvas_lines < len(self.lines):
  59. sys.stdout.write('\n')
  60. LinesIO.canvas_lines += 1
  61. # clear the bottom of the canvas if we shrink
  62. shrink = LinesIO.canvas_lines - len(self.lines)
  63. if shrink > 0:
  64. for i in range(shrink):
  65. sys.stdout.write('\r')
  66. if shrink-1-i > 0:
  67. sys.stdout.write('\x1b[%dA' % (shrink-1-i))
  68. sys.stdout.write('\x1b[K')
  69. if shrink-1-i > 0:
  70. sys.stdout.write('\x1b[%dB' % (shrink-1-i))
  71. sys.stdout.write('\x1b[%dA' % shrink)
  72. LinesIO.canvas_lines = len(self.lines)
  73. for i, line in enumerate(self.lines):
  74. # move cursor, clear line, disable/reenable line wrapping
  75. sys.stdout.write('\r')
  76. if len(self.lines)-1-i > 0:
  77. sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i))
  78. sys.stdout.write('\x1b[K')
  79. sys.stdout.write('\x1b[?7l')
  80. sys.stdout.write(line)
  81. sys.stdout.write('\x1b[?7h')
  82. if len(self.lines)-1-i > 0:
  83. sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i))
  84. sys.stdout.flush()
  85. def main(path='-', *, lines=5, cat=False, sleep=None, keep_open=False):
  86. if cat:
  87. ring = sys.stdout
  88. else:
  89. ring = LinesIO(lines)
  90. # if sleep print in background thread to avoid getting stuck in a read call
  91. event = th.Event()
  92. lock = th.Lock()
  93. if not cat:
  94. done = False
  95. def background():
  96. while not done:
  97. event.wait()
  98. event.clear()
  99. with lock:
  100. ring.draw()
  101. time.sleep(sleep or 0.01)
  102. th.Thread(target=background, daemon=True).start()
  103. try:
  104. while True:
  105. with openio(path) as f:
  106. for line in f:
  107. with lock:
  108. ring.write(line)
  109. event.set()
  110. if not keep_open:
  111. break
  112. # don't just flood open calls
  113. time.sleep(sleep or 0.1)
  114. except FileNotFoundError as e:
  115. print("error: file not found %r" % path)
  116. sys.exit(-1)
  117. except KeyboardInterrupt:
  118. pass
  119. if not cat:
  120. done = True
  121. lock.acquire() # avoids https://bugs.python.org/issue42717
  122. sys.stdout.write('\n')
  123. if __name__ == "__main__":
  124. import sys
  125. import argparse
  126. parser = argparse.ArgumentParser(
  127. description="Efficiently displays the last n lines of a file/pipe.",
  128. allow_abbrev=False)
  129. parser.add_argument(
  130. 'path',
  131. nargs='?',
  132. help="Path to read from.")
  133. parser.add_argument(
  134. '-n', '--lines',
  135. nargs='?',
  136. type=lambda x: int(x, 0),
  137. const=0,
  138. help="Show this many lines of history. 0 uses the terminal height. "
  139. "Defaults to 5.")
  140. parser.add_argument(
  141. '-z', '--cat',
  142. action='store_true',
  143. help="Pipe directly to stdout.")
  144. parser.add_argument(
  145. '-s', '--sleep',
  146. type=float,
  147. help="Seconds to sleep between reads. Defaults to 0.01.")
  148. parser.add_argument(
  149. '-k', '--keep-open',
  150. action='store_true',
  151. help="Reopen the pipe on EOF, useful when multiple "
  152. "processes are writing.")
  153. sys.exit(main(**{k: v
  154. for k, v in vars(parser.parse_intermixed_args()).items()
  155. if v is not None}))