import gc import io import os import sys import signal import weakref import unittest @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " "if threads have been used") class TestBreak(unittest.TestCase): int_handler = None def setUp(self): self._default_handler = signal.getsignal(signal.SIGINT) if self.int_handler is not None: signal.signal(signal.SIGINT, self.int_handler) def tearDown(self): signal.signal(signal.SIGINT, self._default_handler) unittest.signals._results = weakref.WeakKeyDictionary() unittest.signals._interrupt_handler = None def testInstallHandler(self): default_handler = signal.getsignal(signal.SIGINT) unittest.installHandler() self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) try: pid = os.getpid() os.kill(pid, signal.SIGINT) except KeyboardInterrupt: self.fail("KeyboardInterrupt not handled") self.assertTrue(unittest.signals._interrupt_handler.called) def testRegisterResult(self): result = unittest.TestResult() unittest.registerResult(result) for ref in unittest.signals._results: if ref is result: break elif ref is not result: self.fail("odd object in result set") else: self.fail("result not found") def testInterruptCaught(self): default_handler = signal.getsignal(signal.SIGINT) result = unittest.TestResult() unittest.installHandler() unittest.registerResult(result) self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) result.breakCaught = True self.assertTrue(result.shouldStop) try: test(result) except KeyboardInterrupt: self.fail("KeyboardInterrupt not handled") self.assertTrue(result.breakCaught) def testSecondInterrupt(self): # Can't use skipIf decorator because the signal handler may have # been changed after defining this method. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: self.skipTest("test requires SIGINT to not be ignored") result = unittest.TestResult() unittest.installHandler() unittest.registerResult(result) def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) result.breakCaught = True self.assertTrue(result.shouldStop) os.kill(pid, signal.SIGINT) self.fail("Second KeyboardInterrupt not raised") try: test(result) except KeyboardInterrupt: pass else: self.fail("Second KeyboardInterrupt not raised") self.assertTrue(result.breakCaught) def testTwoResults(self): unittest.installHandler() result = unittest.TestResult() unittest.registerResult(result) new_handler = signal.getsignal(signal.SIGINT) result2 = unittest.TestResult() unittest.registerResult(result2) self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) result3 = unittest.TestResult() def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) try: test(result) except KeyboardInterrupt: self.fail("KeyboardInterrupt not handled") self.assertTrue(result.shouldStop) self.assertTrue(result2.shouldStop) self.assertFalse(result3.shouldStop) def testHandlerReplacedButCalled(self): # Can't use skipIf decorator because the signal handler may have # been changed after defining this method. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: self.skipTest("test requires SIGINT to not be ignored") # If our handler has been replaced (is no longer installed) but is # called by the *new* handler, then it isn't safe to delay the # SIGINT and we should immediately delegate to the default handler unittest.installHandler() handler = signal.getsignal(signal.SIGINT) def new_handler(frame, signum): handler(frame, signum) signal.signal(signal.SIGINT, new_handler) try: pid = os.getpid() os.kill(pid, signal.SIGINT) except KeyboardInterrupt: pass else: self.fail("replaced but delegated handler doesn't raise interrupt") def testRunner(self): # Creating a TextTestRunner with the appropriate argument should # register the TextTestResult it creates runner = unittest.TextTestRunner(stream=io.StringIO()) result = runner.run(unittest.TestSuite()) self.assertIn(result, unittest.signals._results) def testWeakReferences(self): # Calling registerResult on a result should not keep it alive result = unittest.TestResult() unittest.registerResult(result) ref = weakref.ref(result) del result # For non-reference counting implementations gc.collect();gc.collect() self.assertIsNone(ref()) def testRemoveResult(self): result = unittest.TestResult() unittest.registerResult(result) unittest.installHandler() self.assertTrue(unittest.removeResult(result)) # Should this raise an error instead? self.assertFalse(unittest.removeResult(unittest.TestResult())) try: pid = os.getpid() os.kill(pid, signal.SIGINT) except KeyboardInterrupt: pass self.assertFalse(result.shouldStop) def testMainInstallsHandler(self): failfast = object() test = object() verbosity = object() result = object() default_handler = signal.getsignal(signal.SIGINT) class FakeRunner(object): initArgs = [] runArgs = [] def __init__(self, *args, **kwargs): self.initArgs.append((args, kwargs)) def run(self, test): self.runArgs.append(test) return result class Program(unittest.TestProgram): def __init__(self, catchbreak): self.exit = False self.verbosity = verbosity self.failfast = failfast self.catchbreak = catchbreak self.tb_locals = False self.testRunner = FakeRunner self.test = test self.result = None p = Program(False) p.runTests() self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 'verbosity': verbosity, 'failfast': failfast, 'tb_locals': False, 'warnings': None})]) self.assertEqual(FakeRunner.runArgs, [test]) self.assertEqual(p.result, result) self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) FakeRunner.initArgs = [] FakeRunner.runArgs = [] p = Program(True) p.runTests() self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 'verbosity': verbosity, 'failfast': failfast, 'tb_locals': False, 'warnings': None})]) self.assertEqual(FakeRunner.runArgs, [test]) self.assertEqual(p.result, result) self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) def testRemoveHandler(self): default_handler = signal.getsignal(signal.SIGINT) unittest.installHandler() unittest.removeHandler() self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) # check that calling removeHandler multiple times has no ill-effect unittest.removeHandler() self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) def testRemoveHandlerAsDecorator(self): default_handler = signal.getsignal(signal.SIGINT) unittest.installHandler() @unittest.removeHandler def test(): self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) test() self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " "if threads have been used") class TestBreakDefaultIntHandler(TestBreak): int_handler = signal.default_int_handler @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " "if threads have been used") class TestBreakSignalIgnored(TestBreak): int_handler = signal.SIG_IGN @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " "if threads have been used") class TestBreakSignalDefault(TestBreak): int_handler = signal.SIG_DFL if __name__ == "__main__": unittest.main()