tailpipe.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #!/usr/bin/env python3
  2. #
  3. # Efficiently displays the last n lines of a file/pipe.
  4. #
  5. # Example:
  6. # ./scripts/tailpipe.py trace -n5
  7. #
  8. # Copyright (c) 2022, The littlefs authors.
  9. # SPDX-License-Identifier: BSD-3-Clause
  10. #
  11. import os
  12. import sys
  13. import threading as th
  14. import time
  15. def openio(path, mode='r'):
  16. if path == '-':
  17. if mode == 'r':
  18. return os.fdopen(os.dup(sys.stdin.fileno()), 'r')
  19. else:
  20. return os.fdopen(os.dup(sys.stdout.fileno()), 'w')
  21. else:
  22. return open(path, mode)
  23. def main(path='-', *, lines=1, sleep=0.01, keep_open=False):
  24. ring = [None] * lines
  25. i = 0
  26. count = 0
  27. lock = th.Lock()
  28. event = th.Event()
  29. done = False
  30. # do the actual reading in a background thread
  31. def read():
  32. nonlocal i
  33. nonlocal count
  34. nonlocal done
  35. while True:
  36. with openio(path) as f:
  37. for line in f:
  38. with lock:
  39. ring[i] = line
  40. i = (i + 1) % lines
  41. count = min(lines, count + 1)
  42. event.set()
  43. if not keep_open:
  44. break
  45. # don't just flood open calls
  46. time.sleep(sleep)
  47. done = True
  48. th.Thread(target=read, daemon=True).start()
  49. try:
  50. last_count = 1
  51. while not done:
  52. time.sleep(sleep)
  53. event.wait()
  54. event.clear()
  55. # create a copy to avoid corrupt output
  56. with lock:
  57. ring_ = ring.copy()
  58. i_ = i
  59. count_ = count
  60. # first thing first, give ourself a canvas
  61. while last_count < count_:
  62. sys.stdout.write('\n')
  63. last_count += 1
  64. for j in range(count_):
  65. # move cursor, clear line, disable/reenable line wrapping
  66. sys.stdout.write('\r')
  67. if count_-1-j > 0:
  68. sys.stdout.write('\x1b[%dA' % (count_-1-j))
  69. sys.stdout.write('\x1b[K')
  70. sys.stdout.write('\x1b[?7l')
  71. sys.stdout.write(ring_[(i_-count_+j) % lines][:-1])
  72. sys.stdout.write('\x1b[?7h')
  73. if count_-1-j > 0:
  74. sys.stdout.write('\x1b[%dB' % (count_-1-j))
  75. sys.stdout.flush()
  76. except KeyboardInterrupt:
  77. pass
  78. sys.stdout.write('\n')
  79. if __name__ == "__main__":
  80. import sys
  81. import argparse
  82. parser = argparse.ArgumentParser(
  83. description="Efficiently displays the last n lines of a file/pipe.")
  84. parser.add_argument(
  85. 'path',
  86. nargs='?',
  87. help="Path to read from.")
  88. parser.add_argument(
  89. '-n',
  90. '--lines',
  91. type=lambda x: int(x, 0),
  92. help="Number of lines to show, defaults to 1.")
  93. parser.add_argument(
  94. '-s',
  95. '--sleep',
  96. type=float,
  97. help="Seconds to sleep between reads, defaults to 0.01.")
  98. parser.add_argument(
  99. '-k',
  100. '--keep-open',
  101. action='store_true',
  102. help="Reopen the pipe on EOF, useful when multiple "
  103. "processes are writing.")
  104. sys.exit(main(**{k: v
  105. for k, v in vars(parser.parse_intermixed_args()).items()
  106. if v is not None}))