208 lines
7 KiB
Python
208 lines
7 KiB
Python
import os, sys, unittest, getopt, time
|
|
|
|
use_resources = []
|
|
|
|
class ResourceDenied(Exception):
|
|
"""Test skipped because it requested a disallowed resource.
|
|
|
|
This is raised when a test calls requires() for a resource that
|
|
has not be enabled. Resources are defined by test modules.
|
|
"""
|
|
|
|
def is_resource_enabled(resource):
|
|
"""Test whether a resource is enabled.
|
|
|
|
If the caller's module is __main__ then automatically return True."""
|
|
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
|
|
return True
|
|
result = use_resources is not None and \
|
|
(resource in use_resources or "*" in use_resources)
|
|
if not result:
|
|
_unavail[resource] = None
|
|
return result
|
|
|
|
_unavail = {}
|
|
def requires(resource, msg=None):
|
|
"""Raise ResourceDenied if the specified resource is not available.
|
|
|
|
If the caller's module is __main__ then automatically return True."""
|
|
# see if the caller's module is __main__ - if so, treat as if
|
|
# the resource was set
|
|
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
|
|
return
|
|
if not is_resource_enabled(resource):
|
|
if msg is None:
|
|
msg = "Use of the `%s' resource not enabled" % resource
|
|
raise ResourceDenied(msg)
|
|
|
|
def find_package_modules(package, mask):
|
|
import fnmatch
|
|
if (package.__loader__ is not None and
|
|
hasattr(package.__loader__, '_files')):
|
|
path = package.__name__.replace(".", os.path.sep)
|
|
mask = os.path.join(path, mask)
|
|
for fnm in package.__loader__._files.keys():
|
|
if fnmatch.fnmatchcase(fnm, mask):
|
|
yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
|
|
else:
|
|
path = package.__path__[0]
|
|
for fnm in os.listdir(path):
|
|
if fnmatch.fnmatchcase(fnm, mask):
|
|
yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
|
|
|
|
def get_tests(package, mask, verbosity, exclude=()):
|
|
"""Return a list of skipped test modules, and a list of test cases."""
|
|
tests = []
|
|
skipped = []
|
|
for modname in find_package_modules(package, mask):
|
|
if modname.split(".")[-1] in exclude:
|
|
skipped.append(modname)
|
|
if verbosity > 1:
|
|
print("Skipped %s: excluded" % modname, file=sys.stderr)
|
|
continue
|
|
try:
|
|
mod = __import__(modname, globals(), locals(), ['*'])
|
|
except (ResourceDenied, unittest.SkipTest) as detail:
|
|
skipped.append(modname)
|
|
if verbosity > 1:
|
|
print("Skipped %s: %s" % (modname, detail), file=sys.stderr)
|
|
continue
|
|
for name in dir(mod):
|
|
if name.startswith("_"):
|
|
continue
|
|
o = getattr(mod, name)
|
|
if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
|
|
tests.append(o)
|
|
return skipped, tests
|
|
|
|
def usage():
|
|
print(__doc__)
|
|
return 1
|
|
|
|
def test_with_refcounts(runner, verbosity, testcase):
|
|
"""Run testcase several times, tracking reference counts."""
|
|
import gc
|
|
import ctypes
|
|
ptc = ctypes._pointer_type_cache.copy()
|
|
cfc = ctypes._c_functype_cache.copy()
|
|
wfc = ctypes._win_functype_cache.copy()
|
|
|
|
# when searching for refcount leaks, we have to manually reset any
|
|
# caches that ctypes has.
|
|
def cleanup():
|
|
ctypes._pointer_type_cache = ptc.copy()
|
|
ctypes._c_functype_cache = cfc.copy()
|
|
ctypes._win_functype_cache = wfc.copy()
|
|
gc.collect()
|
|
|
|
test = unittest.makeSuite(testcase)
|
|
for i in range(5):
|
|
rc = sys.gettotalrefcount()
|
|
runner.run(test)
|
|
cleanup()
|
|
COUNT = 5
|
|
refcounts = [None] * COUNT
|
|
for i in range(COUNT):
|
|
rc = sys.gettotalrefcount()
|
|
runner.run(test)
|
|
cleanup()
|
|
refcounts[i] = sys.gettotalrefcount() - rc
|
|
if filter(None, refcounts):
|
|
print("%s leaks:\n\t" % testcase, refcounts)
|
|
elif verbosity:
|
|
print("%s: ok." % testcase)
|
|
|
|
class TestRunner(unittest.TextTestRunner):
|
|
def run(self, test, skipped):
|
|
"Run the given test case or test suite."
|
|
# Same as unittest.TextTestRunner.run, except that it reports
|
|
# skipped tests.
|
|
result = self._makeResult()
|
|
startTime = time.time()
|
|
test(result)
|
|
stopTime = time.time()
|
|
timeTaken = stopTime - startTime
|
|
result.printErrors()
|
|
self.stream.writeln(result.separator2)
|
|
run = result.testsRun
|
|
if _unavail: #skipped:
|
|
requested = list(_unavail.keys())
|
|
requested.sort()
|
|
self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
|
|
(run, run != 1 and "s" or "", timeTaken,
|
|
len(skipped),
|
|
len(skipped) != 1 and "s" or ""))
|
|
self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
|
|
else:
|
|
self.stream.writeln("Ran %d test%s in %.3fs" %
|
|
(run, run != 1 and "s" or "", timeTaken))
|
|
self.stream.writeln()
|
|
if not result.wasSuccessful():
|
|
self.stream.write("FAILED (")
|
|
failed, errored = map(len, (result.failures, result.errors))
|
|
if failed:
|
|
self.stream.write("failures=%d" % failed)
|
|
if errored:
|
|
if failed: self.stream.write(", ")
|
|
self.stream.write("errors=%d" % errored)
|
|
self.stream.writeln(")")
|
|
else:
|
|
self.stream.writeln("OK")
|
|
return result
|
|
|
|
|
|
def main(*packages):
|
|
try:
|
|
opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
|
|
except getopt.error:
|
|
return usage()
|
|
|
|
verbosity = 1
|
|
search_leaks = False
|
|
exclude = []
|
|
for flag, value in opts:
|
|
if flag == "-q":
|
|
verbosity -= 1
|
|
elif flag == "-v":
|
|
verbosity += 1
|
|
elif flag == "-r":
|
|
try:
|
|
sys.gettotalrefcount
|
|
except AttributeError:
|
|
print("-r flag requires Python debug build", file=sys.stderr)
|
|
return -1
|
|
search_leaks = True
|
|
elif flag == "-u":
|
|
use_resources.extend(value.split(","))
|
|
elif flag == "-x":
|
|
exclude.extend(value.split(","))
|
|
|
|
mask = "test_*.py"
|
|
if args:
|
|
mask = args[0]
|
|
|
|
for package in packages:
|
|
run_tests(package, mask, verbosity, search_leaks, exclude)
|
|
|
|
|
|
def run_tests(package, mask, verbosity, search_leaks, exclude):
|
|
skipped, testcases = get_tests(package, mask, verbosity, exclude)
|
|
runner = TestRunner(verbosity=verbosity)
|
|
|
|
suites = [unittest.makeSuite(o) for o in testcases]
|
|
suite = unittest.TestSuite(suites)
|
|
result = runner.run(suite, skipped)
|
|
|
|
if search_leaks:
|
|
# hunt for refcount leaks
|
|
runner = BasicTestRunner()
|
|
for t in testcases:
|
|
test_with_refcounts(runner, verbosity, t)
|
|
|
|
return bool(result.errors)
|
|
|
|
class BasicTestRunner:
|
|
def run(self, test):
|
|
result = unittest.TestResult()
|
|
test(result)
|
|
return result
|