update Linux_x86_64

This commit is contained in:
j 2015-11-04 12:58:41 +01:00
commit e7ebbedd38
336 changed files with 34353 additions and 21020 deletions

View file

@ -1,29 +1,39 @@
from sys import platform
from functools import wraps, partial
from itertools import count
from itertools import count, chain
from weakref import WeakValueDictionary
from errno import errorcode
from six import text_type as _text_type
from six import binary_type as _binary_type
from six import integer_types as integer_types
from six import int2byte, indexbytes
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
exception_from_error_queue as _exception_from_error_queue,
native as _native)
native as _native,
text_to_bytes_and_warn as _text_to_bytes_and_warn,
path_string as _path_string,
UNSPECIFIED as _UNSPECIFIED,
)
from OpenSSL.crypto import (
FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store)
_unspecified = object()
try:
_memoryview = memoryview
except NameError:
class _memoryview(object):
pass
try:
_buffer = buffer
except NameError:
class _buffer(object):
pass
OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER
SSLEAY_VERSION = _lib.SSLEAY_VERSION
SSLEAY_CFLAGS = _lib.SSLEAY_CFLAGS
@ -81,7 +91,10 @@ except AttributeError:
OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU
OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE
OP_NO_TICKET = _lib.SSL_OP_NO_TICKET
try:
OP_NO_TICKET = _lib.SSL_OP_NO_TICKET
except AttributeError:
pass
OP_ALL = _lib.SSL_OP_ALL
@ -121,7 +134,6 @@ SSL_CB_CONNECT_EXIT = _lib.SSL_CB_CONNECT_EXIT
SSL_CB_HANDSHAKE_START = _lib.SSL_CB_HANDSHAKE_START
SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE
class Error(Exception):
"""
An error occurred in an `OpenSSL.SSL` API.
@ -156,11 +168,42 @@ class SysCallError(Error):
pass
class _CallbackExceptionHelper(object):
"""
A base class for wrapper classes that allow for intelligent exception
handling in OpenSSL callbacks.
class _VerifyHelper(object):
def __init__(self, connection, callback):
:ivar list _problems: Any exceptions that occurred while executing in a
context where they could not be raised in the normal way. Typically
this is because OpenSSL has called into some Python code and requires a
return value. The exceptions are saved to be raised later when it is
possible to do so.
"""
def __init__(self):
self._problems = []
def raise_if_problem(self):
"""
Raise an exception from the OpenSSL error queue or that was previously
captured whe running a callback.
"""
if self._problems:
try:
_raise_current_error()
except Error:
pass
raise self._problems.pop(0)
class _VerifyHelper(_CallbackExceptionHelper):
"""
Wrap a callback such that it can be used as a certificate verification
callback.
"""
def __init__(self, callback):
_CallbackExceptionHelper.__init__(self)
@wraps(callback)
def wrapper(ok, store_ctx):
cert = X509.__new__(X509)
@ -168,6 +211,10 @@ class _VerifyHelper(object):
error_number = _lib.X509_STORE_CTX_get_error(store_ctx)
error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx)
index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx()
ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index)
connection = Connection._reverse_mapping[ssl]
try:
result = callback(connection, cert, error_number, error_depth, ok)
except Exception as e:
@ -184,14 +231,142 @@ class _VerifyHelper(object):
"int (*)(int, X509_STORE_CTX *)", wrapper)
def raise_if_problem(self):
if self._problems:
try:
_raise_current_error()
except Error:
pass
raise self._problems.pop(0)
class _NpnAdvertiseHelper(_CallbackExceptionHelper):
"""
Wrap a callback such that it can be used as an NPN advertisement callback.
"""
def __init__(self, callback):
_CallbackExceptionHelper.__init__(self)
@wraps(callback)
def wrapper(ssl, out, outlen, arg):
try:
conn = Connection._reverse_mapping[ssl]
protos = callback(conn)
# Join the protocols into a Python bytestring, length-prefixing
# each element.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Save our callback arguments on the connection object. This is
# done to make sure that they don't get freed before OpenSSL
# uses them. Then, return them appropriately in the output
# parameters.
conn._npn_advertise_callback_args = [
_ffi.new("unsigned int *", len(protostr)),
_ffi.new("unsigned char[]", protostr),
]
outlen[0] = conn._npn_advertise_callback_args[0][0]
out[0] = conn._npn_advertise_callback_args[1]
return 0
except Exception as e:
self._problems.append(e)
return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
self.callback = _ffi.callback(
"int (*)(SSL *, const unsigned char **, unsigned int *, void *)",
wrapper
)
class _NpnSelectHelper(_CallbackExceptionHelper):
"""
Wrap a callback such that it can be used as an NPN selection callback.
"""
def __init__(self, callback):
_CallbackExceptionHelper.__init__(self)
@wraps(callback)
def wrapper(ssl, out, outlen, in_, inlen, arg):
try:
conn = Connection._reverse_mapping[ssl]
# The string passed to us is actually made up of multiple
# length-prefixed bytestrings. We need to split that into a
# list.
instr = _ffi.buffer(in_, inlen)[:]
protolist = []
while instr:
l = indexbytes(instr, 0)
proto = instr[1:l+1]
protolist.append(proto)
instr = instr[l+1:]
# Call the callback
outstr = callback(conn, protolist)
# Save our callback arguments on the connection object. This is
# done to make sure that they don't get freed before OpenSSL
# uses them. Then, return them appropriately in the output
# parameters.
conn._npn_select_callback_args = [
_ffi.new("unsigned char *", len(outstr)),
_ffi.new("unsigned char[]", outstr),
]
outlen[0] = conn._npn_select_callback_args[0][0]
out[0] = conn._npn_select_callback_args[1]
return 0
except Exception as e:
self._problems.append(e)
return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
self.callback = _ffi.callback(
"int (*)(SSL *, unsigned char **, unsigned char *, "
"const unsigned char *, unsigned int, void *)",
wrapper
)
class _ALPNSelectHelper(_CallbackExceptionHelper):
"""
Wrap a callback such that it can be used as an ALPN selection callback.
"""
def __init__(self, callback):
_CallbackExceptionHelper.__init__(self)
@wraps(callback)
def wrapper(ssl, out, outlen, in_, inlen, arg):
try:
conn = Connection._reverse_mapping[ssl]
# The string passed to us is made up of multiple
# length-prefixed bytestrings. We need to split that into a
# list.
instr = _ffi.buffer(in_, inlen)[:]
protolist = []
while instr:
encoded_len = indexbytes(instr, 0)
proto = instr[1:encoded_len + 1]
protolist.append(proto)
instr = instr[encoded_len + 1:]
# Call the callback
outstr = callback(conn, protolist)
if not isinstance(outstr, _binary_type):
raise TypeError("ALPN callback must return a bytestring.")
# Save our callback arguments on the connection object to make
# sure that they don't get freed before OpenSSL can use them.
# Then, return them in the appropriate output parameters.
conn._alpn_select_callback_args = [
_ffi.new("unsigned char *", len(outstr)),
_ffi.new("unsigned char[]", outstr),
]
outlen[0] = conn._alpn_select_callback_args[0][0]
out[0] = conn._alpn_select_callback_args[1]
return 0
except Exception as e:
self._problems.append(e)
return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
self.callback = _ffi.callback(
"int (*)(SSL *, unsigned char **, unsigned char *, "
"const unsigned char *, unsigned int, void *)",
wrapper
)
def _asFileDescriptor(obj):
@ -223,6 +398,37 @@ def SSLeay_version(type):
return _ffi.string(_lib.SSLeay_version(type))
def _requires_npn(func):
"""
Wraps any function that requires NPN support in OpenSSL, ensuring that
NotImplementedError is raised if NPN is not present.
"""
@wraps(func)
def wrapper(*args, **kwargs):
if not _lib.Cryptography_HAS_NEXTPROTONEG:
raise NotImplementedError("NPN not available.")
return func(*args, **kwargs)
return wrapper
def _requires_alpn(func):
"""
Wraps any function that requires ALPN support in OpenSSL, ensuring that
NotImplementedError is raised if ALPN support is not present.
"""
@wraps(func)
def wrapper(*args, **kwargs):
if not _lib.Cryptography_HAS_ALPN:
raise NotImplementedError("ALPN not available.")
return func(*args, **kwargs)
return wrapper
class Session(object):
pass
@ -235,6 +441,7 @@ class Context(object):
new SSL connections.
"""
_methods = {
SSLv2_METHOD: "SSLv2_method",
SSLv3_METHOD: "SSLv3_method",
SSLv23_METHOD: "SSLv23_method",
TLSv1_METHOD: "TLSv1_method",
@ -280,6 +487,12 @@ class Context(object):
self._info_callback = None
self._tlsext_servername_callback = None
self._app_data = None
self._npn_advertise_helper = None
self._npn_advertise_callback = None
self._npn_select_helper = None
self._npn_select_callback = None
self._alpn_select_helper = None
self._alpn_select_callback = None
# SSL_CTX_set_app_data(self->ctx, self);
# SSL_CTX_set_mode(self->ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |
@ -293,19 +506,22 @@ class Context(object):
Let SSL know where we can find trusted certificates for the certificate
chain
:param cafile: In which file we can find the certificates
:param cafile: In which file we can find the certificates (``bytes`` or
``unicode``).
:param capath: In which directory we can find the certificates
(``bytes`` or ``unicode``).
:return: None
"""
if cafile is None:
cafile = _ffi.NULL
elif not isinstance(cafile, bytes):
raise TypeError("cafile must be None or a byte string")
else:
cafile = _path_string(cafile)
if capath is None:
capath = _ffi.NULL
elif not isinstance(capath, bytes):
raise TypeError("capath must be None or a byte string")
else:
capath = _path_string(capath)
load_result = _lib.SSL_CTX_load_verify_locations(self._context, cafile, capath)
if not load_result:
@ -355,15 +571,12 @@ class Context(object):
"""
Load a certificate chain from a file
:param certfile: The name of the certificate chain file
:param certfile: The name of the certificate chain file (``bytes`` or
``unicode``).
:return: None
"""
if isinstance(certfile, _text_type):
# Perhaps sys.getfilesystemencoding() could be better?
certfile = certfile.encode("utf-8")
if not isinstance(certfile, bytes):
raise TypeError("certfile must be bytes or unicode")
certfile = _path_string(certfile)
result = _lib.SSL_CTX_use_certificate_chain_file(self._context, certfile)
if not result:
@ -374,15 +587,13 @@ class Context(object):
"""
Load a certificate from a file
:param certfile: The name of the certificate file
:param certfile: The name of the certificate file (``bytes`` or
``unicode``).
:param filetype: (optional) The encoding of the file, default is PEM
:return: None
"""
if isinstance(certfile, _text_type):
# Perhaps sys.getfilesystemencoding() could be better?
certfile = certfile.encode("utf-8")
if not isinstance(certfile, bytes):
raise TypeError("certfile must be bytes or unicode")
certfile = _path_string(certfile)
if not isinstance(filetype, integer_types):
raise TypeError("filetype must be an integer")
@ -432,22 +643,18 @@ class Context(object):
raise exception
def use_privatekey_file(self, keyfile, filetype=_unspecified):
def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED):
"""
Load a private key from a file
:param keyfile: The name of the key file
:param keyfile: The name of the key file (``bytes`` or ``unicode``)
:param filetype: (optional) The encoding of the file, default is PEM
:return: None
"""
if isinstance(keyfile, _text_type):
# Perhaps sys.getfilesystemencoding() could be better?
keyfile = keyfile.encode("utf-8")
keyfile = _path_string(keyfile)
if not isinstance(keyfile, bytes):
raise TypeError("keyfile must be a byte string")
if filetype is _unspecified:
if filetype is _UNSPECIFIED:
filetype = FILETYPE_PEM
elif not isinstance(filetype, integer_types):
raise TypeError("filetype must be an integer")
@ -479,6 +686,9 @@ class Context(object):
:return: None (raises an exception if something's wrong)
"""
if not _lib.SSL_CTX_check_private_key(self._context):
_raise_current_error()
def load_client_ca(self, cafile):
"""
@ -538,7 +748,7 @@ class Context(object):
if not callable(callback):
raise TypeError("callback must be callable")
self._verify_helper = _VerifyHelper(self, callback)
self._verify_helper = _VerifyHelper(callback)
self._verify_callback = self._verify_helper.callback
_lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
@ -578,11 +788,12 @@ class Context(object):
"""
Load parameters for Ephemeral Diffie-Hellman
:param dhfile: The file to load EDH parameters from
:param dhfile: The file to load EDH parameters from (``bytes`` or
``unicode``).
:return: None
"""
if not isinstance(dhfile, bytes):
raise TypeError("dhfile must be a byte string")
dhfile = _path_string(dhfile)
bio = _lib.BIO_new_file(dhfile, b"r")
if bio == _ffi.NULL:
@ -594,6 +805,19 @@ class Context(object):
_lib.SSL_CTX_set_tmp_dh(self._context, dh)
def set_tmp_ecdh(self, curve):
"""
Select a curve to use for ECDHE key exchange.
:param curve: A curve object to use as returned by either
:py:meth:`OpenSSL.crypto.get_elliptic_curve` or
:py:meth:`OpenSSL.crypto.get_elliptic_curves`.
:return: None
"""
_lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY())
def set_cipher_list(self, cipher_list):
"""
Change the cipher list
@ -783,6 +1007,79 @@ class Context(object):
_lib.SSL_CTX_set_tlsext_servername_callback(
self._context, self._tlsext_servername_callback)
@_requires_npn
def set_npn_advertise_callback(self, callback):
"""
Specify a callback function that will be called when offering `Next
Protocol Negotiation
<https://technotes.googlecode.com/git/nextprotoneg.html>`_ as a server.
:param callback: The callback function. It will be invoked with one
argument, the Connection instance. It should return a list of
bytestrings representing the advertised protocols, like
``[b'http/1.1', b'spdy/2']``.
"""
self._npn_advertise_helper = _NpnAdvertiseHelper(callback)
self._npn_advertise_callback = self._npn_advertise_helper.callback
_lib.SSL_CTX_set_next_protos_advertised_cb(
self._context, self._npn_advertise_callback, _ffi.NULL)
@_requires_npn
def set_npn_select_callback(self, callback):
"""
Specify a callback function that will be called when a server offers
Next Protocol Negotiation options.
:param callback: The callback function. It will be invoked with two
arguments: the Connection, and a list of offered protocols as
bytestrings, e.g. ``[b'http/1.1', b'spdy/2']``. It should return
one of those bytestrings, the chosen protocol.
"""
self._npn_select_helper = _NpnSelectHelper(callback)
self._npn_select_callback = self._npn_select_helper.callback
_lib.SSL_CTX_set_next_proto_select_cb(
self._context, self._npn_select_callback, _ffi.NULL)
@_requires_alpn
def set_alpn_protos(self, protos):
"""
Specify the clients ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len)
@_requires_alpn
def set_alpn_select_callback(self, callback):
"""
Set the callback to handle ALPN protocol choice.
:param callback: The callback function. It will be invoked with two
arguments: the Connection, and a list of offered protocols as
bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It should return
one of those bytestrings, the chosen protocol.
"""
self._alpn_select_helper = _ALPNSelectHelper(callback)
self._alpn_select_callback = self._alpn_select_helper.callback
_lib.SSL_CTX_set_alpn_select_cb(
self._context, self._alpn_select_callback, _ffi.NULL)
ContextType = Context
@ -807,6 +1104,19 @@ class Connection(object):
self._ssl = _ffi.gc(ssl, _lib.SSL_free)
self._context = context
# References to strings used for Next Protocol Negotiation. OpenSSL's
# header files suggest that these might get copied at some point, but
# doesn't specify when, so we store them here to make sure they don't
# get freed before OpenSSL uses them.
self._npn_advertise_callback_args = None
self._npn_select_callback_args = None
# References to strings used for Application Layer Protocol
# Negotiation. These strings get copied at some point but it's well
# after the callback returns, so we have to hang them somewhere to
# avoid them getting freed.
self._alpn_select_callback_args = None
self._reverse_mapping[self._ssl] = self
if socket is None:
@ -841,6 +1151,12 @@ class Connection(object):
def _raise_ssl_error(self, ssl, result):
if self._context._verify_helper is not None:
self._context._verify_helper.raise_if_problem()
if self._context._npn_advertise_helper is not None:
self._context._npn_advertise_helper.raise_if_problem()
if self._context._npn_select_helper is not None:
self._context._npn_select_helper.raise_if_problem()
if self._context._alpn_select_helper is not None:
self._context._alpn_select_helper.raise_if_problem()
error = _lib.SSL_get_error(ssl, result)
if error == _lib.SSL_ERROR_WANT_READ:
@ -859,7 +1175,7 @@ class Connection(object):
errno = _ffi.getwinerror()[0]
else:
errno = _ffi.errno
raise SysCallError(errno, errorcode[errno])
raise SysCallError(errno, errorcode.get(errno))
else:
raise SysCallError(-1, "Unexpected EOF")
else:
@ -936,15 +1252,20 @@ class Connection(object):
WantWrite or WantX509Lookup exceptions on this, you have to call the
method again with the SAME buffer.
:param buf: The string to send
:param buf: The string, buffer or memoryview to send
:param flags: (optional) Included for compatibility with the socket
API, the value is ignored
:return: The number of bytes written
"""
# Backward compatibility
buf = _text_to_bytes_and_warn("buf", buf)
if isinstance(buf, _memoryview):
buf = buf.tobytes()
if isinstance(buf, _buffer):
buf = str(buf)
if not isinstance(buf, bytes):
raise TypeError("data must be a byte string")
raise TypeError("data must be a memoryview, buffer or byte string")
result = _lib.SSL_write(self._ssl, buf, len(buf))
self._raise_ssl_error(self._ssl, result)
@ -958,15 +1279,19 @@ class Connection(object):
all data is sent. If an error occurs, it's impossible to tell how much
data has been sent.
:param buf: The string to send
:param buf: The string, buffer or memoryview to send
:param flags: (optional) Included for compatibility with the socket
API, the value is ignored
:return: The number of bytes written
"""
buf = _text_to_bytes_and_warn("buf", buf)
if isinstance(buf, _memoryview):
buf = buf.tobytes()
if isinstance(buf, _buffer):
buf = str(buf)
if not isinstance(buf, bytes):
raise TypeError("buf must be a byte string")
raise TypeError("buf must be a memoryview, buffer or byte string")
left_to_send = len(buf)
total_sent = 0
@ -997,6 +1322,45 @@ class Connection(object):
read = recv
def recv_into(self, buffer, nbytes=None, flags=None):
"""
Receive data on the connection and store the data into a buffer rather
than creating a new string.
:param buffer: The buffer to copy into.
:param nbytes: (optional) The maximum number of bytes to read into the
buffer. If not present, defaults to the size of the buffer. If
larger than the size of the buffer, is reduced to the size of the
buffer.
:param flags: (optional) Included for compatibility with the socket
API, the value is ignored.
:return: The number of bytes read into the buffer.
"""
if nbytes is None:
nbytes = len(buffer)
else:
nbytes = min(nbytes, len(buffer))
# We need to create a temporary buffer. This is annoying, it would be
# better if we could pass memoryviews straight into the SSL_read call,
# but right now we can't. Revisit this if CFFI gets that ability.
buf = _ffi.new("char[]", nbytes)
result = _lib.SSL_read(self._ssl, buf, nbytes)
self._raise_ssl_error(self._ssl, result)
# This strange line is all to avoid a memory copy. The buffer protocol
# should allow us to assign a CFFI buffer to the LHS of this line, but
# on CPython 3.3+ that segfaults. As a workaround, we can temporarily
# wrap it in a memoryview, except on Python 2.6 which doesn't have a
# memoryview type.
try:
buffer[:result] = memoryview(_ffi.buffer(buf, result))
except NameError:
buffer[:result] = _ffi.buffer(buf, result)
return result
def _handle_bio_errors(self, bio, result):
if _lib.BIO_should_retry(bio):
if _lib.BIO_should_read(bio):
@ -1046,6 +1410,8 @@ class Connection(object):
:param buf: The string to put into the memory BIO.
:return: The number of bytes written
"""
buf = _text_to_bytes_and_warn("buf", buf)
if self._into_ssl is None:
raise TypeError("Connection sock was not None")
@ -1153,8 +1519,7 @@ class Connection(object):
"""
result = _lib.SSL_shutdown(self._ssl)
if result < 0:
# TODO: This is untested.
_raise_current_error()
self._raise_ssl_error(self._ssl, result)
elif result > 0:
return True
else:
@ -1210,7 +1575,7 @@ class Connection(object):
The makefile() method is not implemented, since there is no dup semantics
for SSL connections
:raise NotImplementedError
:raise: NotImplementedError
"""
raise NotImplementedError("Cannot make file object of OpenSSL.SSL.Connection")
@ -1416,6 +1781,166 @@ class Connection(object):
if not result:
_raise_current_error()
def _get_finished_message(self, function):
"""
Helper to implement :py:meth:`get_finished` and
:py:meth:`get_peer_finished`.
:param function: Either :py:data:`SSL_get_finished`: or
:py:data:`SSL_get_peer_finished`.
:return: :py:data:`None` if the desired message has not yet been
received, otherwise the contents of the message.
:rtype: :py:class:`bytes` or :py:class:`NoneType`
"""
# The OpenSSL documentation says nothing about what might happen if the
# count argument given is zero. Specifically, it doesn't say whether
# the output buffer may be NULL in that case or not. Inspection of the
# implementation reveals that it calls memcpy() unconditionally.
# Section 7.1.4, paragraph 1 of the C standard suggests that
# memcpy(NULL, source, 0) is not guaranteed to produce defined (let
# alone desirable) behavior (though it probably does on just about
# every implementation...)
#
# Allocate a tiny buffer to pass in (instead of just passing NULL as
# one might expect) for the initial call so as to be safe against this
# potentially undefined behavior.
empty = _ffi.new("char[]", 0)
size = function(self._ssl, empty, 0)
if size == 0:
# No Finished message so far.
return None
buf = _ffi.new("char[]", size)
function(self._ssl, buf, size)
return _ffi.buffer(buf, size)[:]
def get_finished(self):
"""
Obtain the latest `handshake finished` message sent to the peer.
:return: The contents of the message or :py:obj:`None` if the TLS
handshake has not yet completed.
:rtype: :py:class:`bytes` or :py:class:`NoneType`
"""
return self._get_finished_message(_lib.SSL_get_finished)
def get_peer_finished(self):
"""
Obtain the latest `handshake finished` message received from the peer.
:return: The contents of the message or :py:obj:`None` if the TLS
handshake has not yet completed.
:rtype: :py:class:`bytes` or :py:class:`NoneType`
"""
return self._get_finished_message(_lib.SSL_get_peer_finished)
def get_cipher_name(self):
"""
Obtain the name of the currently used cipher.
:returns: The name of the currently used cipher or :py:obj:`None`
if no connection has been established.
:rtype: :py:class:`unicode` or :py:class:`NoneType`
"""
cipher = _lib.SSL_get_current_cipher(self._ssl)
if cipher == _ffi.NULL:
return None
else:
name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher))
return name.decode("utf-8")
def get_cipher_bits(self):
"""
Obtain the number of secret bits of the currently used cipher.
:returns: The number of secret bits of the currently used cipher
or :py:obj:`None` if no connection has been established.
:rtype: :py:class:`int` or :py:class:`NoneType`
"""
cipher = _lib.SSL_get_current_cipher(self._ssl)
if cipher == _ffi.NULL:
return None
else:
return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL)
def get_cipher_version(self):
"""
Obtain the protocol version of the currently used cipher.
:returns: The protocol name of the currently used cipher
or :py:obj:`None` if no connection has been established.
:rtype: :py:class:`unicode` or :py:class:`NoneType`
"""
cipher = _lib.SSL_get_current_cipher(self._ssl)
if cipher == _ffi.NULL:
return None
else:
version =_ffi.string(_lib.SSL_CIPHER_get_version(cipher))
return version.decode("utf-8")
@_requires_npn
def get_next_proto_negotiated(self):
"""
Get the protocol that was negotiated by NPN.
"""
data = _ffi.new("unsigned char **")
data_len = _ffi.new("unsigned int *")
_lib.SSL_get0_next_proto_negotiated(self._ssl, data, data_len)
return _ffi.buffer(data[0], data_len[0])[:]
@_requires_alpn
def set_alpn_protos(self, protos):
"""
Specify the client's ALPN protocol list.
These protocols are offered to the server during protocol negotiation.
:param protos: A list of the protocols to be offered to the server.
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b''.join(
chain.from_iterable((int2byte(len(p)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
# because OpenSSL immediately copies the data out.
input_str = _ffi.new("unsigned char[]", protostr)
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
def get_alpn_proto_negotiated(self):
"""
Get the protocol that was negotiated by ALPN.
"""
if not _lib.Cryptography_HAS_ALPN:
raise NotImplementedError("ALPN not available")
data = _ffi.new("unsigned char **")
data_len = _ffi.new("unsigned int *")
_lib.SSL_get0_alpn_selected(self._ssl, data, data_len)
if not data_len:
return b''
return _ffi.buffer(data[0], data_len[0])[:]
ConnectionType = Connection
# This is similar to the initialization calls at the end of OpenSSL/crypto.py

View file

@ -1,3 +1,6 @@
from warnings import warn
import sys
from six import PY3, binary_type, text_type
from cryptography.hazmat.bindings.openssl.binding import Binding
@ -5,11 +8,34 @@ binding = Binding()
ffi = binding.ffi
lib = binding.lib
def exception_from_error_queue(exceptionType):
def text(charp):
return native(ffi.string(charp))
def text(charp):
"""
Get a native string type representing of the given CFFI ``char*`` object.
:param charp: A C-style string represented using CFFI.
:return: :class:`str`
"""
if not charp:
return ""
return native(ffi.string(charp))
def exception_from_error_queue(exception_type):
"""
Convert an OpenSSL library failure into a Python exception.
When a call to the native OpenSSL library fails, this is usually signalled
by the return value, and an error code is stored in an error queue
associated with the current thread. The err library provides functions to
obtain these error codes and textual error messages.
"""
errors = []
while True:
error = lib.ERR_get_error()
if error == 0:
@ -19,7 +45,7 @@ def exception_from_error_queue(exceptionType):
text(lib.ERR_func_error_string(error)),
text(lib.ERR_reason_error_string(error))))
raise exceptionType(errors)
raise exception_type(errors)
@ -45,9 +71,57 @@ def native(s):
def path_string(s):
"""
Convert a Python string to a :py:class:`bytes` string identifying the same
path and which can be passed into an OpenSSL API accepting a filename.
:param s: An instance of :py:class:`bytes` or :py:class:`unicode`.
:return: An instance of :py:class:`bytes`.
"""
if isinstance(s, binary_type):
return s
elif isinstance(s, text_type):
return s.encode(sys.getfilesystemencoding())
else:
raise TypeError("Path must be represented as bytes or unicode string")
if PY3:
def byte_string(s):
return s.encode("charmap")
else:
def byte_string(s):
return s
# A marker object to observe whether some optional arguments are passed any
# value or not.
UNSPECIFIED = object()
_TEXT_WARNING = (
text_type.__name__ + " for {0} is no longer accepted, use bytes"
)
def text_to_bytes_and_warn(label, obj):
"""
If ``obj`` is text, emit a warning that it should be bytes instead and try
to convert it to bytes automatically.
:param str label: The name of the parameter from which ``obj`` was taken
(so a developer can easily find the source of the problem and correct
it).
:return: If ``obj`` is the text string type, a ``bytes`` object giving the
UTF-8 encoding of that text is returned. Otherwise, ``obj`` itself is
returned.
"""
if isinstance(obj, text_type):
warn(
_TEXT_WARNING.format(label),
category=DeprecationWarning,
stacklevel=3
)
return obj.encode('utf-8')
return obj

View file

@ -2,17 +2,22 @@ from time import time
from base64 import b16encode
from functools import partial
from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__
from warnings import warn as _warn
from six import (
integer_types as _integer_types,
text_type as _text_type)
text_type as _text_type,
PY3 as _PY3)
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
exception_from_error_queue as _exception_from_error_queue,
byte_string as _byte_string,
native as _native)
native as _native,
UNSPECIFIED as _UNSPECIFIED,
text_to_bytes_and_warn as _text_to_bytes_and_warn,
)
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
@ -24,6 +29,7 @@ TYPE_RSA = _lib.EVP_PKEY_RSA
TYPE_DSA = _lib.EVP_PKEY_DSA
class Error(Exception):
"""
An error occurred in an `OpenSSL.crypto` API.
@ -32,6 +38,8 @@ class Error(Exception):
_raise_current_error = partial(_exception_from_error_queue, Error)
def _untested_error(where):
"""
An OpenSSL API failed somehow. Additionally, the failure which was
@ -263,6 +271,156 @@ PKeyType = PKey
class _EllipticCurve(object):
"""
A representation of a supported elliptic curve.
@cvar _curves: :py:obj:`None` until an attempt is made to load the curves.
Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve`
instances each of which represents one curve supported by the system.
@type _curves: :py:type:`NoneType` or :py:type:`set`
"""
_curves = None
if _PY3:
# This only necessary on Python 3. Morever, it is broken on Python 2.
def __ne__(self, other):
"""
Implement cooperation with the right-hand side argument of ``!=``.
Python 3 seems to have dropped this cooperation in this very narrow
circumstance.
"""
if isinstance(other, _EllipticCurve):
return super(_EllipticCurve, self).__ne__(other)
return NotImplemented
@classmethod
def _load_elliptic_curves(cls, lib):
"""
Get the curves supported by OpenSSL.
:param lib: The OpenSSL library binding object.
:return: A :py:type:`set` of ``cls`` instances giving the names of the
elliptic curves the underlying library supports.
"""
if lib.Cryptography_HAS_EC:
num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0)
builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves)
# The return value on this call should be num_curves again. We could
# check it to make sure but if it *isn't* then.. what could we do?
# Abort the whole process, I suppose...? -exarkun
lib.EC_get_builtin_curves(builtin_curves, num_curves)
return set(
cls.from_nid(lib, c.nid)
for c in builtin_curves)
return set()
@classmethod
def _get_elliptic_curves(cls, lib):
"""
Get, cache, and return the curves supported by OpenSSL.
:param lib: The OpenSSL library binding object.
:return: A :py:type:`set` of ``cls`` instances giving the names of the
elliptic curves the underlying library supports.
"""
if cls._curves is None:
cls._curves = cls._load_elliptic_curves(lib)
return cls._curves
@classmethod
def from_nid(cls, lib, nid):
"""
Instantiate a new :py:class:`_EllipticCurve` associated with the given
OpenSSL NID.
:param lib: The OpenSSL library binding object.
:param nid: The OpenSSL NID the resulting curve object will represent.
This must be a curve NID (and not, for example, a hash NID) or
subsequent operations will fail in unpredictable ways.
:type nid: :py:class:`int`
:return: The curve object.
"""
return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii"))
def __init__(self, lib, nid, name):
"""
:param _lib: The :py:mod:`cryptography` binding instance used to
interface with OpenSSL.
:param _nid: The OpenSSL NID identifying the curve this object
represents.
:type _nid: :py:class:`int`
:param name: The OpenSSL short name identifying the curve this object
represents.
:type name: :py:class:`unicode`
"""
self._lib = lib
self._nid = nid
self.name = name
def __repr__(self):
return "<Curve %r>" % (self.name,)
def _to_EC_KEY(self):
"""
Create a new OpenSSL EC_KEY structure initialized to use this curve.
The structure is automatically garbage collected when the Python object
is garbage collected.
"""
key = self._lib.EC_KEY_new_by_curve_name(self._nid)
return _ffi.gc(key, _lib.EC_KEY_free)
def get_elliptic_curves():
"""
Return a set of objects representing the elliptic curves supported in the
OpenSSL build in use.
The curve objects have a :py:class:`unicode` ``name`` attribute by which
they identify themselves.
The curve objects are useful as values for the argument accepted by
:py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be
used for ECDHE key exchange.
"""
return _EllipticCurve._get_elliptic_curves(_lib)
def get_elliptic_curve(name):
"""
Return a single curve object selected by name.
See :py:func:`get_elliptic_curves` for information about curve objects.
:param name: The OpenSSL short name identifying the curve object to
retrieve.
:type name: :py:class:`unicode`
If the named curve is not supported then :py:class:`ValueError` is raised.
"""
for curve in get_elliptic_curves():
if curve.name == name:
return curve
raise ValueError("unknown curve name", name)
class X509Name(object):
def __init__(self, name):
"""
@ -697,6 +855,21 @@ class X509Req(object):
_raise_current_error()
def get_extensions(self):
"""
Get extensions to the request.
:return: A :py:class:`list` of :py:class:`X509Extension` objects.
"""
exts = []
native_exts_obj = _lib.X509_REQ_get_extensions(self._req)
for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)):
ext = X509Extension.__new__(X509Extension)
ext._extension = _lib.sk_X509_EXTENSION_value(native_exts_obj, i)
exts.append(ext)
return exts
def sign(self, pkey, digest):
"""
Sign the certificate request using the supplied key and digest
@ -1190,6 +1363,125 @@ class X509Store(object):
X509StoreType = X509Store
class X509StoreContextError(Exception):
"""
An error occurred while verifying a certificate using
`OpenSSL.X509StoreContext.verify_certificate`.
:ivar certificate: The certificate which caused verificate failure.
:type cert: :class:`X509`
"""
def __init__(self, message, certificate):
super(X509StoreContextError, self).__init__(message)
self.certificate = certificate
class X509StoreContext(object):
"""
An X.509 store context.
An :py:class:`X509StoreContext` is used to define some of the criteria for
certificate verification. The information encapsulated in this object
includes, but is not limited to, a set of trusted certificates,
verification parameters, and revoked certificates.
Of these, only the set of trusted certificates is currently exposed.
:ivar _store_ctx: The underlying X509_STORE_CTX structure used by this
instance. It is dynamically allocated and automatically garbage
collected.
:ivar _store: See the ``store`` ``__init__`` parameter.
:ivar _cert: See the ``certificate`` ``__init__`` parameter.
"""
def __init__(self, store, certificate):
"""
:param X509Store store: The certificates which will be trusted for the
purposes of any verifications.
:param X509 certificate: The certificate to be verified.
"""
store_ctx = _lib.X509_STORE_CTX_new()
self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
self._store = store
self._cert = certificate
# Make the store context available for use after instantiating this
# class by initializing it now. Per testing, subsequent calls to
# :py:meth:`_init` have no adverse affect.
self._init()
def _init(self):
"""
Set up the store context for a subsequent verification operation.
"""
ret = _lib.X509_STORE_CTX_init(self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL)
if ret <= 0:
_raise_current_error()
def _cleanup(self):
"""
Internally cleans up the store context.
The store context can then be reused with a new call to
:py:meth:`_init`.
"""
_lib.X509_STORE_CTX_cleanup(self._store_ctx)
def _exception_from_context(self):
"""
Convert an OpenSSL native context error failure into a Python
exception.
When a call to native OpenSSL X509_verify_cert fails, additonal information
about the failure can be obtained from the store context.
"""
errors = [
_lib.X509_STORE_CTX_get_error(self._store_ctx),
_lib.X509_STORE_CTX_get_error_depth(self._store_ctx),
_native(_ffi.string(_lib.X509_verify_cert_error_string(
_lib.X509_STORE_CTX_get_error(self._store_ctx)))),
]
# A context error should always be associated with a certificate, so we
# expect this call to never return :class:`None`.
_x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx)
_cert = _lib.X509_dup(_x509)
pycert = X509.__new__(X509)
pycert._x509 = _ffi.gc(_cert, _lib.X509_free)
return X509StoreContextError(errors, pycert)
def set_store(self, store):
"""
Set the context's trust store.
:param X509Store store: The certificates which will be trusted for the
purposes of any *future* verifications.
"""
self._store = store
def verify_certificate(self):
"""
Verify a certificate in a context.
:param store_ctx: The :py:class:`X509StoreContext` to verify.
:raises: Error
"""
# Always re-initialize the store context in case
# :py:meth:`verify_certificate` is called multiple times.
self._init()
ret = _lib.X509_verify_cert(self._store_ctx)
self._cleanup()
if ret <= 0:
raise self._exception_from_context()
def load_certificate(type, buffer):
"""
@ -1308,9 +1600,11 @@ def _X509_REVOKED_dup(original):
_raise_current_error()
if original.serialNumber != _ffi.NULL:
_lib.ASN1_INTEGER_free(copy.serialNumber)
copy.serialNumber = _lib.ASN1_INTEGER_dup(original.serialNumber)
if original.revocationDate != _ffi.NULL:
_lib.ASN1_TIME_free(copy.revocationDate)
copy.revocationDate = _lib.M_ASN1_TIME_dup(original.revocationDate)
if original.extensions != _ffi.NULL:
@ -1539,7 +1833,8 @@ class CRL(object):
_raise_current_error()
def export(self, cert, key, type=FILETYPE_PEM, days=100):
def export(self, cert, key, type=FILETYPE_PEM, days=100,
digest=_UNSPECIFIED):
"""
export a CRL as a string
@ -1549,12 +1844,15 @@ class CRL(object):
:param key: Used to sign CRL.
:type key: :class:`PKey`
:param type: The export format, either :py:data:`FILETYPE_PEM`, :py:data:`FILETYPE_ASN1`, or :py:data:`FILETYPE_TEXT`.
:param type: The export format, either :py:data:`FILETYPE_PEM`,
:py:data:`FILETYPE_ASN1`, or :py:data:`FILETYPE_TEXT`.
:param days: The number of days until the next update of this CRL.
:type days: :py:data:`int`
:param int days: The number of days until the next update of this CRL.
:return: :py:data:`str`
:param bytes digest: The name of the message digest to use (eg
``b"sha1"``).
:return: :py:data:`bytes`
"""
if not isinstance(cert, X509):
raise TypeError("cert must be an X509 instance")
@ -1563,6 +1861,19 @@ class CRL(object):
if not isinstance(type, int):
raise TypeError("type must be an integer")
if digest is _UNSPECIFIED:
_warn(
"The default message digest (md5) is deprecated. "
"Pass the name of a message digest explicitly.",
category=DeprecationWarning,
stacklevel=2,
)
digest = b"md5"
digest_obj = _lib.EVP_get_digestbyname(digest)
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
bio = _lib.BIO_new(_lib.BIO_s_mem())
if bio == _ffi.NULL:
# TODO: This is untested.
@ -1582,7 +1893,7 @@ class CRL(object):
_lib.X509_CRL_set_issuer_name(self._crl, _lib.X509_get_subject_name(cert._x509))
sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, _lib.EVP_md5())
sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj)
if not sign_result:
_raise_current_error()
@ -1729,7 +2040,7 @@ class PKCS12(object):
def set_ca_certificates(self, cacerts):
"""
Replace or set the CA certificates withing the PKCS12 object.
Replace or set the CA certificates within the PKCS12 object.
:param cacerts: The new CA certificates.
:type cacerts: :py:data:`None` or an iterable of :py:class:`X509`
@ -1784,6 +2095,8 @@ class PKCS12(object):
:return: The string containing the PKCS12
"""
passphrase = _text_to_bytes_and_warn("passphrase", passphrase)
if self._cacerts is None:
cacerts = _ffi.NULL
else:
@ -2081,6 +2394,8 @@ def sign(pkey, data, digest):
:param digest: message digest to use
:return: signature
"""
data = _text_to_bytes_and_warn("data", data)
digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@ -2115,6 +2430,8 @@ def verify(cert, signature, data, digest):
:param digest: message digest to use
:return: None if the signature is correct, raise exception otherwise
"""
data = _text_to_bytes_and_warn("data", data)
digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@ -2136,7 +2453,6 @@ def verify(cert, signature, data, digest):
_raise_current_error()
def load_crl(type, buffer):
"""
Load a certificate revocation list from a buffer
@ -2183,7 +2499,7 @@ def load_pkcs7_data(type, buffer):
if type == FILETYPE_PEM:
pkcs7 = _lib.PEM_read_bio_PKCS7(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
elif type == FILETYPE_ASN1:
pass
pkcs7 = _lib.d2i_PKCS7_bio(bio, _ffi.NULL)
else:
# TODO: This is untested.
_raise_current_error()
@ -2198,7 +2514,7 @@ def load_pkcs7_data(type, buffer):
def load_pkcs12(buffer, passphrase):
def load_pkcs12(buffer, passphrase=None):
"""
Load a PKCS12 object from a buffer
@ -2206,11 +2522,20 @@ def load_pkcs12(buffer, passphrase):
:param passphrase: (Optional) The password to decrypt the PKCS12 lump
:returns: The PKCS12 object
"""
passphrase = _text_to_bytes_and_warn("passphrase", passphrase)
if isinstance(buffer, _text_type):
buffer = buffer.encode("ascii")
bio = _new_mem_buf(buffer)
# Use null passphrase if passphrase is None or empty string. With PKCS#12
# password based encryption no password and a zero length password are two
# different things, but OpenSSL implementation will try both to figure out
# which one works.
if not passphrase:
passphrase = _ffi.NULL
p12 = _lib.d2i_PKCS12_bio(bio, _ffi.NULL)
if p12 == _ffi.NULL:
_raise_current_error()

View file

@ -11,7 +11,8 @@ from six import integer_types as _integer_types
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
exception_from_error_queue as _exception_from_error_queue)
exception_from_error_queue as _exception_from_error_queue,
path_string as _path_string)
class Error(Exception):
@ -131,13 +132,13 @@ def load_file(filename, maxbytes=_unspecified):
"""
Seed the PRNG with data from a file
:param filename: The file to read data from
:param maxbytes: (optional) The number of bytes to read, default is
to read the entire file
:param filename: The file to read data from (``bytes`` or ``unicode``).
:param maxbytes: (optional) The number of bytes to read, default is to read
the entire file
:return: The number of bytes read
"""
if not isinstance(filename, _builtin_bytes):
raise TypeError("filename must be a string")
filename = _path_string(filename)
if maxbytes is _unspecified:
maxbytes = -1
@ -152,12 +153,11 @@ def write_file(filename):
"""
Save PRNG state to a file
:param filename: The file to write data to
:param filename: The file to write data to (``bytes`` or ``unicode``).
:return: The number of bytes written
"""
if not isinstance(filename, _builtin_bytes):
raise TypeError("filename must be a string")
filename = _path_string(filename)
return _lib.RAND_write_file(filename)

View file

@ -6,16 +6,22 @@ Unit tests for :py:mod:`OpenSSL.crypto`.
"""
from unittest import main
from warnings import catch_warnings, simplefilter
import os, re
import base64
import os
import re
from subprocess import PIPE, Popen
from datetime import datetime, timedelta
from six import binary_type
from six import u, b, binary_type, PY3
from warnings import simplefilter
from warnings import catch_warnings
from OpenSSL.crypto import TYPE_RSA, TYPE_DSA, Error, PKey, PKeyType
from OpenSSL.crypto import X509, X509Type, X509Name, X509NameType
from OpenSSL.crypto import X509Store, X509StoreType, X509Req, X509ReqType
from OpenSSL.crypto import X509Store, X509StoreType, X509StoreContext, X509StoreContextError
from OpenSSL.crypto import X509Req, X509ReqType
from OpenSSL.crypto import X509Extension, X509ExtensionType
from OpenSSL.crypto import load_certificate, load_privatekey
from OpenSSL.crypto import FILETYPE_PEM, FILETYPE_ASN1, FILETYPE_TEXT
@ -25,9 +31,12 @@ from OpenSSL.crypto import PKCS7Type, load_pkcs7_data
from OpenSSL.crypto import PKCS12, PKCS12Type, load_pkcs12
from OpenSSL.crypto import CRL, Revoked, load_crl
from OpenSSL.crypto import NetscapeSPKI, NetscapeSPKIType
from OpenSSL.crypto import sign, verify
from OpenSSL.test.util import TestCase, b
from OpenSSL._util import native
from OpenSSL.crypto import (
sign, verify, get_elliptic_curve, get_elliptic_curves)
from OpenSSL.test.util import (
EqualityTestsMixin, TestCase, WARNING_TYPE_EXPECTED
)
from OpenSSL._util import native, lib
def normalize_certificate_pem(pem):
return dump_certificate(FILETYPE_PEM, load_certificate(FILETYPE_PEM, pem))
@ -80,6 +89,40 @@ cbvAhow217X9V0dVerEOKxnNYspXRrh36h7k4mQA+sDq
-----END RSA PRIVATE KEY-----
""")
intermediate_cert_pem = b("""-----BEGIN CERTIFICATE-----
MIICVzCCAcCgAwIBAgIRAMPzhm6//0Y/g2pmnHR2C4cwDQYJKoZIhvcNAQENBQAw
WDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAklMMRAwDgYDVQQHEwdDaGljYWdvMRAw
DgYDVQQKEwdUZXN0aW5nMRgwFgYDVQQDEw9UZXN0aW5nIFJvb3QgQ0EwHhcNMTQw
ODI4MDIwNDA4WhcNMjQwODI1MDIwNDA4WjBmMRUwEwYDVQQDEwxpbnRlcm1lZGlh
dGUxDDAKBgNVBAoTA29yZzERMA8GA1UECxMIb3JnLXVuaXQxCzAJBgNVBAYTAlVT
MQswCQYDVQQIEwJDQTESMBAGA1UEBxMJU2FuIERpZWdvMIGfMA0GCSqGSIb3DQEB
AQUAA4GNADCBiQKBgQDYcEQw5lfbEQRjr5Yy4yxAHGV0b9Al+Lmu7wLHMkZ/ZMmK
FGIbljbviiD1Nz97Oh2cpB91YwOXOTN2vXHq26S+A5xe8z/QJbBsyghMur88CjdT
21H2qwMa+r5dCQwEhuGIiZ3KbzB/n4DTMYI5zy4IYPv0pjxShZn4aZTCCK2IUwID
AQABoxMwETAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBDQUAA4GBAPIWSkLX
QRMApOjjyC+tMxumT5e2pMqChHmxobQK4NMdrf2VCx+cRT6EmY8sK3/Xl/X8UBQ+
9n5zXb1ZwhW/sTWgUvmOceJ4/XVs9FkdWOOn1J0XBch9ZIiFe/s5ASIgG7fUdcUF
9mAWS6FK2ca3xIh5kIupCXOFa0dPvlw/YUFT
-----END CERTIFICATE-----
""")
intermediate_key_pem = b("""-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQDYcEQw5lfbEQRjr5Yy4yxAHGV0b9Al+Lmu7wLHMkZ/ZMmKFGIb
ljbviiD1Nz97Oh2cpB91YwOXOTN2vXHq26S+A5xe8z/QJbBsyghMur88CjdT21H2
qwMa+r5dCQwEhuGIiZ3KbzB/n4DTMYI5zy4IYPv0pjxShZn4aZTCCK2IUwIDAQAB
AoGAfSZVV80pSeOKHTYfbGdNY/jHdU9eFUa/33YWriXU+77EhpIItJjkRRgivIfo
rhFJpBSGmDLblaqepm8emsXMeH4+2QzOYIf0QGGP6E6scjTt1PLqdqKfVJ1a2REN
147cujNcmFJb/5VQHHMpaPTgttEjlzuww4+BCDPsVRABWrkCQQD3loH36nLoQTtf
+kQq0T6Bs9/UWkTAGo0ND81ALj0F8Ie1oeZg6RNT96RxZ3aVuFTESTv6/TbjWywO
wdzlmV1vAkEA38rTJ6PTwaJlw5OttdDzAXGPB9tDmzh9oSi7cHwQQXizYd8MBYx4
sjHUKD3dCQnb1dxJFhd3BT5HsnkRMbVZXQJAbXduH17ZTzcIOXc9jHDXYiFVZV5D
52vV0WCbLzVCZc3jMrtSUKa8lPN5EWrdU3UchWybyG0MR5mX8S5lrF4SoQJAIyUD
DBKaSqpqONCUUx1BTFS9FYrFjzbL4+c1qHCTTPTblt8kUCrDOZjBrKAqeiTmNSum
/qUot9YUBF8m6BuGsQJATHHmdFy/fG1VLkyBp49CAa8tN3Z5r/CgTznI4DfMTf4C
NbRHn2UmYlwQBa+L5lg9phewNe8aEwpPyPLoV85U8Q==
-----END RSA PRIVATE KEY-----
""")
server_cert_pem = b("""-----BEGIN CERTIFICATE-----
MIICKDCCAZGgAwIBAgIJAJn/HpR21r/8MA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UEBxMHQ2hpY2FnbzEQMA4GA1UEChMH
@ -113,6 +156,40 @@ r50+LF74iLXFwqysVCebPKMOpDWp/qQ1BbJQIPs7/A==
-----END RSA PRIVATE KEY-----
"""))
intermediate_server_cert_pem = b("""-----BEGIN CERTIFICATE-----
MIICWDCCAcGgAwIBAgIRAPQFY9jfskSihdiNSNdt6GswDQYJKoZIhvcNAQENBQAw
ZjEVMBMGA1UEAxMMaW50ZXJtZWRpYXRlMQwwCgYDVQQKEwNvcmcxETAPBgNVBAsT
CG9yZy11bml0MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVNh
biBEaWVnbzAeFw0xNDA4MjgwMjEwNDhaFw0yNDA4MjUwMjEwNDhaMG4xHTAbBgNV
BAMTFGludGVybWVkaWF0ZS1zZXJ2aWNlMQwwCgYDVQQKEwNvcmcxETAPBgNVBAsT
CG9yZy11bml0MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVNh
biBEaWVnbzCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAqpJZygd+w1faLOr1
iOAmbBhx5SZWcTCZ/ZjHQTJM7GuPT624QkqsixFghRKdDROwpwnAP7gMRukLqiy4
+kRuGT5OfyGggL95i2xqA+zehjj08lSTlvGHpePJgCyTavIy5+Ljsj4DKnKyuhxm
biXTRrH83NDgixVkObTEmh/OVK0CAwEAATANBgkqhkiG9w0BAQ0FAAOBgQBa0Npw
UkzjaYEo1OUE1sTI6Mm4riTIHMak4/nswKh9hYup//WVOlr/RBSBtZ7Q/BwbjobN
3bfAtV7eSAqBsfxYXyof7G1ALANQERkq3+oyLP1iVt08W1WOUlIMPhdCF/QuCwy6
x9MJLhUCGLJPM+O2rAPWVD9wCmvq10ALsiH3yA==
-----END CERTIFICATE-----
""")
intermediate_server_key_pem = b("""-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCqklnKB37DV9os6vWI4CZsGHHlJlZxMJn9mMdBMkzsa49PrbhC
SqyLEWCFEp0NE7CnCcA/uAxG6QuqLLj6RG4ZPk5/IaCAv3mLbGoD7N6GOPTyVJOW
8Yel48mALJNq8jLn4uOyPgMqcrK6HGZuJdNGsfzc0OCLFWQ5tMSaH85UrQIDAQAB
AoGAIQ594j5zna3/9WaPsTgnmhlesVctt4AAx/n827DA4ayyuHFlXUuVhtoWR5Pk
5ezj9mtYW8DyeCegABnsu2vZni/CdvU6uiS1Hv6qM1GyYDm9KWgovIP9rQCDSGaz
d57IWVGxx7ODFkm3gN5nxnSBOFVHytuW1J7FBRnEsehRroECQQDXHFOv82JuXDcz
z3+4c74IEURdOHcbycxlppmK9kFqm5lsUdydnnGW+mvwDk0APOB7Wg7vyFyr393e
dpmBDCzNAkEAyv6tVbTKUYhSjW+QhabJo896/EqQEYUmtMXxk4cQnKeR/Ao84Rkf
EqD5IykMUfUI0jJU4DGX+gWZ10a7kNbHYQJAVFCuHNFxS4Cpwo0aqtnzKoZaHY/8
X9ABZfafSHCtw3Op92M+7ikkrOELXdS9KdKyyqbKJAKNEHF3LbOfB44WIQJAA2N4
9UNNVUsXRbElEnYUS529CdUczo4QdVgQjkvk5RiPAUwSdBd9Q0xYnFOlFwEmIowg
ipWJWe0aAlP18ZcEQQJBAL+5lekZ/GUdQoZ4HAsN5a9syrzavJ9VvU1KOOPorPZK
nMRZbbQgP+aSB7yl6K0gaLaZ8XaK0pjxNBh6ASqg9f4=
-----END RSA PRIVATE KEY-----
""")
client_cert_pem = b("""-----BEGIN CERTIFICATE-----
MIICJjCCAY+gAwIBAgIJAKxpFI5lODkjMA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UEBxMHQ2hpY2FnbzEQMA4GA1UEChMH
@ -247,6 +324,27 @@ Ho4EzbYCOaEAMQA=
-----END PKCS7-----
""")
pkcs7DataASN1 = base64.b64decode(b"""
MIIDNwYJKoZIhvcNAQcCoIIDKDCCAyQCAQExADALBgkqhkiG9w0BBwGgggMKMIID
BjCCAm+gAwIBAgIBATANBgkqhkiG9w0BAQQFADB7MQswCQYDVQQGEwJTRzERMA8G
A1UEChMITTJDcnlwdG8xFDASBgNVBAsTC00yQ3J5cHRvIENBMSQwIgYDVQQDExtN
MkNyeXB0byBDZXJ0aWZpY2F0ZSBNYXN0ZXIxHTAbBgkqhkiG9w0BCQEWDm5ncHNA
cG9zdDEuY29tMB4XDTAwMDkxMDA5NTEzMFoXDTAyMDkxMDA5NTEzMFowUzELMAkG
A1UEBhMCU0cxETAPBgNVBAoTCE0yQ3J5cHRvMRIwEAYDVQQDEwlsb2NhbGhvc3Qx
HTAbBgkqhkiG9w0BCQEWDm5ncHNAcG9zdDEuY29tMFwwDQYJKoZIhvcNAQEBBQAD
SwAwSAJBAKy+e3dulvXzV7zoTZWc5TzgApr8DmeQHTYC8ydfzH7EECe4R1Xh5kwI
zOuuFfn178FBiS84gngaNcrFi0Z5fAkCAwEAAaOCAQQwggEAMAkGA1UdEwQCMAAw
LAYJYIZIAYb4QgENBB8WHU9wZW5TU0wgR2VuZXJhdGVkIENlcnRpZmljYXRlMB0G
A1UdDgQWBBTPhIKSvnsmYsBVNWjj0m3M2z0qVTCBpQYDVR0jBIGdMIGagBT7hyNp
65w6kxXlxb8pUU/+7Sg4AaF/pH0wezELMAkGA1UEBhMCU0cxETAPBgNVBAoTCE0y
Q3J5cHRvMRQwEgYDVQQLEwtNMkNyeXB0byBDQTEkMCIGA1UEAxMbTTJDcnlwdG8g
Q2VydGlmaWNhdGUgTWFzdGVyMR0wGwYJKoZIhvcNAQkBFg5uZ3BzQHBvc3QxLmNv
bYIBADANBgkqhkiG9w0BAQQFAAOBgQA7/CqT6PoHycTdhEStWNZde7M/2Yc6BoJu
VwnW8YxGO8Sn6UJ4FeffZNcYZddSDKosw8LtPOeWoK3JINjAk5jiPQ2cww++7QGG
/g5NDjxFZNDJP1dGiLAxPW6JXwov4v0FmdzfLOZ01jDcgQQZqEpYlgpuI5JEWUQ9
Ho4EzbYCOaEAMQA=
""")
crlData = b("""\
-----BEGIN X509 CRL-----
MIIBWzCBxTANBgkqhkiG9w0BAQQFADBYMQswCQYDVQQGEwJVUzELMAkGA1UECBMC
@ -512,7 +610,7 @@ class X509ExtTests(TestCase):
def test_issuer(self):
"""
If an extension requires a issuer, the :py:data:`issuer` parameter to
If an extension requires an issuer, the :py:data:`issuer` parameter to
:py:class:`X509Extension` provides its value.
"""
ext2 = X509Extension(
@ -1108,7 +1206,32 @@ class X509ReqTests(TestCase, _PKeyInteractionTestsMixin):
request = X509Req()
request.add_extensions([
X509Extension(b('basicConstraints'), True, b('CA:false'))])
# XXX Add get_extensions so the rest of this unit test can be written.
exts = request.get_extensions()
self.assertEqual(len(exts), 1)
self.assertEqual(exts[0].get_short_name(), b('basicConstraints'))
self.assertEqual(exts[0].get_critical(), 1)
self.assertEqual(exts[0].get_data(), b('0\x00'))
def test_get_extensions(self):
"""
:py:obj:`X509Req.get_extensions` returns a :py:obj:`list` of
extensions added to this X509 request.
"""
request = X509Req()
exts = request.get_extensions()
self.assertEqual(exts, [])
request.add_extensions([
X509Extension(b('basicConstraints'), True, b('CA:true')),
X509Extension(b('keyUsage'), False, b('digitalSignature'))])
exts = request.get_extensions()
self.assertEqual(len(exts), 2)
self.assertEqual(exts[0].get_short_name(), b('basicConstraints'))
self.assertEqual(exts[0].get_critical(), 1)
self.assertEqual(exts[0].get_data(), b('0\x03\x01\x01\xff'))
self.assertEqual(exts[1].get_short_name(), b('keyUsage'))
self.assertEqual(exts[1].get_critical(), 0)
self.assertEqual(exts[1].get_data(), b('\x03\x02\x07\x80'))
def test_add_extensions_wrong_args(self):
@ -1163,7 +1286,7 @@ class X509ReqTests(TestCase, _PKeyInteractionTestsMixin):
def test_verify_success(self):
"""
:py:obj:`X509Req.verify` returns :py:obj:`True` if called with a
:py:obj:`OpenSSL.crypto.PKey` which represents the public part ofthe key
:py:obj:`OpenSSL.crypto.PKey` which represents the public part of the key
which signed the request.
"""
request = X509Req()
@ -1915,6 +2038,21 @@ class PKCS12Tests(TestCase):
self.assertEqual(recovered_cert[-len(ca):], ca)
def verify_pkcs12_container(self, p12):
"""
Verify that the PKCS#12 container contains the correct client
certificate and private key.
:param p12: The PKCS12 instance to verify.
:type p12: :py:class:`PKCS12`
"""
cert_pem = dump_certificate(FILETYPE_PEM, p12.get_certificate())
key_pem = dump_privatekey(FILETYPE_PEM, p12.get_privatekey())
self.assertEqual(
(client_cert_pem, client_key_pem, None),
(cert_pem, key_pem, p12.get_ca_certificates()))
def test_load_pkcs12(self):
"""
A PKCS12 string generated using the openssl command line can be loaded
@ -1924,14 +2062,95 @@ class PKCS12Tests(TestCase):
pem = client_key_pem + client_cert_pem
p12_str = _runopenssl(
pem, b"pkcs12", b"-export", b"-clcerts", b"-passout", b"pass:" + passwd)
p12 = load_pkcs12(p12_str, passwd)
# verify
self.assertTrue(isinstance(p12, PKCS12))
cert_pem = dump_certificate(FILETYPE_PEM, p12.get_certificate())
self.assertEqual(cert_pem, client_cert_pem)
key_pem = dump_privatekey(FILETYPE_PEM, p12.get_privatekey())
self.assertEqual(key_pem, client_key_pem)
self.assertEqual(None, p12.get_ca_certificates())
p12 = load_pkcs12(p12_str, passphrase=passwd)
self.verify_pkcs12_container(p12)
def test_load_pkcs12_text_passphrase(self):
"""
A PKCS12 string generated using the openssl command line can be loaded
with :py:obj:`load_pkcs12` and its components extracted and examined.
Using text as passphrase instead of bytes. DeprecationWarning expected.
"""
pem = client_key_pem + client_cert_pem
passwd = b"whatever"
p12_str = _runopenssl(pem, b"pkcs12", b"-export", b"-clcerts",
b"-passout", b"pass:" + passwd)
with catch_warnings(record=True) as w:
simplefilter("always")
p12 = load_pkcs12(p12_str, passphrase=b"whatever".decode("ascii"))
self.assertEqual(
"{0} for passphrase is no longer accepted, use bytes".format(
WARNING_TYPE_EXPECTED
),
str(w[-1].message)
)
self.assertIs(w[-1].category, DeprecationWarning)
self.verify_pkcs12_container(p12)
def test_load_pkcs12_no_passphrase(self):
"""
A PKCS12 string generated using openssl command line can be loaded with
:py:obj:`load_pkcs12` without a passphrase and its components extracted
and examined.
"""
pem = client_key_pem + client_cert_pem
p12_str = _runopenssl(
pem, b"pkcs12", b"-export", b"-clcerts", b"-passout", b"pass:")
p12 = load_pkcs12(p12_str)
self.verify_pkcs12_container(p12)
def _dump_and_load(self, dump_passphrase, load_passphrase):
"""
A helper method to dump and load a PKCS12 object.
"""
p12 = self.gen_pkcs12(client_cert_pem, client_key_pem)
dumped_p12 = p12.export(passphrase=dump_passphrase, iter=2, maciter=3)
return load_pkcs12(dumped_p12, passphrase=load_passphrase)
def test_load_pkcs12_null_passphrase_load_empty(self):
"""
A PKCS12 string can be dumped with a null passphrase, loaded with an
empty passphrase with :py:obj:`load_pkcs12`, and its components
extracted and examined.
"""
self.verify_pkcs12_container(
self._dump_and_load(dump_passphrase=None, load_passphrase=b''))
def test_load_pkcs12_null_passphrase_load_null(self):
"""
A PKCS12 string can be dumped with a null passphrase, loaded with a
null passphrase with :py:obj:`load_pkcs12`, and its components
extracted and examined.
"""
self.verify_pkcs12_container(
self._dump_and_load(dump_passphrase=None, load_passphrase=None))
def test_load_pkcs12_empty_passphrase_load_empty(self):
"""
A PKCS12 string can be dumped with an empty passphrase, loaded with an
empty passphrase with :py:obj:`load_pkcs12`, and its components
extracted and examined.
"""
self.verify_pkcs12_container(
self._dump_and_load(dump_passphrase=b'', load_passphrase=b''))
def test_load_pkcs12_empty_passphrase_load_null(self):
"""
A PKCS12 string can be dumped with an empty passphrase, loaded with a
null passphrase with :py:obj:`load_pkcs12`, and its components
extracted and examined.
"""
self.verify_pkcs12_container(
self._dump_and_load(dump_passphrase=b'', load_passphrase=None))
def test_load_pkcs12_garbage(self):
@ -2073,6 +2292,26 @@ class PKCS12Tests(TestCase):
dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd=b"")
def test_export_without_bytes(self):
"""
Test :py:obj:`PKCS12.export` with text not bytes as passphrase
"""
p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
with catch_warnings(record=True) as w:
simplefilter("always")
dumped_p12 = p12.export(passphrase=b"randomtext".decode("ascii"))
self.assertEqual(
"{0} for passphrase is no longer accepted, use bytes".format(
WARNING_TYPE_EXPECTED
),
str(w[-1].message)
)
self.assertIs(w[-1].category, DeprecationWarning)
self.check_recovery(
dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd=b"randomtext")
def test_key_cert_mismatch(self):
"""
:py:obj:`PKCS12.export` raises an exception when a key and certificate
@ -2463,7 +2702,7 @@ class FunctionTests(TestCase):
dump_privatekey, FILETYPE_PEM, key, GOOD_CIPHER, cb)
def test_load_pkcs7_data(self):
def test_load_pkcs7_data_pem(self):
"""
:py:obj:`load_pkcs7_data` accepts a PKCS#7 string and returns an instance of
:py:obj:`PKCS7Type`.
@ -2472,6 +2711,15 @@ class FunctionTests(TestCase):
self.assertTrue(isinstance(pkcs7, PKCS7Type))
def test_load_pkcs7_data_asn1(self):
"""
:py:obj:`load_pkcs7_data` accepts a bytes containing ASN1 data
representing PKCS#7 and returns an instance of :py:obj`PKCS7Type`.
"""
pkcs7 = load_pkcs7_data(FILETYPE_ASN1, pkcs7DataASN1)
self.assertTrue(isinstance(pkcs7, PKCS7Type))
def test_load_pkcs7_data_invalid(self):
"""
If the data passed to :py:obj:`load_pkcs7_data` is invalid,
@ -2796,11 +3044,9 @@ class CRLTests(TestCase):
self.assertRaises(TypeError, CRL, None)
def test_export(self):
def _get_crl(self):
"""
Use python to create a simple CRL with a revocation, and export
the CRL in formats of PEM, DER and text. Those outputs are verified
with the openssl program.
Get a new ``CRL`` with a revocation.
"""
crl = CRL()
revoked = Revoked()
@ -2809,26 +3055,110 @@ class CRLTests(TestCase):
revoked.set_serial(b('3ab'))
revoked.set_reason(b('sUpErSeDEd'))
crl.add_revoked(revoked)
return crl
def test_export_pem(self):
"""
If not passed a format, ``CRL.export`` returns a "PEM" format string
representing a serial number, a revoked reason, and certificate issuer
information.
"""
crl = self._get_crl()
# PEM format
dumped_crl = crl.export(self.cert, self.pkey, days=20)
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
# These magic values are based on the way the CRL above was constructed
# and with what certificate it was exported.
text.index(b('Serial Number: 03AB'))
text.index(b('Superseded'))
text.index(b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA'))
text.index(
b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA')
)
def test_export_der(self):
"""
If passed ``FILETYPE_ASN1`` for the format, ``CRL.export`` returns a
"DER" format string representing a serial number, a revoked reason, and
certificate issuer information.
"""
crl = self._get_crl()
# DER format
dumped_crl = crl.export(self.cert, self.pkey, FILETYPE_ASN1)
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text", b"-inform", b"DER")
text = _runopenssl(
dumped_crl, b"crl", b"-noout", b"-text", b"-inform", b"DER"
)
text.index(b('Serial Number: 03AB'))
text.index(b('Superseded'))
text.index(b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA'))
text.index(
b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA')
)
def test_export_text(self):
"""
If passed ``FILETYPE_TEXT`` for the format, ``CRL.export`` returns a
text format string like the one produced by the openssl command line
tool.
"""
crl = self._get_crl()
dumped_crl = crl.export(self.cert, self.pkey, FILETYPE_ASN1)
text = _runopenssl(
dumped_crl, b"crl", b"-noout", b"-text", b"-inform", b"DER"
)
# text format
dumped_text = crl.export(self.cert, self.pkey, type=FILETYPE_TEXT)
self.assertEqual(text, dumped_text)
def test_export_custom_digest(self):
"""
If passed the name of a digest function, ``CRL.export`` uses a
signature algorithm based on that digest function.
"""
crl = self._get_crl()
dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha1")
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
text.index(b('Signature Algorithm: sha1'))
def test_export_md5_digest(self):
"""
If passed md5 as the digest function, ``CRL.export`` uses md5 and does
not emit a deprecation warning.
"""
crl = self._get_crl()
with catch_warnings(record=True) as catcher:
simplefilter("always")
self.assertEqual(0, len(catcher))
dumped_crl = crl.export(self.cert, self.pkey, digest=b"md5")
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
text.index(b('Signature Algorithm: md5'))
def test_export_default_digest(self):
"""
If not passed the name of a digest function, ``CRL.export`` uses a
signature algorithm based on MD5 and emits a deprecation warning.
"""
crl = self._get_crl()
with catch_warnings(record=True) as catcher:
simplefilter("always")
dumped_crl = crl.export(self.cert, self.pkey)
self.assertEqual(
"The default message digest (md5) is deprecated. "
"Pass the name of a message digest explicitly.",
str(catcher[0].message),
)
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
text.index(b('Signature Algorithm: md5'))
def test_export_invalid(self):
"""
If :py:obj:`CRL.export` is used with an uninitialized :py:obj:`X509`
@ -2859,7 +3189,7 @@ class CRLTests(TestCase):
crl = CRL()
self.assertRaises(TypeError, crl.export)
self.assertRaises(TypeError, crl.export, self.cert)
self.assertRaises(TypeError, crl.export, self.cert, self.pkey, FILETYPE_PEM, 10, "foo")
self.assertRaises(TypeError, crl.export, self.cert, self.pkey, FILETYPE_PEM, 10, "md5", "foo")
self.assertRaises(TypeError, crl.export, None, self.pkey, FILETYPE_PEM, 10)
self.assertRaises(TypeError, crl.export, self.cert, None, FILETYPE_PEM, 10)
@ -2877,6 +3207,19 @@ class CRLTests(TestCase):
self.assertRaises(ValueError, crl.export, self.cert, self.pkey, 100, 10)
def test_export_unknown_digest(self):
"""
Calling :py:obj:`OpenSSL.CRL.export` with a unsupported digest results
in a :py:obj:`ValueError` being raised.
"""
crl = CRL()
self.assertRaises(
ValueError,
crl.export,
self.cert, self.pkey, FILETYPE_PEM, 10, b"strange-digest"
)
def test_get_revoked(self):
"""
Use python to create a simple CRL with two revocations.
@ -2977,6 +3320,107 @@ class CRLTests(TestCase):
class X509StoreContextTests(TestCase):
"""
Tests for :py:obj:`OpenSSL.crypto.X509StoreContext`.
"""
root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
intermediate_cert = load_certificate(FILETYPE_PEM, intermediate_cert_pem)
intermediate_server_cert = load_certificate(FILETYPE_PEM, intermediate_server_cert_pem)
def test_valid(self):
"""
:py:obj:`verify_certificate` returns ``None`` when called with a certificate
and valid chain.
"""
store = X509Store()
store.add_cert(self.root_cert)
store.add_cert(self.intermediate_cert)
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
self.assertEqual(store_ctx.verify_certificate(), None)
def test_reuse(self):
"""
:py:obj:`verify_certificate` can be called multiple times with the same
``X509StoreContext`` instance to produce the same result.
"""
store = X509Store()
store.add_cert(self.root_cert)
store.add_cert(self.intermediate_cert)
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
self.assertEqual(store_ctx.verify_certificate(), None)
self.assertEqual(store_ctx.verify_certificate(), None)
def test_trusted_self_signed(self):
"""
:py:obj:`verify_certificate` returns ``None`` when called with a self-signed
certificate and itself in the chain.
"""
store = X509Store()
store.add_cert(self.root_cert)
store_ctx = X509StoreContext(store, self.root_cert)
self.assertEqual(store_ctx.verify_certificate(), None)
def test_untrusted_self_signed(self):
"""
:py:obj:`verify_certificate` raises error when a self-signed certificate is
verified without itself in the chain.
"""
store = X509Store()
store_ctx = X509StoreContext(store, self.root_cert)
e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate)
self.assertEqual(e.args[0][2], 'self signed certificate')
self.assertEqual(e.certificate.get_subject().CN, 'Testing Root CA')
def test_invalid_chain_no_root(self):
"""
:py:obj:`verify_certificate` raises error when a root certificate is missing
from the chain.
"""
store = X509Store()
store.add_cert(self.intermediate_cert)
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate)
self.assertEqual(e.args[0][2], 'unable to get issuer certificate')
self.assertEqual(e.certificate.get_subject().CN, 'intermediate')
def test_invalid_chain_no_intermediate(self):
"""
:py:obj:`verify_certificate` raises error when an intermediate certificate is
missing from the chain.
"""
store = X509Store()
store.add_cert(self.root_cert)
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate)
self.assertEqual(e.args[0][2], 'unable to get local issuer certificate')
self.assertEqual(e.certificate.get_subject().CN, 'intermediate-service')
def test_modification_pre_verify(self):
"""
:py:obj:`verify_certificate` can use a store context modified after
instantiation.
"""
store_bad = X509Store()
store_bad.add_cert(self.intermediate_cert)
store_good = X509Store()
store_good.add_cert(self.root_cert)
store_good.add_cert(self.intermediate_cert)
store_ctx = X509StoreContext(store_bad, self.intermediate_server_cert)
e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate)
self.assertEqual(e.args[0][2], 'unable to get issuer certificate')
self.assertEqual(e.certificate.get_subject().CN, 'intermediate')
store_ctx.set_store(store_good)
self.assertEqual(store_ctx.verify_certificate(), None)
class SignVerifyTests(TestCase):
"""
Tests for :py:obj:`OpenSSL.crypto.sign` and :py:obj:`OpenSSL.crypto.verify`.
@ -3022,6 +3466,47 @@ class SignVerifyTests(TestCase):
ValueError, verify, good_cert, sig, content, "strange-digest")
def test_sign_verify_with_text(self):
"""
:py:obj:`sign` generates a cryptographic signature which :py:obj:`verify` can check.
Deprecation warnings raised because using text instead of bytes as content
"""
content = (
b"It was a bright cold day in April, and the clocks were striking "
b"thirteen. Winston Smith, his chin nuzzled into his breast in an "
b"effort to escape the vile wind, slipped quickly through the "
b"glass doors of Victory Mansions, though not quickly enough to "
b"prevent a swirl of gritty dust from entering along with him."
).decode("ascii")
priv_key = load_privatekey(FILETYPE_PEM, root_key_pem)
cert = load_certificate(FILETYPE_PEM, root_cert_pem)
for digest in ['md5', 'sha1']:
with catch_warnings(record=True) as w:
simplefilter("always")
sig = sign(priv_key, content, digest)
self.assertEqual(
"{0} for data is no longer accepted, use bytes".format(
WARNING_TYPE_EXPECTED
),
str(w[-1].message)
)
self.assertIs(w[-1].category, DeprecationWarning)
with catch_warnings(record=True) as w:
simplefilter("always")
verify(cert, sig, content, digest)
self.assertEqual(
"{0} for data is no longer accepted, use bytes".format(
WARNING_TYPE_EXPECTED
),
str(w[-1].message)
)
self.assertIs(w[-1].category, DeprecationWarning)
def test_sign_nulls(self):
"""
:py:obj:`sign` produces a signature for a string with embedded nulls.
@ -3033,5 +3518,154 @@ class SignVerifyTests(TestCase):
verify(good_cert, sig, content, "sha1")
class EllipticCurveTests(TestCase):
"""
Tests for :py:class:`_EllipticCurve`, :py:obj:`get_elliptic_curve`, and
:py:obj:`get_elliptic_curves`.
"""
def test_set(self):
"""
:py:obj:`get_elliptic_curves` returns a :py:obj:`set`.
"""
self.assertIsInstance(get_elliptic_curves(), set)
def test_some_curves(self):
"""
If :py:mod:`cryptography` has elliptic curve support then the set
returned by :py:obj:`get_elliptic_curves` has some elliptic curves in
it.
There could be an OpenSSL that violates this assumption. If so, this
test will fail and we'll find out.
"""
curves = get_elliptic_curves()
if lib.Cryptography_HAS_EC:
self.assertTrue(curves)
else:
self.assertFalse(curves)
def test_a_curve(self):
"""
:py:obj:`get_elliptic_curve` can be used to retrieve a particular
supported curve.
"""
curves = get_elliptic_curves()
if curves:
curve = next(iter(curves))
self.assertEqual(curve.name, get_elliptic_curve(curve.name).name)
else:
self.assertRaises(ValueError, get_elliptic_curve, u("prime256v1"))
def test_not_a_curve(self):
"""
:py:obj:`get_elliptic_curve` raises :py:class:`ValueError` if called
with a name which does not identify a supported curve.
"""
self.assertRaises(
ValueError, get_elliptic_curve, u("this curve was just invented"))
def test_repr(self):
"""
The string representation of a curve object includes simply states the
object is a curve and what its name is.
"""
curves = get_elliptic_curves()
if curves:
curve = next(iter(curves))
self.assertEqual("<Curve %r>" % (curve.name,), repr(curve))
def test_to_EC_KEY(self):
"""
The curve object can export a version of itself as an EC_KEY* via the
private :py:meth:`_EllipticCurve._to_EC_KEY`.
"""
curves = get_elliptic_curves()
if curves:
curve = next(iter(curves))
# It's not easy to assert anything about this object. However, see
# leakcheck/crypto.py for a test that demonstrates it at least does
# not leak memory.
curve._to_EC_KEY()
class EllipticCurveFactory(object):
"""
A helper to get the names of two curves.
"""
def __init__(self):
curves = iter(get_elliptic_curves())
try:
self.curve_name = next(curves).name
self.another_curve_name = next(curves).name
except StopIteration:
self.curve_name = self.another_curve_name = None
class EllipticCurveEqualityTests(TestCase, EqualityTestsMixin):
"""
Tests :py:type:`_EllipticCurve`\ 's implementation of ``==`` and ``!=``.
"""
curve_factory = EllipticCurveFactory()
if curve_factory.curve_name is None:
skip = "There are no curves available there can be no curve objects."
def anInstance(self):
"""
Get the curve object for an arbitrary curve supported by the system.
"""
return get_elliptic_curve(self.curve_factory.curve_name)
def anotherInstance(self):
"""
Get the curve object for an arbitrary curve supported by the system -
but not the one returned by C{anInstance}.
"""
return get_elliptic_curve(self.curve_factory.another_curve_name)
class EllipticCurveHashTests(TestCase):
"""
Tests for :py:type:`_EllipticCurve`\ 's implementation of hashing (thus use
as an item in a :py:type:`dict` or :py:type:`set`).
"""
curve_factory = EllipticCurveFactory()
if curve_factory.curve_name is None:
skip = "There are no curves available there can be no curve objects."
def test_contains(self):
"""
The ``in`` operator reports that a :py:type:`set` containing a curve
does contain that curve.
"""
curve = get_elliptic_curve(self.curve_factory.curve_name)
curves = set([curve])
self.assertIn(curve, curves)
def test_does_not_contain(self):
"""
The ``in`` operator reports that a :py:type:`set` not containing a
curve does not contain that curve.
"""
curve = get_elliptic_curve(self.curve_factory.curve_name)
curves = set([get_elliptic_curve(self.curve_factory.another_curve_name)])
self.assertNotIn(curve, curves)
if __name__ == '__main__':
main()

View file

@ -10,7 +10,7 @@ import os
import stat
import sys
from OpenSSL.test.util import TestCase, b
from OpenSSL.test.util import NON_ASCII, TestCase, b
from OpenSSL import rand
@ -176,27 +176,47 @@ class RandTests(TestCase):
self.assertRaises(TypeError, rand.write_file, None)
self.assertRaises(TypeError, rand.write_file, "foo", None)
def _read_write_test(self, path):
"""
Verify that ``rand.write_file`` and ``rand.load_file`` can be used.
"""
# Create the file so cleanup is more straightforward
with open(path, "w"):
pass
def test_files(self):
"""
Test reading and writing of files via rand functions.
"""
# Write random bytes to a file
tmpfile = self.mktemp()
# Make sure it exists (so cleanup definitely succeeds)
fObj = open(tmpfile, 'w')
fObj.close()
try:
rand.write_file(tmpfile)
# Write random bytes to a file
rand.write_file(path)
# Verify length of written file
size = os.stat(tmpfile)[stat.ST_SIZE]
size = os.stat(path)[stat.ST_SIZE]
self.assertEqual(1024, size)
# Read random bytes from file
rand.load_file(tmpfile)
rand.load_file(tmpfile, 4) # specify a length
rand.load_file(path)
rand.load_file(path, 4) # specify a length
finally:
# Cleanup
os.unlink(tmpfile)
os.unlink(path)
def test_bytes_paths(self):
"""
Random data can be saved and loaded to files with paths specified as
bytes.
"""
path = self.mktemp()
path += NON_ASCII.encode(sys.getfilesystemencoding())
self._read_write_test(path)
def test_unicode_paths(self):
"""
Random data can be saved and loaded to files with paths specified as
unicode.
"""
path = self.mktemp().decode('utf-8') + NON_ASCII
self._read_write_test(path)
if __name__ == '__main__':

View file

@ -0,0 +1,24 @@
# Copyright (C) Jean-Paul Calderone
# See LICENSE for details.
"""
Unit tests for :py:obj:`OpenSSL.tsafe`.
"""
from OpenSSL.SSL import TLSv1_METHOD, Context
from OpenSSL.tsafe import Connection
from OpenSSL.test.util import TestCase
class ConnectionTest(TestCase):
"""
Tests for :py:obj:`OpenSSL.tsafe.Connection`.
"""
def test_instantiation(self):
"""
:py:obj:`OpenSSL.tsafe.Connection` can be instantiated.
"""
# The following line should not throw an error. This isn't an ideal
# test. It would be great to refactor the other Connection tests so
# they could automatically be applied to this class too.
Connection(Context(TLSv1_METHOD), None)

View file

@ -0,0 +1,17 @@
from OpenSSL._util import exception_from_error_queue, lib
from OpenSSL.test.util import TestCase
class ErrorTests(TestCase):
"""
Tests for handling of certain OpenSSL error cases.
"""
def test_exception_from_error_queue_nonexistent_reason(self):
"""
:py:func:`exception_from_error_queue` raises ``ValueError`` when it
encounters an OpenSSL error code which does not have a reason string.
"""
lib.ERR_put_error(lib.ERR_LIB_EVP, 0, 1112, b"", 10)
exc = self.assertRaises(ValueError, exception_from_error_queue, ValueError)
self.assertEqual(exc.args[0][0][2], "")

View file

@ -14,6 +14,8 @@ from tempfile import mktemp
from unittest import TestCase
import sys
from six import PY3
from OpenSSL._util import exception_from_error_queue
from OpenSSL.crypto import Error
@ -25,6 +27,11 @@ except Exception:
from OpenSSL._util import ffi, lib, byte_string as b
# This is the UTF-8 encoding of the SNOWMAN unicode code point.
NON_ASCII = b("\xe2\x98\x83").decode("utf-8")
class TestCase(TestCase):
"""
:py:class:`TestCase` adds useful testing functionality beyond what is available
@ -210,7 +217,24 @@ class TestCase(TestCase):
return containee
assertIn = failUnlessIn
def failUnlessIdentical(self, first, second, msg=None):
def assertNotIn(self, containee, container, msg=None):
"""
Fail the test if C{containee} is found in C{container}.
@param containee: the value that should not be in C{container}
@param container: a sequence type, or in the case of a mapping type,
will follow semantics of 'if key in dict.keys()'
@param msg: if msg is None, then the failure message will be
'%r in %r' % (first, second)
"""
if containee in container:
raise self.failureException(msg or "%r in %r"
% (containee, container))
return containee
failIfIn = assertNotIn
def assertIs(self, first, second, msg=None):
"""
Fail the test if :py:data:`first` is not :py:data:`second`. This is an
obect-identity-equality test, not an object equality
@ -222,10 +246,10 @@ class TestCase(TestCase):
if first is not second:
raise self.failureException(msg or '%r is not %r' % (first, second))
return first
assertIdentical = failUnlessIdentical
assertIdentical = failUnlessIdentical = assertIs
def failIfIdentical(self, first, second, msg=None):
def assertIsNot(self, first, second, msg=None):
"""
Fail the test if :py:data:`first` is :py:data:`second`. This is an
obect-identity-equality test, not an object equality
@ -237,7 +261,7 @@ class TestCase(TestCase):
if first is second:
raise self.failureException(msg or '%r is %r' % (first, second))
return first
assertNotIdentical = failIfIdentical
assertNotIdentical = failIfIdentical = assertIsNot
def failUnlessRaises(self, exception, f, *args, **kwargs):
@ -300,3 +324,140 @@ class TestCase(TestCase):
self.assertTrue(isinstance(theType, type))
instance = theType(*constructionArgs)
self.assertIdentical(type(instance), theType)
class EqualityTestsMixin(object):
"""
A mixin defining tests for the standard implementation of C{==} and C{!=}.
"""
def anInstance(self):
"""
Return an instance of the class under test. Each call to this method
must return a different object. All objects returned must be equal to
each other.
"""
raise NotImplementedError()
def anotherInstance(self):
"""
Return an instance of the class under test. Each call to this method
must return a different object. The objects must not be equal to the
objects returned by C{anInstance}. They may or may not be equal to
each other (they will not be compared against each other).
"""
raise NotImplementedError()
def test_identicalEq(self):
"""
An object compares equal to itself using the C{==} operator.
"""
o = self.anInstance()
self.assertTrue(o == o)
def test_identicalNe(self):
"""
An object doesn't compare not equal to itself using the C{!=} operator.
"""
o = self.anInstance()
self.assertFalse(o != o)
def test_sameEq(self):
"""
Two objects that are equal to each other compare equal to each other
using the C{==} operator.
"""
a = self.anInstance()
b = self.anInstance()
self.assertTrue(a == b)
def test_sameNe(self):
"""
Two objects that are equal to each other do not compare not equal to
each other using the C{!=} operator.
"""
a = self.anInstance()
b = self.anInstance()
self.assertFalse(a != b)
def test_differentEq(self):
"""
Two objects that are not equal to each other do not compare equal to
each other using the C{==} operator.
"""
a = self.anInstance()
b = self.anotherInstance()
self.assertFalse(a == b)
def test_differentNe(self):
"""
Two objects that are not equal to each other compare not equal to each
other using the C{!=} operator.
"""
a = self.anInstance()
b = self.anotherInstance()
self.assertTrue(a != b)
def test_anotherTypeEq(self):
"""
The object does not compare equal to an object of an unrelated type
(which does not implement the comparison) using the C{==} operator.
"""
a = self.anInstance()
b = object()
self.assertFalse(a == b)
def test_anotherTypeNe(self):
"""
The object compares not equal to an object of an unrelated type (which
does not implement the comparison) using the C{!=} operator.
"""
a = self.anInstance()
b = object()
self.assertTrue(a != b)
def test_delegatedEq(self):
"""
The result of comparison using C{==} is delegated to the right-hand
operand if it is of an unrelated type.
"""
class Delegate(object):
def __eq__(self, other):
# Do something crazy and obvious.
return [self]
a = self.anInstance()
b = Delegate()
self.assertEqual(a == b, [b])
def test_delegateNe(self):
"""
The result of comparison using C{!=} is delegated to the right-hand
operand if it is of an unrelated type.
"""
class Delegate(object):
def __ne__(self, other):
# Do something crazy and obvious.
return [self]
a = self.anInstance()
b = Delegate()
self.assertEqual(a != b, [b])
# The type name expected in warnings about using the wrong string type.
if PY3:
WARNING_TYPE_EXPECTED = "str"
else:
WARNING_TYPE_EXPECTED = "unicode"

View file

@ -8,7 +8,7 @@ del threading
class Connection:
def __init__(self, *args):
self._ssl_conn = apply(_ssl.Connection, args)
self._ssl_conn = _ssl.Connection(*args)
self._lock = _RLock()
for f in ('get_context', 'pending', 'send', 'write', 'recv', 'read',

View file

@ -6,4 +6,4 @@
pyOpenSSL - A simple wrapper around the OpenSSL library
"""
__version__ = '0.14'
__version__ = '0.15.1'