| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716 |
- #!/usr/bin/env python3
- # This script manages littlefs tests, which are configured with
- # .toml files stored in the tests directory.
- #
- # TODO
- # - nargs > 1?
- # x show perm config on failure
- # x filtering
- # n show perm config on verbose?
- # - better lineno tracking for cases?
- # n non-int perms?
- # - different path format?
- # - suite.prologue, suite.epilogue
- # - in
- # x change BLOCK_CYCLES to -1 by default
- # x change persist behaviour
- # x config chaining correct
- import toml
- import glob
- import re
- import os
- import io
- import itertools as it
- import collections.abc as abc
- import subprocess as sp
- import base64
- import sys
- import copy
- import shlex
- TESTDIR = 'tests_'
- RULES = """
- define FLATTEN
- %$(subst /,.,$(target:.c=.tc)): $(target)
- cat <(echo '#line 1 "$$<"') $$< > $$@
- endef
- $(foreach target,$(SRC),$(eval $(FLATTEN)))
- -include tests_/*.d
- .SECONDARY:
- %.c: %.tc
- ./scripts/explode_asserts.py $< -o $@
- %.test: override CFLAGS += -fdiagnostics-color=always
- %.test: override CFLAGS += -ggdb
- %.test: %.test.o $(foreach f,$(subst /,.,$(SRC:.c=.o)),%.$f)
- $(CC) $(CFLAGS) $^ $(LFLAGS) -o $@
- """
- GLOBALS = """
- //////////////// AUTOGENERATED TEST ////////////////
- #include "lfs.h"
- #include "filebd/lfs_filebd.h"
- #include "rambd/lfs_rambd.h"
- #include <stdio.h>
- const char *LFS_DISK = NULL;
- """
- DEFINES = {
- "LFS_READ_SIZE": 16,
- "LFS_PROG_SIZE": "LFS_READ_SIZE",
- "LFS_BLOCK_SIZE": 512,
- "LFS_BLOCK_COUNT": 1024,
- "LFS_BLOCK_CYCLES": -1,
- "LFS_CACHE_SIZE": "(64 % LFS_PROG_SIZE == 0 ? 64 : LFS_PROG_SIZE)",
- "LFS_LOOKAHEAD_SIZE": 16,
- "LFS_ERASE_VALUE": 0xff,
- }
- PROLOGUE = """
- // prologue
- __attribute__((unused)) lfs_t lfs;
- __attribute__((unused)) lfs_filebd_t filebd;
- __attribute__((unused)) lfs_rambd_t rambd;
- __attribute__((unused)) lfs_file_t file;
- __attribute__((unused)) lfs_dir_t dir;
- __attribute__((unused)) struct lfs_info info;
- __attribute__((unused)) char path[1024];
- __attribute__((unused)) uint8_t buffer[1024];
- __attribute__((unused)) lfs_size_t size;
- __attribute__((unused)) int err;
-
- __attribute__((unused)) const struct lfs_config cfg = {
- .context = LFS_DISK ? (void*)&filebd : (void*)&rambd,
- .read = LFS_DISK ? &lfs_filebd_read : &lfs_rambd_read,
- .prog = LFS_DISK ? &lfs_filebd_prog : &lfs_rambd_prog,
- .erase = LFS_DISK ? &lfs_filebd_erase : &lfs_rambd_erase,
- .sync = LFS_DISK ? &lfs_filebd_sync : &lfs_rambd_sync,
- .read_size = LFS_READ_SIZE,
- .prog_size = LFS_PROG_SIZE,
- .block_size = LFS_BLOCK_SIZE,
- .block_count = LFS_BLOCK_COUNT,
- .block_cycles = LFS_BLOCK_CYCLES,
- .cache_size = LFS_CACHE_SIZE,
- .lookahead_size = LFS_LOOKAHEAD_SIZE,
- };
- __attribute__((unused)) const struct lfs_filebd_config filecfg = {
- .erase_value = LFS_ERASE_VALUE,
- };
- __attribute__((unused)) const struct lfs_rambd_config ramcfg = {
- .erase_value = LFS_ERASE_VALUE,
- };
- if (LFS_DISK) {
- lfs_filebd_createcfg(&cfg, LFS_DISK, &filecfg);
- } else {
- lfs_rambd_createcfg(&cfg, &ramcfg);
- }
- """
- EPILOGUE = """
- // epilogue
- if (LFS_DISK) {
- lfs_filebd_destroy(&cfg);
- } else {
- lfs_rambd_destroy(&cfg);
- }
- """
- PASS = '\033[32m✓\033[0m'
- FAIL = '\033[31m✗\033[0m'
- class TestFailure(Exception):
- def __init__(self, case, returncode=None, stdout=None, assert_=None):
- self.case = case
- self.returncode = returncode
- self.stdout = stdout
- self.assert_ = assert_
- class TestCase:
- def __init__(self, config, suite=None, caseno=None, lineno=None, **_):
- self.suite = suite
- self.caseno = caseno
- self.lineno = lineno
- self.code = config['code']
- self.defines = config.get('define', {})
- self.if_ = config.get('if', None)
- self.leaky = config.get('leaky', False)
- def __str__(self):
- if hasattr(self, 'permno'):
- if any(k not in self.case.defines for k in self.defines):
- return '%s[%d,%d] (%s)' % (
- self.suite.name, self.caseno, self.permno, ', '.join(
- '%s=%s' % (k, v) for k, v in self.defines.items()
- if k not in self.case.defines))
- else:
- return '%s[%d,%d]' % (
- self.suite.name, self.caseno, self.permno)
- else:
- return '%s[%d]' % (
- self.suite.name, self.caseno)
- def permute(self, defines, permno=None, **_):
- ncase = copy.copy(self)
- ncase.case = self
- ncase.perms = [ncase]
- ncase.permno = permno
- ncase.defines = defines
- return ncase
- def build(self, f, **_):
- # prologue
- f.write('void test_case%d(' % self.caseno)
- first = True
- for k, v in sorted(self.perms[0].defines.items()):
- if k not in self.defines:
- if not first:
- f.write(',')
- else:
- first = False
- f.write('\n')
- f.write(8*' '+'__attribute__((unused)) intmax_t %s' % k)
- f.write(') {\n')
- for k, v in sorted(self.defines.items()):
- if k not in self.suite.defines:
- f.write(4*' '+'#define %s %s\n' % (k, v))
- f.write(PROLOGUE)
- f.write('\n')
- f.write(4*' '+'// test case %d\n' % self.caseno)
- f.write(4*' '+'#line %d "%s"\n' % (self.lineno, self.suite.path))
- # test case goes here
- f.write(self.code)
- # epilogue
- f.write(EPILOGUE)
- f.write('\n')
- for k, v in sorted(self.defines.items()):
- if k not in self.suite.defines:
- f.write(4*' '+'#undef %s\n' % k)
- f.write('}\n')
- def shouldtest(self, **args):
- if self.if_ is not None:
- return eval(self.if_, None, self.defines.copy())
- else:
- return True
- def test(self, exec=[], persist=False, gdb=False, failure=None, **args):
- # build command
- cmd = exec + ['./%s.test' % self.suite.path,
- repr(self.caseno), repr(self.permno)]
- # persist disk or keep in RAM for speed?
- if persist:
- if persist != 'noerase':
- try:
- os.remove(self.suite.path + '.disk')
- except FileNotFoundError:
- pass
- cmd.append(self.suite.path + '.disk')
- # failed? drop into debugger?
- if gdb and failure:
- ncmd = ['gdb']
- if gdb == 'assert':
- ncmd.extend(['-ex', 'r'])
- if failure.assert_:
- ncmd.extend(['-ex', 'up'])
- elif gdb == 'start':
- ncmd.extend([
- '-ex', 'b %s:%d' % (self.suite.path, self.lineno),
- '-ex', 'r'])
- ncmd.extend(['--args'] + cmd)
- if args.get('verbose', False):
- print(' '.join(shlex.quote(c) for c in ncmd))
- sys.exit(sp.call(ncmd))
- # run test case!
- stdout = []
- assert_ = None
- if args.get('verbose', False):
- print(' '.join(shlex.quote(c) for c in cmd))
- proc = sp.Popen(cmd,
- universal_newlines=True,
- bufsize=1,
- stdout=sp.PIPE,
- stderr=sp.STDOUT)
- for line in iter(proc.stdout.readline, ''):
- stdout.append(line)
- if args.get('verbose', False):
- sys.stdout.write(line)
- # intercept asserts
- m = re.match(
- '^{0}([^:]+):(\d+):(?:\d+:)?{0}{1}:{0}(.*)$'
- .format('(?:\033\[[\d;]*.| )*', 'assert'),
- line)
- if m and assert_ is None:
- try:
- with open(m.group(1)) as f:
- lineno = int(m.group(2))
- line = next(it.islice(f, lineno-1, None)).strip('\n')
- assert_ = {
- 'path': m.group(1),
- 'line': line,
- 'lineno': lineno,
- 'message': m.group(3)}
- except:
- pass
- proc.wait()
- # did we pass?
- if proc.returncode != 0:
- raise TestFailure(self, proc.returncode, stdout, assert_)
- else:
- return PASS
- class ValgrindTestCase(TestCase):
- def __init__(self, config, **args):
- self.leaky = config.get('leaky', False)
- super().__init__(config, **args)
- def shouldtest(self, **args):
- return not self.leaky and super().shouldtest(**args)
- def test(self, exec=[], **args):
- exec = exec + [
- 'valgrind',
- '--leak-check=full',
- '--error-exitcode=4',
- '-q']
- return super().test(exec=exec, **args)
- class ReentrantTestCase(TestCase):
- def __init__(self, config, **args):
- self.reentrant = config.get('reentrant', False)
- super().__init__(config, **args)
- def shouldtest(self, **args):
- return self.reentrant and super().shouldtest(**args)
- def test(self, exec=[], persist=False, gdb=False, failure=None, **args):
- # clear disk first?
- if persist != 'noerase':
- try:
- os.remove(self.suite.path + '.disk')
- except FileNotFoundError:
- pass
- for cycles in it.count(1):
- # exact cycle we should drop into debugger?
- if gdb and failure and failure.cycleno == cycles:
- return super().test(exec=exec, persist=True,
- gdb=gdb, failure=failure, **args)
- # run tests, but kill the program after prog/erase has
- # been hit n cycles. We exit with a special return code if the
- # program has not finished, since this isn't a test failure.
- nexec = exec + [
- 'gdb', '-batch-silent',
- '-ex', 'handle all nostop',
- '-ex', 'b lfs_filebd_prog',
- '-ex', 'b lfs_filebd_erase',
- '-ex', 'r',
- ] + cycles*['-ex', 'c'] + [
- '-ex', 'q '
- '!$_isvoid($_exitsignal) ? $_exitsignal : '
- '!$_isvoid($_exitcode) ? $_exitcode : '
- '33',
- '--args']
- try:
- return super().test(exec=nexec, persist='noerase', **args)
- except TestFailure as nfailure:
- if nfailure.returncode == 33:
- continue
- else:
- nfailure.cycleno = cycles
- raise
- class TestSuite:
- def __init__(self, path, TestCase=TestCase, **args):
- self.name = os.path.basename(path)
- if self.name.endswith('.toml'):
- self.name = self.name[:-len('.toml')]
- self.path = path
- self.TestCase = TestCase
- with open(path) as f:
- # load tests
- config = toml.load(f)
- # find line numbers
- f.seek(0)
- linenos = []
- for i, line in enumerate(f):
- if re.match(r'^\s*code\s*=\s*(\'\'\'|""")', line):
- linenos.append(i + 2)
- # grab global config
- self.defines = config.get('define', {})
- # create initial test cases
- self.cases = []
- for i, (case, lineno) in enumerate(zip(config['case'], linenos)):
- # give our case's config a copy of our "global" config
- for k, v in config.items():
- if k not in case:
- case[k] = v
- # initialize test case
- self.cases.append(self.TestCase(case,
- suite=self, caseno=i, lineno=lineno, **args))
- def __str__(self):
- return self.name
- def __lt__(self, other):
- return self.name < other.name
- def permute(self, defines={}, **args):
- for case in self.cases:
- # lets find all parameterized definitions, in one of [args.D,
- # suite.defines, case.defines, DEFINES]. Note that each of these
- # can be either a dict of defines, or a list of dicts, expressing
- # an initial set of permutations.
- pending = [{}]
- for inits in [defines, self.defines, case.defines, DEFINES]:
- if not isinstance(inits, list):
- inits = [inits]
- npending = []
- for init, pinit in it.product(inits, pending):
- ninit = pinit.copy()
- for k, v in init.items():
- if k not in ninit:
- try:
- ninit[k] = eval(v)
- except:
- ninit[k] = v
- npending.append(ninit)
- pending = npending
- # expand permutations
- pending = list(reversed(pending))
- expanded = []
- while pending:
- perm = pending.pop()
- for k, v in sorted(perm.items()):
- if not isinstance(v, str) and isinstance(v, abc.Iterable):
- for nv in reversed(v):
- nperm = perm.copy()
- nperm[k] = nv
- pending.append(nperm)
- break
- else:
- expanded.append(perm)
- # generate permutations
- case.perms = []
- for i, perm in enumerate(expanded):
- case.perms.append(case.permute(perm, permno=i, **args))
- # also track non-unique defines
- case.defines = {}
- for k, v in case.perms[0].defines.items():
- if all(perm.defines[k] == v for perm in case.perms):
- case.defines[k] = v
- # track all perms and non-unique defines
- self.perms = []
- for case in self.cases:
- self.perms.extend(case.perms)
- self.defines = {}
- for k, v in self.perms[0].defines.items():
- if all(perm.defines.get(k, None) == v for perm in self.perms):
- self.defines[k] = v
- return self.perms
- def build(self, **args):
- # build test.c
- f = io.StringIO()
- f.write(GLOBALS)
- for case in self.cases:
- f.write('\n')
- case.build(f, **args)
- f.write('\n')
- f.write('int main(int argc, char **argv) {\n')
- f.write(4*' '+'int case_ = (argc >= 2) ? atoi(argv[1]) : 0;\n')
- f.write(4*' '+'int perm = (argc >= 3) ? atoi(argv[2]) : 0;\n')
- f.write(4*' '+'LFS_DISK = (argc >= 4) ? argv[3] : NULL;\n')
- for perm in self.perms:
- f.write(4*' '+'if (argc < 3 || '
- '(case_ == %d && perm == %d)) { ' % (
- perm.caseno, perm.permno))
- f.write('test_case%d(' % perm.caseno)
- first = True
- for k, v in sorted(perm.defines.items()):
- if k not in perm.case.defines:
- if not first:
- f.write(', ')
- else:
- first = False
- f.write(str(v))
- f.write('); }\n')
- f.write('}\n')
- # add test-related rules
- rules = RULES.replace(4*' ', '\t')
- with open(self.path + '.mk', 'w') as mk:
- mk.write(rules)
- mk.write('\n')
- # add truely global defines globally
- for k, v in sorted(self.defines.items()):
- mk.write('%s: override CFLAGS += -D%s=%r\n' % (
- self.path+'.test', k, v))
- # write test.c in base64 so make can decide when to rebuild
- mk.write('%s: %s\n' % (self.path+'.test.tc', self.path))
- mk.write('\t@base64 -d <<< ')
- mk.write(base64.b64encode(
- f.getvalue().encode('utf8')).decode('utf8'))
- mk.write(' > $@\n')
- self.makefile = self.path + '.mk'
- self.target = self.path + '.test'
- return self.makefile, self.target
- def test(self, caseno=None, permno=None, **args):
- # run test suite!
- if not args.get('verbose', True):
- sys.stdout.write(self.name + ' ')
- sys.stdout.flush()
- for perm in self.perms:
- if caseno is not None and perm.caseno != caseno:
- continue
- if permno is not None and perm.permno != permno:
- continue
- if not perm.shouldtest(**args):
- continue
- try:
- result = perm.test(**args)
- except TestFailure as failure:
- perm.result = failure
- if not args.get('verbose', True):
- sys.stdout.write(FAIL)
- sys.stdout.flush()
- if not args.get('keep_going', False):
- if not args.get('verbose', True):
- sys.stdout.write('\n')
- raise
- else:
- perm.result = PASS
- if not args.get('verbose', True):
- sys.stdout.write(PASS)
- sys.stdout.flush()
- if not args.get('verbose', True):
- sys.stdout.write('\n')
- def main(**args):
- testpath = args['testpath']
- # optional brackets for specific test
- m = re.search(r'\[(\d+)(?:,(\d+))?\]$', testpath)
- if m:
- caseno = int(m.group(1))
- permno = int(m.group(2)) if m.group(2) is not None else None
- testpath = testpath[:m.start()]
- else:
- caseno = None
- permno = None
- # figure out the suite's toml file
- if os.path.isdir(testpath):
- testpath = testpath + '/test_*.toml'
- elif os.path.isfile(testpath):
- testpath = testpath
- elif testpath.endswith('.toml'):
- testpath = TESTDIR + '/' + testpath
- else:
- testpath = TESTDIR + '/' + testpath + '.toml'
- # find tests
- suites = []
- for path in glob.glob(testpath):
- if args.get('valgrind', False):
- suites.append(TestSuite(path, TestCase=ValgrindTestCase, **args))
- elif args.get('reentrant', False):
- suites.append(TestSuite(path, TestCase=ReentrantTestCase, **args))
- else:
- suites.append(TestSuite(path, **args))
- # sort for reproducability
- suites = sorted(suites)
- # generate permutations
- defines = {}
- for define in args['D']:
- k, v, *_ = define.split('=', 2) + ['']
- defines[k] = v
- for suite in suites:
- suite.permute(defines, **args)
- # build tests in parallel
- print('====== building ======')
- makefiles = []
- targets = []
- for suite in suites:
- makefile, target = suite.build(**args)
- makefiles.append(makefile)
- targets.append(target)
- cmd = (['make', '-f', 'Makefile'] +
- list(it.chain.from_iterable(['-f', m] for m in makefiles)) +
- [target for target in targets])
- stdout = []
- if args.get('verbose', False):
- print(' '.join(shlex.quote(c) for c in cmd))
- proc = sp.Popen(cmd,
- universal_newlines=True,
- bufsize=1,
- stdout=sp.PIPE,
- stderr=sp.STDOUT)
- for line in iter(proc.stdout.readline, ''):
- stdout.append(line)
- if args.get('verbose', False):
- sys.stdout.write(line)
- # intercept warnings
- m = re.match(
- '^{0}([^:]+):(\d+):(?:\d+:)?{0}{1}:{0}(.*)$'
- .format('(?:\033\[[\d;]*.| )*', 'warning'),
- line)
- if m and not args.get('verbose', False):
- try:
- with open(m.group(1)) as f:
- lineno = int(m.group(2))
- line = next(it.islice(f, lineno-1, None)).strip('\n')
- sys.stdout.write(
- "\033[01m{path}:{lineno}:\033[01;35mwarning:\033[m "
- "{message}\n{line}\n\n".format(
- path=m.group(1), line=line, lineno=lineno,
- message=m.group(3)))
- except:
- pass
- proc.wait()
- if proc.returncode != 0:
- if not args.get('verbose', False):
- for line in stdout:
- sys.stdout.write(line)
- sys.exit(-3)
- print('built %d test suites, %d test cases, %d permutations' % (
- len(suites),
- sum(len(suite.cases) for suite in suites),
- sum(len(suite.perms) for suite in suites)))
- filtered = 0
- for suite in suites:
- for perm in suite.perms:
- filtered += perm.shouldtest(**args)
- if filtered != sum(len(suite.perms) for suite in suites):
- print('filtered down to %d permutations' % filtered)
- print('====== testing ======')
- try:
- for suite in suites:
- suite.test(caseno, permno, **args)
- except TestFailure:
- pass
- print('====== results ======')
- passed = 0
- failed = 0
- for suite in suites:
- for perm in suite.perms:
- if not hasattr(perm, 'result'):
- continue
- if perm.result == PASS:
- passed += 1
- else:
- #sys.stdout.write("--- %s ---\n" % perm)
- sys.stdout.write(
- "\033[01m{path}:{lineno}:\033[01;31mfailure:\033[m "
- "{perm} failed with {returncode}\n".format(
- perm=perm, path=perm.suite.path, lineno=perm.lineno-2,
- returncode=perm.result.returncode or 0))
- if perm.result.stdout:
- for line in (perm.result.stdout
- if not perm.result.assert_
- else perm.result.stdout[:-1]):
- sys.stdout.write(line)
- if perm.result.assert_:
- sys.stdout.write(
- "\033[01m{path}:{lineno}:\033[01;31massert:\033[m "
- "{message}\n{line}\n".format(
- **perm.result.assert_))
- else:
- for line in perm.result.stdout:
- sys.stdout.write(line)
- sys.stdout.write('\n')
- failed += 1
- if args.get('gdb', False):
- failure = None
- for suite in suites:
- for perm in suite.perms:
- if getattr(perm, 'result', PASS) != PASS:
- failure = perm.result
- if failure is not None:
- print('======= gdb ======')
- # drop into gdb
- failure.case.test(failure=failure, **args)
- sys.exit(0)
- print('tests passed: %d' % passed)
- print('tests failed: %d' % failed)
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(
- description="Run parameterized tests in various configurations.")
- parser.add_argument('testpath', nargs='?', default=TESTDIR,
- help="Description of test(s) to run. By default, this is all tests \
- found in the \"{0}\" directory. Here, you can specify a different \
- directory of tests, a specific file, a suite by name, and even a \
- specific test case by adding brackets. For example \
- \"test_dirs[0]\" or \"{0}/test_dirs.toml[0]\".".format(TESTDIR))
- parser.add_argument('-D', action='append', default=[],
- help="Overriding parameter definitions.")
- parser.add_argument('-v', '--verbose', action='store_true',
- help="Output everything that is happening.")
- parser.add_argument('-k', '--keep-going', action='store_true',
- help="Run all tests instead of stopping on first error. Useful for CI.")
- parser.add_argument('-p', '--persist', choices=['erase', 'noerase'],
- nargs='?', const='erase',
- help="Store disk image in a file.")
- parser.add_argument('-g', '--gdb', choices=['init', 'start', 'assert'],
- nargs='?', const='assert',
- help="Drop into gdb on test failure.")
- parser.add_argument('--valgrind', action='store_true',
- help="Run non-leaky tests under valgrind to check for memory leaks.")
- parser.add_argument('--reentrant', action='store_true',
- help="Run reentrant tests with simulated power-loss.")
- parser.add_argument('-e', '--exec', default=[], type=lambda e: e.split(' '),
- help="Run tests with another executable prefixed on the command line.")
- main(**vars(parser.parse_args()))
|