Open Media Library Platform

This commit is contained in:
j 2013-10-11 19:28:32 +02:00
commit 411ad5b16f
5849 changed files with 1778641 additions and 0 deletions

View file

@ -0,0 +1,3 @@
"""
Unit tests for L{twisted.python}.
"""

View file

@ -0,0 +1,28 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
A module that is deprecated, used by L{twisted.python.test.test_deprecate} for
testing purposes.
"""
from __future__ import division, absolute_import
from twisted.python.versions import Version
from twisted.python.deprecate import deprecatedModuleAttribute
# Known module-level attributes.
DEPRECATED_ATTRIBUTE = 42
ANOTHER_ATTRIBUTE = 'hello'
version = Version('Twisted', 8, 0, 0)
message = 'Oh noes!'
deprecatedModuleAttribute(
version,
message,
__name__,
'DEPRECATED_ATTRIBUTE')

View file

@ -0,0 +1,59 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Facilities for helping test code which interacts with Python's module system
to load code.
"""
from __future__ import division, absolute_import
import sys
from twisted.python.filepath import FilePath
class TwistedModulesMixin:
"""
A mixin for C{twisted.trial.unittest.SynchronousTestCase} providing useful
methods for manipulating Python's module system.
"""
def replaceSysPath(self, sysPath):
"""
Replace sys.path, for the duration of the test, with the given value.
"""
originalSysPath = sys.path[:]
def cleanUpSysPath():
sys.path[:] = originalSysPath
self.addCleanup(cleanUpSysPath)
sys.path[:] = sysPath
def replaceSysModules(self, sysModules):
"""
Replace sys.modules, for the duration of the test, with the given value.
"""
originalSysModules = sys.modules.copy()
def cleanUpSysModules():
sys.modules.clear()
sys.modules.update(originalSysModules)
self.addCleanup(cleanUpSysModules)
sys.modules.clear()
sys.modules.update(sysModules)
def pathEntryWithOnePackage(self, pkgname=b"test_package"):
"""
Generate a L{FilePath} with one package, named C{pkgname}, on it, and
return the L{FilePath} of the path entry.
"""
# Remove utf-8 encode and bytes for path segments when Filepath
# supports Unicode paths on Python 3 (#2366, #4736, #5203).
entry = FilePath(self.mktemp().encode("utf-8"))
pkg = entry.child(b"test_package")
pkg.makedirs()
pkg.child(b"__init__.py").setContent(b"")
return entry

View file

@ -0,0 +1,40 @@
#!/usr/bin/python
# -*- test-case-name: twisted.python.test.test_sendmsg -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
import sys, os
from struct import unpack
# This makes me sad. Why aren't things nice?
sys.path.insert(0, __file__.rsplit('/', 4)[0])
from twisted.python.sendmsg import recv1msg
def recvfd(socketfd):
"""
Receive a file descriptor from a L{send1msg} message on the given C{AF_UNIX}
socket.
@param socketfd: An C{AF_UNIX} socket, attached to another process waiting
to send sockets via the ancillary data mechanism in L{send1msg}.
@param fd: C{int}
@return: a 2-tuple of (new file descriptor, description).
@rtype: 2-tuple of (C{int}, C{str})
"""
data, flags, ancillary = recv1msg(socketfd)
[(cmsg_level, cmsg_type, packedFD)] = ancillary
# cmsg_level and cmsg_type really need to be SOL_SOCKET / SCM_RIGHTS, but
# since those are the *only* standard values, there's not much point in
# checking.
[unpackedFD] = unpack("i", packedFD)
return (unpackedFD, data)
if __name__ == '__main__':
fd, description = recvfd(int(sys.argv[1]))
os.write(fd, "Test fixture data: %s.\n" % (description,))
os.close(fd)

View file

@ -0,0 +1,839 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Test cases for Twisted component architecture.
"""
from __future__ import division, absolute_import
from zope.interface import Interface, implementer, Attribute
from zope.interface.adapter import AdapterRegistry
from twisted.python.compat import comparable, cmp
from twisted.trial import unittest
from twisted.python import components
from twisted.python.components import _addHook, _removeHook, proxyForInterface
class Compo(components.Componentized):
num = 0
def inc(self):
self.num = self.num + 1
return self.num
class IAdept(Interface):
def adaptorFunc():
raise NotImplementedError()
class IElapsed(Interface):
def elapsedFunc():
"""
1!
"""
@implementer(IAdept)
class Adept(components.Adapter):
def __init__(self, orig):
self.original = orig
self.num = 0
def adaptorFunc(self):
self.num = self.num + 1
return self.num, self.original.inc()
@implementer(IElapsed)
class Elapsed(components.Adapter):
def elapsedFunc(self):
return 1
class AComp(components.Componentized):
pass
class BComp(AComp):
pass
class CComp(BComp):
pass
class ITest(Interface):
pass
class ITest2(Interface):
pass
class ITest3(Interface):
pass
class ITest4(Interface):
pass
@implementer(ITest, ITest3, ITest4)
class Test(components.Adapter):
def __init__(self, orig):
pass
@implementer(ITest2)
class Test2:
temporaryAdapter = 1
def __init__(self, orig):
pass
class RegistryUsingMixin(object):
"""
Mixin for test cases which modify the global registry somehow.
"""
def setUp(self):
"""
Configure L{twisted.python.components.registerAdapter} to mutate an
alternate registry to improve test isolation.
"""
# Create a brand new, empty registry and put it onto the components
# module where registerAdapter will use it. Also ensure that it goes
# away at the end of the test.
scratchRegistry = AdapterRegistry()
self.patch(components, 'globalRegistry', scratchRegistry)
# Hook the new registry up to the adapter lookup system and ensure that
# association is also discarded after the test.
hook = _addHook(scratchRegistry)
self.addCleanup(_removeHook, hook)
class ComponentizedTestCase(unittest.SynchronousTestCase, RegistryUsingMixin):
"""
Simple test case for caching in Componentized.
"""
def setUp(self):
RegistryUsingMixin.setUp(self)
components.registerAdapter(Test, AComp, ITest)
components.registerAdapter(Test, AComp, ITest3)
components.registerAdapter(Test2, AComp, ITest2)
def testComponentized(self):
components.registerAdapter(Adept, Compo, IAdept)
components.registerAdapter(Elapsed, Compo, IElapsed)
c = Compo()
assert c.getComponent(IAdept).adaptorFunc() == (1, 1)
assert c.getComponent(IAdept).adaptorFunc() == (2, 2)
assert IElapsed(IAdept(c)).elapsedFunc() == 1
def testInheritanceAdaptation(self):
c = CComp()
co1 = c.getComponent(ITest)
co2 = c.getComponent(ITest)
co3 = c.getComponent(ITest2)
co4 = c.getComponent(ITest2)
assert co1 is co2
assert co3 is not co4
c.removeComponent(co1)
co5 = c.getComponent(ITest)
co6 = c.getComponent(ITest)
assert co5 is co6
assert co1 is not co5
def testMultiAdapter(self):
c = CComp()
co1 = c.getComponent(ITest)
co2 = c.getComponent(ITest2)
co3 = c.getComponent(ITest3)
co4 = c.getComponent(ITest4)
self.assertIdentical(None, co4)
self.assertIdentical(co1, co3)
def test_getComponentDefaults(self):
"""
Test that a default value specified to Componentized.getComponent if
there is no component for the requested interface.
"""
componentized = components.Componentized()
default = object()
self.assertIdentical(
componentized.getComponent(ITest, default),
default)
self.assertIdentical(
componentized.getComponent(ITest, default=default),
default)
self.assertIdentical(
componentized.getComponent(ITest),
None)
def test_setAdapter(self):
"""
C{Componentized.setAdapter} sets a component for an interface by
wrapping the instance with the given adapter class.
"""
componentized = components.Componentized()
componentized.setAdapter(IAdept, Adept)
component = componentized.getComponent(IAdept)
self.assertEqual(component.original, componentized)
self.assertIsInstance(component, Adept)
def test_addAdapter(self):
"""
C{Componentized.setAdapter} adapts the instance by wrapping it with
given adapter class, then stores it using C{addComponent}.
"""
componentized = components.Componentized()
componentized.addAdapter(Adept, ignoreClass=True)
component = componentized.getComponent(IAdept)
self.assertEqual(component.original, componentized)
self.assertIsInstance(component, Adept)
def test_setComponent(self):
"""
C{Componentized.setComponent} stores the given component using the
given interface as the key.
"""
componentized = components.Componentized()
obj = object()
componentized.setComponent(ITest, obj)
self.assertIdentical(componentized.getComponent(ITest), obj)
def test_unsetComponent(self):
"""
C{Componentized.setComponent} removes the cached component for the
given interface.
"""
componentized = components.Componentized()
obj = object()
componentized.setComponent(ITest, obj)
componentized.unsetComponent(ITest)
self.assertIdentical(componentized.getComponent(ITest), None)
def test_reprableComponentized(self):
"""
C{ReprableComponentized} has a C{__repr__} that lists its cache.
"""
rc = components.ReprableComponentized()
rc.setComponent(ITest, "hello")
result = repr(rc)
self.assertIn("ITest", result)
self.assertIn("hello", result)
class AdapterTestCase(unittest.SynchronousTestCase):
"""Test adapters."""
def testAdapterGetComponent(self):
o = object()
a = Adept(o)
self.assertRaises(components.CannotAdapt, ITest, a)
self.assertEqual(ITest(a, None), None)
class IMeta(Interface):
pass
@implementer(IMeta)
class MetaAdder(components.Adapter):
def add(self, num):
return self.original.num + num
@implementer(IMeta)
class BackwardsAdder(components.Adapter):
def add(self, num):
return self.original.num - num
class MetaNumber:
def __init__(self, num):
self.num = num
class FakeAdder:
def add(self, num):
return num + 5
class FakeNumber:
num = 3
class ComponentNumber(components.Componentized):
def __init__(self):
self.num = 0
components.Componentized.__init__(self)
implementer(IMeta)
class ComponentMeta(components.Adapter):
def __init__(self, original):
components.Adapter.__init__(self, original)
self.num = self.original.num
class ComponentAdder(ComponentMeta):
def add(self, num):
self.num += num
return self.num
class ComponentDoubler(ComponentMeta):
def add(self, num):
self.num += (num * 2)
return self.original.num
class IAttrX(Interface):
def x():
pass
class IAttrXX(Interface):
def xx():
pass
@implementer(IAttrX)
class Xcellent:
def x(self):
return 'x!'
@comparable
class DoubleXAdapter:
num = 42
def __init__(self, original):
self.original = original
def xx(self):
return (self.original.x(), self.original.x())
def __cmp__(self, other):
return cmp(self.num, other.num)
class TestMetaInterface(RegistryUsingMixin, unittest.SynchronousTestCase):
def testBasic(self):
components.registerAdapter(MetaAdder, MetaNumber, IMeta)
n = MetaNumber(1)
self.assertEqual(IMeta(n).add(1), 2)
def testComponentizedInteraction(self):
components.registerAdapter(ComponentAdder, ComponentNumber, IMeta)
c = ComponentNumber()
IMeta(c).add(1)
IMeta(c).add(1)
self.assertEqual(IMeta(c).add(1), 3)
def testAdapterWithCmp(self):
# Make sure that a __cmp__ on an adapter doesn't break anything
components.registerAdapter(DoubleXAdapter, IAttrX, IAttrXX)
xx = IAttrXX(Xcellent())
self.assertEqual(('x!', 'x!'), xx.xx())
class RegistrationTestCase(RegistryUsingMixin, unittest.SynchronousTestCase):
"""
Tests for adapter registration.
"""
def _registerAdapterForClassOrInterface(self, original):
"""
Register an adapter with L{components.registerAdapter} for the given
class or interface and verify that the adapter can be looked up with
L{components.getAdapterFactory}.
"""
adapter = lambda o: None
components.registerAdapter(adapter, original, ITest)
self.assertIdentical(
components.getAdapterFactory(original, ITest, None),
adapter)
def test_registerAdapterForClass(self):
"""
Test that an adapter from a class can be registered and then looked
up.
"""
class TheOriginal(object):
pass
return self._registerAdapterForClassOrInterface(TheOriginal)
def test_registerAdapterForInterface(self):
"""
Test that an adapter from an interface can be registered and then
looked up.
"""
return self._registerAdapterForClassOrInterface(ITest2)
def _duplicateAdapterForClassOrInterface(self, original):
"""
Verify that L{components.registerAdapter} raises L{ValueError} if the
from-type/interface and to-interface pair is not unique.
"""
firstAdapter = lambda o: False
secondAdapter = lambda o: True
components.registerAdapter(firstAdapter, original, ITest)
self.assertRaises(
ValueError,
components.registerAdapter,
secondAdapter, original, ITest)
# Make sure that the original adapter is still around as well
self.assertIdentical(
components.getAdapterFactory(original, ITest, None),
firstAdapter)
def test_duplicateAdapterForClass(self):
"""
Test that attempting to register a second adapter from a class
raises the appropriate exception.
"""
class TheOriginal(object):
pass
return self._duplicateAdapterForClassOrInterface(TheOriginal)
def test_duplicateAdapterForInterface(self):
"""
Test that attempting to register a second adapter from an interface
raises the appropriate exception.
"""
return self._duplicateAdapterForClassOrInterface(ITest2)
def _duplicateAdapterForClassOrInterfaceAllowed(self, original):
"""
Verify that when C{components.ALLOW_DUPLICATES} is set to C{True}, new
adapter registrations for a particular from-type/interface and
to-interface pair replace older registrations.
"""
firstAdapter = lambda o: False
secondAdapter = lambda o: True
class TheInterface(Interface):
pass
components.registerAdapter(firstAdapter, original, TheInterface)
components.ALLOW_DUPLICATES = True
try:
components.registerAdapter(secondAdapter, original, TheInterface)
self.assertIdentical(
components.getAdapterFactory(original, TheInterface, None),
secondAdapter)
finally:
components.ALLOW_DUPLICATES = False
# It should be rejected again at this point
self.assertRaises(
ValueError,
components.registerAdapter,
firstAdapter, original, TheInterface)
self.assertIdentical(
components.getAdapterFactory(original, TheInterface, None),
secondAdapter)
def test_duplicateAdapterForClassAllowed(self):
"""
Test that when L{components.ALLOW_DUPLICATES} is set to a true
value, duplicate registrations from classes are allowed to override
the original registration.
"""
class TheOriginal(object):
pass
return self._duplicateAdapterForClassOrInterfaceAllowed(TheOriginal)
def test_duplicateAdapterForInterfaceAllowed(self):
"""
Test that when L{components.ALLOW_DUPLICATES} is set to a true
value, duplicate registrations from interfaces are allowed to
override the original registration.
"""
class TheOriginal(Interface):
pass
return self._duplicateAdapterForClassOrInterfaceAllowed(TheOriginal)
def _multipleInterfacesForClassOrInterface(self, original):
"""
Verify that an adapter can be registered for multiple to-interfaces at a
time.
"""
adapter = lambda o: None
components.registerAdapter(adapter, original, ITest, ITest2)
self.assertIdentical(
components.getAdapterFactory(original, ITest, None), adapter)
self.assertIdentical(
components.getAdapterFactory(original, ITest2, None), adapter)
def test_multipleInterfacesForClass(self):
"""
Test the registration of an adapter from a class to several
interfaces at once.
"""
class TheOriginal(object):
pass
return self._multipleInterfacesForClassOrInterface(TheOriginal)
def test_multipleInterfacesForInterface(self):
"""
Test the registration of an adapter from an interface to several
interfaces at once.
"""
return self._multipleInterfacesForClassOrInterface(ITest3)
def _subclassAdapterRegistrationForClassOrInterface(self, original):
"""
Verify that a new adapter can be registered for a particular
to-interface from a subclass of a type or interface which already has an
adapter registered to that interface and that the subclass adapter takes
precedence over the base class adapter.
"""
firstAdapter = lambda o: True
secondAdapter = lambda o: False
class TheSubclass(original):
pass
components.registerAdapter(firstAdapter, original, ITest)
components.registerAdapter(secondAdapter, TheSubclass, ITest)
self.assertIdentical(
components.getAdapterFactory(original, ITest, None),
firstAdapter)
self.assertIdentical(
components.getAdapterFactory(TheSubclass, ITest, None),
secondAdapter)
def test_subclassAdapterRegistrationForClass(self):
"""
Test that an adapter to a particular interface can be registered
from both a class and its subclass.
"""
class TheOriginal(object):
pass
return self._subclassAdapterRegistrationForClassOrInterface(TheOriginal)
def test_subclassAdapterRegistrationForInterface(self):
"""
Test that an adapter to a particular interface can be registered
from both an interface and its subclass.
"""
return self._subclassAdapterRegistrationForClassOrInterface(ITest2)
class IProxiedInterface(Interface):
"""
An interface class for use by L{proxyForInterface}.
"""
ifaceAttribute = Attribute("""
An example declared attribute, which should be proxied.""")
def yay(*a, **kw):
"""
A sample method which should be proxied.
"""
class IProxiedSubInterface(IProxiedInterface):
"""
An interface that derives from another for use with L{proxyForInterface}.
"""
def boo(self):
"""
A different sample method which should be proxied.
"""
@implementer(IProxiedInterface)
class Yayable(object):
"""
A provider of L{IProxiedInterface} which increments a counter for
every call to C{yay}.
@ivar yays: The number of times C{yay} has been called.
"""
def __init__(self):
self.yays = 0
self.yayArgs = []
def yay(self, *a, **kw):
"""
Increment C{self.yays}.
"""
self.yays += 1
self.yayArgs.append((a, kw))
return self.yays
@implementer(IProxiedSubInterface)
class Booable(object):
"""
An implementation of IProxiedSubInterface
"""
yayed = False
booed = False
def yay(self):
"""
Mark the fact that 'yay' has been called.
"""
self.yayed = True
def boo(self):
"""
Mark the fact that 'boo' has been called.1
"""
self.booed = True
class IMultipleMethods(Interface):
"""
An interface with multiple methods.
"""
def methodOne():
"""
The first method. Should return 1.
"""
def methodTwo():
"""
The second method. Should return 2.
"""
class MultipleMethodImplementor(object):
"""
A precise implementation of L{IMultipleMethods}.
"""
def methodOne(self):
"""
@return: 1
"""
return 1
def methodTwo(self):
"""
@return: 2
"""
return 2
class ProxyForInterfaceTests(unittest.SynchronousTestCase):
"""
Tests for L{proxyForInterface}.
"""
def test_original(self):
"""
Proxy objects should have an C{original} attribute which refers to the
original object passed to the constructor.
"""
original = object()
proxy = proxyForInterface(IProxiedInterface)(original)
self.assertIdentical(proxy.original, original)
def test_proxyMethod(self):
"""
The class created from L{proxyForInterface} passes methods on an
interface to the object which is passed to its constructor.
"""
klass = proxyForInterface(IProxiedInterface)
yayable = Yayable()
proxy = klass(yayable)
proxy.yay()
self.assertEqual(proxy.yay(), 2)
self.assertEqual(yayable.yays, 2)
def test_proxyAttribute(self):
"""
Proxy objects should proxy declared attributes, but not other
attributes.
"""
yayable = Yayable()
yayable.ifaceAttribute = object()
proxy = proxyForInterface(IProxiedInterface)(yayable)
self.assertIdentical(proxy.ifaceAttribute, yayable.ifaceAttribute)
self.assertRaises(AttributeError, lambda: proxy.yays)
def test_proxySetAttribute(self):
"""
The attributes that proxy objects proxy should be assignable and affect
the original object.
"""
yayable = Yayable()
proxy = proxyForInterface(IProxiedInterface)(yayable)
thingy = object()
proxy.ifaceAttribute = thingy
self.assertIdentical(yayable.ifaceAttribute, thingy)
def test_proxyDeleteAttribute(self):
"""
The attributes that proxy objects proxy should be deletable and affect
the original object.
"""
yayable = Yayable()
yayable.ifaceAttribute = None
proxy = proxyForInterface(IProxiedInterface)(yayable)
del proxy.ifaceAttribute
self.assertFalse(hasattr(yayable, 'ifaceAttribute'))
def test_multipleMethods(self):
"""
[Regression test] The proxy should send its method calls to the correct
method, not the incorrect one.
"""
multi = MultipleMethodImplementor()
proxy = proxyForInterface(IMultipleMethods)(multi)
self.assertEqual(proxy.methodOne(), 1)
self.assertEqual(proxy.methodTwo(), 2)
def test_subclassing(self):
"""
It is possible to subclass the result of L{proxyForInterface}.
"""
class SpecializedProxy(proxyForInterface(IProxiedInterface)):
"""
A specialized proxy which can decrement the number of yays.
"""
def boo(self):
"""
Decrement the number of yays.
"""
self.original.yays -= 1
yayable = Yayable()
special = SpecializedProxy(yayable)
self.assertEqual(yayable.yays, 0)
special.boo()
self.assertEqual(yayable.yays, -1)
def test_proxyName(self):
"""
The name of a proxy class indicates which interface it proxies.
"""
proxy = proxyForInterface(IProxiedInterface)
self.assertEqual(
proxy.__name__,
"(Proxy for "
"twisted.python.test.test_components.IProxiedInterface)")
def test_implements(self):
"""
The resulting proxy implements the interface that it proxies.
"""
proxy = proxyForInterface(IProxiedInterface)
self.assertTrue(IProxiedInterface.implementedBy(proxy))
def test_proxyDescriptorGet(self):
"""
_ProxyDescriptor's __get__ method should return the appropriate
attribute of its argument's 'original' attribute if it is invoked with
an object. If it is invoked with None, it should return a false
class-method emulator instead.
For some reason, Python's documentation recommends to define
descriptors' __get__ methods with the 'type' parameter as optional,
despite the fact that Python itself never actually calls the descriptor
that way. This is probably do to support 'foo.__get__(bar)' as an
idiom. Let's make sure that the behavior is correct. Since we don't
actually use the 'type' argument at all, this test calls it the
idiomatic way to ensure that signature works; test_proxyInheritance
verifies the how-Python-actually-calls-it signature.
"""
class Sample:
called = False
def hello(self):
self.called = True
fakeProxy = Sample()
testObject = Sample()
fakeProxy.original = testObject
pd = components._ProxyDescriptor("hello", "original")
self.assertEqual(pd.__get__(fakeProxy), testObject.hello)
fakeClassMethod = pd.__get__(None)
fakeClassMethod(fakeProxy)
self.failUnless(testObject.called)
def test_proxyInheritance(self):
"""
Subclasses of the class returned from L{proxyForInterface} should be
able to upcall methods by reference to their superclass, as any normal
Python class can.
"""
class YayableWrapper(proxyForInterface(IProxiedInterface)):
"""
This class does not override any functionality.
"""
class EnhancedWrapper(YayableWrapper):
"""
This class overrides the 'yay' method.
"""
wrappedYays = 1
def yay(self, *a, **k):
self.wrappedYays += 1
return YayableWrapper.yay(self, *a, **k) + 7
yayable = Yayable()
wrapper = EnhancedWrapper(yayable)
self.assertEqual(wrapper.yay(3, 4, x=5, y=6), 8)
self.assertEqual(yayable.yayArgs,
[((3, 4), dict(x=5, y=6))])
def test_interfaceInheritance(self):
"""
Proxies of subinterfaces generated with proxyForInterface should allow
access to attributes of both the child and the base interfaces.
"""
proxyClass = proxyForInterface(IProxiedSubInterface)
booable = Booable()
proxy = proxyClass(booable)
proxy.yay()
proxy.boo()
self.failUnless(booable.yayed)
self.failUnless(booable.booed)
def test_attributeCustomization(self):
"""
The original attribute name can be customized via the
C{originalAttribute} argument of L{proxyForInterface}: the attribute
should change, but the methods of the original object should still be
callable, and the attributes still accessible.
"""
yayable = Yayable()
yayable.ifaceAttribute = object()
proxy = proxyForInterface(
IProxiedInterface, originalAttribute='foo')(yayable)
self.assertIdentical(proxy.foo, yayable)
# Check the behavior
self.assertEqual(proxy.yay(), 1)
self.assertIdentical(proxy.ifaceAttribute, yayable.ifaceAttribute)
thingy = object()
proxy.ifaceAttribute = thingy
self.assertIdentical(yayable.ifaceAttribute, thingy)
del proxy.ifaceAttribute
self.assertFalse(hasattr(yayable, 'ifaceAttribute'))

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,917 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for Twisted's deprecation framework, L{twisted.python.deprecate}.
"""
from __future__ import division, absolute_import
import sys, types, warnings, inspect
from os.path import normcase
from warnings import simplefilter, catch_warnings
try:
from importlib import invalidate_caches
except ImportError:
invalidate_caches = None
from twisted.python import deprecate
from twisted.python.deprecate import _getDeprecationWarningString
from twisted.python.deprecate import DEPRECATION_WARNING_FORMAT
from twisted.python.deprecate import (
getDeprecationWarningString,
deprecated, _appendToDocstring, _getDeprecationDocstring,
_fullyQualifiedName as fullyQualifiedName,
_passed, _mutuallyExclusiveArguments
)
from twisted.python.versions import Version
from twisted.python.filepath import FilePath
from twisted.python.test import deprecatedattributes
from twisted.python.test.modules_helpers import TwistedModulesMixin
from twisted.trial.unittest import SynchronousTestCase
# Note that various tests in this module require manual encoding of paths to
# utf-8. This can be fixed once FilePath supports Unicode; see #2366, #4736,
# #5203.
class _MockDeprecatedAttribute(object):
"""
Mock of L{twisted.python.deprecate._DeprecatedAttribute}.
@ivar value: The value of the attribute.
"""
def __init__(self, value):
self.value = value
def get(self):
"""
Get a known value.
"""
return self.value
class ModuleProxyTests(SynchronousTestCase):
"""
Tests for L{twisted.python.deprecate._ModuleProxy}, which proxies
access to module-level attributes, intercepting access to deprecated
attributes and passing through access to normal attributes.
"""
def _makeProxy(self, **attrs):
"""
Create a temporary module proxy object.
@param **kw: Attributes to initialise on the temporary module object
@rtype: L{twistd.python.deprecate._ModuleProxy}
"""
mod = types.ModuleType('foo')
for key, value in attrs.items():
setattr(mod, key, value)
return deprecate._ModuleProxy(mod)
def test_getattrPassthrough(self):
"""
Getting a normal attribute on a L{twisted.python.deprecate._ModuleProxy}
retrieves the underlying attribute's value, and raises C{AttributeError}
if a non-existant attribute is accessed.
"""
proxy = self._makeProxy(SOME_ATTRIBUTE='hello')
self.assertIdentical(proxy.SOME_ATTRIBUTE, 'hello')
self.assertRaises(AttributeError, getattr, proxy, 'DOES_NOT_EXIST')
def test_getattrIntercept(self):
"""
Getting an attribute marked as being deprecated on
L{twisted.python.deprecate._ModuleProxy} results in calling the
deprecated wrapper's C{get} method.
"""
proxy = self._makeProxy()
_deprecatedAttributes = object.__getattribute__(
proxy, '_deprecatedAttributes')
_deprecatedAttributes['foo'] = _MockDeprecatedAttribute(42)
self.assertEqual(proxy.foo, 42)
def test_privateAttributes(self):
"""
Private attributes of L{twisted.python.deprecate._ModuleProxy} are
inaccessible when regular attribute access is used.
"""
proxy = self._makeProxy()
self.assertRaises(AttributeError, getattr, proxy, '_module')
self.assertRaises(
AttributeError, getattr, proxy, '_deprecatedAttributes')
def test_setattr(self):
"""
Setting attributes on L{twisted.python.deprecate._ModuleProxy} proxies
them through to the wrapped module.
"""
proxy = self._makeProxy()
proxy._module = 1
self.assertNotEquals(object.__getattribute__(proxy, '_module'), 1)
self.assertEqual(proxy._module, 1)
def test_repr(self):
"""
L{twisted.python.deprecated._ModuleProxy.__repr__} produces a string
containing the proxy type and a representation of the wrapped module
object.
"""
proxy = self._makeProxy()
realModule = object.__getattribute__(proxy, '_module')
self.assertEqual(
repr(proxy), '<%s module=%r>' % (type(proxy).__name__, realModule))
class DeprecatedAttributeTests(SynchronousTestCase):
"""
Tests for L{twisted.python.deprecate._DeprecatedAttribute} and
L{twisted.python.deprecate.deprecatedModuleAttribute}, which issue
warnings for deprecated module-level attributes.
"""
def setUp(self):
self.version = deprecatedattributes.version
self.message = deprecatedattributes.message
self._testModuleName = __name__ + '.foo'
def _getWarningString(self, attr):
"""
Create the warning string used by deprecated attributes.
"""
return _getDeprecationWarningString(
deprecatedattributes.__name__ + '.' + attr,
deprecatedattributes.version,
DEPRECATION_WARNING_FORMAT + ': ' + deprecatedattributes.message)
def test_deprecatedAttributeHelper(self):
"""
L{twisted.python.deprecate._DeprecatedAttribute} correctly sets its
__name__ to match that of the deprecated attribute and emits a warning
when the original attribute value is accessed.
"""
name = 'ANOTHER_DEPRECATED_ATTRIBUTE'
setattr(deprecatedattributes, name, 42)
attr = deprecate._DeprecatedAttribute(
deprecatedattributes, name, self.version, self.message)
self.assertEqual(attr.__name__, name)
# Since we're accessing the value getter directly, as opposed to via
# the module proxy, we need to match the warning's stack level.
def addStackLevel():
attr.get()
# Access the deprecated attribute.
addStackLevel()
warningsShown = self.flushWarnings([
self.test_deprecatedAttributeHelper])
self.assertIdentical(warningsShown[0]['category'], DeprecationWarning)
self.assertEqual(
warningsShown[0]['message'],
self._getWarningString(name))
self.assertEqual(len(warningsShown), 1)
def test_deprecatedAttribute(self):
"""
L{twisted.python.deprecate.deprecatedModuleAttribute} wraps a
module-level attribute in an object that emits a deprecation warning
when it is accessed the first time only, while leaving other unrelated
attributes alone.
"""
# Accessing non-deprecated attributes does not issue a warning.
deprecatedattributes.ANOTHER_ATTRIBUTE
warningsShown = self.flushWarnings([self.test_deprecatedAttribute])
self.assertEqual(len(warningsShown), 0)
name = 'DEPRECATED_ATTRIBUTE'
# Access the deprecated attribute. This uses getattr to avoid repeating
# the attribute name.
getattr(deprecatedattributes, name)
warningsShown = self.flushWarnings([self.test_deprecatedAttribute])
self.assertEqual(len(warningsShown), 1)
self.assertIdentical(warningsShown[0]['category'], DeprecationWarning)
self.assertEqual(
warningsShown[0]['message'],
self._getWarningString(name))
def test_wrappedModule(self):
"""
Deprecating an attribute in a module replaces and wraps that module
instance, in C{sys.modules}, with a
L{twisted.python.deprecate._ModuleProxy} instance but only if it hasn't
already been wrapped.
"""
sys.modules[self._testModuleName] = mod = types.ModuleType('foo')
self.addCleanup(sys.modules.pop, self._testModuleName)
setattr(mod, 'first', 1)
setattr(mod, 'second', 2)
deprecate.deprecatedModuleAttribute(
Version('Twisted', 8, 0, 0),
'message',
self._testModuleName,
'first')
proxy = sys.modules[self._testModuleName]
self.assertNotEqual(proxy, mod)
deprecate.deprecatedModuleAttribute(
Version('Twisted', 8, 0, 0),
'message',
self._testModuleName,
'second')
self.assertIdentical(proxy, sys.modules[self._testModuleName])
class ImportedModuleAttributeTests(TwistedModulesMixin, SynchronousTestCase):
"""
Tests for L{deprecatedModuleAttribute} which involve loading a module via
'import'.
"""
_packageInit = """\
from twisted.python.deprecate import deprecatedModuleAttribute
from twisted.python.versions import Version
deprecatedModuleAttribute(
Version('Package', 1, 2, 3), 'message', __name__, 'module')
"""
def pathEntryTree(self, tree):
"""
Create some files in a hierarchy, based on a dictionary describing those
files. The resulting hierarchy will be placed onto sys.path for the
duration of the test.
@param tree: A dictionary representing a directory structure. Keys are
strings, representing filenames, dictionary values represent
directories, string values represent file contents.
@return: another dictionary similar to the input, with file content
strings replaced with L{FilePath} objects pointing at where those
contents are now stored.
"""
def makeSomeFiles(pathobj, dirdict):
pathdict = {}
for (key, value) in dirdict.items():
child = pathobj.child(key)
if isinstance(value, bytes):
pathdict[key] = child
child.setContent(value)
elif isinstance(value, dict):
child.createDirectory()
pathdict[key] = makeSomeFiles(child, value)
else:
raise ValueError("only strings and dicts allowed as values")
return pathdict
base = FilePath(self.mktemp().encode("utf-8"))
base.makedirs()
result = makeSomeFiles(base, tree)
# On Python 3, sys.path cannot include byte paths:
self.replaceSysPath([base.path.decode("utf-8")] + sys.path)
self.replaceSysModules(sys.modules.copy())
return result
def simpleModuleEntry(self):
"""
Add a sample module and package to the path, returning a L{FilePath}
pointing at the module which will be loadable as C{package.module}.
"""
paths = self.pathEntryTree(
{b"package": {b"__init__.py": self._packageInit.encode("utf-8"),
b"module.py": b""}})
return paths[b'package'][b'module.py']
def checkOneWarning(self, modulePath):
"""
Verification logic for L{test_deprecatedModule}.
"""
from package import module
self.assertEqual(FilePath(module.__file__.encode("utf-8")),
modulePath)
emitted = self.flushWarnings([self.checkOneWarning])
self.assertEqual(len(emitted), 1)
self.assertEqual(emitted[0]['message'],
'package.module was deprecated in Package 1.2.3: '
'message')
self.assertEqual(emitted[0]['category'], DeprecationWarning)
def test_deprecatedModule(self):
"""
If L{deprecatedModuleAttribute} is used to deprecate a module attribute
of a package, only one deprecation warning is emitted when the
deprecated module is imported.
"""
self.checkOneWarning(self.simpleModuleEntry())
def test_deprecatedModuleMultipleTimes(self):
"""
If L{deprecatedModuleAttribute} is used to deprecate a module attribute
of a package, only one deprecation warning is emitted when the
deprecated module is subsequently imported.
"""
mp = self.simpleModuleEntry()
# The first time, the code needs to be loaded.
self.checkOneWarning(mp)
# The second time, things are slightly different; the object's already
# in the namespace.
self.checkOneWarning(mp)
# The third and fourth times, things things should all be exactly the
# same, but this is a sanity check to make sure the implementation isn't
# special casing the second time. Also, putting these cases into a loop
# means that the stack will be identical, to make sure that the
# implementation doesn't rely too much on stack-crawling.
for x in range(2):
self.checkOneWarning(mp)
class WarnAboutFunctionTests(SynchronousTestCase):
"""
Tests for L{twisted.python.deprecate.warnAboutFunction} which allows the
callers of a function to issue a C{DeprecationWarning} about that function.
"""
def setUp(self):
"""
Create a file that will have known line numbers when emitting warnings.
"""
self.package = FilePath(self.mktemp().encode("utf-8")
).child(b'twisted_private_helper')
self.package.makedirs()
self.package.child(b'__init__.py').setContent(b'')
self.package.child(b'module.py').setContent(b'''
"A module string"
from twisted.python import deprecate
def testFunction():
"A doc string"
a = 1 + 2
return a
def callTestFunction():
b = testFunction()
if b == 3:
deprecate.warnAboutFunction(testFunction, "A Warning String")
''')
# Python 3 doesn't accept bytes in sys.path:
packagePath = self.package.parent().path.decode("utf-8")
sys.path.insert(0, packagePath)
self.addCleanup(sys.path.remove, packagePath)
modules = sys.modules.copy()
self.addCleanup(
lambda: (sys.modules.clear(), sys.modules.update(modules)))
def test_warning(self):
"""
L{deprecate.warnAboutFunction} emits a warning the file and line number
of which point to the beginning of the implementation of the function
passed to it.
"""
def aFunc():
pass
deprecate.warnAboutFunction(aFunc, 'A Warning Message')
warningsShown = self.flushWarnings()
filename = __file__
if filename.lower().endswith('.pyc'):
filename = filename[:-1]
self.assertSamePath(
FilePath(warningsShown[0]["filename"]), FilePath(filename))
self.assertEqual(warningsShown[0]["message"], "A Warning Message")
def test_warningLineNumber(self):
"""
L{deprecate.warnAboutFunction} emits a C{DeprecationWarning} with the
number of a line within the implementation of the function passed to it.
"""
from twisted_private_helper import module
module.callTestFunction()
warningsShown = self.flushWarnings()
self.assertSamePath(
FilePath(warningsShown[0]["filename"].encode("utf-8")),
self.package.sibling(b'twisted_private_helper').child(b'module.py'))
# Line number 9 is the last line in the testFunction in the helper
# module.
self.assertEqual(warningsShown[0]["lineno"], 9)
self.assertEqual(warningsShown[0]["message"], "A Warning String")
self.assertEqual(len(warningsShown), 1)
def assertSamePath(self, first, second):
"""
Assert that the two paths are the same, considering case normalization
appropriate for the current platform.
@type first: L{FilePath}
@type second: L{FilePath}
@raise C{self.failureType}: If the paths are not the same.
"""
self.assertTrue(
normcase(first.path) == normcase(second.path),
"%r != %r" % (first, second))
def test_renamedFile(self):
"""
Even if the implementation of a deprecated function is moved around on
the filesystem, the line number in the warning emitted by
L{deprecate.warnAboutFunction} points to a line in the implementation of
the deprecated function.
"""
from twisted_private_helper import module
# Clean up the state resulting from that import; we're not going to use
# this module, so it should go away.
del sys.modules['twisted_private_helper']
del sys.modules[module.__name__]
# Rename the source directory
self.package.moveTo(self.package.sibling(b'twisted_renamed_helper'))
# Make sure importlib notices we've changed importable packages:
if invalidate_caches:
invalidate_caches()
# Import the newly renamed version
from twisted_renamed_helper import module
self.addCleanup(sys.modules.pop, 'twisted_renamed_helper')
self.addCleanup(sys.modules.pop, module.__name__)
module.callTestFunction()
warningsShown = self.flushWarnings()
warnedPath = FilePath(warningsShown[0]["filename"].encode("utf-8"))
expectedPath = self.package.sibling(
b'twisted_renamed_helper').child(b'module.py')
self.assertSamePath(warnedPath, expectedPath)
self.assertEqual(warningsShown[0]["lineno"], 9)
self.assertEqual(warningsShown[0]["message"], "A Warning String")
self.assertEqual(len(warningsShown), 1)
def test_filteredWarning(self):
"""
L{deprecate.warnAboutFunction} emits a warning that will be filtered if
L{warnings.filterwarning} is called with the module name of the
deprecated function.
"""
# Clean up anything *else* that might spuriously filter out the warning,
# such as the "always" simplefilter set up by unittest._collectWarnings.
# We'll also rely on trial to restore the original filters afterwards.
del warnings.filters[:]
warnings.filterwarnings(
action="ignore", module="twisted_private_helper")
from twisted_private_helper import module
module.callTestFunction()
warningsShown = self.flushWarnings()
self.assertEqual(len(warningsShown), 0)
def test_filteredOnceWarning(self):
"""
L{deprecate.warnAboutFunction} emits a warning that will be filtered
once if L{warnings.filterwarning} is called with the module name of the
deprecated function and an action of once.
"""
# Clean up anything *else* that might spuriously filter out the warning,
# such as the "always" simplefilter set up by unittest._collectWarnings.
# We'll also rely on trial to restore the original filters afterwards.
del warnings.filters[:]
warnings.filterwarnings(
action="module", module="twisted_private_helper")
from twisted_private_helper import module
module.callTestFunction()
module.callTestFunction()
warningsShown = self.flushWarnings()
self.assertEqual(len(warningsShown), 1)
message = warningsShown[0]['message']
category = warningsShown[0]['category']
filename = warningsShown[0]['filename']
lineno = warningsShown[0]['lineno']
msg = warnings.formatwarning(message, category, filename, lineno)
self.assertTrue(
msg.endswith("module.py:9: DeprecationWarning: A Warning String\n"
" return a\n"),
"Unexpected warning string: %r" % (msg,))
def dummyCallable():
"""
Do nothing.
This is used to test the deprecation decorators.
"""
def dummyReplacementMethod():
"""
Do nothing.
This is used to test the replacement parameter to L{deprecated}.
"""
class TestDeprecationWarnings(SynchronousTestCase):
def test_getDeprecationWarningString(self):
"""
L{getDeprecationWarningString} returns a string that tells us that a
callable was deprecated at a certain released version of Twisted.
"""
version = Version('Twisted', 8, 0, 0)
self.assertEqual(
getDeprecationWarningString(self.test_getDeprecationWarningString,
version),
"%s.TestDeprecationWarnings.test_getDeprecationWarningString "
"was deprecated in Twisted 8.0.0" % (__name__,))
def test_getDeprecationWarningStringWithFormat(self):
"""
L{getDeprecationWarningString} returns a string that tells us that a
callable was deprecated at a certain released version of Twisted, with
a message containing additional information about the deprecation.
"""
version = Version('Twisted', 8, 0, 0)
format = DEPRECATION_WARNING_FORMAT + ': This is a message'
self.assertEqual(
getDeprecationWarningString(self.test_getDeprecationWarningString,
version, format),
'%s.TestDeprecationWarnings.test_getDeprecationWarningString was '
'deprecated in Twisted 8.0.0: This is a message' % (__name__,))
def test_deprecateEmitsWarning(self):
"""
Decorating a callable with L{deprecated} emits a warning.
"""
version = Version('Twisted', 8, 0, 0)
dummy = deprecated(version)(dummyCallable)
def addStackLevel():
dummy()
with catch_warnings(record=True) as caught:
simplefilter("always")
addStackLevel()
self.assertEqual(caught[0].category, DeprecationWarning)
self.assertEqual(str(caught[0].message), getDeprecationWarningString(dummyCallable, version))
# rstrip in case .pyc/.pyo
self.assertEqual(caught[0].filename.rstrip('co'), __file__.rstrip('co'))
def test_deprecatedPreservesName(self):
"""
The decorated function has the same name as the original.
"""
version = Version('Twisted', 8, 0, 0)
dummy = deprecated(version)(dummyCallable)
self.assertEqual(dummyCallable.__name__, dummy.__name__)
self.assertEqual(fullyQualifiedName(dummyCallable),
fullyQualifiedName(dummy))
def test_getDeprecationDocstring(self):
"""
L{_getDeprecationDocstring} returns a note about the deprecation to go
into a docstring.
"""
version = Version('Twisted', 8, 0, 0)
self.assertEqual(
"Deprecated in Twisted 8.0.0.",
_getDeprecationDocstring(version, ''))
def test_deprecatedUpdatesDocstring(self):
"""
The docstring of the deprecated function is appended with information
about the deprecation.
"""
def localDummyCallable():
"""
Do nothing.
This is used to test the deprecation decorators.
"""
version = Version('Twisted', 8, 0, 0)
dummy = deprecated(version)(localDummyCallable)
_appendToDocstring(
localDummyCallable,
_getDeprecationDocstring(version, ''))
self.assertEqual(localDummyCallable.__doc__, dummy.__doc__)
def test_versionMetadata(self):
"""
Deprecating a function adds version information to the decorated
version of that function.
"""
version = Version('Twisted', 8, 0, 0)
dummy = deprecated(version)(dummyCallable)
self.assertEqual(version, dummy.deprecatedVersion)
def test_getDeprecationWarningStringReplacement(self):
"""
L{getDeprecationWarningString} takes an additional replacement parameter
that can be used to add information to the deprecation. If the
replacement parameter is a string, it will be interpolated directly into
the result.
"""
version = Version('Twisted', 8, 0, 0)
warningString = getDeprecationWarningString(
self.test_getDeprecationWarningString, version,
replacement="something.foobar")
self.assertEqual(
warningString,
"%s was deprecated in Twisted 8.0.0; please use something.foobar "
"instead" % (
fullyQualifiedName(self.test_getDeprecationWarningString),))
def test_getDeprecationWarningStringReplacementWithCallable(self):
"""
L{getDeprecationWarningString} takes an additional replacement parameter
that can be used to add information to the deprecation. If the
replacement parameter is a callable, its fully qualified name will be
interpolated into the result.
"""
version = Version('Twisted', 8, 0, 0)
warningString = getDeprecationWarningString(
self.test_getDeprecationWarningString, version,
replacement=dummyReplacementMethod)
self.assertEqual(
warningString,
"%s was deprecated in Twisted 8.0.0; please use "
"%s.dummyReplacementMethod instead" % (
fullyQualifiedName(self.test_getDeprecationWarningString),
__name__))
def test_deprecatedReplacement(self):
"""
L{deprecated} takes an additional replacement parameter that can be used
to indicate the new, non-deprecated method developers should use. If
the replacement parameter is a string, it will be interpolated directly
into the warning message.
"""
version = Version('Twisted', 8, 0, 0)
dummy = deprecated(version, "something.foobar")(dummyCallable)
self.assertEqual(dummy.__doc__,
"\n"
" Do nothing.\n\n"
" This is used to test the deprecation decorators.\n\n"
" Deprecated in Twisted 8.0.0; please use "
"something.foobar"
" instead.\n"
" ")
def test_deprecatedReplacementWithCallable(self):
"""
L{deprecated} takes an additional replacement parameter that can be used
to indicate the new, non-deprecated method developers should use. If
the replacement parameter is a callable, its fully qualified name will
be interpolated into the warning message.
"""
version = Version('Twisted', 8, 0, 0)
decorator = deprecated(version, replacement=dummyReplacementMethod)
dummy = decorator(dummyCallable)
self.assertEqual(dummy.__doc__,
"\n"
" Do nothing.\n\n"
" This is used to test the deprecation decorators.\n\n"
" Deprecated in Twisted 8.0.0; please use "
"%s.dummyReplacementMethod instead.\n"
" " % (__name__,))
class TestAppendToDocstring(SynchronousTestCase):
"""
Test the _appendToDocstring function.
_appendToDocstring is used to add text to a docstring.
"""
def test_appendToEmptyDocstring(self):
"""
Appending to an empty docstring simply replaces the docstring.
"""
def noDocstring():
pass
_appendToDocstring(noDocstring, "Appended text.")
self.assertEqual("Appended text.", noDocstring.__doc__)
def test_appendToSingleLineDocstring(self):
"""
Appending to a single line docstring places the message on a new line,
with a blank line separating it from the rest of the docstring.
The docstring ends with a newline, conforming to Twisted and PEP 8
standards. Unfortunately, the indentation is incorrect, since the
existing docstring doesn't have enough info to help us indent
properly.
"""
def singleLineDocstring():
"""This doesn't comply with standards, but is here for a test."""
_appendToDocstring(singleLineDocstring, "Appended text.")
self.assertEqual(
["This doesn't comply with standards, but is here for a test.",
"",
"Appended text."],
singleLineDocstring.__doc__.splitlines())
self.assertTrue(singleLineDocstring.__doc__.endswith('\n'))
def test_appendToMultilineDocstring(self):
"""
Appending to a multi-line docstring places the messade on a new line,
with a blank line separating it from the rest of the docstring.
Because we have multiple lines, we have enough information to do
indentation.
"""
def multiLineDocstring():
"""
This is a multi-line docstring.
"""
def expectedDocstring():
"""
This is a multi-line docstring.
Appended text.
"""
_appendToDocstring(multiLineDocstring, "Appended text.")
self.assertEqual(
expectedDocstring.__doc__, multiLineDocstring.__doc__)
class MutualArgumentExclusionTests(SynchronousTestCase):
"""
Tests for L{mutuallyExclusiveArguments}.
"""
def checkPassed(self, func, *args, **kw):
"""
Test an invocation of L{passed} with the given function, arguments, and
keyword arguments.
@param func: A function whose argspec to pass to L{_passed}.
@type func: A callable.
@param args: The arguments which could be passed to L{func}.
@param kw: The keyword arguments which could be passed to L{func}.
@return: L{_passed}'s return value
@rtype: L{dict}
"""
return _passed(inspect.getargspec(func), args, kw)
def test_passed_simplePositional(self):
"""
L{passed} identifies the arguments passed by a simple
positional test.
"""
def func(a, b):
pass
self.assertEqual(self.checkPassed(func, 1, 2), dict(a=1, b=2))
def test_passed_tooManyArgs(self):
"""
L{passed} raises a L{TypeError} if too many arguments are
passed.
"""
def func(a, b):
pass
self.assertRaises(TypeError, self.checkPassed, func, 1, 2, 3)
def test_passed_doublePassKeyword(self):
"""
L{passed} raises a L{TypeError} if a argument is passed both
positionally and by keyword.
"""
def func(a):
pass
self.assertRaises(TypeError, self.checkPassed, func, 1, a=2)
def test_passed_unspecifiedKeyword(self):
"""
L{passed} raises a L{TypeError} if a keyword argument not
present in the function's declaration is passed.
"""
def func(a):
pass
self.assertRaises(TypeError, self.checkPassed, func, 1, z=2)
def test_passed_star(self):
"""
L{passed} places additional positional arguments into a tuple
under the name of the star argument.
"""
def func(a, *b):
pass
self.assertEqual(self.checkPassed(func, 1, 2, 3),
dict(a=1, b=(2, 3)))
def test_passed_starStar(self):
"""
Additional keyword arguments are passed as a dict to the star star
keyword argument.
"""
def func(a, **b):
pass
self.assertEqual(self.checkPassed(func, 1, x=2, y=3, z=4),
dict(a=1, b=dict(x=2, y=3, z=4)))
def test_passed_noDefaultValues(self):
"""
The results of L{passed} only include arguments explicitly
passed, not default values.
"""
def func(a, b, c=1, d=2, e=3):
pass
self.assertEqual(self.checkPassed(func, 1, 2, e=7),
dict(a=1, b=2, e=7))
def test_mutualExclusionPrimeDirective(self):
"""
L{mutuallyExclusiveArguments} does not interfere in its
decoratee's operation, either its receipt of arguments or its return
value.
"""
@_mutuallyExclusiveArguments([('a', 'b')])
def func(x, y, a=3, b=4):
return x + y + a + b
self.assertEqual(func(1, 2), 10)
self.assertEqual(func(1, 2, 7), 14)
self.assertEqual(func(1, 2, b=7), 13)
def test_mutualExclusionExcludesByKeyword(self):
"""
L{mutuallyExclusiveArguments} raises a L{TypeError}n if its
decoratee is passed a pair of mutually exclusive arguments.
"""
@_mutuallyExclusiveArguments([['a', 'b']])
def func(a=3, b=4):
return a + b
self.assertRaises(TypeError, func, a=3, b=4)

View file

@ -0,0 +1,454 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for parts of our release automation system.
"""
import os
import sys
from distutils.core import Distribution
from twisted.trial.unittest import TestCase
from twisted.python import dist
from twisted.python.dist import (get_setup_args, ConditionalExtension,
build_scripts_twisted)
from twisted.python.filepath import FilePath
class SetupTest(TestCase):
"""
Tests for L{get_setup_args}.
"""
def test_conditionalExtensions(self):
"""
Passing C{conditionalExtensions} as a list of L{ConditionalExtension}
objects to get_setup_args inserts a custom build_ext into the result
which knows how to check whether they should be built.
"""
good_ext = ConditionalExtension("whatever", ["whatever.c"],
condition=lambda b: True)
bad_ext = ConditionalExtension("whatever", ["whatever.c"],
condition=lambda b: False)
args = get_setup_args(conditionalExtensions=[good_ext, bad_ext])
# ext_modules should be set even though it's not used. See comment
# in get_setup_args
self.assertEqual(args["ext_modules"], [good_ext, bad_ext])
cmdclass = args["cmdclass"]
build_ext = cmdclass["build_ext"]
builder = build_ext(Distribution())
builder.prepare_extensions()
self.assertEqual(builder.extensions, [good_ext])
def test_win32Definition(self):
"""
When building on Windows NT, the WIN32 macro will be defined as 1.
"""
ext = ConditionalExtension("whatever", ["whatever.c"],
define_macros=[("whatever", 2)])
args = get_setup_args(conditionalExtensions=[ext])
builder = args["cmdclass"]["build_ext"](Distribution())
self.patch(os, "name", "nt")
builder.prepare_extensions()
self.assertEqual(ext.define_macros, [("whatever", 2), ("WIN32", 1)])
class GetExtensionsTest(TestCase):
"""
Tests for L{dist.getExtensions}.
"""
setupTemplate = (
"from twisted.python.dist import ConditionalExtension\n"
"extensions = [\n"
" ConditionalExtension(\n"
" '%s', ['twisted/some/thing.c'],\n"
" condition=lambda builder: True)\n"
" ]\n")
def setUp(self):
self.basedir = FilePath(self.mktemp()).child("twisted")
self.basedir.makedirs()
self.addCleanup(os.chdir, os.getcwd())
os.chdir(self.basedir.parent().path)
def writeSetup(self, name, *path):
"""
Write out a C{setup.py} file to a location determined by
L{self.basedir} and L{path}. L{self.setupTemplate} is used to
generate its contents.
"""
outdir = self.basedir.descendant(path)
outdir.makedirs()
setup = outdir.child("setup.py")
setup.setContent(self.setupTemplate % (name,))
def writeEmptySetup(self, *path):
"""
Write out an empty C{setup.py} file to a location determined by
L{self.basedir} and L{path}.
"""
outdir = self.basedir.descendant(path)
outdir.makedirs()
outdir.child("setup.py").setContent("")
def assertExtensions(self, expected):
"""
Assert that the given names match the (sorted) names of discovered
extensions.
"""
extensions = dist.getExtensions()
names = [extension.name for extension in extensions]
self.assertEqual(sorted(names), expected)
def test_getExtensions(self):
"""
Files named I{setup.py} in I{twisted/topfiles} and I{twisted/*/topfiles}
are executed with L{execfile} in order to discover the extensions they
declare.
"""
self.writeSetup("twisted.transmutate", "topfiles")
self.writeSetup("twisted.tele.port", "tele", "topfiles")
self.assertExtensions(["twisted.tele.port", "twisted.transmutate"])
def test_getExtensionsTooDeep(self):
"""
Files named I{setup.py} in I{topfiles} directories are not considered if
they are too deep in the directory hierarchy.
"""
self.writeSetup("twisted.trans.mog.rify", "trans", "mog", "topfiles")
self.assertExtensions([])
def test_getExtensionsNotTopfiles(self):
"""
The folder in which I{setup.py} is discovered must be called I{topfiles}
otherwise it is ignored.
"""
self.writeSetup("twisted.metamorphosis", "notfiles")
self.assertExtensions([])
def test_getExtensionsNotSupportedOnJava(self):
"""
Extensions are not supported on Java-based platforms.
"""
self.addCleanup(setattr, sys, "platform", sys.platform)
sys.platform = "java"
self.writeSetup("twisted.sorcery", "topfiles")
self.assertExtensions([])
def test_getExtensionsExtensionsLocalIsOptional(self):
"""
It is acceptable for extensions to not define the C{extensions} local
variable.
"""
self.writeEmptySetup("twisted.necromancy", "topfiles")
self.assertExtensions([])
class GetVersionTest(TestCase):
"""
Tests for L{dist.getVersion}.
"""
def setUp(self):
self.dirname = self.mktemp()
os.mkdir(self.dirname)
def test_getVersionCore(self):
"""
Test that getting the version of core reads from the
[base]/_version.py file.
"""
f = open(os.path.join(self.dirname, "_version.py"), "w")
f.write("""
from twisted.python import versions
version = versions.Version("twisted", 0, 1, 2)
""")
f.close()
self.assertEqual(dist.getVersion("core", base=self.dirname), "0.1.2")
def test_getVersionOther(self):
"""
Test that getting the version of a non-core project reads from
the [base]/[projname]/_version.py file.
"""
os.mkdir(os.path.join(self.dirname, "blat"))
f = open(os.path.join(self.dirname, "blat", "_version.py"), "w")
f.write("""
from twisted.python import versions
version = versions.Version("twisted.blat", 9, 8, 10)
""")
f.close()
self.assertEqual(dist.getVersion("blat", base=self.dirname), "9.8.10")
class GetScriptsTest(TestCase):
"""
Tests for L{dist.getScripts} which returns the scripts which should be
included in the distribution of a project.
"""
def test_scriptsInSVN(self):
"""
getScripts should return the scripts associated with a project
in the context of Twisted SVN.
"""
basedir = self.mktemp()
os.mkdir(basedir)
os.mkdir(os.path.join(basedir, 'bin'))
os.mkdir(os.path.join(basedir, 'bin', 'proj'))
f = open(os.path.join(basedir, 'bin', 'proj', 'exy'), 'w')
f.write('yay')
f.close()
scripts = dist.getScripts('proj', basedir=basedir)
self.assertEqual(len(scripts), 1)
self.assertEqual(os.path.basename(scripts[0]), 'exy')
def test_excludedPreamble(self):
"""
L{dist.getScripts} includes neither C{"_preamble.py"} nor
C{"_preamble.pyc"}.
"""
basedir = FilePath(self.mktemp())
bin = basedir.child('bin')
bin.makedirs()
bin.child('_preamble.py').setContent('some preamble code\n')
bin.child('_preamble.pyc').setContent('some preamble byte code\n')
bin.child('program').setContent('good program code\n')
scripts = dist.getScripts("", basedir=basedir.path)
self.assertEqual(scripts, [bin.child('program').path])
def test_scriptsInRelease(self):
"""
getScripts should return the scripts associated with a project
in the context of a released subproject tarball.
"""
basedir = self.mktemp()
os.mkdir(basedir)
os.mkdir(os.path.join(basedir, 'bin'))
f = open(os.path.join(basedir, 'bin', 'exy'), 'w')
f.write('yay')
f.close()
scripts = dist.getScripts('proj', basedir=basedir)
self.assertEqual(len(scripts), 1)
self.assertEqual(os.path.basename(scripts[0]), 'exy')
def test_noScriptsInSVN(self):
"""
When calling getScripts for a project which doesn't actually
have any scripts, in the context of an SVN checkout, an
empty list should be returned.
"""
basedir = self.mktemp()
os.mkdir(basedir)
os.mkdir(os.path.join(basedir, 'bin'))
os.mkdir(os.path.join(basedir, 'bin', 'otherproj'))
scripts = dist.getScripts('noscripts', basedir=basedir)
self.assertEqual(scripts, [])
def test_getScriptsTopLevel(self):
"""
Passing the empty string to getScripts returns scripts that are (only)
in the top level bin directory.
"""
basedir = FilePath(self.mktemp())
basedir.createDirectory()
bindir = basedir.child("bin")
bindir.createDirectory()
included = bindir.child("included")
included.setContent("yay included")
subdir = bindir.child("subdir")
subdir.createDirectory()
subdir.child("not-included").setContent("not included")
scripts = dist.getScripts("", basedir=basedir.path)
self.assertEqual(scripts, [included.path])
def test_noScriptsInSubproject(self):
"""
When calling getScripts for a project which doesn't actually
have any scripts in the context of that project's individual
project structure, an empty list should be returned.
"""
basedir = self.mktemp()
os.mkdir(basedir)
scripts = dist.getScripts('noscripts', basedir=basedir)
self.assertEqual(scripts, [])
class DummyCommand:
"""
A fake Command.
"""
def __init__(self, **kwargs):
for kw, val in kwargs.items():
setattr(self, kw, val)
def ensure_finalized(self):
pass
class BuildScriptsTest(TestCase):
"""
Tests for L{dist.build_scripts_twisted}.
"""
def setUp(self):
self.source = FilePath(self.mktemp())
self.target = FilePath(self.mktemp())
self.source.makedirs()
self.addCleanup(os.chdir, os.getcwd())
os.chdir(self.source.path)
def buildScripts(self):
"""
Write 3 types of scripts and run the L{build_scripts_twisted}
command.
"""
self.writeScript(self.source, "script1",
("#! /usr/bin/env python2.7\n"
"# bogus script w/ Python sh-bang\n"
"pass\n"))
self.writeScript(self.source, "script2.py",
("#!/usr/bin/python\n"
"# bogus script w/ Python sh-bang\n"
"pass\n"))
self.writeScript(self.source, "shell.sh",
("#!/bin/sh\n"
"# bogus shell script w/ sh-bang\n"
"exit 0\n"))
expected = ['script1', 'script2.py', 'shell.sh']
cmd = self.getBuildScriptsCmd(self.target,
[self.source.child(fn).path
for fn in expected])
cmd.finalize_options()
cmd.run()
return self.target.listdir()
def getBuildScriptsCmd(self, target, scripts):
"""
Create a distutils L{Distribution} with a L{DummyCommand} and wrap it
in L{build_scripts_twisted}.
@type target: L{FilePath}
"""
dist = Distribution()
dist.scripts = scripts
dist.command_obj["build"] = DummyCommand(
build_scripts = target.path,
force = 1,
executable = sys.executable
)
return build_scripts_twisted(dist)
def writeScript(self, dir, name, text):
"""
Write the script to disk.
"""
with open(dir.child(name).path, "w") as f:
f.write(text)
def test_notWindows(self):
"""
L{build_scripts_twisted} does not rename scripts on non-Windows
platforms.
"""
self.patch(os, "name", "twisted")
built = self.buildScripts()
for name in ['script1', 'script2.py', 'shell.sh']:
self.assertTrue(name in built)
def test_windows(self):
"""
L{build_scripts_twisted} renames scripts so they end with '.py' on
the Windows platform.
"""
self.patch(os, "name", "nt")
built = self.buildScripts()
for name in ['script1.py', 'script2.py', 'shell.sh.py']:
self.assertTrue(name in built)
class FakeModule(object):
"""
A fake module, suitable for dependency injection in testing.
"""
def __init__(self, attrs):
"""
Initializes a fake module.
@param attrs: The attrs that will be accessible on the module.
@type attrs: C{dict} of C{str} (Python names) to objects
"""
self._attrs = attrs
def __getattr__(self, name):
"""
Gets an attribute of this fake module from its attrs.
@raise AttributeError: When the requested attribute is missing.
"""
try:
return self._attrs[name]
except KeyError:
raise AttributeError()
fakeCPythonPlatform = FakeModule({"python_implementation": lambda: "CPython"})
fakeOtherPlatform = FakeModule({"python_implementation": lambda: "lvhpy"})
class WithPlatformTests(TestCase):
"""
Tests for L{_checkCPython} when used with a (fake) C{platform} module.
"""
def test_cpython(self):
"""
L{_checkCPython} returns C{True} when C{platform.python_implementation}
says we're running on CPython.
"""
self.assertTrue(dist._checkCPython(platform=fakeCPythonPlatform))
def test_other(self):
"""
L{_checkCPython} returns C{False} when C{platform.python_implementation}
says we're not running on CPython.
"""
self.assertFalse(dist._checkCPython(platform=fakeOtherPlatform))

View file

@ -0,0 +1,42 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.dist3}.
"""
from __future__ import division
import os
from twisted.trial.unittest import TestCase
from twisted.python.dist3 import modulesToInstall
class ModulesToInstallTests(TestCase):
"""
Tests for L{modulesToInstall}.
"""
def test_sanityCheck(self):
"""
L{modulesToInstall} includes some obvious module names.
"""
self.assertIn("twisted.internet.reactor", modulesToInstall)
self.assertIn("twisted.python.test.test_dist3", modulesToInstall)
def test_exist(self):
"""
All modules listed in L{modulesToInstall} exist.
"""
import twisted
root = os.path.dirname(os.path.dirname(twisted.__file__))
for module in modulesToInstall:
segments = module.split(".")
segments[-1] += ".py"
path = os.path.join(root, *segments)
alternateSegments = module.split(".") + ["__init__.py"]
packagePath = os.path.join(root, *alternateSegments)
self.assertTrue(os.path.exists(path) or
os.path.exists(packagePath),
"Module {0} does not exist".format(module))

View file

@ -0,0 +1,414 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.fakepwd}.
"""
try:
import pwd
except ImportError:
pwd = None
try:
import spwd
except ImportError:
spwd = None
import os
from operator import getitem
from twisted.trial.unittest import TestCase
from twisted.python.fakepwd import UserDatabase, ShadowDatabase
SYSTEM_UID_MAX = 999
def findInvalidUID():
"""
By convention, UIDs less than 1000 are reserved for the system. A system
which allocated every single one of those UIDs would likely have practical
problems with allocating new ones, so let's assume that we'll be able to
find one. (If we don't, this will wrap around to negative values and
I{eventually} find something.)
@return: a user ID which does not exist on the local system. Or, on
systems without a L{pwd} module, return C{SYSTEM_UID_MAX}.
"""
guess = SYSTEM_UID_MAX
if pwd is not None:
while True:
try:
pwd.getpwuid(guess)
except KeyError:
break
else:
guess -= 1
return guess
INVALID_UID = findInvalidUID()
class UserDatabaseTestsMixin:
"""
L{UserDatabaseTestsMixin} defines tests which apply to any user database
implementation. Subclasses should mix it in, implement C{setUp} to create
C{self.database} bound to a user database instance, and implement
C{getExistingUserInfo} to return information about a user (such information
should be unique per test method).
"""
def test_getpwuid(self):
"""
I{getpwuid} accepts a uid and returns the user record associated with
it.
"""
for i in range(2):
# Get some user which exists in the database.
username, password, uid, gid, gecos, dir, shell = self.getExistingUserInfo()
# Now try to look it up and make sure the result is correct.
entry = self.database.getpwuid(uid)
self.assertEqual(entry.pw_name, username)
self.assertEqual(entry.pw_passwd, password)
self.assertEqual(entry.pw_uid, uid)
self.assertEqual(entry.pw_gid, gid)
self.assertEqual(entry.pw_gecos, gecos)
self.assertEqual(entry.pw_dir, dir)
self.assertEqual(entry.pw_shell, shell)
def test_noSuchUID(self):
"""
I{getpwuid} raises L{KeyError} when passed a uid which does not exist
in the user database.
"""
self.assertRaises(KeyError, self.database.getpwuid, INVALID_UID)
def test_getpwnam(self):
"""
I{getpwnam} accepts a username and returns the user record associated
with it.
"""
for i in range(2):
# Get some user which exists in the database.
username, password, uid, gid, gecos, dir, shell = self.getExistingUserInfo()
# Now try to look it up and make sure the result is correct.
entry = self.database.getpwnam(username)
self.assertEqual(entry.pw_name, username)
self.assertEqual(entry.pw_passwd, password)
self.assertEqual(entry.pw_uid, uid)
self.assertEqual(entry.pw_gid, gid)
self.assertEqual(entry.pw_gecos, gecos)
self.assertEqual(entry.pw_dir, dir)
self.assertEqual(entry.pw_shell, shell)
def test_noSuchName(self):
"""
I{getpwnam} raises L{KeyError} when passed a username which does not
exist in the user database.
"""
self.assertRaises(
KeyError, self.database.getpwnam,
'no' 'such' 'user' 'exists' 'the' 'name' 'is' 'too' 'long' 'and' 'has'
'\1' 'in' 'it' 'too')
def test_recordLength(self):
"""
The user record returned by I{getpwuid}, I{getpwnam}, and I{getpwall}
has a length.
"""
db = self.database
username, password, uid, gid, gecos, dir, shell = self.getExistingUserInfo()
for entry in [db.getpwuid(uid), db.getpwnam(username), db.getpwall()[0]]:
self.assertIsInstance(len(entry), int)
self.assertEqual(len(entry), 7)
def test_recordIndexable(self):
"""
The user record returned by I{getpwuid}, I{getpwnam}, and I{getpwall}
is indexable, with successive indexes starting from 0 corresponding to
the values of the C{pw_name}, C{pw_passwd}, C{pw_uid}, C{pw_gid},
C{pw_gecos}, C{pw_dir}, and C{pw_shell} attributes, respectively.
"""
db = self.database
username, password, uid, gid, gecos, dir, shell = self.getExistingUserInfo()
for entry in [db.getpwuid(uid), db.getpwnam(username), db.getpwall()[0]]:
self.assertEqual(entry[0], username)
self.assertEqual(entry[1], password)
self.assertEqual(entry[2], uid)
self.assertEqual(entry[3], gid)
self.assertEqual(entry[4], gecos)
self.assertEqual(entry[5], dir)
self.assertEqual(entry[6], shell)
self.assertEqual(len(entry), len(list(entry)))
self.assertRaises(IndexError, getitem, entry, 7)
class UserDatabaseTests(TestCase, UserDatabaseTestsMixin):
"""
Tests for L{UserDatabase}.
"""
def setUp(self):
"""
Create a L{UserDatabase} with no user data in it.
"""
self.database = UserDatabase()
self._counter = SYSTEM_UID_MAX + 1
def getExistingUserInfo(self):
"""
Add a new user to C{self.database} and return its information.
"""
self._counter += 1
suffix = '_' + str(self._counter)
username = 'username' + suffix
password = 'password' + suffix
uid = self._counter
gid = self._counter + 1000
gecos = 'gecos' + suffix
dir = 'dir' + suffix
shell = 'shell' + suffix
self.database.addUser(username, password, uid, gid, gecos, dir, shell)
return (username, password, uid, gid, gecos, dir, shell)
def test_addUser(self):
"""
L{UserDatabase.addUser} accepts seven arguments, one for each field of
a L{pwd.struct_passwd}, and makes the new record available via
L{UserDatabase.getpwuid}, L{UserDatabase.getpwnam}, and
L{UserDatabase.getpwall}.
"""
username = 'alice'
password = 'secr3t'
uid = 123
gid = 456
gecos = 'Alice,,,'
home = '/users/alice'
shell = '/usr/bin/foosh'
db = self.database
db.addUser(username, password, uid, gid, gecos, home, shell)
for [entry] in [[db.getpwuid(uid)], [db.getpwnam(username)],
db.getpwall()]:
self.assertEqual(entry.pw_name, username)
self.assertEqual(entry.pw_passwd, password)
self.assertEqual(entry.pw_uid, uid)
self.assertEqual(entry.pw_gid, gid)
self.assertEqual(entry.pw_gecos, gecos)
self.assertEqual(entry.pw_dir, home)
self.assertEqual(entry.pw_shell, shell)
class PwdModuleTests(TestCase, UserDatabaseTestsMixin):
"""
L{PwdModuleTests} runs the tests defined by L{UserDatabaseTestsMixin}
against the built-in C{pwd} module. This serves to verify that
L{UserDatabase} is really a fake of that API.
"""
if pwd is None:
skip = "Cannot verify UserDatabase against pwd without pwd"
else:
database = pwd
def setUp(self):
self._users = iter(self.database.getpwall())
self._uids = set()
def getExistingUserInfo(self):
"""
Read and return the next record from C{self._users}, filtering out
any records with previously seen uid values (as these cannot be
found with C{getpwuid} and only cause trouble).
"""
while True:
entry = next(self._users)
uid = entry.pw_uid
if uid not in self._uids:
self._uids.add(uid)
return entry
class ShadowDatabaseTestsMixin:
"""
L{ShadowDatabaseTestsMixin} defines tests which apply to any shadow user
database implementation. Subclasses should mix it in, implement C{setUp} to
create C{self.database} bound to a shadow user database instance, and
implement C{getExistingUserInfo} to return information about a user (such
information should be unique per test method).
"""
def test_getspnam(self):
"""
L{getspnam} accepts a username and returns the user record associated
with it.
"""
for i in range(2):
# Get some user which exists in the database.
(username, password, lastChange, min, max, warn, inact, expire,
flag) = self.getExistingUserInfo()
entry = self.database.getspnam(username)
self.assertEqual(entry.sp_nam, username)
self.assertEqual(entry.sp_pwd, password)
self.assertEqual(entry.sp_lstchg, lastChange)
self.assertEqual(entry.sp_min, min)
self.assertEqual(entry.sp_max, max)
self.assertEqual(entry.sp_warn, warn)
self.assertEqual(entry.sp_inact, inact)
self.assertEqual(entry.sp_expire, expire)
self.assertEqual(entry.sp_flag, flag)
def test_noSuchName(self):
"""
I{getspnam} raises L{KeyError} when passed a username which does not
exist in the user database.
"""
self.assertRaises(KeyError, self.database.getspnam, "alice")
def test_recordLength(self):
"""
The shadow user record returned by I{getspnam} and I{getspall} has a
length.
"""
db = self.database
username = self.getExistingUserInfo()[0]
for entry in [db.getspnam(username), db.getspall()[0]]:
self.assertIsInstance(len(entry), int)
self.assertEqual(len(entry), 9)
def test_recordIndexable(self):
"""
The shadow user record returned by I{getpwnam} and I{getspall} is
indexable, with successive indexes starting from 0 corresponding to the
values of the C{sp_nam}, C{sp_pwd}, C{sp_lstchg}, C{sp_min}, C{sp_max},
C{sp_warn}, C{sp_inact}, C{sp_expire}, and C{sp_flag} attributes,
respectively.
"""
db = self.database
(username, password, lastChange, min, max, warn, inact, expire,
flag) = self.getExistingUserInfo()
for entry in [db.getspnam(username), db.getspall()[0]]:
self.assertEqual(entry[0], username)
self.assertEqual(entry[1], password)
self.assertEqual(entry[2], lastChange)
self.assertEqual(entry[3], min)
self.assertEqual(entry[4], max)
self.assertEqual(entry[5], warn)
self.assertEqual(entry[6], inact)
self.assertEqual(entry[7], expire)
self.assertEqual(entry[8], flag)
self.assertEqual(len(entry), len(list(entry)))
self.assertRaises(IndexError, getitem, entry, 9)
class ShadowDatabaseTests(TestCase, ShadowDatabaseTestsMixin):
"""
Tests for L{ShadowDatabase}.
"""
def setUp(self):
"""
Create a L{ShadowDatabase} with no user data in it.
"""
self.database = ShadowDatabase()
self._counter = 0
def getExistingUserInfo(self):
"""
Add a new user to C{self.database} and return its information.
"""
self._counter += 1
suffix = '_' + str(self._counter)
username = 'username' + suffix
password = 'password' + suffix
lastChange = self._counter + 1
min = self._counter + 2
max = self._counter + 3
warn = self._counter + 4
inact = self._counter + 5
expire = self._counter + 6
flag = self._counter + 7
self.database.addUser(username, password, lastChange, min, max, warn,
inact, expire, flag)
return (username, password, lastChange, min, max, warn, inact,
expire, flag)
def test_addUser(self):
"""
L{UserDatabase.addUser} accepts seven arguments, one for each field of
a L{pwd.struct_passwd}, and makes the new record available via
L{UserDatabase.getpwuid}, L{UserDatabase.getpwnam}, and
L{UserDatabase.getpwall}.
"""
username = 'alice'
password = 'secr3t'
lastChange = 17
min = 42
max = 105
warn = 12
inact = 3
expire = 400
flag = 3
db = self.database
db.addUser(username, password, lastChange, min, max, warn, inact,
expire, flag)
for [entry] in [[db.getspnam(username)], db.getspall()]:
self.assertEqual(entry.sp_nam, username)
self.assertEqual(entry.sp_pwd, password)
self.assertEqual(entry.sp_lstchg, lastChange)
self.assertEqual(entry.sp_min, min)
self.assertEqual(entry.sp_max, max)
self.assertEqual(entry.sp_warn, warn)
self.assertEqual(entry.sp_inact, inact)
self.assertEqual(entry.sp_expire, expire)
self.assertEqual(entry.sp_flag, flag)
class SPwdModuleTests(TestCase, ShadowDatabaseTestsMixin):
"""
L{SPwdModuleTests} runs the tests defined by L{ShadowDatabaseTestsMixin}
against the built-in C{spwd} module. This serves to verify that
L{ShadowDatabase} is really a fake of that API.
"""
if spwd is None:
skip = "Cannot verify ShadowDatabase against spwd without spwd"
elif os.getuid() != 0:
skip = "Cannot access shadow user database except as root"
else:
database = spwd
def setUp(self):
self._users = iter(self.database.getspall())
def getExistingUserInfo(self):
"""
Read and return the next record from C{self._users}.
"""
return next(self._users)

View file

@ -0,0 +1,112 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.hashlib}
"""
from twisted.trial.unittest import TestCase
from twisted.trial import util
class HashObjectTests(TestCase):
"""
Tests for the hash object APIs presented by L{hashlib}, C{md5} and C{sha1}.
"""
def test_deprecation(self):
"""
Ensure the deprecation of L{twisted.python.hashlib} is working.
"""
from twisted.python import hashlib
warnings = self.flushWarnings(
offendingFunctions=[self.test_deprecation])
self.assertIdentical(warnings[0]['category'], DeprecationWarning)
self.assertEqual(len(warnings), 1)
self.assertEqual(warnings[0]['message'],
"twisted.python.hashlib was deprecated in "
"Twisted 13.1.0: Please use hashlib from stdlib.")
def test_md5(self):
"""
L{hashlib.md5} returns an object which can be used to compute an MD5
hash as defined by U{RFC 1321<http://www.ietf.org/rfc/rfc1321.txt>}.
"""
from twisted.python.hashlib import md5
# Test the result using values from section A.5 of the RFC.
self.assertEqual(
md5().hexdigest(), "d41d8cd98f00b204e9800998ecf8427e")
self.assertEqual(
md5("a").hexdigest(), "0cc175b9c0f1b6a831c399e269772661")
self.assertEqual(
md5("abc").hexdigest(), "900150983cd24fb0d6963f7d28e17f72")
self.assertEqual(
md5("message digest").hexdigest(),
"f96b697d7cb7938d525a2f31aaf161d0")
self.assertEqual(
md5("abcdefghijklmnopqrstuvwxyz").hexdigest(),
"c3fcd3d76192e4007dfb496cca67e13b")
self.assertEqual(
md5("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
"0123456789").hexdigest(),
"d174ab98d277d9f5a5611c2c9f419d9f")
self.assertEqual(
md5("1234567890123456789012345678901234567890123456789012345678901"
"2345678901234567890").hexdigest(),
"57edf4a22be3c955ac49da2e2107b67a")
# It should have digest and update methods, too.
self.assertEqual(
md5().digest().encode('hex'),
"d41d8cd98f00b204e9800998ecf8427e")
hash = md5()
hash.update("a")
self.assertEqual(
hash.digest().encode('hex'),
"0cc175b9c0f1b6a831c399e269772661")
# Instances of it should have a digest_size attribute
self.assertEqual(md5().digest_size, 16)
test_md5.suppress = [util.suppress(message="twisted.python.hashlib"
"was deprecated in Twisted 13.1.0: Please use hashlib from stdlib.")]
def test_sha1(self):
"""
L{hashlib.sha1} returns an object which can be used to compute a SHA1
hash as defined by U{RFC 3174<http://tools.ietf.org/rfc/rfc3174.txt>}.
"""
from twisted.python.hashlib import sha1
def format(s):
return ''.join(s.split()).lower()
# Test the result using values from section 7.3 of the RFC.
self.assertEqual(
sha1("abc").hexdigest(),
format(
"A9 99 3E 36 47 06 81 6A BA 3E 25 71 78 50 C2 6C 9C D0 D8 9D"))
self.assertEqual(
sha1("abcdbcdecdefdefgefghfghighijhi"
"jkijkljklmklmnlmnomnopnopq").hexdigest(),
format(
"84 98 3E 44 1C 3B D2 6E BA AE 4A A1 F9 51 29 E5 E5 46 70 F1"))
# It should have digest and update methods, too.
self.assertEqual(
sha1("abc").digest().encode('hex'),
format(
"A9 99 3E 36 47 06 81 6A BA 3E 25 71 78 50 C2 6C 9C D0 D8 9D"))
hash = sha1()
hash.update("abc")
self.assertEqual(
hash.digest().encode('hex'),
format(
"A9 99 3E 36 47 06 81 6A BA 3E 25 71 78 50 C2 6C 9C D0 D8 9D"))
# Instances of it should have a digest_size attribute.
self.assertEqual(
sha1().digest_size, 20)
test_sha1.suppress = [util.suppress(message="twisted.python.hashlib"
"was deprecated in Twisted 13.1.0: Please use hashlib from stdlib.")]

View file

@ -0,0 +1,41 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.htmlizer}.
"""
from StringIO import StringIO
from twisted.trial.unittest import TestCase
from twisted.python.htmlizer import filter
class FilterTests(TestCase):
"""
Tests for L{twisted.python.htmlizer.filter}.
"""
def test_empty(self):
"""
If passed an empty input file, L{filter} writes a I{pre} tag containing
only an end marker to the output file.
"""
input = StringIO("")
output = StringIO()
filter(input, output)
self.assertEqual(output.getvalue(), '<pre><span class="py-src-endmarker"></span></pre>\n')
def test_variable(self):
"""
If passed an input file containing a variable access, L{filter} writes
a I{pre} tag containing a I{py-src-variable} span containing the
variable.
"""
input = StringIO("foo\n")
output = StringIO()
filter(input, output)
self.assertEqual(
output.getvalue(),
'<pre><span class="py-src-variable">foo</span><span class="py-src-newline">\n'
'</span><span class="py-src-endmarker"></span></pre>\n')

View file

@ -0,0 +1,120 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python._inotify}.
"""
from twisted.trial.unittest import TestCase
from twisted.python.runtime import platform
if platform.supportsINotify():
from ctypes import c_int, c_uint32, c_char_p
from twisted.python import _inotify
from twisted.python._inotify import (
INotifyError, initializeModule, init, add)
else:
_inotify = None
class INotifyTests(TestCase):
"""
Tests for L{twisted.python._inotify}.
"""
if _inotify is None:
skip = "This platform doesn't support INotify."
def test_missingInit(self):
"""
If the I{libc} object passed to L{initializeModule} has no
C{inotify_init} attribute, L{ImportError} is raised.
"""
class libc:
def inotify_add_watch(self):
pass
def inotify_rm_watch(self):
pass
self.assertRaises(ImportError, initializeModule, libc())
def test_missingAdd(self):
"""
If the I{libc} object passed to L{initializeModule} has no
C{inotify_add_watch} attribute, L{ImportError} is raised.
"""
class libc:
def inotify_init(self):
pass
def inotify_rm_watch(self):
pass
self.assertRaises(ImportError, initializeModule, libc())
def test_missingRemove(self):
"""
If the I{libc} object passed to L{initializeModule} has no
C{inotify_rm_watch} attribute, L{ImportError} is raised.
"""
class libc:
def inotify_init(self):
pass
def inotify_add_watch(self):
pass
self.assertRaises(ImportError, initializeModule, libc())
def test_setTypes(self):
"""
If the I{libc} object passed to L{initializeModule} has all of the
necessary attributes, it sets the C{argtypes} and C{restype} attributes
of the three ctypes methods used from libc.
"""
class libc:
def inotify_init(self):
pass
inotify_init = staticmethod(inotify_init)
def inotify_rm_watch(self):
pass
inotify_rm_watch = staticmethod(inotify_rm_watch)
def inotify_add_watch(self):
pass
inotify_add_watch = staticmethod(inotify_add_watch)
c = libc()
initializeModule(c)
self.assertEqual(c.inotify_init.argtypes, [])
self.assertEqual(c.inotify_init.restype, c_int)
self.assertEqual(c.inotify_rm_watch.argtypes, [c_int, c_int])
self.assertEqual(c.inotify_rm_watch.restype, c_int)
self.assertEqual(
c.inotify_add_watch.argtypes, [c_int, c_char_p, c_uint32])
self.assertEqual(c.inotify_add_watch.restype, c_int)
def test_failedInit(self):
"""
If C{inotify_init} returns a negative number, L{init} raises
L{INotifyError}.
"""
class libc:
def inotify_init(self):
return -1
self.patch(_inotify, 'libc', libc())
self.assertRaises(INotifyError, init)
def test_failedAddWatch(self):
"""
If C{inotify_add_watch} returns a negative number, L{add}
raises L{INotifyError}.
"""
class libc:
def inotify_add_watch(self, fd, path, mask):
return -1
self.patch(_inotify, 'libc', libc())
self.assertRaises(INotifyError, add, 3, '/foo', 0)

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,165 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.runtime}.
"""
from __future__ import division, absolute_import
import sys
from twisted.trial.util import suppress as SUPRESS
from twisted.trial.unittest import SynchronousTestCase
from twisted.python.runtime import Platform, shortPythonVersion
class PythonVersionTests(SynchronousTestCase):
"""
Tests the shortPythonVersion method.
"""
def test_shortPythonVersion(self):
"""
Verify if the Python version is returned correctly.
"""
ver = shortPythonVersion().split('.')
for i in range(3):
self.assertEqual(int(ver[i]), sys.version_info[i])
class PlatformTests(SynchronousTestCase):
"""
Tests for the default L{Platform} initializer.
"""
isWinNTDeprecationMessage = ('twisted.python.runtime.Platform.isWinNT was '
'deprecated in Twisted 13.0. Use Platform.isWindows instead.')
def test_isKnown(self):
"""
L{Platform.isKnown} returns a boolean indicating whether this is one of
the L{runtime.knownPlatforms}.
"""
platform = Platform()
self.assertTrue(platform.isKnown())
def test_isVistaConsistency(self):
"""
Verify consistency of L{Platform.isVista}: it can only be C{True} if
L{Platform.isWinNT} and L{Platform.isWindows} are C{True}.
"""
platform = Platform()
if platform.isVista():
self.assertTrue(platform.isWinNT())
self.assertTrue(platform.isWindows())
self.assertFalse(platform.isMacOSX())
def test_isMacOSXConsistency(self):
"""
L{Platform.isMacOSX} can only return C{True} if L{Platform.getType}
returns C{'posix'}.
"""
platform = Platform()
if platform.isMacOSX():
self.assertEqual(platform.getType(), 'posix')
def test_isLinuxConsistency(self):
"""
L{Platform.isLinux} can only return C{True} if L{Platform.getType}
returns C{'posix'} and L{sys.platform} starts with C{"linux"}.
"""
platform = Platform()
if platform.isLinux():
self.assertTrue(sys.platform.startswith("linux"))
def test_isWinNT(self):
"""
L{Platform.isWinNT} can return only C{False} or C{True} and can not
return C{True} if L{Platform.getType} is not C{"win32"}.
"""
platform = Platform()
isWinNT = platform.isWinNT()
self.assertIn(isWinNT, (False, True))
if platform.getType() != "win32":
self.assertEqual(isWinNT, False)
test_isWinNT.suppress = [SUPRESS(category=DeprecationWarning,
message=isWinNTDeprecationMessage)]
def test_isWinNTDeprecated(self):
"""
L{Platform.isWinNT} is deprecated in favor of L{platform.isWindows}.
"""
platform = Platform()
result = platform.isWinNT()
warnings = self.flushWarnings([self.test_isWinNTDeprecated])
self.assertEqual(len(warnings), 1)
self.assertEqual(
warnings[0]['message'], self.isWinNTDeprecationMessage)
def test_supportsThreads(self):
"""
L{Platform.supportsThreads} returns C{True} if threads can be created in
this runtime, C{False} otherwise.
"""
# It's difficult to test both cases of this without faking the threading
# module. Perhaps an adequate test is to just test the behavior with
# the current runtime, whatever that happens to be.
try:
import threading
except ImportError:
self.assertFalse(Platform().supportsThreads())
else:
self.assertTrue(Platform().supportsThreads())
class ForeignPlatformTests(SynchronousTestCase):
"""
Tests for L{Platform} based overridden initializer values.
"""
def test_getType(self):
"""
If an operating system name is supplied to L{Platform}'s initializer,
L{Platform.getType} returns the platform type which corresponds to that
name.
"""
self.assertEqual(Platform('nt').getType(), 'win32')
self.assertEqual(Platform('ce').getType(), 'win32')
self.assertEqual(Platform('posix').getType(), 'posix')
self.assertEqual(Platform('java').getType(), 'java')
def test_isMacOSX(self):
"""
If a system platform name is supplied to L{Platform}'s initializer, it
is used to determine the result of L{Platform.isMacOSX}, which returns
C{True} for C{"darwin"}, C{False} otherwise.
"""
self.assertTrue(Platform(None, 'darwin').isMacOSX())
self.assertFalse(Platform(None, 'linux2').isMacOSX())
self.assertFalse(Platform(None, 'win32').isMacOSX())
def test_isLinux(self):
"""
If a system platform name is supplied to L{Platform}'s initializer, it
is used to determine the result of L{Platform.isLinux}, which returns
C{True} for values beginning with C{"linux"}, C{False} otherwise.
"""
self.assertFalse(Platform(None, 'darwin').isLinux())
self.assertTrue(Platform(None, 'linux').isLinux())
self.assertTrue(Platform(None, 'linux2').isLinux())
self.assertTrue(Platform(None, 'linux3').isLinux())
self.assertFalse(Platform(None, 'win32').isLinux())

View file

@ -0,0 +1,543 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.sendmsg}.
"""
import sys
import errno
from socket import SOL_SOCKET, AF_INET, AF_INET6, socket, error
try:
from socket import AF_UNIX, socketpair
except ImportError:
nonUNIXSkip = "Platform does not support AF_UNIX sockets"
else:
nonUNIXSkip = None
from struct import pack
from os import devnull, pipe, read, close, environ
from twisted.internet.defer import Deferred
from twisted.internet.error import ProcessDone
from twisted.trial.unittest import TestCase
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.python.filepath import FilePath
from twisted.python.runtime import platform
from twisted.internet.protocol import ProcessProtocol
if platform.isLinux():
from socket import MSG_DONTWAIT
dontWaitSkip = None
else:
# It would be nice to be able to test flags on more platforms, but finding a
# flag that works *at all* is somewhat challenging.
dontWaitSkip = "MSG_DONTWAIT is only known to work as intended on Linux"
try:
from twisted.python.sendmsg import SCM_RIGHTS, send1msg, recv1msg, getsockfam
except ImportError:
importSkip = "Cannot import twisted.python.sendmsg"
else:
importSkip = None
class ExitedWithStderr(Exception):
"""
A process exited with some stderr.
"""
def __str__(self):
"""
Dump the errors in a pretty way in the event of a subprocess traceback.
"""
return '\n'.join([''] + list(self.args))
class StartStopProcessProtocol(ProcessProtocol):
"""
An L{IProcessProtocol} with a Deferred for events where the subprocess
starts and stops.
@ivar started: A L{Deferred} which fires with this protocol's
L{IProcessTransport} provider when it is connected to one.
@ivar stopped: A L{Deferred} which fires with the process output or a
failure if the process produces output on standard error.
@ivar output: A C{str} used to accumulate standard output.
@ivar errors: A C{str} used to accumulate standard error.
"""
def __init__(self):
self.started = Deferred()
self.stopped = Deferred()
self.output = ''
self.errors = ''
def connectionMade(self):
self.started.callback(self.transport)
def outReceived(self, data):
self.output += data
def errReceived(self, data):
self.errors += data
def processEnded(self, reason):
if reason.check(ProcessDone):
self.stopped.callback(self.output)
else:
self.stopped.errback(ExitedWithStderr(
self.errors, self.output))
class BadList(list):
"""
A list which cannot be iterated sometimes.
This is a C{list} subclass to get past the type check in L{send1msg}, not as
an example of how real programs might want to interact with L{send1msg} (or
anything else). A custom C{list} subclass makes it easier to trigger
certain error cases in the implementation.
@ivar iterate: A flag which indicates whether an instance of L{BadList} will
allow iteration over itself or not. If C{False}, an attempt to iterate
over the instance will raise an exception.
"""
iterate = True
def __iter__(self):
"""
Allow normal list iteration, or raise an exception.
If C{self.iterate} is C{True}, it will be flipped to C{False} and then
normal iteration will proceed. If C{self.iterate} is C{False},
L{RuntimeError} is raised instead.
"""
if self.iterate:
self.iterate = False
return super(BadList, self).__iter__()
raise RuntimeError("Something bad happened")
class WorseList(list):
"""
A list which at first gives the appearance of being iterable, but then
raises an exception.
See L{BadList} for a warning about not writing code like this.
"""
def __iter__(self):
"""
Return an iterator which will raise an exception as soon as C{next} is
called on it.
"""
class BadIterator(object):
def next(self):
raise RuntimeError("This is a really bad case.")
return BadIterator()
class SendmsgTestCase(TestCase):
"""
Tests for sendmsg extension module and associated file-descriptor sending
functionality.
"""
if nonUNIXSkip is not None:
skip = nonUNIXSkip
elif importSkip is not None:
skip = importSkip
def setUp(self):
"""
Create a pair of UNIX sockets.
"""
self.input, self.output = socketpair(AF_UNIX)
def tearDown(self):
"""
Close the sockets opened by setUp.
"""
self.input.close()
self.output.close()
def test_sendmsgBadArguments(self):
"""
The argument types accepted by L{send1msg} are:
1. C{int}
2. read-only character buffer
3. C{int}
4. sequence
The 3rd and 4th arguments are optional. If fewer than two arguments or
more than four arguments are passed, or if any of the arguments passed
are not compatible with these types, L{TypeError} is raised.
"""
# Exercise the wrong number of arguments cases
self.assertRaises(TypeError, send1msg)
self.assertRaises(TypeError, send1msg, 1)
self.assertRaises(TypeError, send1msg, 1, "hello world", 2, [], object())
# Exercise the wrong type of arguments cases
self.assertRaises(TypeError, send1msg, object(), "hello world", 2, [])
self.assertRaises(TypeError, send1msg, 1, object(), 2, [])
self.assertRaises(TypeError, send1msg, 1, "hello world", object(), [])
self.assertRaises(TypeError, send1msg, 1, "hello world", 2, object())
def test_badAncillaryIter(self):
"""
If iteration over the ancillary data list fails (at the point of the
C{__iter__} call), the exception with which it fails is propagated to
the caller of L{send1msg}.
"""
badList = BadList()
badList.append((1, 2, "hello world"))
badList.iterate = False
self.assertRaises(RuntimeError, send1msg, 1, "hello world", 2, badList)
# Hit the second iteration
badList.iterate = True
self.assertRaises(RuntimeError, send1msg, 1, "hello world", 2, badList)
def test_badAncillaryNext(self):
"""
If iteration over the ancillary data list fails (at the point of a
C{next} call), the exception with which it fails is propagated to the
caller of L{send1msg}.
"""
worseList = WorseList()
self.assertRaises(RuntimeError, send1msg, 1, "hello world", 2, worseList)
def test_sendmsgBadAncillaryItem(self):
"""
The ancillary data list contains three-tuples with element types of:
1. C{int}
2. C{int}
3. read-only character buffer
If a tuple in the ancillary data list does not elements of these types,
L{TypeError} is raised.
"""
# Exercise the wrong number of arguments cases
self.assertRaises(TypeError, send1msg, 1, "hello world", 2, [()])
self.assertRaises(TypeError, send1msg, 1, "hello world", 2, [(1,)])
self.assertRaises(TypeError, send1msg, 1, "hello world", 2, [(1, 2)])
self.assertRaises(
TypeError,
send1msg, 1, "hello world", 2, [(1, 2, "goodbye", object())])
# Exercise the wrong type of arguments cases
exc = self.assertRaises(
TypeError, send1msg, 1, "hello world", 2, [object()])
self.assertEqual(
"send1msg argument 3 expected list of tuple, "
"got list containing object",
str(exc))
self.assertRaises(
TypeError,
send1msg, 1, "hello world", 2, [(object(), 1, "goodbye")])
self.assertRaises(
TypeError,
send1msg, 1, "hello world", 2, [(1, object(), "goodbye")])
self.assertRaises(
TypeError,
send1msg, 1, "hello world", 2, [(1, 1, object())])
def test_syscallError(self):
"""
If the underlying C{sendmsg} call fails, L{send1msg} raises
L{socket.error} with its errno set to the underlying errno value.
"""
probe = file(devnull)
fd = probe.fileno()
probe.close()
exc = self.assertRaises(error, send1msg, fd, "hello, world")
self.assertEqual(exc.args[0], errno.EBADF)
def test_syscallErrorWithControlMessage(self):
"""
The behavior when the underlying C{sendmsg} call fails is the same
whether L{send1msg} is passed ancillary data or not.
"""
probe = file(devnull)
fd = probe.fileno()
probe.close()
exc = self.assertRaises(
error, send1msg, fd, "hello, world", 0, [(0, 0, "0123")])
self.assertEqual(exc.args[0], errno.EBADF)
def test_roundtrip(self):
"""
L{recv1msg} will retrieve a message sent via L{send1msg}.
"""
message = "hello, world!"
self.assertEqual(
len(message),
send1msg(self.input.fileno(), message, 0))
result = recv1msg(fd=self.output.fileno())
self.assertEqual(result, (message, 0, []))
def test_shortsend(self):
"""
L{send1msg} returns the number of bytes which it was able to send.
"""
message = "x" * 1024 * 1024
self.input.setblocking(False)
sent = send1msg(self.input.fileno(), message)
# Sanity check - make sure we did fill the send buffer and then some
self.assertTrue(sent < len(message))
received = recv1msg(self.output.fileno(), 0, len(message))
self.assertEqual(len(received[0]), sent)
def test_roundtripEmptyAncillary(self):
"""
L{send1msg} treats an empty ancillary data list the same way it treats
receiving no argument for the ancillary parameter at all.
"""
send1msg(self.input.fileno(), "hello, world!", 0, [])
result = recv1msg(fd=self.output.fileno())
self.assertEqual(result, ("hello, world!", 0, []))
def test_flags(self):
"""
The C{flags} argument to L{send1msg} is passed on to the underlying
C{sendmsg} call, to affect it in whatever way is defined by those flags.
"""
# Just exercise one flag with simple, well-known behavior. MSG_DONTWAIT
# makes the send a non-blocking call, even if the socket is in blocking
# mode. See also test_flags in RecvmsgTestCase
for i in range(1024):
try:
send1msg(self.input.fileno(), "x" * 1024, MSG_DONTWAIT)
except error, e:
self.assertEqual(e.args[0], errno.EAGAIN)
break
else:
self.fail(
"Failed to fill up the send buffer, "
"or maybe send1msg blocked for a while")
if dontWaitSkip is not None:
test_flags.skip = dontWaitSkip
def test_wrongTypeAncillary(self):
"""
L{send1msg} will show a helpful exception message when given the wrong
type of object for the 'ancillary' argument.
"""
error = self.assertRaises(TypeError,
send1msg, self.input.fileno(),
"hello, world!", 0, 4321)
self.assertEqual(str(error),
"send1msg argument 3 expected list, got int")
def spawn(self, script):
"""
Start a script that is a peer of this test as a subprocess.
@param script: the module name of the script in this directory (no
package prefix, no '.py')
@type script: C{str}
@rtype: L{StartStopProcessProtocol}
"""
sspp = StartStopProcessProtocol()
reactor.spawnProcess(
sspp, sys.executable, [
sys.executable,
FilePath(__file__).sibling(script + ".py").path,
str(self.output.fileno()),
],
environ,
childFDs={0: "w", 1: "r", 2: "r",
self.output.fileno(): self.output.fileno()}
)
return sspp
@inlineCallbacks
def test_sendSubProcessFD(self):
"""
Calling L{sendsmsg} with SOL_SOCKET, SCM_RIGHTS, and a platform-endian
packed file descriptor number should send that file descriptor to a
different process, where it can be retrieved by using L{recv1msg}.
"""
sspp = self.spawn("pullpipe")
yield sspp.started
pipeOut, pipeIn = pipe()
self.addCleanup(close, pipeOut)
send1msg(
self.input.fileno(), "blonk", 0,
[(SOL_SOCKET, SCM_RIGHTS, pack("i", pipeIn))])
close(pipeIn)
yield sspp.stopped
self.assertEqual(read(pipeOut, 1024), "Test fixture data: blonk.\n")
# Make sure that the pipe is actually closed now.
self.assertEqual(read(pipeOut, 1024), "")
class RecvmsgTestCase(TestCase):
"""
Tests for L{recv1msg} (primarily error handling cases).
"""
if importSkip is not None:
skip = importSkip
def test_badArguments(self):
"""
The argument types accepted by L{recv1msg} are:
1. C{int}
2. C{int}
3. C{int}
4. C{int}
The 2nd, 3rd, and 4th arguments are optional. If fewer than one
argument or more than four arguments are passed, or if any of the
arguments passed are not compatible with these types, L{TypeError} is
raised.
"""
# Exercise the wrong number of arguments cases
self.assertRaises(TypeError, recv1msg)
self.assertRaises(TypeError, recv1msg, 1, 2, 3, 4, object())
# Exercise the wrong type of arguments cases
self.assertRaises(TypeError, recv1msg, object(), 2, 3, 4)
self.assertRaises(TypeError, recv1msg, 1, object(), 3, 4)
self.assertRaises(TypeError, recv1msg, 1, 2, object(), 4)
self.assertRaises(TypeError, recv1msg, 1, 2, 3, object())
def test_cmsgSpaceOverflow(self):
"""
L{recv1msg} raises L{OverflowError} if passed a value for the
C{cmsg_size} argument which exceeds C{SOCKLEN_MAX}.
"""
self.assertRaises(OverflowError, recv1msg, 0, 0, 0, 0x7FFFFFFF)
def test_syscallError(self):
"""
If the underlying C{recvmsg} call fails, L{recv1msg} raises
L{socket.error} with its errno set to the underlying errno value.
"""
probe = file(devnull)
fd = probe.fileno()
probe.close()
exc = self.assertRaises(error, recv1msg, fd)
self.assertEqual(exc.args[0], errno.EBADF)
def test_flags(self):
"""
The C{flags} argument to L{recv1msg} is passed on to the underlying
C{recvmsg} call, to affect it in whatever way is defined by those flags.
"""
# See test_flags in SendmsgTestCase
reader, writer = socketpair(AF_UNIX)
exc = self.assertRaises(
error, recv1msg, reader.fileno(), MSG_DONTWAIT)
self.assertEqual(exc.args[0], errno.EAGAIN)
if dontWaitSkip is not None:
test_flags.skip = dontWaitSkip
class GetSocketFamilyTests(TestCase):
"""
Tests for L{getsockfam}, a helper which reveals the address family of an
arbitrary socket.
"""
if importSkip is not None:
skip = importSkip
def _socket(self, addressFamily):
"""
Create a new socket using the given address family and return that
socket's file descriptor. The socket will automatically be closed when
the test is torn down.
"""
s = socket(addressFamily)
self.addCleanup(s.close)
return s.fileno()
def test_badArguments(self):
"""
L{getsockfam} accepts a single C{int} argument. If it is called in some
other way, L{TypeError} is raised.
"""
self.assertRaises(TypeError, getsockfam)
self.assertRaises(TypeError, getsockfam, 1, 2)
self.assertRaises(TypeError, getsockfam, object())
def test_syscallError(self):
"""
If the underlying C{getsockname} call fails, L{getsockfam} raises
L{socket.error} with its errno set to the underlying errno value.
"""
probe = file(devnull)
fd = probe.fileno()
probe.close()
exc = self.assertRaises(error, getsockfam, fd)
self.assertEqual(errno.EBADF, exc.args[0])
def test_inet(self):
"""
When passed the file descriptor of a socket created with the C{AF_INET}
address family, L{getsockfam} returns C{AF_INET}.
"""
self.assertEqual(AF_INET, getsockfam(self._socket(AF_INET)))
def test_inet6(self):
"""
When passed the file descriptor of a socket created with the C{AF_INET6}
address family, L{getsockfam} returns C{AF_INET6}.
"""
self.assertEqual(AF_INET6, getsockfam(self._socket(AF_INET6)))
def test_unix(self):
"""
When passed the file descriptor of a socket created with the C{AF_UNIX}
address family, L{getsockfam} returns C{AF_UNIX}.
"""
self.assertEqual(AF_UNIX, getsockfam(self._socket(AF_UNIX)))
if nonUNIXSkip is not None:
test_unix.skip = nonUNIXSkip

View file

@ -0,0 +1,623 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Test cases for twisted.python._shellcomp
"""
import sys
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.python import _shellcomp, usage, reflect
from twisted.python.usage import Completions, Completer, CompleteFiles
from twisted.python.usage import CompleteList
class ZshScriptTestMeta(type):
"""
Metaclass of ZshScriptTestMixin.
"""
def __new__(cls, name, bases, attrs):
def makeTest(cmdName, optionsFQPN):
def runTest(self):
return test_genZshFunction(self, cmdName, optionsFQPN)
return runTest
# add test_ methods to the class for each script
# we are testing.
if 'generateFor' in attrs:
for cmdName, optionsFQPN in attrs['generateFor']:
test = makeTest(cmdName, optionsFQPN)
attrs['test_genZshFunction_' + cmdName] = test
return type.__new__(cls, name, bases, attrs)
class ZshScriptTestMixin(object):
"""
Integration test helper to show that C{usage.Options} classes can have zsh
completion functions generated for them without raising errors.
In your subclasses set a class variable like so:
# | cmd name | Fully Qualified Python Name of Options class |
#
generateFor = [('conch', 'twisted.conch.scripts.conch.ClientOptions'),
('twistd', 'twisted.scripts.twistd.ServerOptions'),
]
Each package that contains Twisted scripts should contain one TestCase
subclass which also inherits from this mixin, and contains a C{generateFor}
list appropriate for the scripts in that package.
"""
__metaclass__ = ZshScriptTestMeta
def test_genZshFunction(self, cmdName, optionsFQPN):
"""
Generate completion functions for given twisted command - no errors
should be raised
@type cmdName: C{str}
@param cmdName: The name of the command-line utility e.g. 'twistd'
@type optionsFQPN: C{str}
@param optionsFQPN: The Fully Qualified Python Name of the C{Options}
class to be tested.
"""
outputFile = StringIO()
self.patch(usage.Options, '_shellCompFile', outputFile)
# some scripts won't import or instantiate because of missing
# dependencies (PyCrypto, etc) so we have to skip them.
try:
o = reflect.namedAny(optionsFQPN)()
except Exception, e:
raise unittest.SkipTest("Couldn't import or instantiate "
"Options class: %s" % (e,))
try:
o.parseOptions(["", "--_shell-completion", "zsh:2"])
except ImportError, e:
# this can happen for commands which don't have all
# the necessary dependencies installed. skip test.
# skip
raise unittest.SkipTest("ImportError calling parseOptions(): %s", (e,))
except SystemExit:
pass # expected
else:
self.fail('SystemExit not raised')
outputFile.seek(0)
# test that we got some output
self.assertEqual(1, len(outputFile.read(1)))
outputFile.seek(0)
outputFile.truncate()
# now, if it has sub commands, we have to test those too
if hasattr(o, 'subCommands'):
for (cmd, short, parser, doc) in o.subCommands:
try:
o.parseOptions([cmd, "", "--_shell-completion",
"zsh:3"])
except ImportError, e:
# this can happen for commands which don't have all
# the necessary dependencies installed. skip test.
raise unittest.SkipTest("ImportError calling parseOptions() "
"on subcommand: %s", (e,))
except SystemExit:
pass # expected
else:
self.fail('SystemExit not raised')
outputFile.seek(0)
# test that we got some output
self.assertEqual(1, len(outputFile.read(1)))
outputFile.seek(0)
outputFile.truncate()
# flushed because we don't want DeprecationWarnings to be printed when
# running these test cases.
self.flushWarnings()
class ZshTestCase(unittest.TestCase):
"""
Tests for zsh completion code
"""
def test_accumulateMetadata(self):
"""
Are `compData' attributes you can place on Options classes
picked up correctly?
"""
opts = FighterAceExtendedOptions()
ag = _shellcomp.ZshArgumentsGenerator(opts, 'ace', 'dummy_value')
descriptions = FighterAceOptions.compData.descriptions.copy()
descriptions.update(FighterAceExtendedOptions.compData.descriptions)
self.assertEqual(ag.descriptions, descriptions)
self.assertEqual(ag.multiUse,
set(FighterAceOptions.compData.multiUse))
self.assertEqual(ag.mutuallyExclusive,
FighterAceOptions.compData.mutuallyExclusive)
optActions = FighterAceOptions.compData.optActions.copy()
optActions.update(FighterAceExtendedOptions.compData.optActions)
self.assertEqual(ag.optActions, optActions)
self.assertEqual(ag.extraActions,
FighterAceOptions.compData.extraActions)
def test_mutuallyExclusiveCornerCase(self):
"""
Exercise a corner-case of ZshArgumentsGenerator.makeExcludesDict()
where the long option name already exists in the `excludes` dict being
built.
"""
class OddFighterAceOptions(FighterAceExtendedOptions):
# since "fokker", etc, are already defined as mutually-
# exclusive on the super-class, defining them again here forces
# the corner-case to be exercised.
optFlags = [['anatra', None,
'Select the Anatra DS as your dogfighter aircraft']]
compData = Completions(
mutuallyExclusive=[['anatra', 'fokker', 'albatros',
'spad', 'bristol']])
opts = OddFighterAceOptions()
ag = _shellcomp.ZshArgumentsGenerator(opts, 'ace', 'dummy_value')
expected = {
'albatros': set(['anatra', 'b', 'bristol', 'f',
'fokker', 's', 'spad']),
'anatra': set(['a', 'albatros', 'b', 'bristol',
'f', 'fokker', 's', 'spad']),
'bristol': set(['a', 'albatros', 'anatra', 'f',
'fokker', 's', 'spad']),
'fokker': set(['a', 'albatros', 'anatra', 'b',
'bristol', 's', 'spad']),
'spad': set(['a', 'albatros', 'anatra', 'b',
'bristol', 'f', 'fokker'])}
self.assertEqual(ag.excludes, expected)
def test_accumulateAdditionalOptions(self):
"""
We pick up options that are only defined by having an
appropriately named method on your Options class,
e.g. def opt_foo(self, foo)
"""
opts = FighterAceExtendedOptions()
ag = _shellcomp.ZshArgumentsGenerator(opts, 'ace', 'dummy_value')
self.assertIn('nocrash', ag.flagNameToDefinition)
self.assertIn('nocrash', ag.allOptionsNameToDefinition)
self.assertIn('difficulty', ag.paramNameToDefinition)
self.assertIn('difficulty', ag.allOptionsNameToDefinition)
def test_verifyZshNames(self):
"""
Using a parameter/flag name that doesn't exist
will raise an error
"""
class TmpOptions(FighterAceExtendedOptions):
# Note typo of detail
compData = Completions(optActions={'detaill' : None})
self.assertRaises(ValueError, _shellcomp.ZshArgumentsGenerator,
TmpOptions(), 'ace', 'dummy_value')
class TmpOptions2(FighterAceExtendedOptions):
# Note that 'foo' and 'bar' are not real option
# names defined in this class
compData = Completions(
mutuallyExclusive=[("foo", "bar")])
self.assertRaises(ValueError, _shellcomp.ZshArgumentsGenerator,
TmpOptions2(), 'ace', 'dummy_value')
def test_zshCode(self):
"""
Generate a completion function, and test the textual output
against a known correct output
"""
outputFile = StringIO()
self.patch(usage.Options, '_shellCompFile', outputFile)
self.patch(sys, 'argv', ["silly", "", "--_shell-completion", "zsh:2"])
opts = SimpleProgOptions()
self.assertRaises(SystemExit, opts.parseOptions)
self.assertEqual(testOutput1, outputFile.getvalue())
def test_zshCodeWithSubs(self):
"""
Generate a completion function with subcommands,
and test the textual output against a known correct output
"""
outputFile = StringIO()
self.patch(usage.Options, '_shellCompFile', outputFile)
self.patch(sys, 'argv', ["silly2", "", "--_shell-completion", "zsh:2"])
opts = SimpleProgWithSubcommands()
self.assertRaises(SystemExit, opts.parseOptions)
self.assertEqual(testOutput2, outputFile.getvalue())
def test_incompleteCommandLine(self):
"""
Completion still happens even if a command-line is given
that would normally throw UsageError.
"""
outputFile = StringIO()
self.patch(usage.Options, '_shellCompFile', outputFile)
opts = FighterAceOptions()
self.assertRaises(SystemExit, opts.parseOptions,
["--fokker", "server", "--unknown-option",
"--unknown-option2",
"--_shell-completion", "zsh:5"])
outputFile.seek(0)
# test that we got some output
self.assertEqual(1, len(outputFile.read(1)))
def test_incompleteCommandLine_case2(self):
"""
Completion still happens even if a command-line is given
that would normally throw UsageError.
The existance of --unknown-option prior to the subcommand
will break subcommand detection... but we complete anyway
"""
outputFile = StringIO()
self.patch(usage.Options, '_shellCompFile', outputFile)
opts = FighterAceOptions()
self.assertRaises(SystemExit, opts.parseOptions,
["--fokker", "--unknown-option", "server",
"--list-server", "--_shell-completion", "zsh:5"])
outputFile.seek(0)
# test that we got some output
self.assertEqual(1, len(outputFile.read(1)))
outputFile.seek(0)
outputFile.truncate()
def test_incompleteCommandLine_case3(self):
"""
Completion still happens even if a command-line is given
that would normally throw UsageError.
Break subcommand detection in a different way by providing
an invalid subcommand name.
"""
outputFile = StringIO()
self.patch(usage.Options, '_shellCompFile', outputFile)
opts = FighterAceOptions()
self.assertRaises(SystemExit, opts.parseOptions,
["--fokker", "unknown-subcommand",
"--list-server", "--_shell-completion", "zsh:4"])
outputFile.seek(0)
# test that we got some output
self.assertEqual(1, len(outputFile.read(1)))
def test_skipSubcommandList(self):
"""
Ensure the optimization which skips building the subcommand list
under certain conditions isn't broken.
"""
outputFile = StringIO()
self.patch(usage.Options, '_shellCompFile', outputFile)
opts = FighterAceOptions()
self.assertRaises(SystemExit, opts.parseOptions,
["--alba", "--_shell-completion", "zsh:2"])
outputFile.seek(0)
# test that we got some output
self.assertEqual(1, len(outputFile.read(1)))
def test_poorlyDescribedOptMethod(self):
"""
Test corner case fetching an option description from a method docstring
"""
opts = FighterAceOptions()
argGen = _shellcomp.ZshArgumentsGenerator(opts, 'ace', None)
descr = argGen.getDescription('silly')
# docstring for opt_silly is useless so it should just use the
# option name as the description
self.assertEqual(descr, 'silly')
def test_brokenActions(self):
"""
A C{Completer} with repeat=True may only be used as the
last item in the extraActions list.
"""
class BrokenActions(usage.Options):
compData = usage.Completions(
extraActions=[usage.Completer(repeat=True),
usage.Completer()]
)
outputFile = StringIO()
opts = BrokenActions()
self.patch(opts, '_shellCompFile', outputFile)
self.assertRaises(ValueError, opts.parseOptions,
["", "--_shell-completion", "zsh:2"])
def test_optMethodsDontOverride(self):
"""
opt_* methods on Options classes should not override the
data provided in optFlags or optParameters.
"""
class Options(usage.Options):
optFlags = [['flag', 'f', 'A flag']]
optParameters = [['param', 'p', None, 'A param']]
def opt_flag(self):
""" junk description """
def opt_param(self, param):
""" junk description """
opts = Options()
argGen = _shellcomp.ZshArgumentsGenerator(opts, 'ace', None)
self.assertEqual(argGen.getDescription('flag'), 'A flag')
self.assertEqual(argGen.getDescription('param'), 'A param')
class EscapeTestCase(unittest.TestCase):
def test_escape(self):
"""
Verify _shellcomp.escape() function
"""
esc = _shellcomp.escape
test = "$"
self.assertEqual(esc(test), "'$'")
test = 'A--\'$"\\`--B'
self.assertEqual(esc(test), '"A--\'\\$\\"\\\\\\`--B"')
class CompleterNotImplementedTestCase(unittest.TestCase):
"""
Test that using an unknown shell constant with SubcommandAction
raises NotImplementedError
The other Completer() subclasses are tested in test_usage.py
"""
def test_unknownShell(self):
"""
Using an unknown shellType should raise NotImplementedError
"""
action = _shellcomp.SubcommandAction()
self.assertRaises(NotImplementedError, action._shellCode,
None, "bad_shell_type")
class FighterAceServerOptions(usage.Options):
"""
Options for FighterAce 'server' subcommand
"""
optFlags = [['list-server', None,
'List this server with the online FighterAce network']]
optParameters = [['packets-per-second', None,
'Number of update packets to send per second', '20']]
class FighterAceOptions(usage.Options):
"""
Command-line options for an imaginary `Fighter Ace` game
"""
optFlags = [['fokker', 'f',
'Select the Fokker Dr.I as your dogfighter aircraft'],
['albatros', 'a',
'Select the Albatros D-III as your dogfighter aircraft'],
['spad', 's',
'Select the SPAD S.VII as your dogfighter aircraft'],
['bristol', 'b',
'Select the Bristol Scout as your dogfighter aircraft'],
['physics', 'p',
'Enable secret Twisted physics engine'],
['jam', 'j',
'Enable a small chance that your machine guns will jam!'],
['verbose', 'v',
'Verbose logging (may be specified more than once)'],
]
optParameters = [['pilot-name', None, "What's your name, Ace?",
'Manfred von Richthofen'],
['detail', 'd',
'Select the level of rendering detail (1-5)', '3'],
]
subCommands = [['server', None, FighterAceServerOptions,
'Start FighterAce game-server.'],
]
compData = Completions(
descriptions={'physics' : 'Twisted-Physics',
'detail' : 'Rendering detail level'},
multiUse=['verbose'],
mutuallyExclusive=[['fokker', 'albatros', 'spad',
'bristol']],
optActions={'detail' : CompleteList(['1' '2' '3'
'4' '5'])},
extraActions=[CompleteFiles(descr='saved game file to load')]
)
def opt_silly(self):
# A silly option which nobody can explain
""" """
class FighterAceExtendedOptions(FighterAceOptions):
"""
Extend the options and zsh metadata provided by FighterAceOptions.
_shellcomp must accumulate options and metadata from all classes in the
hiearchy so this is important to test.
"""
optFlags = [['no-stalls', None,
'Turn off the ability to stall your aircraft']]
optParameters = [['reality-level', None,
'Select the level of physics reality (1-5)', '5']]
compData = Completions(
descriptions={'no-stalls' : 'Can\'t stall your plane'},
optActions={'reality-level' :
Completer(descr='Physics reality level')}
)
def opt_nocrash(self):
"""
Select that you can't crash your plane
"""
def opt_difficulty(self, difficulty):
"""
How tough are you? (1-10)
"""
def _accuracyAction():
# add tick marks just to exercise quoting
return CompleteList(['1', '2', '3'], descr='Accuracy\'`?')
class SimpleProgOptions(usage.Options):
"""
Command-line options for a `Silly` imaginary program
"""
optFlags = [['color', 'c', 'Turn on color output'],
['gray', 'g', 'Turn on gray-scale output'],
['verbose', 'v',
'Verbose logging (may be specified more than once)'],
]
optParameters = [['optimization', None, '5',
'Select the level of optimization (1-5)'],
['accuracy', 'a', '3',
'Select the level of accuracy (1-3)'],
]
compData = Completions(
descriptions={'color' : 'Color on',
'optimization' : 'Optimization level'},
multiUse=['verbose'],
mutuallyExclusive=[['color', 'gray']],
optActions={'optimization' : CompleteList(['1', '2', '3', '4', '5'],
descr='Optimization?'),
'accuracy' : _accuracyAction},
extraActions=[CompleteFiles(descr='output file')]
)
def opt_X(self):
"""
usage.Options does not recognize single-letter opt_ methods
"""
class SimpleProgSub1(usage.Options):
optFlags = [['sub-opt', 's', 'Sub Opt One']]
class SimpleProgSub2(usage.Options):
optFlags = [['sub-opt', 's', 'Sub Opt Two']]
class SimpleProgWithSubcommands(SimpleProgOptions):
optFlags = [['some-option'],
['other-option', 'o']]
optParameters = [['some-param'],
['other-param', 'p'],
['another-param', 'P', 'Yet Another Param']]
subCommands = [ ['sub1', None, SimpleProgSub1, 'Sub Command 1'],
['sub2', None, SimpleProgSub2, 'Sub Command 2']]
testOutput1 = """#compdef silly
_arguments -s -A "-*" \\
':output file (*):_files -g "*"' \\
"(--accuracy)-a[Select the level of accuracy (1-3)]:Accuracy'\`?:(1 2 3)" \\
"(-a)--accuracy=[Select the level of accuracy (1-3)]:Accuracy'\`?:(1 2 3)" \\
'(--color --gray -g)-c[Color on]' \\
'(--gray -c -g)--color[Color on]' \\
'(--color --gray -c)-g[Turn on gray-scale output]' \\
'(--color -c -g)--gray[Turn on gray-scale output]' \\
'--help[Display this help and exit.]' \\
'--optimization=[Optimization level]:Optimization?:(1 2 3 4 5)' \\
'*-v[Verbose logging (may be specified more than once)]' \\
'*--verbose[Verbose logging (may be specified more than once)]' \\
'--version[Display Twisted version and exit.]' \\
&& return 0
"""
# with sub-commands
testOutput2 = """#compdef silly2
_arguments -s -A "-*" \\
'*::subcmd:->subcmd' \\
':output file (*):_files -g "*"' \\
"(--accuracy)-a[Select the level of accuracy (1-3)]:Accuracy'\`?:(1 2 3)" \\
"(-a)--accuracy=[Select the level of accuracy (1-3)]:Accuracy'\`?:(1 2 3)" \\
'(--another-param)-P[another-param]:another-param:_files' \\
'(-P)--another-param=[another-param]:another-param:_files' \\
'(--color --gray -g)-c[Color on]' \\
'(--gray -c -g)--color[Color on]' \\
'(--color --gray -c)-g[Turn on gray-scale output]' \\
'(--color -c -g)--gray[Turn on gray-scale output]' \\
'--help[Display this help and exit.]' \\
'--optimization=[Optimization level]:Optimization?:(1 2 3 4 5)' \\
'(--other-option)-o[other-option]' \\
'(-o)--other-option[other-option]' \\
'(--other-param)-p[other-param]:other-param:_files' \\
'(-p)--other-param=[other-param]:other-param:_files' \\
'--some-option[some-option]' \\
'--some-param=[some-param]:some-param:_files' \\
'*-v[Verbose logging (may be specified more than once)]' \\
'*--verbose[Verbose logging (may be specified more than once)]' \\
'--version[Display Twisted version and exit.]' \\
&& return 0
local _zsh_subcmds_array
_zsh_subcmds_array=(
"sub1:Sub Command 1"
"sub2:Sub Command 2"
)
_describe "sub-command" _zsh_subcmds_array
"""

View file

@ -0,0 +1,151 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure
try:
import syslog as stdsyslog
except ImportError:
stdsyslog = None
else:
from twisted.python import syslog
class SyslogObserverTests(TestCase):
"""
Tests for L{SyslogObserver} which sends Twisted log events to the syslog.
"""
events = None
if stdsyslog is None:
skip = "syslog is not supported on this platform"
def setUp(self):
self.patch(syslog.SyslogObserver, 'openlog', self.openlog)
self.patch(syslog.SyslogObserver, 'syslog', self.syslog)
self.observer = syslog.SyslogObserver('SyslogObserverTests')
def openlog(self, prefix, options, facility):
self.logOpened = (prefix, options, facility)
self.events = []
def syslog(self, options, message):
self.events.append((options, message))
def test_emitWithoutMessage(self):
"""
L{SyslogObserver.emit} ignores events with an empty value for the
C{'message'} key.
"""
self.observer.emit({'message': (), 'isError': False, 'system': '-'})
self.assertEqual(self.events, [])
def test_emitCustomPriority(self):
"""
L{SyslogObserver.emit} uses the value of the C{'syslogPriority'} as the
syslog priority, if that key is present in the event dictionary.
"""
self.observer.emit({
'message': ('hello, world',), 'isError': False, 'system': '-',
'syslogPriority': stdsyslog.LOG_DEBUG})
self.assertEqual(
self.events,
[(stdsyslog.LOG_DEBUG, '[-] hello, world')])
def test_emitErrorPriority(self):
"""
L{SyslogObserver.emit} uses C{LOG_ALERT} if the event represents an
error.
"""
self.observer.emit({
'message': ('hello, world',), 'isError': True, 'system': '-',
'failure': Failure(Exception("foo"))})
self.assertEqual(
self.events,
[(stdsyslog.LOG_ALERT, '[-] hello, world')])
def test_emitCustomPriorityOverridesError(self):
"""
L{SyslogObserver.emit} uses the value of the C{'syslogPriority'} key if
it is specified even if the event dictionary represents an error.
"""
self.observer.emit({
'message': ('hello, world',), 'isError': True, 'system': '-',
'syslogPriority': stdsyslog.LOG_NOTICE,
'failure': Failure(Exception("bar"))})
self.assertEqual(
self.events,
[(stdsyslog.LOG_NOTICE, '[-] hello, world')])
def test_emitCustomFacility(self):
"""
L{SyslogObserver.emit} uses the value of the C{'syslogPriority'} as the
syslog priority, if that key is present in the event dictionary.
"""
self.observer.emit({
'message': ('hello, world',), 'isError': False, 'system': '-',
'syslogFacility': stdsyslog.LOG_CRON})
self.assertEqual(
self.events,
[(stdsyslog.LOG_INFO | stdsyslog.LOG_CRON, '[-] hello, world')])
def test_emitCustomSystem(self):
"""
L{SyslogObserver.emit} uses the value of the C{'system'} key to prefix
the logged message.
"""
self.observer.emit({'message': ('hello, world',), 'isError': False,
'system': 'nonDefaultSystem'})
self.assertEqual(
self.events,
[(stdsyslog.LOG_INFO, "[nonDefaultSystem] hello, world")])
def test_emitMessage(self):
"""
L{SyslogObserver.emit} logs the value of the C{'message'} key of the
event dictionary it is passed to the syslog.
"""
self.observer.emit({
'message': ('hello, world',), 'isError': False,
'system': '-'})
self.assertEqual(
self.events,
[(stdsyslog.LOG_INFO, "[-] hello, world")])
def test_emitMultilineMessage(self):
"""
Each line of a multiline message is emitted separately to the syslog.
"""
self.observer.emit({
'message': ('hello,\nworld',), 'isError': False,
'system': '-'})
self.assertEqual(
self.events,
[(stdsyslog.LOG_INFO, '[-] hello,'),
(stdsyslog.LOG_INFO, '[-] \tworld')])
def test_emitStripsTrailingEmptyLines(self):
"""
Trailing empty lines of a multiline message are omitted from the
messages sent to the syslog.
"""
self.observer.emit({
'message': ('hello,\nworld\n\n',), 'isError': False,
'system': '-'})
self.assertEqual(
self.events,
[(stdsyslog.LOG_INFO, '[-] hello,'),
(stdsyslog.LOG_INFO, '[-] \tworld')])

View file

@ -0,0 +1,173 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.systemd}.
"""
import os
from twisted.trial.unittest import TestCase
from twisted.python.systemd import ListenFDs
class InheritedDescriptorsMixin(object):
"""
Mixin for a L{TestCase} subclass which defines test methods for some kind of
systemd sd-daemon class. In particular, it defines tests for a
C{inheritedDescriptors} method.
"""
def test_inheritedDescriptors(self):
"""
C{inheritedDescriptors} returns a list of integers giving the file
descriptors which were inherited from systemd.
"""
sddaemon = self.getDaemon(7, 3)
self.assertEqual([7, 8, 9], sddaemon.inheritedDescriptors())
def test_repeated(self):
"""
Any subsequent calls to C{inheritedDescriptors} return the same list.
"""
sddaemon = self.getDaemon(7, 3)
self.assertEqual(
sddaemon.inheritedDescriptors(),
sddaemon.inheritedDescriptors())
class MemoryOnlyMixin(object):
"""
Mixin for a L{TestCase} subclass which creates creating a fake, in-memory
implementation of C{inheritedDescriptors}. This provides verification that
the fake behaves in a compatible way with the real implementation.
"""
def getDaemon(self, start, count):
"""
Invent C{count} new I{file descriptors} (actually integers, attached to
no real file description), starting at C{start}. Construct and return a
new L{ListenFDs} which will claim those integers represent inherited
file descriptors.
"""
return ListenFDs(range(start, start + count))
class EnvironmentMixin(object):
"""
Mixin for a L{TestCase} subclass which creates a real implementation of
C{inheritedDescriptors} which is based on the environment variables set by
systemd. To facilitate testing, this mixin will also create a fake
environment dictionary and add keys to it to make it look as if some
descriptors have been inherited.
"""
def initializeEnvironment(self, count, pid):
"""
Create a copy of the process environment and add I{LISTEN_FDS} and
I{LISTEN_PID} (the environment variables set by systemd) to it.
"""
result = os.environ.copy()
result['LISTEN_FDS'] = str(count)
result['LISTEN_PID'] = str(pid)
return result
def getDaemon(self, start, count):
"""
Create a new L{ListenFDs} instance, initialized with a fake environment
dictionary which will be set up as systemd would have set it up if
C{count} descriptors were being inherited. The descriptors will also
start at C{start}.
"""
fakeEnvironment = self.initializeEnvironment(count, os.getpid())
return ListenFDs.fromEnvironment(environ=fakeEnvironment, start=start)
class MemoryOnlyTests(MemoryOnlyMixin, InheritedDescriptorsMixin, TestCase):
"""
Apply tests to L{ListenFDs}, explicitly constructed with some fake file
descriptors.
"""
class EnvironmentTests(EnvironmentMixin, InheritedDescriptorsMixin, TestCase):
"""
Apply tests to L{ListenFDs}, constructed based on an environment dictionary.
"""
def test_secondEnvironment(self):
"""
Only a single L{Environment} can extract inherited file descriptors.
"""
fakeEnvironment = self.initializeEnvironment(3, os.getpid())
first = ListenFDs.fromEnvironment(environ=fakeEnvironment)
second = ListenFDs.fromEnvironment(environ=fakeEnvironment)
self.assertEqual(range(3, 6), first.inheritedDescriptors())
self.assertEqual([], second.inheritedDescriptors())
def test_mismatchedPID(self):
"""
If the current process PID does not match the PID in the environment, no
inherited descriptors are reported.
"""
fakeEnvironment = self.initializeEnvironment(3, os.getpid() + 1)
sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
self.assertEqual([], sddaemon.inheritedDescriptors())
def test_missingPIDVariable(self):
"""
If the I{LISTEN_PID} environment variable is not present, no inherited
descriptors are reported.
"""
fakeEnvironment = self.initializeEnvironment(3, os.getpid())
del fakeEnvironment['LISTEN_PID']
sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
self.assertEqual([], sddaemon.inheritedDescriptors())
def test_nonIntegerPIDVariable(self):
"""
If the I{LISTEN_PID} environment variable is set to a string that cannot
be parsed as an integer, no inherited descriptors are reported.
"""
fakeEnvironment = self.initializeEnvironment(3, "hello, world")
sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
self.assertEqual([], sddaemon.inheritedDescriptors())
def test_missingFDSVariable(self):
"""
If the I{LISTEN_FDS} environment variable is not present, no inherited
descriptors are reported.
"""
fakeEnvironment = self.initializeEnvironment(3, os.getpid())
del fakeEnvironment['LISTEN_FDS']
sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
self.assertEqual([], sddaemon.inheritedDescriptors())
def test_nonIntegerFDSVariable(self):
"""
If the I{LISTEN_FDS} environment variable is set to a string that cannot
be parsed as an integer, no inherited descriptors are reported.
"""
fakeEnvironment = self.initializeEnvironment("hello, world", os.getpid())
sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
self.assertEqual([], sddaemon.inheritedDescriptors())
def test_defaultEnviron(self):
"""
If the process environment is not explicitly passed to
L{Environment.__init__}, the real process environment dictionary is
used.
"""
self.patch(os, 'environ', {
'LISTEN_PID': str(os.getpid()),
'LISTEN_FDS': '5'})
sddaemon = ListenFDs.fromEnvironment()
self.assertEqual(range(3, 3 + 5), sddaemon.inheritedDescriptors())

View file

@ -0,0 +1,27 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.textattributes}.
"""
from twisted.trial import unittest
from twisted.python._textattributes import DefaultFormattingState
class DefaultFormattingStateTests(unittest.TestCase):
"""
Tests for L{twisted.python._textattributes.DefaultFormattingState}.
"""
def test_equality(self):
"""
L{DefaultFormattingState}s are always equal to other
L{DefaultFormattingState}s.
"""
self.assertEqual(
DefaultFormattingState(),
DefaultFormattingState())
self.assertNotEquals(
DefaultFormattingState(),
'hello')

View file

@ -0,0 +1,44 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.urlpath}.
"""
from twisted.trial import unittest
from twisted.python import urlpath
class URLPathTestCase(unittest.TestCase):
def setUp(self):
self.path = urlpath.URLPath.fromString("http://example.com/foo/bar?yes=no&no=yes#footer")
def testStringConversion(self):
self.assertEqual(str(self.path), "http://example.com/foo/bar?yes=no&no=yes#footer")
def testChildString(self):
self.assertEqual(str(self.path.child('hello')), "http://example.com/foo/bar/hello")
self.assertEqual(str(self.path.child('hello').child('')), "http://example.com/foo/bar/hello/")
def testSiblingString(self):
self.assertEqual(str(self.path.sibling('baz')), 'http://example.com/foo/baz')
# The sibling of http://example.com/foo/bar/
# is http://example.comf/foo/bar/baz
# because really we are constructing a sibling of
# http://example.com/foo/bar/index.html
self.assertEqual(str(self.path.child('').sibling('baz')), 'http://example.com/foo/bar/baz')
def testParentString(self):
# parent should be equivalent to '..'
# 'foo' is the current directory, '/' is the parent directory
self.assertEqual(str(self.path.parent()), 'http://example.com/')
self.assertEqual(str(self.path.child('').parent()), 'http://example.com/foo/')
self.assertEqual(str(self.path.child('baz').parent()), 'http://example.com/foo/')
self.assertEqual(str(self.path.parent().parent().parent().parent().parent()), 'http://example.com/')
def testHereString(self):
# here should be equivalent to '.'
self.assertEqual(str(self.path.here()), 'http://example.com/foo/')
self.assertEqual(str(self.path.child('').here()), 'http://example.com/foo/bar/')

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,361 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.versions}.
"""
from __future__ import division, absolute_import
import sys
import operator
from io import BytesIO
from twisted.python.versions import getVersionString, IncomparableVersions
from twisted.python.versions import Version, _inf
from twisted.python.filepath import FilePath
from twisted.trial.unittest import SynchronousTestCase as TestCase
VERSION_4_ENTRIES = b"""\
<?xml version="1.0" encoding="utf-8"?>
<wc-entries
xmlns="svn:">
<entry
committed-rev="18210"
name=""
committed-date="2006-09-21T04:43:09.542953Z"
url="svn+ssh://svn.twistedmatrix.com/svn/Twisted/trunk/twisted"
last-author="exarkun"
kind="dir"
uuid="bbbe8e31-12d6-0310-92fd-ac37d47ddeeb"
repos="svn+ssh://svn.twistedmatrix.com/svn/Twisted"
revision="18211"/>
</wc-entries>
"""
VERSION_8_ENTRIES = b"""\
8
dir
22715
svn+ssh://svn.twistedmatrix.com/svn/Twisted/trunk
"""
VERSION_9_ENTRIES = b"""\
9
dir
22715
svn+ssh://svn.twistedmatrix.com/svn/Twisted/trunk
"""
VERSION_10_ENTRIES = b"""\
10
dir
22715
svn+ssh://svn.twistedmatrix.com/svn/Twisted/trunk
"""
class VersionsTest(TestCase):
def test_versionComparison(self):
"""
Versions can be compared for equality and order.
"""
va = Version("dummy", 1, 0, 0)
vb = Version("dummy", 0, 1, 0)
self.assertTrue(va > vb)
self.assertTrue(vb < va)
self.assertTrue(va >= vb)
self.assertTrue(vb <= va)
self.assertTrue(va != vb)
self.assertTrue(vb == Version("dummy", 0, 1, 0))
self.assertTrue(vb == vb)
def test_comparingPrereleasesWithReleases(self):
"""
Prereleases are always less than versions without prereleases.
"""
va = Version("whatever", 1, 0, 0, prerelease=1)
vb = Version("whatever", 1, 0, 0)
self.assertTrue(va < vb)
self.assertFalse(va > vb)
self.assertNotEquals(vb, va)
def test_comparingPrereleases(self):
"""
The value specified as the prerelease is used in version comparisons.
"""
va = Version("whatever", 1, 0, 0, prerelease=1)
vb = Version("whatever", 1, 0, 0, prerelease=2)
self.assertTrue(va < vb)
self.assertTrue(vb > va)
self.assertTrue(va <= vb)
self.assertTrue(vb >= va)
self.assertTrue(va != vb)
self.assertTrue(vb == Version("whatever", 1, 0, 0, prerelease=2))
self.assertTrue(va == va)
def test_infComparison(self):
"""
L{_inf} is equal to L{_inf}.
This is a regression test.
"""
self.assertEqual(_inf, _inf)
def test_disallowBuggyComparisons(self):
"""
The package names of the Version objects need to be the same,
"""
self.assertRaises(IncomparableVersions,
operator.eq,
Version("dummy", 1, 0, 0),
Version("dumym", 1, 0, 0))
def test_notImplementedComparisons(self):
"""
Comparing a L{Version} to some other object type results in
C{NotImplemented}.
"""
va = Version("dummy", 1, 0, 0)
vb = ("dummy", 1, 0, 0) # a tuple is not a Version object
self.assertEqual(va.__cmp__(vb), NotImplemented)
def test_repr(self):
"""
Calling C{repr} on a version returns a human-readable string
representation of the version.
"""
self.assertEqual(repr(Version("dummy", 1, 2, 3)),
"Version('dummy', 1, 2, 3)")
def test_reprWithPrerelease(self):
"""
Calling C{repr} on a version with a prerelease returns a human-readable
string representation of the version including the prerelease.
"""
self.assertEqual(repr(Version("dummy", 1, 2, 3, prerelease=4)),
"Version('dummy', 1, 2, 3, prerelease=4)")
def test_str(self):
"""
Calling C{str} on a version returns a human-readable string
representation of the version.
"""
self.assertEqual(str(Version("dummy", 1, 2, 3)),
"[dummy, version 1.2.3]")
def test_strWithPrerelease(self):
"""
Calling C{str} on a version with a prerelease includes the prerelease.
"""
self.assertEqual(str(Version("dummy", 1, 0, 0, prerelease=1)),
"[dummy, version 1.0.0pre1]")
def testShort(self):
self.assertEqual(Version('dummy', 1, 2, 3).short(), '1.2.3')
def test_goodSVNEntries_4(self):
"""
Version should be able to parse an SVN format 4 entries file.
"""
version = Version("dummy", 1, 0, 0)
self.assertEqual(
version._parseSVNEntries_4(BytesIO(VERSION_4_ENTRIES)), b'18211')
def test_goodSVNEntries_8(self):
"""
Version should be able to parse an SVN format 8 entries file.
"""
version = Version("dummy", 1, 0, 0)
self.assertEqual(
version._parseSVNEntries_8(BytesIO(VERSION_8_ENTRIES)), b'22715')
def test_goodSVNEntries_9(self):
"""
Version should be able to parse an SVN format 9 entries file.
"""
version = Version("dummy", 1, 0, 0)
self.assertEqual(
version._parseSVNEntries_9(BytesIO(VERSION_9_ENTRIES)), b'22715')
def test_goodSVNEntriesTenPlus(self):
"""
Version should be able to parse an SVN format 10 entries file.
"""
version = Version("dummy", 1, 0, 0)
self.assertEqual(
version._parseSVNEntriesTenPlus(BytesIO(VERSION_10_ENTRIES)), b'22715')
def test_getVersionString(self):
"""
L{getVersionString} returns a string with the package name and the
short version number.
"""
self.assertEqual(
'Twisted 8.0.0', getVersionString(Version('Twisted', 8, 0, 0)))
def test_getVersionStringWithPrerelease(self):
"""
L{getVersionString} includes the prerelease, if any.
"""
self.assertEqual(
getVersionString(Version("whatever", 8, 0, 0, prerelease=1)),
"whatever 8.0.0pre1")
def test_base(self):
"""
The L{base} method returns a very simple representation of the version.
"""
self.assertEqual(Version("foo", 1, 0, 0).base(), "1.0.0")
def test_baseWithPrerelease(self):
"""
The base version includes 'preX' for versions with prereleases.
"""
self.assertEqual(Version("foo", 1, 0, 0, prerelease=8).base(),
"1.0.0pre8")
class FormatDiscoveryTests(TestCase):
"""
Tests which discover the parsing method based on the imported module name.
"""
def mktemp(self):
return TestCase.mktemp(self).encode("utf-8")
def setUp(self):
"""
Create a temporary directory with a package structure in it.
"""
self.entry = FilePath(self.mktemp())
self.preTestModules = sys.modules.copy()
sys.path.append(self.entry.path.decode('utf-8'))
pkg = self.entry.child(b"twisted_python_versions_package")
pkg.makedirs()
pkg.child(b"__init__.py").setContent(
b"from twisted.python.versions import Version\n"
b"version = Version('twisted_python_versions_package', 1, 0, 0)\n")
self.svnEntries = pkg.child(b".svn")
self.svnEntries.makedirs()
def tearDown(self):
"""
Remove the imported modules and sys.path modifications.
"""
sys.modules.clear()
sys.modules.update(self.preTestModules)
sys.path.remove(self.entry.path.decode('utf-8'))
def checkSVNFormat(self, formatVersion, entriesText, expectedRevision):
"""
Check for the given revision being detected after setting the SVN
entries text and format version of the test directory structure.
"""
self.svnEntries.child(b"format").setContent(formatVersion + b"\n")
self.svnEntries.child(b"entries").setContent(entriesText)
self.assertEqual(self.getVersion()._getSVNVersion(), expectedRevision)
def getVersion(self):
"""
Import and retrieve the Version object from our dynamically created
package.
"""
import twisted_python_versions_package
return twisted_python_versions_package.version
def test_detectVersion4(self):
"""
Verify that version 4 format file will be properly detected and parsed.
"""
self.checkSVNFormat(b"4", VERSION_4_ENTRIES, b'18211')
def test_detectVersion8(self):
"""
Verify that version 8 format files will be properly detected and
parsed.
"""
self.checkSVNFormat(b"8", VERSION_8_ENTRIES, b'22715')
def test_detectVersion9(self):
"""
Verify that version 9 format files will be properly detected and
parsed.
"""
self.checkSVNFormat(b"9", VERSION_9_ENTRIES, b'22715')
def test_unparseableEntries(self):
"""
Verify that the result is C{b"Unknown"} for an apparently supported
version for which parsing of the entries file fails.
"""
self.checkSVNFormat(b"4", b"some unsupported stuff", b"Unknown")
def test_detectVersion10(self):
"""
Verify that version 10 format files will be properly detected and
parsed.
Differing from previous formats, the version 10 format lacks a
I{format} file and B{only} has the version information on the first
line of the I{entries} file.
"""
self.svnEntries.child(b"entries").setContent(VERSION_10_ENTRIES)
self.assertEqual(self.getVersion()._getSVNVersion(), b'22715')
def test_detectUnknownVersion(self):
"""
Verify that a new version of SVN will result in the revision 'Unknown'.
"""
self.checkSVNFormat(b"some-random-new-version", b"ooga booga!", b'Unknown')
def test_getVersionStringWithRevision(self):
"""
L{getVersionString} includes the discovered revision number.
"""
self.svnEntries.child(b"format").setContent(b"9\n")
self.svnEntries.child(b"entries").setContent(VERSION_10_ENTRIES)
version = getVersionString(self.getVersion())
self.assertEqual(
"twisted_python_versions_package 1.0.0+r22715",
version)
self.assertTrue(isinstance(version, type("")))

View file

@ -0,0 +1,70 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.win32}.
"""
from twisted.trial import unittest
from twisted.python.runtime import platform
from twisted.python import win32
class CommandLineQuotingTests(unittest.TestCase):
"""
Tests for L{cmdLineQuote}.
"""
def test_argWithoutSpaces(self):
"""
Calling C{cmdLineQuote} with an argument with no spaces should
return the argument unchanged.
"""
self.assertEqual(win32.cmdLineQuote('an_argument'), 'an_argument')
def test_argWithSpaces(self):
"""
Calling C{cmdLineQuote} with an argument containing spaces should
return the argument surrounded by quotes.
"""
self.assertEqual(win32.cmdLineQuote('An Argument'), '"An Argument"')
def test_emptyStringArg(self):
"""
Calling C{cmdLineQuote} with an empty string should return a
quoted empty string.
"""
self.assertEqual(win32.cmdLineQuote(''), '""')
class ProgramPathsTests(unittest.TestCase):
"""
Tests for L{getProgramsMenuPath} and L{getProgramFilesPath}.
"""
def test_getProgramsMenuPath(self):
"""
L{getProgramsMenuPath} guesses the programs menu path on non-win32
platforms. On non-win32 it will try to figure out the path by
examining the registry.
"""
if not platform.isWindows():
self.assertEqual(win32.getProgramsMenuPath(),
"C:\\Windows\\Start Menu\\Programs")
else:
self.assertIsInstance(win32.getProgramsMenuPath(), str)
def test_getProgramFilesPath(self):
"""
L{getProgramFilesPath} returns the "program files" path on win32.
"""
self.assertIsInstance(win32.getProgramFilesPath(), str)
if not platform.isWindows():
test_getProgramFilesPath.skip = (
"Cannot figure out the program files path on non-win32 platform")

View file

@ -0,0 +1,101 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Test cases covering L{twisted.python.zippath}.
"""
import os, zipfile
from twisted.test.test_paths import AbstractFilePathTestCase
from twisted.python.zippath import ZipArchive
def zipit(dirname, zfname):
"""
Create a zipfile on zfname, containing the contents of dirname'
"""
zf = zipfile.ZipFile(zfname, "w")
for root, ignored, files, in os.walk(dirname):
for fname in files:
fspath = os.path.join(root, fname)
arcpath = os.path.join(root, fname)[len(dirname)+1:]
# print fspath, '=>', arcpath
zf.write(fspath, arcpath)
zf.close()
class ZipFilePathTestCase(AbstractFilePathTestCase):
"""
Test various L{ZipPath} path manipulations as well as reprs for L{ZipPath}
and L{ZipArchive}.
"""
def setUp(self):
AbstractFilePathTestCase.setUp(self)
zipit(self.cmn, self.cmn + '.zip')
self.path = ZipArchive(self.cmn + '.zip')
self.root = self.path
self.all = [x.replace(self.cmn, self.cmn + '.zip') for x in self.all]
def test_zipPathRepr(self):
"""
Make sure that invoking ZipPath's repr prints the correct class name
and an absolute path to the zip file.
"""
child = self.path.child("foo")
pathRepr = "ZipPath(%r)" % (
os.path.abspath(self.cmn + ".zip" + os.sep + 'foo'),)
# Check for an absolute path
self.assertEqual(repr(child), pathRepr)
# Create a path to the file rooted in the current working directory
relativeCommon = self.cmn.replace(os.getcwd() + os.sep, "", 1) + ".zip"
relpath = ZipArchive(relativeCommon)
child = relpath.child("foo")
# Check using a path without the cwd prepended
self.assertEqual(repr(child), pathRepr)
def test_zipPathReprParentDirSegment(self):
"""
The repr of a ZipPath with C{".."} in the internal part of its path
includes the C{".."} rather than applying the usual parent directory
meaning.
"""
child = self.path.child("foo").child("..").child("bar")
pathRepr = "ZipPath(%r)" % (
self.cmn + ".zip" + os.sep.join(["", "foo", "..", "bar"]))
self.assertEqual(repr(child), pathRepr)
def test_zipPathReprEscaping(self):
"""
Bytes in the ZipPath path which have special meaning in Python
string literals are escaped in the ZipPath repr.
"""
child = self.path.child("'")
path = self.cmn + ".zip" + os.sep.join(["", "'"])
pathRepr = "ZipPath('%s')" % (path.encode('string-escape'),)
self.assertEqual(repr(child), pathRepr)
def test_zipArchiveRepr(self):
"""
Make sure that invoking ZipArchive's repr prints the correct class
name and an absolute path to the zip file.
"""
pathRepr = 'ZipArchive(%r)' % (os.path.abspath(self.cmn + '.zip'),)
# Check for an absolute path
self.assertEqual(repr(self.path), pathRepr)
# Create a path to the file rooted in the current working directory
relativeCommon = self.cmn.replace(os.getcwd() + os.sep, "", 1) + ".zip"
relpath = ZipArchive(relativeCommon)
# Check using a path without the cwd prepended
self.assertEqual(repr(relpath), pathRepr)

View file

@ -0,0 +1,355 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.zipstream}
"""
import random
import zipfile
from hashlib import md5
from twisted.python import zipstream, filepath
from twisted.trial import unittest
class FileEntryMixin:
"""
File entry classes should behave as file-like objects
"""
def getFileEntry(self, contents):
"""
Return an appropriate zip file entry
"""
filename = self.mktemp()
z = zipfile.ZipFile(filename, 'w', self.compression)
z.writestr('content', contents)
z.close()
z = zipstream.ChunkingZipFile(filename, 'r')
return z.readfile('content')
def test_isatty(self):
"""
zip files should not be ttys, so isatty() should be false
"""
self.assertEqual(self.getFileEntry('').isatty(), False)
def test_closed(self):
"""
The C{closed} attribute should reflect whether C{close()} has been
called.
"""
fileEntry = self.getFileEntry('')
self.assertEqual(fileEntry.closed, False)
fileEntry.close()
self.assertEqual(fileEntry.closed, True)
def test_readline(self):
"""
C{readline()} should mirror L{file.readline} and return up to a single
deliminter.
"""
fileEntry = self.getFileEntry('hoho\nho')
self.assertEqual(fileEntry.readline(), 'hoho\n')
self.assertEqual(fileEntry.readline(), 'ho')
self.assertEqual(fileEntry.readline(), '')
def test_next(self):
"""
Zip file entries should implement the iterator protocol as files do.
"""
fileEntry = self.getFileEntry('ho\nhoho')
self.assertEqual(fileEntry.next(), 'ho\n')
self.assertEqual(fileEntry.next(), 'hoho')
self.assertRaises(StopIteration, fileEntry.next)
def test_readlines(self):
"""
C{readlines()} should return a list of all the lines.
"""
fileEntry = self.getFileEntry('ho\nho\nho')
self.assertEqual(fileEntry.readlines(), ['ho\n', 'ho\n', 'ho'])
def test_iteration(self):
"""
C{__iter__()} and C{xreadlines()} should return C{self}.
"""
fileEntry = self.getFileEntry('')
self.assertIdentical(iter(fileEntry), fileEntry)
self.assertIdentical(fileEntry.xreadlines(), fileEntry)
def test_readWhole(self):
"""
C{.read()} should read the entire file.
"""
contents = "Hello, world!"
entry = self.getFileEntry(contents)
self.assertEqual(entry.read(), contents)
def test_readPartial(self):
"""
C{.read(num)} should read num bytes from the file.
"""
contents = "0123456789"
entry = self.getFileEntry(contents)
one = entry.read(4)
two = entry.read(200)
self.assertEqual(one, "0123")
self.assertEqual(two, "456789")
def test_tell(self):
"""
C{.tell()} should return the number of bytes that have been read so
far.
"""
contents = "x" * 100
entry = self.getFileEntry(contents)
entry.read(2)
self.assertEqual(entry.tell(), 2)
entry.read(4)
self.assertEqual(entry.tell(), 6)
class DeflatedZipFileEntryTest(FileEntryMixin, unittest.TestCase):
"""
DeflatedZipFileEntry should be file-like
"""
compression = zipfile.ZIP_DEFLATED
class ZipFileEntryTest(FileEntryMixin, unittest.TestCase):
"""
ZipFileEntry should be file-like
"""
compression = zipfile.ZIP_STORED
class ZipstreamTest(unittest.TestCase):
"""
Tests for twisted.python.zipstream
"""
def setUp(self):
"""
Creates junk data that can be compressed and a test directory for any
files that will be created
"""
self.testdir = filepath.FilePath(self.mktemp())
self.testdir.makedirs()
self.unzipdir = self.testdir.child('unzipped')
self.unzipdir.makedirs()
def makeZipFile(self, contents, directory=''):
"""
Makes a zip file archive containing len(contents) files. Contents
should be a list of strings, each string being the content of one file.
"""
zpfilename = self.testdir.child('zipfile.zip').path
zpfile = zipfile.ZipFile(zpfilename, 'w')
for i, content in enumerate(contents):
filename = str(i)
if directory:
filename = directory + "/" + filename
zpfile.writestr(filename, content)
zpfile.close()
return zpfilename
def test_invalidMode(self):
"""
A ChunkingZipFile opened in write-mode should not allow .readfile(),
and raise a RuntimeError instead.
"""
czf = zipstream.ChunkingZipFile(self.mktemp(), "w")
self.assertRaises(RuntimeError, czf.readfile, "something")
def test_closedArchive(self):
"""
A closed ChunkingZipFile should raise a L{RuntimeError} when
.readfile() is invoked.
"""
czf = zipstream.ChunkingZipFile(self.makeZipFile(["something"]), "r")
czf.close()
self.assertRaises(RuntimeError, czf.readfile, "something")
def test_invalidHeader(self):
"""
A zipfile entry with the wrong magic number should raise BadZipfile for
readfile(), but that should not affect other files in the archive.
"""
fn = self.makeZipFile(["test contents",
"more contents"])
zf = zipfile.ZipFile(fn, "r")
zeroOffset = zf.getinfo("0").header_offset
zf.close()
# Zero out just the one header.
scribble = file(fn, "r+b")
scribble.seek(zeroOffset, 0)
scribble.write(chr(0) * 4)
scribble.close()
czf = zipstream.ChunkingZipFile(fn)
self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
self.assertEqual(czf.readfile("1").read(), "more contents")
def test_filenameMismatch(self):
"""
A zipfile entry with a different filename than is found in the central
directory should raise BadZipfile.
"""
fn = self.makeZipFile(["test contents",
"more contents"])
zf = zipfile.ZipFile(fn, "r")
info = zf.getinfo("0")
info.filename = "not zero"
zf.close()
scribble = file(fn, "r+b")
scribble.seek(info.header_offset, 0)
scribble.write(info.FileHeader())
scribble.close()
czf = zipstream.ChunkingZipFile(fn)
self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
self.assertEqual(czf.readfile("1").read(), "more contents")
def test_unsupportedCompression(self):
"""
A zipfile which describes an unsupported compression mechanism should
raise BadZipfile.
"""
fn = self.mktemp()
zf = zipfile.ZipFile(fn, "w")
zi = zipfile.ZipInfo("0")
zf.writestr(zi, "some data")
# Mangle its compression type in the central directory; can't do this
# before the writestr call or zipfile will (correctly) tell us not to
# pass bad compression types :)
zi.compress_type = 1234
zf.close()
czf = zipstream.ChunkingZipFile(fn)
self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
def test_extraData(self):
"""
readfile() should skip over 'extra' data present in the zip metadata.
"""
fn = self.mktemp()
zf = zipfile.ZipFile(fn, 'w')
zi = zipfile.ZipInfo("0")
zi.extra = "hello, extra"
zf.writestr(zi, "the real data")
zf.close()
czf = zipstream.ChunkingZipFile(fn)
self.assertEqual(czf.readfile("0").read(), "the real data")
def test_unzipIterChunky(self):
"""
L{twisted.python.zipstream.unzipIterChunky} returns an iterator which
must be exhausted to completely unzip the input archive.
"""
numfiles = 10
contents = ['This is test file %d!' % i for i in range(numfiles)]
zpfilename = self.makeZipFile(contents)
list(zipstream.unzipIterChunky(zpfilename, self.unzipdir.path))
self.assertEqual(
set(self.unzipdir.listdir()),
set(map(str, range(numfiles))))
for child in self.unzipdir.children():
num = int(child.basename())
self.assertEqual(child.getContent(), contents[num])
def test_unzipIterChunkyDirectory(self):
"""
The path to which a file is extracted by L{zipstream.unzipIterChunky}
is determined by joining the C{directory} argument to C{unzip} with the
path within the archive of the file being extracted.
"""
numfiles = 10
contents = ['This is test file %d!' % i for i in range(numfiles)]
zpfilename = self.makeZipFile(contents, 'foo')
list(zipstream.unzipIterChunky(zpfilename, self.unzipdir.path))
self.assertEqual(
set(self.unzipdir.child('foo').listdir()),
set(map(str, range(numfiles))))
for child in self.unzipdir.child('foo').children():
num = int(child.basename())
self.assertEqual(child.getContent(), contents[num])
# XXX these tests are kind of gross and old, but I think unzipIterChunky is
# kind of a gross function anyway. We should really write an abstract
# copyTo/moveTo that operates on FilePath and make sure ZipPath can support
# it, then just deprecate / remove this stuff.
def _unzipIterChunkyTest(self, compression, chunksize, lower, upper):
"""
unzipIterChunky should unzip the given number of bytes per iteration.
"""
junk = ' '.join([str(random.random()) for n in xrange(1000)])
junkmd5 = md5(junk).hexdigest()
tempdir = filepath.FilePath(self.mktemp())
tempdir.makedirs()
zfpath = tempdir.child('bigfile.zip').path
self._makebigfile(zfpath, compression, junk)
uziter = zipstream.unzipIterChunky(zfpath, tempdir.path,
chunksize=chunksize)
r = uziter.next()
# test that the number of chunks is in the right ballpark;
# this could theoretically be any number but statistically it
# should always be in this range
approx = lower < r < upper
self.failUnless(approx)
for r in uziter:
pass
self.assertEqual(r, 0)
newmd5 = md5(
tempdir.child("zipstreamjunk").open().read()).hexdigest()
self.assertEqual(newmd5, junkmd5)
def test_unzipIterChunkyStored(self):
"""
unzipIterChunky should unzip the given number of bytes per iteration on
a stored archive.
"""
self._unzipIterChunkyTest(zipfile.ZIP_STORED, 500, 35, 45)
def test_chunkyDeflated(self):
"""
unzipIterChunky should unzip the given number of bytes per iteration on
a deflated archive.
"""
self._unzipIterChunkyTest(zipfile.ZIP_DEFLATED, 972, 23, 27)
def _makebigfile(self, filename, compression, junk):
"""
Create a zip file with the given file name and compression scheme.
"""
zf = zipfile.ZipFile(filename, 'w', compression)
for i in range(10):
fn = 'zipstream%d' % i
zf.writestr(fn, "")
zf.writestr('zipstreamjunk', junk)
zf.close()