coverage.py 16 KB


  1. #!/usr/bin/env python3
  2. #
  3. # Script to find test coverage. Basically just a big wrapper around gcov with
  4. # some extra conveniences for comparing builds. Heavily inspired by Linux's
  5. # Bloat-O-Meter.
  6. #
  7. import collections as co
  8. import csv
  9. import glob
  10. import itertools as it
  11. import json
  12. import os
  13. import re
  14. import shlex
  15. import subprocess as sp
  16. # TODO use explode_asserts to avoid counting assert branches?
  17. # TODO use dwarf=info to find functions for inline functions?
  18. GCDA_PATHS = ['*.gcda']
  19. class CoverageResult(co.namedtuple('CoverageResult',
  20. 'coverage_line_hits,coverage_line_count,'
  21. 'coverage_branch_hits,coverage_branch_count')):
  22. __slots__ = ()
  23. def __new__(cls,
  24. coverage_line_hits=0, coverage_line_count=0,
  25. coverage_branch_hits=0, coverage_branch_count=0):
  26. return super().__new__(cls,
  27. int(coverage_line_hits),
  28. int(coverage_line_count),
  29. int(coverage_branch_hits),
  30. int(coverage_branch_count))
  31. def __add__(self, other):
  32. return self.__class__(
  33. self.coverage_line_hits + other.coverage_line_hits,
  34. self.coverage_line_count + other.coverage_line_count,
  35. self.coverage_branch_hits + other.coverage_branch_hits,
  36. self.coverage_branch_count + other.coverage_branch_count)
  37. def __sub__(self, other):
  38. return CoverageDiff(other, self)
  39. def __rsub__(self, other):
  40. return self.__class__.__sub__(other, self)
  41. def key(self, **args):
  42. ratio_line = (self.coverage_line_hits/self.coverage_line_count
  43. if self.coverage_line_count else -1)
  44. ratio_branch = (self.coverage_branch_hits/self.coverage_branch_count
  45. if self.coverage_branch_count else -1)
  46. if args.get('line_sort'):
  47. return (-ratio_line, -ratio_branch)
  48. elif args.get('reverse_line_sort'):
  49. return (+ratio_line, +ratio_branch)
  50. elif args.get('branch_sort'):
  51. return (-ratio_branch, -ratio_line)
  52. elif args.get('reverse_branch_sort'):
  53. return (+ratio_branch, +ratio_line)
  54. else:
  55. return None
  56. _header = '%19s %19s' % ('hits/line', 'hits/branch')
  57. def __str__(self):
  58. line_hits = self.coverage_line_hits
  59. line_count = self.coverage_line_count
  60. branch_hits = self.coverage_branch_hits
  61. branch_count = self.coverage_branch_count
  62. return '%11s %7s %11s %7s' % (
  63. '%d/%d' % (line_hits, line_count)
  64. if line_count else '-',
  65. '%.1f%%' % (100*line_hits/line_count)
  66. if line_count else '-',
  67. '%d/%d' % (branch_hits, branch_count)
  68. if branch_count else '-',
  69. '%.1f%%' % (100*branch_hits/branch_count)
  70. if branch_count else '-')
  71. class CoverageDiff(co.namedtuple('CoverageDiff', 'old,new')):
  72. __slots__ = ()
  73. def ratio_line(self):
  74. old_line_hits = (self.old.coverage_line_hits
  75. if self.old is not None else 0)
  76. old_line_count = (self.old.coverage_line_count
  77. if self.old is not None else 0)
  78. new_line_hits = (self.new.coverage_line_hits
  79. if self.new is not None else 0)
  80. new_line_count = (self.new.coverage_line_count
  81. if self.new is not None else 0)
  82. return ((new_line_hits/new_line_count if new_line_count else 1.0)
  83. - (old_line_hits/old_line_count if old_line_count else 1.0))
  84. def ratio_branch(self):
  85. old_branch_hits = (self.old.coverage_branch_hits
  86. if self.old is not None else 0)
  87. old_branch_count = (self.old.coverage_branch_count
  88. if self.old is not None else 0)
  89. new_branch_hits = (self.new.coverage_branch_hits
  90. if self.new is not None else 0)
  91. new_branch_count = (self.new.coverage_branch_count
  92. if self.new is not None else 0)
  93. return ((new_branch_hits/new_branch_count if new_branch_count else 1.0)
  94. - (old_branch_hits/old_branch_count if old_branch_count else 1.0))
  95. def key(self, **args):
  96. return (
  97. self.new.key(**args) if self.new is not None else 0,
  98. -self.ratio_line(),
  99. -self.ratio_branch())
  100. def __bool__(self):
  101. return bool(self.ratio_line() or self.ratio_branch())
  102. _header = '%23s %23s %23s' % ('old', 'new', 'diff')
  103. def __str__(self):
  104. old_line_hits = (self.old.coverage_line_hits
  105. if self.old is not None else 0)
  106. old_line_count = (self.old.coverage_line_count
  107. if self.old is not None else 0)
  108. old_branch_hits = (self.old.coverage_branch_hits
  109. if self.old is not None else 0)
  110. old_branch_count = (self.old.coverage_branch_count
  111. if self.old is not None else 0)
  112. new_line_hits = (self.new.coverage_line_hits
  113. if self.new is not None else 0)
  114. new_line_count = (self.new.coverage_line_count
  115. if self.new is not None else 0)
  116. new_branch_hits = (self.new.coverage_branch_hits
  117. if self.new is not None else 0)
  118. new_branch_count = (self.new.coverage_branch_count
  119. if self.new is not None else 0)
  120. diff_line_hits = new_line_hits - old_line_hits
  121. diff_line_count = new_line_count - old_line_count
  122. diff_branch_hits = new_branch_hits - old_branch_hits
  123. diff_branch_count = new_branch_count - old_branch_count
  124. ratio_line = self.ratio_line()
  125. ratio_branch = self.ratio_branch()
  126. return '%11s %11s %11s %11s %11s %11s%-10s%s' % (
  127. '%d/%d' % (old_line_hits, old_line_count)
  128. if old_line_count else '-',
  129. '%d/%d' % (old_branch_hits, old_branch_count)
  130. if old_branch_count else '-',
  131. '%d/%d' % (new_line_hits, new_line_count)
  132. if new_line_count else '-',
  133. '%d/%d' % (new_branch_hits, new_branch_count)
  134. if new_branch_count else '-',
  135. '%+d/%+d' % (diff_line_hits, diff_line_count),
  136. '%+d/%+d' % (diff_branch_hits, diff_branch_count),
  137. ' (%+.1f%%)' % (100*ratio_line) if ratio_line else '',
  138. ' (%+.1f%%)' % (100*ratio_branch) if ratio_branch else '')
  139. def openio(path, mode='r'):
  140. if path == '-':
  141. if 'r' in mode:
  142. return os.fdopen(os.dup(sys.stdin.fileno()), 'r')
  143. else:
  144. return os.fdopen(os.dup(sys.stdout.fileno()), 'w')
  145. else:
  146. return open(path, mode)
  147. def collect(paths, **args):
  148. results = {}
  149. for path in paths:
  150. # map to source file
  151. src_path = re.sub('\.t\.a\.gcda$', '.c', path)
  152. # TODO test this
  153. if args.get('build_dir'):
  154. src_path = re.sub('%s/*' % re.escape(args['build_dir']), '',
  155. src_path)
  156. # get coverage info through gcov's json output
  157. # note, gcov-tool may contain extra args
  158. cmd = args['gcov_tool'] + ['-b', '-t', '--json-format', path]
  159. if args.get('verbose'):
  160. print(' '.join(shlex.quote(c) for c in cmd))
  161. proc = sp.Popen(cmd,
  162. stdout=sp.PIPE,
  163. stderr=sp.PIPE if not args.get('verbose') else None,
  164. universal_newlines=True,
  165. errors='replace')
  166. data = json.load(proc.stdout)
  167. proc.wait()
  168. if proc.returncode != 0:
  169. if not args.get('verbose'):
  170. for line in proc.stderr:
  171. sys.stdout.write(line)
  172. sys.exit(-1)
  173. # collect line/branch coverage
  174. for file in data['files']:
  175. if file['file'] != src_path:
  176. continue
  177. for line in file['lines']:
  178. func = line.get('function_name', '(inlined)')
  179. # discard internal function (this includes injected test cases)
  180. if not args.get('everything'):
  181. if func.startswith('__'):
  182. continue
  183. results[(src_path, func, line['line_number'])] = (
  184. line['count'],
  185. CoverageResult(
  186. coverage_line_hits=1 if line['count'] > 0 else 0,
  187. coverage_line_count=1,
  188. coverage_branch_hits=sum(
  189. 1 if branch['count'] > 0 else 0
  190. for branch in line['branches']),
  191. coverage_branch_count=len(line['branches'])))
  192. # merge into functions, since this is what other scripts use
  193. func_results = co.defaultdict(lambda: CoverageResult())
  194. for (file, func, _), (_, result) in results.items():
  195. func_results[(file, func)] += result
  196. return func_results, results
  197. def main(**args):
  198. # find sizes
  199. if not args.get('use', None):
  200. # find .gcda files
  201. paths = []
  202. for path in args['gcda_paths']:
  203. if os.path.isdir(path):
  204. path = path + '/*.gcda'
  205. for path in glob.glob(path):
  206. paths.append(path)
  207. if not paths:
  208. print('no .gcda files found in %r?' % args['gcda_paths'])
  209. sys.exit(-1)
  210. results, line_results = collect(paths, **args)
  211. else:
  212. with openio(args['use']) as f:
  213. r = csv.DictReader(f)
  214. results = {
  215. (result['file'], result['name']): CoverageResult(
  216. *(result[f] for f in CoverageResult._fields))
  217. for result in r
  218. if all(result.get(f) not in {None, ''}
  219. for f in CoverageResult._fields)}
  220. # find previous results?
  221. if args.get('diff'):
  222. try:
  223. with openio(args['diff']) as f:
  224. r = csv.DictReader(f)
  225. prev_results = {
  226. (result['file'], result['name']): CoverageResult(
  227. *(result[f] for f in CoverageResult._fields))
  228. for result in r
  229. if all(result.get(f) not in {None, ''}
  230. for f in CoverageResult._fields)}
  231. except FileNotFoundError:
  232. prev_results = []
  233. # write results to CSV
  234. if args.get('output'):
  235. merged_results = co.defaultdict(lambda: {})
  236. other_fields = []
  237. # merge?
  238. if args.get('merge'):
  239. try:
  240. with openio(args['merge']) as f:
  241. r = csv.DictReader(f)
  242. for result in r:
  243. file = result.pop('file', '')
  244. func = result.pop('name', '')
  245. for f in CoverageResult._fields:
  246. result.pop(f, None)
  247. merged_results[(file, func)] = result
  248. other_fields = result.keys()
  249. except FileNotFoundError:
  250. pass
  251. for (file, func), result in results.items():
  252. merged_results[(file, func)] |= result._asdict()
  253. with openio(args['output'], 'w') as f:
  254. w = csv.DictWriter(f, ['file', 'name',
  255. *other_fields, *CoverageResult._fields])
  256. w.writeheader()
  257. for (file, func), result in sorted(merged_results.items()):
  258. w.writerow({'file': file, 'name': func, **result})
  259. # print results
  260. def print_header(by):
  261. if by == 'total':
  262. entry = lambda k: 'TOTAL'
  263. elif by == 'file':
  264. entry = lambda k: k[0]
  265. else:
  266. entry = lambda k: k[1]
  267. if not args.get('diff'):
  268. print('%-36s %s' % (by, CoverageResult._header))
  269. else:
  270. old = {entry(k) for k in results.keys()}
  271. new = {entry(k) for k in prev_results.keys()}
  272. print('%-36s %s' % (
  273. '%s (%d added, %d removed)' % (by,
  274. sum(1 for k in new if k not in old),
  275. sum(1 for k in old if k not in new))
  276. if by else '',
  277. CoverageDiff._header))
  278. def print_entries(by):
  279. if by == 'total':
  280. entry = lambda k: 'TOTAL'
  281. elif by == 'file':
  282. entry = lambda k: k[0]
  283. else:
  284. entry = lambda k: k[1]
  285. entries = co.defaultdict(lambda: CoverageResult())
  286. for k, result in results.items():
  287. entries[entry(k)] += result
  288. if not args.get('diff'):
  289. for name, result in sorted(entries.items(),
  290. key=lambda p: (p[1].key(**args), p)):
  291. print('%-36s %s' % (name, result))
  292. else:
  293. prev_entries = co.defaultdict(lambda: CoverageResult())
  294. for k, result in prev_results.items():
  295. prev_entries[entry(k)] += result
  296. diff_entries = {name: entries.get(name) - prev_entries.get(name)
  297. for name in (entries.keys() | prev_entries.keys())}
  298. for name, diff in sorted(diff_entries.items(),
  299. key=lambda p: (p[1].key(**args), p)):
  300. if diff or args.get('all'):
  301. print('%-36s %s' % (name, diff))
  302. if args.get('quiet'):
  303. pass
  304. elif args.get('summary'):
  305. print_header('')
  306. print_entries('total')
  307. elif args.get('files'):
  308. print_header('file')
  309. print_entries('file')
  310. print_entries('total')
  311. else:
  312. print_header('function')
  313. print_entries('function')
  314. print_entries('total')
  315. # catch lack of coverage
  316. if args.get('error_on_lines') and any(
  317. r.coverage_line_hits < r.coverage_line_count
  318. for r in results.values()):
  319. sys.exit(2)
  320. elif args.get('error_on_branches') and any(
  321. r.coverage_branch_hits < r.coverage_branch_count
  322. for r in results.values()):
  323. sys.exit(3)
  324. if __name__ == "__main__":
  325. import argparse
  326. import sys
  327. parser = argparse.ArgumentParser(
  328. description="Find coverage info after running tests.")
  329. parser.add_argument('gcda_paths', nargs='*', default=GCDA_PATHS,
  330. help="Description of where to find *.gcda files. May be a directory \
  331. or a list of paths. Defaults to %r." % GCDA_PATHS)
  332. parser.add_argument('-v', '--verbose', action='store_true',
  333. help="Output commands that run behind the scenes.")
  334. parser.add_argument('-q', '--quiet', action='store_true',
  335. help="Don't show anything, useful with -o.")
  336. parser.add_argument('-o', '--output',
  337. help="Specify CSV file to store results.")
  338. parser.add_argument('-u', '--use',
  339. help="Don't compile and find code sizes, instead use this CSV file.")
  340. parser.add_argument('-d', '--diff',
  341. help="Specify CSV file to diff code size against.")
  342. parser.add_argument('-m', '--merge',
  343. help="Merge with an existing CSV file when writing to output.")
  344. parser.add_argument('-a', '--all', action='store_true',
  345. help="Show all functions, not just the ones that changed.")
  346. parser.add_argument('-A', '--everything', action='store_true',
  347. help="Include builtin and libc specific symbols.")
  348. parser.add_argument('-s', '--line-sort', action='store_true',
  349. help="Sort by line coverage.")
  350. parser.add_argument('-S', '--reverse-line-sort', action='store_true',
  351. help="Sort by line coverage, but backwards.")
  352. parser.add_argument('--branch-sort', action='store_true',
  353. help="Sort by branch coverage.")
  354. parser.add_argument('--reverse-branch-sort', action='store_true',
  355. help="Sort by branch coverage, but backwards.")
  356. parser.add_argument('-F', '--files', action='store_true',
  357. help="Show file-level coverage.")
  358. parser.add_argument('-Y', '--summary', action='store_true',
  359. help="Only show the total coverage.")
  360. parser.add_argument('-e', '--error-on-lines', action='store_true',
  361. help="Error if any lines are not covered.")
  362. parser.add_argument('-E', '--error-on-branches', action='store_true',
  363. help="Error if any branches are not covered.")
  364. parser.add_argument('--gcov-tool', default=['gcov'],
  365. type=lambda x: x.split(),
  366. help="Path to the gcov tool to use.")
  367. parser.add_argument('--build-dir',
  368. help="Specify the relative build directory. Used to map object files \
  369. to the correct source files.")
  370. sys.exit(main(**{k: v
  371. for k, v in vars(parser.parse_args()).items()
  372. if v is not None}))