tailpipe.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. #!/usr/bin/env python3
  2. #
  3. # Efficiently displays the last n lines of a file/pipe.
  4. #
  5. import os
  6. import sys
  7. import threading as th
  8. import time
  9. def openio(path, mode='r'):
  10. if path == '-':
  11. if 'r' in mode:
  12. return os.fdopen(os.dup(sys.stdin.fileno()), 'r')
  13. else:
  14. return os.fdopen(os.dup(sys.stdout.fileno()), 'w')
  15. else:
  16. return open(path, mode)
  17. def main(path='-', *, lines=1, sleep=0.01, keep_open=False):
  18. ring = [None] * lines
  19. i = 0
  20. count = 0
  21. lock = th.Lock()
  22. event = th.Event()
  23. done = False
  24. # do the actual reading in a background thread
  25. def read():
  26. nonlocal i
  27. nonlocal count
  28. nonlocal done
  29. while True:
  30. with openio(path) as f:
  31. for line in f:
  32. with lock:
  33. ring[i] = line
  34. i = (i + 1) % lines
  35. count = min(lines, count + 1)
  36. event.set()
  37. if not keep_open:
  38. break
  39. done = True
  40. th.Thread(target=read, daemon=True).start()
  41. try:
  42. last_count = 1
  43. while not done:
  44. time.sleep(sleep)
  45. event.wait()
  46. event.clear()
  47. # create a copy to avoid corrupt output
  48. with lock:
  49. ring_ = ring.copy()
  50. i_ = i
  51. count_ = count
  52. # first thing first, give ourself a canvas
  53. while last_count < count_:
  54. sys.stdout.write('\n')
  55. last_count += 1
  56. for j in range(count_):
  57. # move cursor, clear line, disable/reenable line wrapping
  58. sys.stdout.write('\r')
  59. if count_-1-j > 0:
  60. sys.stdout.write('\x1b[%dA' % (count_-1-j))
  61. sys.stdout.write('\x1b[K')
  62. sys.stdout.write('\x1b[?7l')
  63. sys.stdout.write(ring_[(i_-count_+j) % lines][:-1])
  64. sys.stdout.write('\x1b[?7h')
  65. if count_-1-j > 0:
  66. sys.stdout.write('\x1b[%dB' % (count_-1-j))
  67. sys.stdout.flush()
  68. except KeyboardInterrupt:
  69. pass
  70. sys.stdout.write('\n')
  71. if __name__ == "__main__":
  72. import sys
  73. import argparse
  74. parser = argparse.ArgumentParser(
  75. description="Efficiently displays the last n lines of a file/pipe.")
  76. parser.add_argument(
  77. 'path',
  78. nargs='?',
  79. help="Path to read from.")
  80. parser.add_argument(
  81. '-n',
  82. '--lines',
  83. type=lambda x: int(x, 0),
  84. help="Number of lines to show, defaults to 1.")
  85. parser.add_argument(
  86. '-s',
  87. '--sleep',
  88. type=float,
  89. help="Seconds to sleep between reads, defaults to 0.01.")
  90. parser.add_argument(
  91. '-k',
  92. '--keep-open',
  93. action='store_true',
  94. help="Reopen the pipe on EOF, useful when multiple "
  95. "processes are writing.")
  96. sys.exit(main(**{k: v
  97. for k, v in vars(parser.parse_args()).items()
  98. if v is not None}))