add Linux_i686

This commit is contained in:
j 2014-05-17 18:11:40 +00:00 committed by Ubuntu
commit 95cd9b11f2
1644 changed files with 564260 additions and 0 deletions

View file

@ -0,0 +1,27 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Subpackage containing the modules that implement the command line tools.
Note that these are imported by top-level scripts which are intended to be
invoked directly from a shell.
"""
from twisted.python.versions import Version
from twisted.python.deprecate import deprecatedModuleAttribute
deprecatedModuleAttribute(
Version("Twisted", 11, 1, 0),
"Seek unzipping software outside of Twisted.",
__name__,
"tkunzip")
deprecatedModuleAttribute(
Version("Twisted", 12, 1, 0),
"tapconvert has been deprecated.",
__name__,
"tapconvert")
del Version, deprecatedModuleAttribute

View file

@ -0,0 +1,390 @@
# -*- test-case-name: twisted.test.test_twistd -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
import os, errno, sys
from twisted.python import log, syslog, logfile, usage
from twisted.python.util import (
switchUID, uidFromString, gidFromString, untilConcludes)
from twisted.application import app, service
from twisted.internet.interfaces import IReactorDaemonize
from twisted import copyright
def _umask(value):
return int(value, 8)
class ServerOptions(app.ServerOptions):
synopsis = "Usage: twistd [options]"
optFlags = [['nodaemon', 'n', "don't daemonize, don't use default umask of 0077"],
['originalname', None, "Don't try to change the process name"],
['syslog', None, "Log to syslog, not to file"],
['euid', '',
"Set only effective user-id rather than real user-id. "
"(This option has no effect unless the server is running as "
"root, in which case it means not to shed all privileges "
"after binding ports, retaining the option to regain "
"privileges in cases such as spawning processes. "
"Use with caution.)"],
]
optParameters = [
['prefix', None,'twisted',
"use the given prefix when syslogging"],
['pidfile','','twistd.pid',
"Name of the pidfile"],
['chroot', None, None,
'Chroot to a supplied directory before running'],
['uid', 'u', None, "The uid to run as.", uidFromString],
['gid', 'g', None, "The gid to run as.", gidFromString],
['umask', None, None,
"The (octal) file creation mask to apply.", _umask],
]
compData = usage.Completions(
optActions={"pidfile": usage.CompleteFiles("*.pid"),
"chroot": usage.CompleteDirs(descr="chroot directory"),
"gid": usage.CompleteGroups(descr="gid to run as"),
"uid": usage.CompleteUsernames(descr="uid to run as"),
"prefix": usage.Completer(descr="syslog prefix"),
},
)
def opt_version(self):
"""Print version information and exit.
"""
print 'twistd (the Twisted daemon) %s' % copyright.version
print copyright.copyright
sys.exit()
def postOptions(self):
app.ServerOptions.postOptions(self)
if self['pidfile']:
self['pidfile'] = os.path.abspath(self['pidfile'])
def checkPID(pidfile):
if not pidfile:
return
if os.path.exists(pidfile):
try:
pid = int(open(pidfile).read())
except ValueError:
sys.exit('Pidfile %s contains non-numeric value' % pidfile)
try:
os.kill(pid, 0)
except OSError, why:
if why[0] == errno.ESRCH:
# The pid doesnt exists.
log.msg('Removing stale pidfile %s' % pidfile, isError=True)
os.remove(pidfile)
else:
sys.exit("Can't check status of PID %s from pidfile %s: %s" %
(pid, pidfile, why[1]))
else:
sys.exit("""\
Another twistd server is running, PID %s\n
This could either be a previously started instance of your application or a
different application entirely. To start a new one, either run it in some other
directory, or use the --pidfile and --logfile parameters to avoid clashes.
""" % pid)
class UnixAppLogger(app.AppLogger):
"""
A logger able to log to syslog, to files, and to stdout.
@ivar _syslog: A flag indicating whether to use syslog instead of file
logging.
@type _syslog: C{bool}
@ivar _syslogPrefix: If C{sysLog} is C{True}, the string prefix to use for
syslog messages.
@type _syslogPrefix: C{str}
@ivar _nodaemon: A flag indicating the process will not be daemonizing.
@type _nodaemon: C{bool}
"""
def __init__(self, options):
app.AppLogger.__init__(self, options)
self._syslog = options.get("syslog", False)
self._syslogPrefix = options.get("prefix", "")
self._nodaemon = options.get("nodaemon", False)
def _getLogObserver(self):
"""
Create and return a suitable log observer for the given configuration.
The observer will go to syslog using the prefix C{_syslogPrefix} if
C{_syslog} is true. Otherwise, it will go to the file named
C{_logfilename} or, if C{_nodaemon} is true and C{_logfilename} is
C{"-"}, to stdout.
@return: An object suitable to be passed to C{log.addObserver}.
"""
if self._syslog:
return syslog.SyslogObserver(self._syslogPrefix).emit
if self._logfilename == '-':
if not self._nodaemon:
sys.exit('Daemons cannot log to stdout, exiting!')
logFile = sys.stdout
elif self._nodaemon and not self._logfilename:
logFile = sys.stdout
else:
if not self._logfilename:
self._logfilename = 'twistd.log'
logFile = logfile.LogFile.fromFullPath(self._logfilename)
try:
import signal
except ImportError:
pass
else:
# Override if signal is set to None or SIG_DFL (0)
if not signal.getsignal(signal.SIGUSR1):
def rotateLog(signal, frame):
from twisted.internet import reactor
reactor.callFromThread(logFile.rotate)
signal.signal(signal.SIGUSR1, rotateLog)
return log.FileLogObserver(logFile).emit
def launchWithName(name):
if name and name != sys.argv[0]:
exe = os.path.realpath(sys.executable)
log.msg('Changing process name to ' + name)
os.execv(exe, [name, sys.argv[0], '--originalname'] + sys.argv[1:])
class UnixApplicationRunner(app.ApplicationRunner):
"""
An ApplicationRunner which does Unix-specific things, like fork,
shed privileges, and maintain a PID file.
"""
loggerFactory = UnixAppLogger
def preApplication(self):
"""
Do pre-application-creation setup.
"""
checkPID(self.config['pidfile'])
self.config['nodaemon'] = (self.config['nodaemon']
or self.config['debug'])
self.oldstdout = sys.stdout
self.oldstderr = sys.stderr
def postApplication(self):
"""
To be called after the application is created: start the application
and run the reactor. After the reactor stops, clean up PID files and
such.
"""
try:
self.startApplication(self.application)
except Exception as ex:
statusPipe = self.config.get("statusPipe", None)
if statusPipe is not None:
# Limit the total length to the passed string to 100
strippedError = str(ex)[:98]
untilConcludes(os.write, statusPipe, "1 %s" % (strippedError,))
untilConcludes(os.close, statusPipe)
self.removePID(self.config['pidfile'])
raise
else:
statusPipe = self.config.get("statusPipe", None)
if statusPipe is not None:
untilConcludes(os.write, statusPipe, "0")
untilConcludes(os.close, statusPipe)
self.startReactor(None, self.oldstdout, self.oldstderr)
self.removePID(self.config['pidfile'])
def removePID(self, pidfile):
"""
Remove the specified PID file, if possible. Errors are logged, not
raised.
@type pidfile: C{str}
@param pidfile: The path to the PID tracking file.
"""
if not pidfile:
return
try:
os.unlink(pidfile)
except OSError, e:
if e.errno == errno.EACCES or e.errno == errno.EPERM:
log.msg("Warning: No permission to delete pid file")
else:
log.err(e, "Failed to unlink PID file:")
except:
log.err(None, "Failed to unlink PID file:")
def setupEnvironment(self, chroot, rundir, nodaemon, umask, pidfile):
"""
Set the filesystem root, the working directory, and daemonize.
@type chroot: C{str} or L{NoneType}
@param chroot: If not None, a path to use as the filesystem root (using
L{os.chroot}).
@type rundir: C{str}
@param rundir: The path to set as the working directory.
@type nodaemon: C{bool}
@param nodaemon: A flag which, if set, indicates that daemonization
should not be done.
@type umask: C{int} or L{NoneType}
@param umask: The value to which to change the process umask.
@type pidfile: C{str} or L{NoneType}
@param pidfile: If not C{None}, the path to a file into which to put
the PID of this process.
"""
daemon = not nodaemon
if chroot is not None:
os.chroot(chroot)
if rundir == '.':
rundir = '/'
os.chdir(rundir)
if daemon and umask is None:
umask = 077
if umask is not None:
os.umask(umask)
if daemon:
from twisted.internet import reactor
self.config["statusPipe"] = self.daemonize(reactor)
if pidfile:
f = open(pidfile, 'wb')
f.write(str(os.getpid()))
f.close()
def daemonize(self, reactor):
"""
Daemonizes the application on Unix. This is done by the usual double
forking approach.
@see: U{http://code.activestate.com/recipes/278731/}
@see: W. Richard Stevens,
"Advanced Programming in the Unix Environment",
1992, Addison-Wesley, ISBN 0-201-56317-7
@param reactor: The reactor in use. If it provides
L{IReactorDaemonize}, its daemonization-related callbacks will be
invoked.
@return: A writable pipe to be used to report errors.
@rtype: C{int}
"""
# If the reactor requires hooks to be called for daemonization, call
# them. Currently the only reactor which provides/needs that is
# KQueueReactor.
if IReactorDaemonize.providedBy(reactor):
reactor.beforeDaemonize()
r, w = os.pipe()
if os.fork(): # launch child and...
code = self._waitForStart(r)
os.close(r)
os._exit(code) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
null = os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError as e:
if e.errno != errno.EBADF:
raise
os.close(null)
if IReactorDaemonize.providedBy(reactor):
reactor.afterDaemonize()
return w
def _waitForStart(self, readPipe):
"""
Wait for the daemonization success.
@param readPipe: file descriptor to read start information from.
@type readPipe: C{int}
@return: code to be passed to C{os._exit}: 0 for success, 1 for error.
@rtype: C{int}
"""
data = untilConcludes(os.read, readPipe, 100)
if data != "0":
msg = ("An error has occurred: '%s'\nPlease look at log "
"file for more information.\n" % (data[2:],))
untilConcludes(sys.__stderr__.write, msg)
return 1
return 0
def shedPrivileges(self, euid, uid, gid):
"""
Change the UID and GID or the EUID and EGID of this process.
@type euid: C{bool}
@param euid: A flag which, if set, indicates that only the I{effective}
UID and GID should be set.
@type uid: C{int} or C{NoneType}
@param uid: If not C{None}, the UID to which to switch.
@type gid: C{int} or C{NoneType}
@param gid: If not C{None}, the GID to which to switch.
"""
if uid is not None or gid is not None:
extra = euid and 'e' or ''
desc = '%suid/%sgid %s/%s' % (extra, extra, uid, gid)
try:
switchUID(uid, gid, euid)
except OSError:
log.msg('failed to set %s (are you root?) -- exiting.' % desc)
sys.exit(1)
else:
log.msg('set %s' % desc)
def startApplication(self, application):
"""
Configure global process state based on the given application and run
the application.
@param application: An object which can be adapted to
L{service.IProcess} and L{service.IService}.
"""
process = service.IProcess(application)
if not self.config['originalname']:
launchWithName(process.processName)
self.setupEnvironment(
self.config['chroot'], self.config['rundir'],
self.config['nodaemon'], self.config['umask'],
self.config['pidfile'])
service.IService(application).privilegedStartService()
uid, gid = self.config['uid'], self.config['gid']
if uid is None:
uid = process.uid
if gid is None:
gid = process.gid
self.shedPrivileges(self.config['euid'], uid, gid)
app.startApplication(application, not self.config['no_save'])

View file

@ -0,0 +1,50 @@
# -*- test-case-name: twisted.test.test_twistd -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
from twisted.python import log
from twisted.application import app, service, internet
from twisted import copyright
import sys, os
class ServerOptions(app.ServerOptions):
synopsis = "Usage: twistd [options]"
optFlags = [['nodaemon','n', "(for backwards compatability)."],
]
def opt_version(self):
"""Print version information and exit.
"""
print 'twistd (the Twisted Windows runner) %s' % copyright.version
print copyright.copyright
sys.exit()
class WindowsApplicationRunner(app.ApplicationRunner):
"""
An ApplicationRunner which avoids unix-specific things. No
forking, no PID files, no privileges.
"""
def preApplication(self):
"""
Do pre-application-creation setup.
"""
self.oldstdout = sys.stdout
self.oldstderr = sys.stderr
os.chdir(self.config['rundir'])
def postApplication(self):
"""
Start the application and run the reactor.
"""
service.IService(self.application).privilegedStartService()
app.startApplication(self.application, not self.config['no_save'])
app.startApplication(internet.TimerService(0.1, lambda:None), 0)
self.startReactor(None, self.oldstdout, self.oldstderr)
log.msg("Server Shut Down.")

View file

@ -0,0 +1,69 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
#
"""HTML pretty-printing for Python source code."""
__version__ = '$Revision: 1.8 $'[11:-2]
from twisted.python import htmlizer, usage
from twisted import copyright
import os, sys
header = '''<html><head>
<title>%(title)s</title>
<meta name=\"Generator\" content="%(generator)s" />
%(alternate)s
%(stylesheet)s
</head>
<body>
'''
footer = """</body>"""
styleLink = '<link rel="stylesheet" href="%s" type="text/css" />'
alternateLink = '<link rel="alternate" href="%(source)s" type="text/x-python" />'
class Options(usage.Options):
synopsis = """%s [options] source.py
""" % (
os.path.basename(sys.argv[0]),)
optParameters = [
('stylesheet', 's', None, "URL of stylesheet to link to."),
]
compData = usage.Completions(
extraActions=[usage.CompleteFiles('*.py', descr='source python file')]
)
def parseArgs(self, filename):
self['filename'] = filename
def run():
options = Options()
try:
options.parseOptions()
except usage.UsageError, e:
print str(e)
sys.exit(1)
filename = options['filename']
if options.get('stylesheet') is not None:
stylesheet = styleLink % (options['stylesheet'],)
else:
stylesheet = ''
output = open(filename + '.html', 'w')
try:
output.write(header % {
'title': filename,
'generator': 'htmlizer/%s' % (copyright.longversion,),
'alternate': alternateLink % {'source': filename},
'stylesheet': stylesheet
})
htmlizer.filter(open(filename), output,
htmlizer.SmallerHTMLWriter)
output.write(footer)
finally:
output.close()

View file

@ -0,0 +1,69 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Start a L{twisted.manhole} client.
"""
import sys
from twisted.python import usage
def run():
config = MyOptions()
try:
config.parseOptions()
except usage.UsageError, e:
print str(e)
print str(config)
sys.exit(1)
run_gtk2(config)
from twisted.internet import reactor
reactor.run()
def run_gtk2(config):
# Put these off until after we parse options, so we know what reactor
# to load.
from twisted.internet import gtk2reactor
gtk2reactor.install()
# Put this off until after we parse options, or else gnome eats them.
sys.argv[:] = ['manhole']
from twisted.manhole.ui import gtk2manhole
o = config.opts
defaults = {
'host': o['host'],
'port': o['port'],
'identityName': o['user'],
'password': o['password'],
'serviceName': o['service'],
'perspectiveName': o['perspective']
}
w = gtk2manhole.ManholeWindow()
w.setDefaults(defaults)
w.login()
pbportno = 8787
class MyOptions(usage.Options):
optParameters=[("user", "u", "guest", "username"),
("password", "w", "guest"),
("service", "s", "twisted.manhole", "PB Service"),
("host", "h", "localhost"),
("port", "p", str(pbportno)),
("perspective", "P", "",
"PB Perspective to ask for "
"(if different than username)")]
compData = usage.Completions(
optActions={"host": usage.CompleteHostnames(),
"user": usage.CompleteUsernames()}
)
if __name__ == '__main__':
run()

View file

@ -0,0 +1,301 @@
# -*- test-case-name: twisted.scripts.test.test_tap2deb -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
tap2deb creates Debian packages which wrap .tap files.
"""
import os
import sys
import shutil
import subprocess
from email.utils import formatdate as now
from twisted.python import usage
from twisted.python.filepath import FilePath
class MyOptions(usage.Options):
optFlags = [["unsigned", "u"]]
optParameters = [["tapfile", "t", "twistd.tap"],
["maintainer", "m", "",
"The maintainer's name and email in a specific format: "
"'John Doe <johndoe@example.com>'"],
["protocol", "p", ""],
["description", "e", ""],
["long_description", "l", ""],
["set-version", "V", "1.0"],
["debfile", "d", None],
["type", "y", "tap", "Type of configuration: 'tap', 'xml', "
"'source' or 'python' for .tac files"]]
compData = usage.Completions(
optActions={
"type": usage.CompleteList(["tap", "xml", "source", "python"]),
"debfile": usage.CompleteFiles("*.deb")}
)
def postOptions(self):
if not self["maintainer"]:
raise usage.UsageError("maintainer must be specified.")
type_dict = {
'tap': 'file',
'python': 'python',
'source': 'source',
'xml': 'xml',
}
def run(args=None):
"""
Parses the configuration options in C{args} and runs C{dpkg-buildpackage}
to create a .deb file.
@param args: List of strings representing the C{tap2deb} configuration
options.
@type args: L{list}
"""
try:
config = MyOptions()
config.parseOptions(args)
except usage.error as ue:
sys.exit("%s: %s" % (sys.argv[0], ue))
tapFile = config['tapfile']
baseTapFile = os.path.basename(config['tapfile'])
protocol = (config['protocol'] or os.path.splitext(baseTapFile)[0])
debFile = config['debfile'] or 'twisted-' + protocol
version = config['set-version']
maintainer = config['maintainer']
description = config['description'] or (
'A Twisted-based server for %(protocol)s' % vars())
longDescription = config['long_description'] or\
'Automatically created by tap2deb'
twistdOption = type_dict[config['type']]
date = now()
directory = debFile + '-' + version
pythonVersion = '%s.%s' % sys.version_info[:2]
buildDir = FilePath('.build').child(directory)
if buildDir.exists():
buildDir.remove()
debianDir = buildDir.child('debian')
debianDir.child('source').makedirs()
shutil.copy(tapFile, buildDir.path)
debianDir.child('README.Debian').setContent(
'''This package was auto-generated by tap2deb\n''')
debianDir.child('conffiles').setContent(
'''\
/etc/init.d/%(debFile)s
/etc/default/%(debFile)s
/etc/%(baseTapFile)s
''' % vars())
debianDir.child('default').setContent(
'''\
pidfile=/var/run/%(debFile)s.pid
rundir=/var/lib/%(debFile)s/
file=/etc/%(tapFile)s
logfile=/var/log/%(debFile)s.log
''' % vars())
debianDir.child('init.d').setContent(
'''\
#!/bin/sh
PATH=/sbin:/bin:/usr/sbin:/usr/bin
pidfile=/var/run/%(debFile)s.pid \
rundir=/var/lib/%(debFile)s/ \
file=/etc/%(tapFile)s \
logfile=/var/log/%(debFile)s.log
[ -r /etc/default/%(debFile)s ] && . /etc/default/%(debFile)s
test -x /usr/bin/twistd%(pythonVersion)s || exit 0
test -r $file || exit 0
test -r /usr/share/%(debFile)s/package-installed || exit 0
case "$1" in
start)
echo -n "Starting %(debFile)s: twistd"
start-stop-daemon --start --quiet --exec /usr/bin/twistd%(pythonVersion)s -- \
--pidfile=$pidfile \
--rundir=$rundir \
--%(twistdOption)s=$file \
--logfile=$logfile
echo "."
;;
stop)
echo -n "Stopping %(debFile)s: twistd"
start-stop-daemon --stop --quiet \
--pidfile $pidfile
echo "."
;;
restart)
$0 stop
$0 start
;;
force-reload)
$0 restart
;;
*)
echo "Usage: /etc/init.d/%(debFile)s {start|stop|restart|force-reload}" >&2
exit 1
;;
esac
exit 0
''' % vars())
debianDir.child('init.d').chmod(0755)
debianDir.child('postinst').setContent(
'''\
#!/bin/sh
update-rc.d %(debFile)s defaults >/dev/null
invoke-rc.d %(debFile)s start
''' % vars())
debianDir.child('prerm').setContent(
'''\
#!/bin/sh
invoke-rc.d %(debFile)s stop
''' % vars())
debianDir.child('postrm').setContent(
'''\
#!/bin/sh
if [ "$1" = purge ]; then
update-rc.d %(debFile)s remove >/dev/null
fi
''' % vars())
debianDir.child('changelog').setContent(
'''\
%(debFile)s (%(version)s) unstable; urgency=low
* Created by tap2deb
-- %(maintainer)s %(date)s
''' % vars())
debianDir.child('control').setContent(
'''\
Source: %(debFile)s
Section: net
Priority: extra
Maintainer: %(maintainer)s
Build-Depends-Indep: debhelper
Standards-Version: 3.5.6
Package: %(debFile)s
Architecture: all
Depends: python%(pythonVersion)s-twisted
Description: %(description)s
%(longDescription)s
''' % vars())
debianDir.child('copyright').setContent(
'''\
This package was auto-debianized by %(maintainer)s on
%(date)s
It was auto-generated by tap2deb
Upstream Author(s):
Moshe Zadka <moshez@twistedmatrix.com> -- tap2deb author
Copyright:
Insert copyright here.
''' % vars())
debianDir.child('dirs').setContent(
'''\
etc/init.d
etc/default
var/lib/%(debFile)s
usr/share/doc/%(debFile)s
usr/share/%(debFile)s
''' % vars())
debianDir.child('rules').setContent(
'''\
#!/usr/bin/make -f
export DH_COMPAT=1
build: build-stamp
build-stamp:
dh_testdir
touch build-stamp
clean:
dh_testdir
dh_testroot
rm -f build-stamp install-stamp
dh_clean
install: install-stamp
install-stamp: build-stamp
dh_testdir
dh_testroot
dh_clean -k
dh_installdirs
# Add here commands to install the package into debian/tmp.
cp %(baseTapFile)s debian/tmp/etc/
cp debian/init.d debian/tmp/etc/init.d/%(debFile)s
cp debian/default debian/tmp/etc/default/%(debFile)s
cp debian/copyright debian/tmp/usr/share/doc/%(debFile)s/
cp debian/README.Debian debian/tmp/usr/share/doc/%(debFile)s/
touch debian/tmp/usr/share/%(debFile)s/package-installed
touch install-stamp
binary-arch: build install
binary-indep: build install
dh_testdir
dh_testroot
dh_strip
dh_compress
dh_installchangelogs
dh_fixperms
dh_installdeb
dh_shlibdeps
dh_gencontrol
dh_md5sums
dh_builddeb
source diff:
@echo >&2 'source and diff are obsolete - use dpkg-source -b'; false
binary: binary-indep binary-arch
.PHONY: build clean binary-indep binary-arch binary install
''' % vars())
debianDir.child('rules').chmod(0755)
args = ["dpkg-buildpackage", "-rfakeroot"]
if config['unsigned']:
args = args + ['-uc', '-us']
# Build deb
job = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, cwd=buildDir.path)
stdout, _ = job.communicate()

View file

@ -0,0 +1,331 @@
# -*- test-case-name: twisted.scripts.test.test_tap2rpm -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
import sys, os, shutil, time, glob
import subprocess
import tempfile
import tarfile
from StringIO import StringIO
import warnings
from twisted.python import usage, log, versions, deprecate
#################################
# data that goes in /etc/inittab
initFileData = '''\
#!/bin/sh
#
# Startup script for a Twisted service.
#
# chkconfig: - 85 15
# description: Start-up script for the Twisted service "%(tap_file)s".
PATH=/usr/bin:/bin:/usr/sbin:/sbin
pidfile=/var/run/%(rpm_file)s.pid
rundir=/var/lib/twisted-taps/%(rpm_file)s/
file=/etc/twisted-taps/%(tap_file)s
logfile=/var/log/%(rpm_file)s.log
# load init function library
. /etc/init.d/functions
[ -r /etc/default/%(rpm_file)s ] && . /etc/default/%(rpm_file)s
# check for required files
if [ ! -x /usr/bin/twistd ]
then
echo "$0: Aborting, no /usr/bin/twistd found"
exit 0
fi
if [ ! -r "$file" ]
then
echo "$0: Aborting, no file $file found."
exit 0
fi
# set up run directory if necessary
if [ ! -d "${rundir}" ]
then
mkdir -p "${rundir}"
fi
case "$1" in
start)
echo -n "Starting %(rpm_file)s: twistd"
daemon twistd \\
--pidfile=$pidfile \\
--rundir=$rundir \\
--%(twistd_option)s=$file \\
--logfile=$logfile
status %(rpm_file)s
;;
stop)
echo -n "Stopping %(rpm_file)s: twistd"
kill `cat "${pidfile}"`
status %(rpm_file)s
;;
restart)
"${0}" stop
"${0}" start
;;
*)
echo "Usage: ${0} {start|stop|restart|}" >&2
exit 1
;;
esac
exit 0
'''
#######################################
# the data for creating the spec file
specFileData = '''\
Summary: %(description)s
Name: %(rpm_file)s
Version: %(version)s
Release: 1
License: Unknown
Group: Networking/Daemons
Source: %(tarfile_basename)s
BuildRoot: %%{_tmppath}/%%{name}-%%{version}-root
Requires: /usr/bin/twistd
BuildArch: noarch
%%description
%(long_description)s
%%prep
%%setup
%%build
%%install
[ ! -z "$RPM_BUILD_ROOT" -a "$RPM_BUILD_ROOT" != '/' ] \
&& rm -rf "$RPM_BUILD_ROOT"
mkdir -p "$RPM_BUILD_ROOT"/etc/twisted-taps
mkdir -p "$RPM_BUILD_ROOT"/etc/init.d
mkdir -p "$RPM_BUILD_ROOT"/var/lib/twisted-taps
cp "%(tap_file)s" "$RPM_BUILD_ROOT"/etc/twisted-taps/
cp "%(rpm_file)s.init" "$RPM_BUILD_ROOT"/etc/init.d/"%(rpm_file)s"
%%clean
[ ! -z "$RPM_BUILD_ROOT" -a "$RPM_BUILD_ROOT" != '/' ] \
&& rm -rf "$RPM_BUILD_ROOT"
%%post
/sbin/chkconfig --add %(rpm_file)s
/sbin/chkconfig --level 35 %(rpm_file)s
/etc/init.d/%(rpm_file)s start
%%preun
/etc/init.d/%(rpm_file)s stop
/sbin/chkconfig --del %(rpm_file)s
%%files
%%defattr(-,root,root)
%%attr(0755,root,root) /etc/init.d/%(rpm_file)s
%%attr(0660,root,root) /etc/twisted-taps/%(tap_file)s
%%changelog
* %(date)s %(maintainer)s
- Created by tap2rpm: %(rpm_file)s (%(version)s)
'''
###############################
class MyOptions(usage.Options):
optFlags = [['quiet', 'q']]
optParameters = [
["tapfile", "t", "twistd.tap"],
["maintainer", "m", "tap2rpm"],
["protocol", "p", None],
["description", "e", None],
["long_description", "l",
"Automatically created by tap2rpm"],
["set-version", "V", "1.0"],
["rpmfile", "r", None],
["type", "y", "tap", "type of configuration: 'tap', 'xml, "
"'source' or 'python'"],
]
compData = usage.Completions(
optActions={"type": usage.CompleteList(["tap", "xml", "source",
"python"]),
"rpmfile": usage.CompleteFiles("*.rpm")}
)
def postOptions(self):
"""
Calculate the default values for certain command-line options.
"""
# Options whose defaults depend on other parameters.
if self['protocol'] is None:
base_tapfile = os.path.basename(self['tapfile'])
self['protocol'] = os.path.splitext(base_tapfile)[0]
if self['description'] is None:
self['description'] = "A TCP server for %s" % (self['protocol'],)
if self['rpmfile'] is None:
self['rpmfile'] = 'twisted-%s' % (self['protocol'],)
# Values that aren't options, but are calculated from options and are
# handy to have around.
self['twistd_option'] = type_dict[self['type']]
self['release-name'] = '%s-%s' % (self['rpmfile'], self['set-version'])
def opt_unsigned(self):
"""
Generate an unsigned rather than a signed RPM. (DEPRECATED; unsigned
is the default)
"""
msg = deprecate.getDeprecationWarningString(
self.opt_unsigned, versions.Version("Twisted", 12, 1, 0))
warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
# Maintain the -u short flag
opt_u = opt_unsigned
type_dict = {
'tap': 'file',
'python': 'python',
'source': 'source',
'xml': 'xml',
}
##########################
def makeBuildDir():
"""
Set up the temporary directory for building RPMs.
Returns: buildDir, a randomly-named subdirectory of baseDir.
"""
tmpDir = tempfile.mkdtemp()
# set up initial directory contents
os.makedirs(os.path.join(tmpDir, 'RPMS', 'noarch'))
os.makedirs(os.path.join(tmpDir, 'SPECS'))
os.makedirs(os.path.join(tmpDir, 'BUILD'))
os.makedirs(os.path.join(tmpDir, 'SOURCES'))
os.makedirs(os.path.join(tmpDir, 'SRPMS'))
log.msg(format="Created RPM build structure in %(path)r",
path=tmpDir)
return tmpDir
def setupBuildFiles(buildDir, config):
"""
Create files required to build an RPM in the build directory.
"""
# Create the source tarball in the SOURCES directory.
tarballName = "%s.tar" % (config['release-name'],)
tarballPath = os.path.join(buildDir, "SOURCES", tarballName)
tarballHandle = tarfile.open(tarballPath, "w")
sourceDirInfo = tarfile.TarInfo(config['release-name'])
sourceDirInfo.type = tarfile.DIRTYPE
sourceDirInfo.mode = 0755
tarballHandle.addfile(sourceDirInfo)
tapFileBase = os.path.basename(config['tapfile'])
initFileInfo = tarfile.TarInfo(
os.path.join(
config['release-name'],
'%s.init' % config['rpmfile'],
)
)
initFileInfo.type = tarfile.REGTYPE
initFileInfo.mode = 0755
initFileRealData = initFileData % {
'tap_file': tapFileBase,
'rpm_file': config['release-name'],
'twistd_option': config['twistd_option'],
}
initFileInfo.size = len(initFileRealData)
tarballHandle.addfile(initFileInfo, StringIO(initFileRealData))
tapFileHandle = open(config['tapfile'], 'rb')
tapFileInfo = tarballHandle.gettarinfo(
arcname=os.path.join(config['release-name'], tapFileBase),
fileobj=tapFileHandle,
)
tapFileInfo.mode = 0644
tarballHandle.addfile(tapFileInfo, tapFileHandle)
tarballHandle.close()
log.msg(format="Created dummy source tarball %(tarballPath)r",
tarballPath=tarballPath)
# Create the spec file in the SPECS directory.
specName = "%s.spec" % (config['release-name'],)
specPath = os.path.join(buildDir, "SPECS", specName)
specHandle = open(specPath, "w")
specFileRealData = specFileData % {
'description': config['description'],
'rpm_file': config['rpmfile'],
'version': config['set-version'],
'tarfile_basename': tarballName,
'tap_file': tapFileBase,
'date': time.strftime('%a %b %d %Y', time.localtime(time.time())),
'maintainer': config['maintainer'],
'long_description': config['long_description'],
}
specHandle.write(specFileRealData)
specHandle.close()
log.msg(format="Created RPM spec file %(specPath)r",
specPath=specPath)
return specPath
def run(options=None):
# parse options
try:
config = MyOptions()
config.parseOptions(options)
except usage.error, ue:
sys.exit("%s: %s" % (sys.argv[0], ue))
# create RPM build environment
tmpDir = makeBuildDir()
specPath = setupBuildFiles(tmpDir, config)
# build rpm
job = subprocess.Popen([
"rpmbuild",
"-vv",
"--define", "_topdir %s" % (tmpDir,),
"-ba", specPath,
], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, _ = job.communicate()
# If there was a problem, show people what it was.
if job.returncode != 0:
print stdout
# copy the RPMs to the local directory
rpmPath = glob.glob(os.path.join(tmpDir, 'RPMS', 'noarch', '*'))[0]
srpmPath = glob.glob(os.path.join(tmpDir, 'SRPMS', '*'))[0]
if not config['quiet']:
print 'Writing "%s"...' % os.path.basename(rpmPath)
shutil.copy(rpmPath, '.')
if not config['quiet']:
print 'Writing "%s"...' % os.path.basename(srpmPath)
shutil.copy(srpmPath, '.')
# remove the build directory
shutil.rmtree(tmpDir)
return [os.path.basename(rpmPath), os.path.basename(srpmPath)]

View file

@ -0,0 +1,57 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
import sys, getpass
from twisted.python import usage
from twisted.application import app
from twisted.persisted import sob
class ConvertOptions(usage.Options):
synopsis = "Usage: tapconvert [options]"
optParameters = [
['in', 'i', None, "The filename of the tap to read from"],
['out', 'o', None, "A filename to write the tap to"],
['typein', 'f', 'guess',
"The format to use; this can be 'guess', 'python', "
"'pickle', 'xml', or 'source'."],
['typeout', 't', 'source',
"The output format to use; this can be 'pickle', 'xml', or 'source'."],
]
optFlags = [
['decrypt', 'd', "The specified tap/aos/xml file is encrypted."],
['encrypt', 'e', "Encrypt file before writing"]
]
compData = usage.Completions(
optActions={"typein": usage.CompleteList(["guess", "python", "pickle",
"xml", "source"]),
"typeout": usage.CompleteList(["pickle", "xml", "source"]),
"in": usage.CompleteFiles(descr="tap file to read from"),
"out": usage.CompleteFiles(descr="tap file to write to"),
}
)
def postOptions(self):
if self['in'] is None:
raise usage.UsageError("%s\nYou must specify the input filename."
% self)
if self["typein"] == "guess":
try:
self["typein"] = sob.guessType(self["in"])
except KeyError:
raise usage.UsageError("Could not guess type for '%s'" %
self["typein"])
def run():
options = ConvertOptions()
try:
options.parseOptions(sys.argv[1:])
except usage.UsageError, e:
print e
else:
app.convertStyle(options["in"], options["typein"],
options.opts['decrypt'] or getpass.getpass('Passphrase: '),
options["out"], options['typeout'], options["encrypt"])

View file

@ -0,0 +1,6 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Test package for L{twisted.scripts}.
"""

View file

@ -0,0 +1,201 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for the command-line scripts in the top-level I{bin/} directory.
Tests for actual functionality belong elsewhere, written in a way that doesn't
involve launching child processes.
"""
from os import devnull, getcwd, chdir
from sys import executable
from subprocess import PIPE, Popen
from twisted.trial.unittest import SkipTest, TestCase
from twisted.python.modules import getModule
from twisted.python.filepath import FilePath
from twisted.python.test.test_shellcomp import ZshScriptTestMixin
def outputFromPythonScript(script, *args):
"""
Synchronously run a Python script, with the same Python interpreter that
ran the process calling this function, using L{Popen}, using the given
command-line arguments, with standard input and standard error both
redirected to L{os.devnull}, and return its output as a string.
@param script: The path to the script.
@type script: L{FilePath}
@param args: The command-line arguments to follow the script in its
invocation (the desired C{sys.argv[1:]}).
@type args: L{tuple} of L{str}
@return: the output passed to the proces's C{stdout}, without any messages
from C{stderr}.
@rtype: L{bytes}
"""
nullInput = file(devnull, "rb")
nullError = file(devnull, "wb")
stdout = Popen([executable, script.path] + list(args),
stdout=PIPE, stderr=nullError, stdin=nullInput).stdout.read()
nullInput.close()
nullError.close()
return stdout
class ScriptTestsMixin:
"""
Mixin for L{TestCase} subclasses which defines a helper function for testing
a Twisted-using script.
"""
bin = getModule("twisted").pathEntry.filePath.child("bin")
def scriptTest(self, name):
"""
Verify that the given script runs and uses the version of Twisted
currently being tested.
This only works when running tests against a vcs checkout of Twisted,
since it relies on the scripts being in the place they are kept in
version control, and exercises their logic for finding the right version
of Twisted to use in that situation.
@param name: A path fragment, relative to the I{bin} directory of a
Twisted source checkout, identifying a script to test.
@type name: C{str}
@raise SkipTest: if the script is not where it is expected to be.
"""
script = self.bin.preauthChild(name)
if not script.exists():
raise SkipTest(
"Script tests do not apply to installed configuration.")
from twisted.copyright import version
scriptVersion = outputFromPythonScript(script, '--version')
self.assertIn(str(version), scriptVersion)
class ScriptTests(TestCase, ScriptTestsMixin):
"""
Tests for the core scripts.
"""
def test_twistd(self):
self.scriptTest("twistd")
def test_twistdPathInsert(self):
"""
The twistd script adds the current working directory to sys.path so
that it's able to import modules from it.
"""
script = self.bin.child("twistd")
if not script.exists():
raise SkipTest(
"Script tests do not apply to installed configuration.")
cwd = getcwd()
self.addCleanup(chdir, cwd)
testDir = FilePath(self.mktemp())
testDir.makedirs()
chdir(testDir.path)
testDir.child("bar.tac").setContent(
"import sys\n"
"print sys.path\n")
output = outputFromPythonScript(script, '-ny', 'bar.tac')
self.assertIn(repr(testDir.path), output)
def test_manhole(self):
self.scriptTest("manhole")
def test_trial(self):
self.scriptTest("trial")
def test_trialPathInsert(self):
"""
The trial script adds the current working directory to sys.path so that
it's able to import modules from it.
"""
script = self.bin.child("trial")
if not script.exists():
raise SkipTest(
"Script tests do not apply to installed configuration.")
cwd = getcwd()
self.addCleanup(chdir, cwd)
testDir = FilePath(self.mktemp())
testDir.makedirs()
chdir(testDir.path)
testDir.child("foo.py").setContent("")
output = outputFromPythonScript(script, 'foo')
self.assertIn("PASSED", output)
def test_pyhtmlizer(self):
self.scriptTest("pyhtmlizer")
def test_tap2rpm(self):
self.scriptTest("tap2rpm")
def test_tap2deb(self):
self.scriptTest("tap2deb")
def test_tapconvert(self):
self.scriptTest("tapconvert")
def test_deprecatedTkunzip(self):
"""
The entire L{twisted.scripts.tkunzip} module, part of the old Windows
installer tool chain, is deprecated.
"""
from twisted.scripts import tkunzip
warnings = self.flushWarnings(
offendingFunctions=[self.test_deprecatedTkunzip])
self.assertEqual(DeprecationWarning, warnings[0]['category'])
self.assertEqual(
"twisted.scripts.tkunzip was deprecated in Twisted 11.1.0: "
"Seek unzipping software outside of Twisted.",
warnings[0]['message'])
self.assertEqual(1, len(warnings))
def test_deprecatedTapconvert(self):
"""
The entire L{twisted.scripts.tapconvert} module is deprecated.
"""
from twisted.scripts import tapconvert
warnings = self.flushWarnings(
offendingFunctions=[self.test_deprecatedTapconvert])
self.assertEqual(DeprecationWarning, warnings[0]['category'])
self.assertEqual(
"twisted.scripts.tapconvert was deprecated in Twisted 12.1.0: "
"tapconvert has been deprecated.",
warnings[0]['message'])
self.assertEqual(1, len(warnings))
class ZshIntegrationTestCase(TestCase, ZshScriptTestMixin):
"""
Test that zsh completion functions are generated without error
"""
generateFor = [('twistd', 'twisted.scripts.twistd.ServerOptions'),
('trial', 'twisted.scripts.trial.Options'),
('pyhtmlizer', 'twisted.scripts.htmlizer.Options'),
('tap2rpm', 'twisted.scripts.tap2rpm.MyOptions'),
('tap2deb', 'twisted.scripts.tap2deb.MyOptions'),
('tapconvert', 'twisted.scripts.tapconvert.ConvertOptions'),
('manhole', 'twisted.scripts.manhole.MyOptions')
]

View file

@ -0,0 +1,100 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.scripts.tap2deb}.
"""
from twisted.scripts import tap2deb
from twisted.python import usage, procutils
from twisted.python.filepath import FilePath
from twisted.trial.unittest import TestCase, SkipTest
class TestTap2DEB(TestCase):
"""
Tests for the L{tap2deb} script.
"""
maintainer = "Jane Doe <janedoe@example.com>"
def test_maintainerOption(self):
"""
The C{--maintainer} option must be specified on the commandline or
passed to L{tap2deb.run}.
"""
config = tap2deb.MyOptions()
self.assertRaises(usage.UsageError, config.parseOptions, [])
self.assertRaises(SystemExit, tap2deb.run, [])
def test_optionDefaults(self):
"""
Commandline options default to sensible values.
"""
config = tap2deb.MyOptions()
config.parseOptions(['--maintainer', self.maintainer])
self.assertEqual(config['tapfile'], 'twistd.tap')
self.assertEqual(config['maintainer'], self.maintainer)
self.assertEqual(config['protocol'], '')
self.assertEqual(config['description'], '')
self.assertEqual(config['long_description'], '')
self.assertEqual(config['set-version'], '1.0')
self.assertEqual(config['debfile'], None)
self.assertEqual(config['type'], 'tap')
def test_missingMaintainer(self):
"""
Omitting the maintainer argument results in L{tap2deb.run} raising
C{SystemExit}.
"""
error = self.assertRaises(SystemExit, tap2deb.run,
["--tapfile", "foo"])
self.assertTrue(str(error).endswith('maintainer must be specified.'))
def test_basicOperation(self):
"""
Running the L{tap2deb} script produces a bunch of files using
C{dpkg-buildpackage}.
"""
# Skip tests if dpkg-buildpackage is not present
if not procutils.which("dpkg-buildpackage"):
raise SkipTest("dpkg-buildpackage must be present to test tap2deb")
baseDir = FilePath(self.mktemp())
baseDir.makedirs()
# Make a temporary .tap file
version = '1.0'
tapName = 'lemon'
tapFile = baseDir.child("%s.tap" % (tapName,))
tapFile.setContent("# Dummy .tap file")
buildDir = FilePath('.build')
outputDir = buildDir.child('twisted-%s-%s' % (tapName, version))
# Run
args = ["--tapfile", tapFile.path, "--maintainer", self.maintainer]
tap2deb.run(args)
# Verify input files were created
self.assertEqual(sorted(outputDir.listdir()),
['build-stamp', 'debian', 'install-stamp', 'lemon.tap'])
debianDir = outputDir.child('debian')
for name in ['README.Debian', 'conffiles', 'default', 'init.d',
'postinst', 'prerm', 'postrm', 'changelog', 'control',
'copyright', 'dirs', 'rules']:
self.assertTrue(debianDir.child(name).exists())
# Verify 4 output files were created
self.assertTrue(buildDir.child('twisted-lemon_1.0_all.deb').exists())
self.assertTrue(buildDir.child('twisted-lemon_1.0.tar.gz').exists())
self.assertTrue(buildDir.child('twisted-lemon_1.0.dsc').exists())
self.assertEqual(
len(buildDir.globChildren('twisted-lemon_1.0_*.changes')), 1)

View file

@ -0,0 +1,399 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.scripts.tap2rpm}.
"""
import os
from twisted.trial.unittest import TestCase, SkipTest
from twisted.python import procutils
from twisted.python import versions
from twisted.python import deprecate
from twisted.python.failure import Failure
from twisted.internet import utils
from twisted.scripts import tap2rpm
# When we query the RPM metadata, we get back a string we'll have to parse, so
# we'll use suitably rare delimiter characters to split on. Luckily, ASCII
# defines some for us!
RECORD_SEPARATOR = "\x1E"
UNIT_SEPARATOR = "\x1F"
def _makeRPMs(tapfile=None, maintainer=None, protocol=None, description=None,
longDescription=None, setVersion=None, rpmfile=None, type_=None):
"""
Helper function to invoke tap2rpm with the given parameters.
"""
args = []
if not tapfile:
tapfile = "dummy-tap-file"
handle = open(tapfile, "w")
handle.write("# Dummy TAP file\n")
handle.close()
args.extend(["--quiet", "--tapfile", tapfile])
if maintainer:
args.extend(["--maintainer", maintainer])
if protocol:
args.extend(["--protocol", protocol])
if description:
args.extend(["--description", description])
if longDescription:
args.extend(["--long_description", longDescription])
if setVersion:
args.extend(["--set-version", setVersion])
if rpmfile:
args.extend(["--rpmfile", rpmfile])
if type_:
args.extend(["--type", type_])
return tap2rpm.run(args)
def _queryRPMTags(rpmfile, taglist):
"""
Helper function to read the given header tags from the given RPM file.
Returns a Deferred that fires with dictionary mapping a tag name to a list
of the associated values in the RPM header. If a tag has only a single
value in the header (like NAME or VERSION), it will be returned as a 1-item
list.
Run "rpm --querytags" to see what tags can be queried.
"""
# Build a query format string that will return appropriately delimited
# results. Every field is treated as an array field, so single-value tags
# like VERSION will be returned as 1-item lists.
queryFormat = RECORD_SEPARATOR.join([
"[%%{%s}%s]" % (tag, UNIT_SEPARATOR) for tag in taglist
])
def parseTagValues(output):
res = {}
for tag, values in zip(taglist, output.split(RECORD_SEPARATOR)):
values = values.strip(UNIT_SEPARATOR).split(UNIT_SEPARATOR)
res[tag] = values
return res
def checkErrorResult(failure):
# The current rpm packages on Debian and Ubuntu don't properly set up
# the RPM database, which causes rpm to print a harmless warning to
# stderr. Unfortunately, .getProcessOutput() assumes all warnings are
# catastrophic and panics whenever it sees one.
#
# See also:
# http://twistedmatrix.com/trac/ticket/3292#comment:42
# http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=551669
# http://rpm.org/ticket/106
failure.trap(IOError)
# Depending on kernel scheduling, we might read the whole error
# message, or only the first few bytes.
if str(failure.value).startswith("got stderr: 'error: "):
newFailure = Failure(SkipTest("rpm is missing its package "
"database. Run 'sudo rpm -qa > /dev/null' to create one."))
else:
# Not the exception we were looking for; we should report the
# original failure.
newFailure = failure
# We don't want to raise the exception right away; we want to wait for
# the process to exit, otherwise we'll get extra useless errors
# reported.
d = failure.value.processEnded
d.addBoth(lambda _: newFailure)
return d
d = utils.getProcessOutput("rpm",
("-q", "--queryformat", queryFormat, "-p", rpmfile))
d.addCallbacks(parseTagValues, checkErrorResult)
return d
class TestTap2RPM(TestCase):
def setUp(self):
return self._checkForRpmbuild()
def _checkForRpmbuild(self):
"""
tap2rpm requires rpmbuild; skip tests if rpmbuild is not present.
"""
if not procutils.which("rpmbuild"):
raise SkipTest("rpmbuild must be present to test tap2rpm")
def _makeTapFile(self, basename="dummy"):
"""
Make a temporary .tap file and returns the absolute path.
"""
path = basename + ".tap"
handle = open(path, "w")
handle.write("# Dummy .tap file")
handle.close()
return path
def _verifyRPMTags(self, rpmfile, **tags):
"""
Check the given file has the given tags set to the given values.
"""
d = _queryRPMTags(rpmfile, tags.keys())
d.addCallback(self.assertEqual, tags)
return d
def test_optionDefaults(self):
"""
Commandline options should default to sensible values.
"sensible" here is defined as "the same values that previous versions
defaulted to".
"""
config = tap2rpm.MyOptions()
config.parseOptions([])
self.assertEqual(config['tapfile'], 'twistd.tap')
self.assertEqual(config['maintainer'], 'tap2rpm')
self.assertEqual(config['protocol'], 'twistd')
self.assertEqual(config['description'], 'A TCP server for twistd')
self.assertEqual(config['long_description'],
'Automatically created by tap2rpm')
self.assertEqual(config['set-version'], '1.0')
self.assertEqual(config['rpmfile'], 'twisted-twistd')
self.assertEqual(config['type'], 'tap')
self.assertEqual(config['quiet'], False)
self.assertEqual(config['twistd_option'], 'file')
self.assertEqual(config['release-name'], 'twisted-twistd-1.0')
def test_protocolCalculatedFromTapFile(self):
"""
The protocol name defaults to a value based on the tapfile value.
"""
config = tap2rpm.MyOptions()
config.parseOptions(['--tapfile', 'pancakes.tap'])
self.assertEqual(config['tapfile'], 'pancakes.tap')
self.assertEqual(config['protocol'], 'pancakes')
def test_optionsDefaultToProtocolValue(self):
"""
Many options default to a value calculated from the protocol name.
"""
config = tap2rpm.MyOptions()
config.parseOptions([
'--tapfile', 'sausages.tap',
'--protocol', 'eggs',
])
self.assertEqual(config['tapfile'], 'sausages.tap')
self.assertEqual(config['maintainer'], 'tap2rpm')
self.assertEqual(config['protocol'], 'eggs')
self.assertEqual(config['description'], 'A TCP server for eggs')
self.assertEqual(config['long_description'],
'Automatically created by tap2rpm')
self.assertEqual(config['set-version'], '1.0')
self.assertEqual(config['rpmfile'], 'twisted-eggs')
self.assertEqual(config['type'], 'tap')
self.assertEqual(config['quiet'], False)
self.assertEqual(config['twistd_option'], 'file')
self.assertEqual(config['release-name'], 'twisted-eggs-1.0')
def test_releaseNameDefaultsToRpmfileValue(self):
"""
The release-name option is calculated from rpmfile and set-version.
"""
config = tap2rpm.MyOptions()
config.parseOptions([
"--rpmfile", "beans",
"--set-version", "1.2.3",
])
self.assertEqual(config['release-name'], 'beans-1.2.3')
def test_basicOperation(self):
"""
Calling tap2rpm should produce an RPM and SRPM with default metadata.
"""
basename = "frenchtoast"
# Create RPMs based on a TAP file with this name.
rpm, srpm = _makeRPMs(tapfile=self._makeTapFile(basename))
# Verify the resulting RPMs have the correct tags.
d = self._verifyRPMTags(rpm,
NAME=["twisted-%s" % (basename,)],
VERSION=["1.0"],
RELEASE=["1"],
SUMMARY=["A TCP server for %s" % (basename,)],
DESCRIPTION=["Automatically created by tap2rpm"],
)
d.addCallback(lambda _: self._verifyRPMTags(srpm,
NAME=["twisted-%s" % (basename,)],
VERSION=["1.0"],
RELEASE=["1"],
SUMMARY=["A TCP server for %s" % (basename,)],
DESCRIPTION=["Automatically created by tap2rpm"],
))
return d
def test_protocolOverride(self):
"""
Setting 'protocol' should change the name of the resulting package.
"""
basename = "acorn"
protocol = "banana"
# Create RPMs based on a TAP file with this name.
rpm, srpm = _makeRPMs(tapfile=self._makeTapFile(basename),
protocol=protocol)
# Verify the resulting RPMs have the correct tags.
d = self._verifyRPMTags(rpm,
NAME=["twisted-%s" % (protocol,)],
SUMMARY=["A TCP server for %s" % (protocol,)],
)
d.addCallback(lambda _: self._verifyRPMTags(srpm,
NAME=["twisted-%s" % (protocol,)],
SUMMARY=["A TCP server for %s" % (protocol,)],
))
return d
def test_rpmfileOverride(self):
"""
Setting 'rpmfile' should change the name of the resulting package.
"""
basename = "cherry"
rpmfile = "donut"
# Create RPMs based on a TAP file with this name.
rpm, srpm = _makeRPMs(tapfile=self._makeTapFile(basename),
rpmfile=rpmfile)
# Verify the resulting RPMs have the correct tags.
d = self._verifyRPMTags(rpm,
NAME=[rpmfile],
SUMMARY=["A TCP server for %s" % (basename,)],
)
d.addCallback(lambda _: self._verifyRPMTags(srpm,
NAME=[rpmfile],
SUMMARY=["A TCP server for %s" % (basename,)],
))
return d
def test_descriptionOverride(self):
"""
Setting 'description' should change the SUMMARY tag.
"""
description = "eggplant"
# Create RPMs based on a TAP file with this name.
rpm, srpm = _makeRPMs(tapfile=self._makeTapFile(),
description=description)
# Verify the resulting RPMs have the correct tags.
d = self._verifyRPMTags(rpm,
SUMMARY=[description],
)
d.addCallback(lambda _: self._verifyRPMTags(srpm,
SUMMARY=[description],
))
return d
def test_longDescriptionOverride(self):
"""
Setting 'longDescription' should change the DESCRIPTION tag.
"""
longDescription = "fig"
# Create RPMs based on a TAP file with this name.
rpm, srpm = _makeRPMs(tapfile=self._makeTapFile(),
longDescription=longDescription)
# Verify the resulting RPMs have the correct tags.
d = self._verifyRPMTags(rpm,
DESCRIPTION=[longDescription],
)
d.addCallback(lambda _: self._verifyRPMTags(srpm,
DESCRIPTION=[longDescription],
))
return d
def test_setVersionOverride(self):
"""
Setting 'setVersion' should change the RPM's version info.
"""
version = "123.456"
# Create RPMs based on a TAP file with this name.
rpm, srpm = _makeRPMs(tapfile=self._makeTapFile(),
setVersion=version)
# Verify the resulting RPMs have the correct tags.
d = self._verifyRPMTags(rpm,
VERSION=["123.456"],
RELEASE=["1"],
)
d.addCallback(lambda _: self._verifyRPMTags(srpm,
VERSION=["123.456"],
RELEASE=["1"],
))
return d
def test_tapInOtherDirectory(self):
"""
tap2rpm handles tapfiles outside the current directory.
"""
# Make a tapfile outside the current directory.
tempdir = self.mktemp()
os.mkdir(tempdir)
tapfile = self._makeTapFile(os.path.join(tempdir, "bacon"))
# Try and make an RPM from that tapfile.
_makeRPMs(tapfile=tapfile)
def test_unsignedFlagDeprecationWarning(self):
"""
The 'unsigned' flag in tap2rpm should be deprecated, and its use
should raise a warning as such.
"""
config = tap2rpm.MyOptions()
config.parseOptions(['--unsigned'])
warnings = self.flushWarnings()
self.assertEqual(DeprecationWarning, warnings[0]['category'])
self.assertEqual(
deprecate.getDeprecationWarningString(
config.opt_unsigned, versions.Version("Twisted", 12, 1, 0)),
warnings[0]['message'])
self.assertEqual(1, len(warnings))

View file

@ -0,0 +1,290 @@
# -*- test-case-name: twisted.scripts.test.test_scripts -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Post-install GUI to compile to pyc and unpack twisted doco.
"""
import sys
import zipfile
import py_compile
# we're going to ignore failures to import tkinter and fall back
# to using the console if the required dll is not found
# Scary kludge to work around tk84.dll bug:
# https://sourceforge.net/tracker/index.php?func=detail&aid=814654&group_id=5470&atid=105470
# Without which(): you get a windows missing-dll popup message
from twisted.python.procutils import which
tkdll='tk84.dll'
if which(tkdll) or which('DLLs/%s' % tkdll):
try:
import Tkinter
from Tkinter import *
from twisted.internet import tksupport
except ImportError:
pass
# twisted
from twisted.internet import reactor, defer
from twisted.python import failure, log, zipstream, util, usage, log
# local
import os.path
class ProgressBar:
def __init__(self, master=None, orientation="horizontal",
min=0, max=100, width=100, height=18,
doLabel=1, appearance="sunken",
fillColor="blue", background="gray",
labelColor="yellow", labelFont="Arial",
labelText="", labelFormat="%d%%",
value=0, bd=2):
# preserve various values
self.master=master
self.orientation=orientation
self.min=min
self.max=max
self.width=width
self.height=height
self.doLabel=doLabel
self.fillColor=fillColor
self.labelFont= labelFont
self.labelColor=labelColor
self.background=background
self.labelText=labelText
self.labelFormat=labelFormat
self.value=value
self.frame=Frame(master, relief=appearance, bd=bd)
self.canvas=Canvas(self.frame, height=height, width=width, bd=0,
highlightthickness=0, background=background)
self.scale=self.canvas.create_rectangle(0, 0, width, height,
fill=fillColor)
self.label=self.canvas.create_text(self.canvas.winfo_reqwidth() / 2,
height / 2, text=labelText,
anchor="c", fill=labelColor,
font=self.labelFont)
self.update()
self.canvas.pack(side='top', fill='x', expand='no')
def pack(self, *args, **kwargs):
self.frame.pack(*args, **kwargs)
def updateProgress(self, newValue, newMax=None):
if newMax:
self.max = newMax
self.value = newValue
self.update()
def update(self):
# Trim the values to be between min and max
value=self.value
if value > self.max:
value = self.max
if value < self.min:
value = self.min
# Adjust the rectangle
if self.orientation == "horizontal":
self.canvas.coords(self.scale, 0, 0,
float(value) / self.max * self.width, self.height)
else:
self.canvas.coords(self.scale, 0,
self.height - (float(value) /
self.max*self.height),
self.width, self.height)
# Now update the colors
self.canvas.itemconfig(self.scale, fill=self.fillColor)
self.canvas.itemconfig(self.label, fill=self.labelColor)
# And update the label
if self.doLabel:
if value:
if value >= 0:
pvalue = int((float(value) / float(self.max)) *
100.0)
else:
pvalue = 0
self.canvas.itemconfig(self.label, text=self.labelFormat
% pvalue)
else:
self.canvas.itemconfig(self.label, text='')
else:
self.canvas.itemconfig(self.label, text=self.labelFormat %
self.labelText)
self.canvas.update_idletasks()
class Progressor:
"""A base class to make it simple to hook a progress bar up to a process.
"""
def __init__(self, title, *args, **kwargs):
self.title=title
self.stopping=0
self.bar=None
self.iterator=None
self.remaining=1000
def setBar(self, bar, max):
self.bar=bar
bar.updateProgress(0, max)
return self
def setIterator(self, iterator):
self.iterator=iterator
return self
def updateBar(self, deferred):
b=self.bar
try:
b.updateProgress(b.max - self.remaining)
except TclError:
self.stopping=1
except:
deferred.errback(failure.Failure())
def processAll(self, root):
assert self.bar and self.iterator, "must setBar and setIterator"
self.root=root
root.title(self.title)
d=defer.Deferred()
d.addErrback(log.err)
reactor.callLater(0.1, self.processOne, d)
return d
def processOne(self, deferred):
if self.stopping:
deferred.callback(self.root)
return
try:
self.remaining=self.iterator.next()
except StopIteration:
self.stopping=1
except:
deferred.errback(failure.Failure())
if self.remaining%10==0:
reactor.callLater(0, self.updateBar, deferred)
if self.remaining%100==0:
log.msg(self.remaining)
reactor.callLater(0, self.processOne, deferred)
def compiler(path):
"""A generator for compiling files to .pyc"""
def justlist(arg, directory, names):
pynames=[os.path.join(directory, n) for n in names
if n.endswith('.py')]
arg.extend(pynames)
all=[]
os.path.walk(path, justlist, all)
remaining=len(all)
i=zip(all, range(remaining-1, -1, -1))
for f, remaining in i:
py_compile.compile(f)
yield remaining
class TkunzipOptions(usage.Options):
optParameters=[["zipfile", "z", "", "a zipfile"],
["ziptargetdir", "t", ".", "where to extract zipfile"],
["compiledir", "c", "", "a directory to compile"],
]
optFlags=[["use-console", "C", "show in the console, not graphically"],
["shell-exec", "x", """\
spawn a new console to show output (implies -C)"""],
]
def countPys(countl, directory, names):
sofar=countl[0]
sofar=sofar+len([f for f in names if f.endswith('.py')])
countl[0]=sofar
return sofar
def countPysRecursive(path):
countl=[0]
os.path.walk(path, countPys, countl)
return countl[0]
def run(argv=sys.argv):
log.startLogging(file('tkunzip.log', 'w'))
opt=TkunzipOptions()
try:
opt.parseOptions(argv[1:])
except usage.UsageError, e:
print str(opt)
print str(e)
sys.exit(1)
if opt['use-console']:
# this should come before shell-exec to prevent infinite loop
return doItConsolicious(opt)
if opt['shell-exec'] or not 'Tkinter' in sys.modules:
from distutils import sysconfig
from twisted.scripts import tkunzip
myfile=tkunzip.__file__
exe=os.path.join(sysconfig.get_config_var('prefix'), 'python.exe')
return os.system('%s %s --use-console %s' % (exe, myfile,
' '.join(argv[1:])))
return doItTkinterly(opt)
def doItConsolicious(opt):
# reclaim stdout/stderr from log
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
if opt['zipfile']:
print 'Unpacking documentation...'
for n in zipstream.unzipIter(opt['zipfile'], opt['ziptargetdir']):
if n % 100 == 0:
print n,
if n % 1000 == 0:
print
print 'Done unpacking.'
if opt['compiledir']:
print 'Compiling to pyc...'
import compileall
compileall.compile_dir(opt["compiledir"])
print 'Done compiling.'
def doItTkinterly(opt):
root=Tkinter.Tk()
root.withdraw()
root.title('One Moment.')
root.protocol('WM_DELETE_WINDOW', reactor.stop)
tksupport.install(root)
prog=ProgressBar(root, value=0, labelColor="black", width=200)
prog.pack()
# callback immediately
d=defer.succeed(root).addErrback(log.err)
def deiconify(root):
root.deiconify()
return root
d.addCallback(deiconify)
if opt['zipfile']:
uz=Progressor('Unpacking documentation...')
max=zipstream.countZipFileChunks(opt['zipfile'], 4096)
uz.setBar(prog, max)
uz.setIterator(zipstream.unzipIterChunky(opt['zipfile'],
opt['ziptargetdir']))
d.addCallback(uz.processAll)
if opt['compiledir']:
comp=Progressor('Compiling to pyc...')
comp.setBar(prog, countPysRecursive(opt['compiledir']))
comp.setIterator(compiler(opt['compiledir']))
d.addCallback(comp.processAll)
def stop(ignore):
reactor.stop()
root.destroy()
d.addCallback(stop)
reactor.run()
if __name__=='__main__':
run()

View file

@ -0,0 +1,621 @@
# -*- test-case-name: twisted.trial.test.test_script -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
from __future__ import print_function
import gc
import inspect
import os
import pdb
import random
import sys
import time
import warnings
from twisted.internet import defer
from twisted.application import app
from twisted.python import usage, reflect, failure
from twisted.python.filepath import FilePath
from twisted import plugin
from twisted.python.util import spewer
from twisted.trial import runner, itrial, reporter
# Yea, this is stupid. Leave it for for command-line compatibility for a
# while, though.
TBFORMAT_MAP = {
'plain': 'default',
'default': 'default',
'emacs': 'brief',
'brief': 'brief',
'cgitb': 'verbose',
'verbose': 'verbose'
}
def _parseLocalVariables(line):
"""
Accepts a single line in Emacs local variable declaration format and
returns a dict of all the variables {name: value}.
Raises ValueError if 'line' is in the wrong format.
See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
"""
paren = '-*-'
start = line.find(paren) + len(paren)
end = line.rfind(paren)
if start == -1 or end == -1:
raise ValueError("%r not a valid local variable declaration" % (line,))
items = line[start:end].split(';')
localVars = {}
for item in items:
if len(item.strip()) == 0:
continue
split = item.split(':')
if len(split) != 2:
raise ValueError("%r contains invalid declaration %r"
% (line, item))
localVars[split[0].strip()] = split[1].strip()
return localVars
def loadLocalVariables(filename):
"""
Accepts a filename and attempts to load the Emacs variable declarations
from that file, simulating what Emacs does.
See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
"""
f = file(filename, "r")
lines = [f.readline(), f.readline()]
f.close()
for line in lines:
try:
return _parseLocalVariables(line)
except ValueError:
pass
return {}
def getTestModules(filename):
testCaseVar = loadLocalVariables(filename).get('test-case-name', None)
if testCaseVar is None:
return []
return testCaseVar.split(',')
def isTestFile(filename):
"""
Returns true if 'filename' looks like a file containing unit tests.
False otherwise. Doesn't care whether filename exists.
"""
basename = os.path.basename(filename)
return (basename.startswith('test_')
and os.path.splitext(basename)[1] == ('.py'))
def _reporterAction():
return usage.CompleteList([p.longOpt for p in
plugin.getPlugins(itrial.IReporter)])
def _maybeFindSourceLine(testThing):
"""
Try to find the source line of the given test thing.
@param testThing: the test item to attempt to inspect
@type testThing: an L{TestCase}, test method, or module, though only the
former two have a chance to succeed
@rtype: int
@return: the starting source line, or -1 if one couldn't be found
"""
# an instance of L{TestCase} -- locate the test it will run
method = getattr(testThing, "_testMethodName", None)
if method is not None:
testThing = getattr(testThing, method)
# If it's a function, we can get the line number even if the source file no
# longer exists
code = getattr(testThing, "__code__", None)
if code is not None:
return code.co_firstlineno
try:
return inspect.getsourcelines(testThing)[1]
except (IOError, TypeError):
# either testThing is a module, which raised a TypeError, or the file
# couldn't be read
return -1
# orders which can be passed to trial --order
_runOrders = {
"alphabetical" : (
"alphabetical order for test methods, arbitrary order for test cases",
runner.name),
"toptobottom" : (
"attempt to run test cases and methods in the order they were defined",
_maybeFindSourceLine),
}
def _checkKnownRunOrder(order):
"""
Check that the given order is a known test running order.
Does nothing else, since looking up the appropriate callable to sort the
tests should be done when it actually will be used, as the default argument
will not be coerced by this function.
@param order: one of the known orders in C{_runOrders}
@return: the order unmodified
"""
if order not in _runOrders:
raise usage.UsageError(
"--order must be one of: %s. See --help-orders for details" %
(", ".join(repr(order) for order in _runOrders),))
return order
class _BasicOptions(object):
"""
Basic options shared between trial and its local workers.
"""
synopsis = """%s [options] [[file|package|module|TestCase|testmethod]...]
""" % (os.path.basename(sys.argv[0]),)
longdesc = ("trial loads and executes a suite of unit tests, obtained "
"from modules, packages and files listed on the command line.")
optFlags = [["help", "h"],
["no-recurse", "N", "Don't recurse into packages"],
['help-orders', None, "Help on available test running orders"],
['help-reporters', None,
"Help on available output plugins (reporters)"],
["rterrors", "e", "realtime errors, print out tracebacks as "
"soon as they occur"],
["unclean-warnings", None,
"Turn dirty reactor errors into warnings"],
["force-gc", None, "Have Trial run gc.collect() before and "
"after each test case."],
["exitfirst", "x",
"Exit after the first non-successful result (cannot be "
"specified along with --jobs)."],
]
optParameters = [
["order", "o", None,
"Specify what order to run test cases and methods. "
"See --help-orders for more info.", _checkKnownRunOrder],
["random", "z", None,
"Run tests in random order using the specified seed"],
['temp-directory', None, '_trial_temp',
'Path to use as working directory for tests.'],
['reporter', None, 'verbose',
'The reporter to use for this test run. See --help-reporters for '
'more info.']]
compData = usage.Completions(
optActions={"order": usage.CompleteList(_runOrders),
"reporter": _reporterAction,
"logfile": usage.CompleteFiles(descr="log file name"),
"random": usage.Completer(descr="random seed")},
extraActions=[usage.CompleteFiles(
"*.py", descr="file | module | package | TestCase | testMethod",
repeat=True)],
)
fallbackReporter = reporter.TreeReporter
tracer = None
def __init__(self):
self['tests'] = []
usage.Options.__init__(self)
def coverdir(self):
"""
Return a L{FilePath} representing the directory into which coverage
results should be written.
"""
coverdir = 'coverage'
result = FilePath(self['temp-directory']).child(coverdir)
print("Setting coverage directory to %s." % (result.path,))
return result
# TODO: Some of the opt_* methods on this class have docstrings and some do
# not. This is mostly because usage.Options's currently will replace
# any intended output in optFlags and optParameters with the
# docstring. See #6427. When that is fixed, all methods should be
# given docstrings (and it should be verified that those with
# docstrings already have content suitable for printing as usage
# information).
def opt_coverage(self):
"""
Generate coverage information in the coverage file in the
directory specified by the temp-directory option.
"""
import trace
self.tracer = trace.Trace(count=1, trace=0)
sys.settrace(self.tracer.globaltrace)
self['coverage'] = True
def opt_testmodule(self, filename):
"""
Filename to grep for test cases (-*- test-case-name).
"""
# If the filename passed to this parameter looks like a test module
# we just add that to the test suite.
#
# If not, we inspect it for an Emacs buffer local variable called
# 'test-case-name'. If that variable is declared, we try to add its
# value to the test suite as a module.
#
# This parameter allows automated processes (like Buildbot) to pass
# a list of files to Trial with the general expectation of "these files,
# whatever they are, will get tested"
if not os.path.isfile(filename):
sys.stderr.write("File %r doesn't exist\n" % (filename,))
return
filename = os.path.abspath(filename)
if isTestFile(filename):
self['tests'].append(filename)
else:
self['tests'].extend(getTestModules(filename))
def opt_spew(self):
"""
Print an insanely verbose log of everything that happens. Useful
when debugging freezes or locks in complex code.
"""
sys.settrace(spewer)
def opt_help_orders(self):
synopsis = ("Trial can attempt to run test cases and their methods in "
"a few different orders. You can select any of the "
"following options using --order=<foo>.\n")
print(synopsis)
for name, (description, _) in sorted(_runOrders.items()):
print(' ', name, '\t', description)
sys.exit(0)
def opt_help_reporters(self):
synopsis = ("Trial's output can be customized using plugins called "
"Reporters. You can\nselect any of the following "
"reporters using --reporter=<foo>\n")
print(synopsis)
for p in plugin.getPlugins(itrial.IReporter):
print(' ', p.longOpt, '\t', p.description)
sys.exit(0)
def opt_disablegc(self):
"""
Disable the garbage collector
"""
self["disablegc"] = True
gc.disable()
def opt_tbformat(self, opt):
"""
Specify the format to display tracebacks with. Valid formats are
'plain', 'emacs', and 'cgitb' which uses the nicely verbose stdlib
cgitb.text function
"""
try:
self['tbformat'] = TBFORMAT_MAP[opt]
except KeyError:
raise usage.UsageError(
"tbformat must be 'plain', 'emacs', or 'cgitb'.")
def opt_recursionlimit(self, arg):
"""
see sys.setrecursionlimit()
"""
try:
sys.setrecursionlimit(int(arg))
except (TypeError, ValueError):
raise usage.UsageError(
"argument to recursionlimit must be an integer")
else:
self["recursionlimit"] = int(arg)
def opt_random(self, option):
try:
self['random'] = long(option)
except ValueError:
raise usage.UsageError(
"Argument to --random must be a positive integer")
else:
if self['random'] < 0:
raise usage.UsageError(
"Argument to --random must be a positive integer")
elif self['random'] == 0:
self['random'] = long(time.time() * 100)
def opt_without_module(self, option):
"""
Fake the lack of the specified modules, separated with commas.
"""
self["without-module"] = option
for module in option.split(","):
if module in sys.modules:
warnings.warn("Module '%s' already imported, "
"disabling anyway." % (module,),
category=RuntimeWarning)
sys.modules[module] = None
def parseArgs(self, *args):
self['tests'].extend(args)
def _loadReporterByName(self, name):
for p in plugin.getPlugins(itrial.IReporter):
qual = "%s.%s" % (p.module, p.klass)
if p.longOpt == name:
return reflect.namedAny(qual)
raise usage.UsageError("Only pass names of Reporter plugins to "
"--reporter. See --help-reporters for "
"more info.")
def postOptions(self):
# Only load reporters now, as opposed to any earlier, to avoid letting
# application-defined plugins muck up reactor selecting by importing
# t.i.reactor and causing the default to be installed.
self['reporter'] = self._loadReporterByName(self['reporter'])
if 'tbformat' not in self:
self['tbformat'] = 'default'
if self['order'] is not None and self['random'] is not None:
raise usage.UsageError(
"You can't specify --random when using --order")
class Options(_BasicOptions, usage.Options, app.ReactorSelectionMixin):
"""
Options to the trial command line tool.
@ivar _workerFlags: List of flags which are accepted by trial distributed
workers. This is used by C{_getWorkerArguments} to build the command
line arguments.
@type _workerFlags: C{list}
@ivar _workerParameters: List of parameter which are accepted by trial
distrubuted workers. This is used by C{_getWorkerArguments} to build
the command line arguments.
@type _workerParameters: C{list}
"""
optFlags = [
["debug", "b", "Run tests in a debugger. If that debugger is "
"pdb, will load '.pdbrc' from current directory if it exists."
],
["debug-stacktraces", "B", "Report Deferred creation and "
"callback stack traces"],
["nopm", None, "don't automatically jump into debugger for "
"postmorteming of exceptions"],
["dry-run", 'n', "do everything but run the tests"],
["profile", None, "Run tests under the Python profiler"],
["until-failure", "u", "Repeat test until it fails"],
]
optParameters = [
["debugger", None, "pdb", "the fully qualified name of a debugger to "
"use if --debug is passed"],
["logfile", "l", "test.log", "log file name"],
["jobs", "j", None, "Number of local workers to run"]
]
compData = usage.Completions(
optActions = {
"tbformat": usage.CompleteList(["plain", "emacs", "cgitb"]),
"reporter": _reporterAction,
},
)
_workerFlags = ["disablegc", "force-gc", "coverage"]
_workerParameters = ["recursionlimit", "reactor", "without-module"]
fallbackReporter = reporter.TreeReporter
extra = None
tracer = None
def opt_jobs(self, number):
"""
Number of local workers to run, a strictly positive integer.
"""
try:
number = int(number)
except ValueError:
raise usage.UsageError(
"Expecting integer argument to jobs, got '%s'" % number)
if number <= 0:
raise usage.UsageError(
"Argument to jobs must be a strictly positive integer")
self["jobs"] = number
def _getWorkerArguments(self):
"""
Return a list of options to pass to distributed workers.
"""
args = []
for option in self._workerFlags:
if self.get(option) is not None:
if self[option]:
args.append("--%s" % (option,))
for option in self._workerParameters:
if self.get(option) is not None:
args.extend(["--%s" % (option,), str(self[option])])
return args
def postOptions(self):
_BasicOptions.postOptions(self)
if self['jobs']:
conflicts = ['debug', 'profile', 'debug-stacktraces', 'exitfirst']
for option in conflicts:
if self[option]:
raise usage.UsageError(
"You can't specify --%s when using --jobs" % option)
if self['nopm']:
if not self['debug']:
raise usage.UsageError("You must specify --debug when using "
"--nopm ")
failure.DO_POST_MORTEM = False
def _initialDebugSetup(config):
# do this part of debug setup first for easy debugging of import failures
if config['debug']:
failure.startDebugMode()
if config['debug'] or config['debug-stacktraces']:
defer.setDebugging(True)
def _getSuite(config):
loader = _getLoader(config)
recurse = not config['no-recurse']
return loader.loadByNames(config['tests'], recurse)
def _getLoader(config):
loader = runner.TestLoader()
if config['random']:
randomer = random.Random()
randomer.seed(config['random'])
loader.sorter = lambda x : randomer.random()
print('Running tests shuffled with seed %d\n' % config['random'])
elif config['order']:
_, sorter = _runOrders[config['order']]
loader.sorter = sorter
if not config['until-failure']:
loader.suiteFactory = runner.DestructiveTestSuite
return loader
def _wrappedPdb():
"""
Wrap an instance of C{pdb.Pdb} with readline support and load any .rcs.
"""
dbg = pdb.Pdb()
try:
import readline
except ImportError:
print("readline module not available")
sys.exc_clear()
for path in ('.pdbrc', 'pdbrc'):
if os.path.exists(path):
try:
rcFile = file(path, 'r')
except IOError:
sys.exc_clear()
else:
dbg.rcLines.extend(rcFile.readlines())
return dbg
class _DebuggerNotFound(Exception):
"""
A debugger import failed.
Used to allow translating these errors into usage error messages.
"""
def _makeRunner(config):
"""
Return a trial runner class set up with the parameters extracted from
C{config}.
@return: A trial runner instance.
@rtype: L{runner.TrialRunner} or C{DistTrialRunner} depending on the
configuration.
"""
cls = runner.TrialRunner
args = {'reporterFactory': config['reporter'],
'tracebackFormat': config['tbformat'],
'realTimeErrors': config['rterrors'],
'uncleanWarnings': config['unclean-warnings'],
'logfile': config['logfile'],
'workingDirectory': config['temp-directory']}
if config['dry-run']:
args['mode'] = runner.TrialRunner.DRY_RUN
elif config['jobs']:
from twisted.trial._dist.disttrial import DistTrialRunner
cls = DistTrialRunner
args['workerNumber'] = config['jobs']
args['workerArguments'] = config._getWorkerArguments()
else:
if config['debug']:
args['mode'] = runner.TrialRunner.DEBUG
debugger = config['debugger']
if debugger != 'pdb':
try:
args['debugger'] = reflect.namedAny(debugger)
except reflect.ModuleNotFound:
raise _DebuggerNotFound(
'%r debugger could not be found.' % (debugger,))
else:
args['debugger'] = _wrappedPdb()
args['exitFirst'] = config['exitfirst']
args['profile'] = config['profile']
args['forceGarbageCollection'] = config['force-gc']
return cls(**args)
def run():
if len(sys.argv) == 1:
sys.argv.append("--help")
config = Options()
try:
config.parseOptions()
except usage.error, ue:
raise SystemExit, "%s: %s" % (sys.argv[0], ue)
_initialDebugSetup(config)
try:
trialRunner = _makeRunner(config)
except _DebuggerNotFound as e:
raise SystemExit('%s: %s' % (sys.argv[0], str(e)))
suite = _getSuite(config)
if config['until-failure']:
test_result = trialRunner.runUntilFailure(suite)
else:
test_result = trialRunner.run(suite)
if config.tracer:
sys.settrace(None)
results = config.tracer.results()
results.write_results(show_missing=1, summary=False,
coverdir=config.coverdir().path)
sys.exit(not test_result.wasSuccessful())

View file

@ -0,0 +1,30 @@
# -*- test-case-name: twisted.test.test_twistd -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
The Twisted Daemon: platform-independent interface.
@author: Christopher Armstrong
"""
from twisted.application import app
from twisted.python.runtime import platformType
if platformType == "win32":
from twisted.scripts._twistw import ServerOptions, \
WindowsApplicationRunner as _SomeApplicationRunner
else:
from twisted.scripts._twistd_unix import ServerOptions, \
UnixApplicationRunner as _SomeApplicationRunner
def runApp(config):
_SomeApplicationRunner(config).run()
def run():
app.run(runApp, ServerOptions)
__all__ = ['run', 'runApp']