update windows build to Python 3.7

This commit is contained in:
j 2019-01-20 16:05:31 +05:30
commit ddc59ab92d
5761 changed files with 750298 additions and 213405 deletions

View file

@ -0,0 +1,9 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
from .version import __version__, __version_info__
__all__ = [
'__version__',
'__version_info__',
]

View file

@ -0,0 +1,314 @@
# coding: utf-8
"""
Classes and objects to represent prime-field elliptic curves and points on them.
Exports the following items:
- PrimeCurve()
- PrimePoint()
- SECP192R1_CURVE
- SECP192R1_BASE_POINT
- SECP224R1_CURVE
- SECP224R1_BASE_POINT
- SECP256R1_CURVE
- SECP256R1_BASE_POINT
- SECP384R1_CURVE
- SECP384R1_BASE_POINT
- SECP521R1_CURVE
- SECP521R1_BASE_POINT
The curve constants are all PrimeCurve() objects and the base point constants
are all PrimePoint() objects.
Some of the following source code is derived from
http://webpages.charter.net/curryfans/peter/downloads.html, but has been heavily
modified to fit into this projects lint settings. The original project license
is listed below:
Copyright (c) 2014 Peter Pearson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from ._int import inverse_mod
class PrimeCurve():
"""
Elliptic curve over a prime field. Characteristic two field curves are not
supported.
"""
def __init__(self, p, a, b):
"""
The curve of points satisfying y^2 = x^3 + a*x + b (mod p)
:param p:
The prime number as an integer
:param a:
The component a as an integer
:param b:
The component b as an integer
"""
self.p = p
self.a = a
self.b = b
def contains(self, point):
"""
:param point:
A Point object
:return:
Boolean if the point is on this curve
"""
y2 = point.y * point.y
x3 = point.x * point.x * point.x
return (y2 - (x3 + self.a * point.x + self.b)) % self.p == 0
class PrimePoint():
"""
A point on a prime-field elliptic curve
"""
def __init__(self, curve, x, y, order=None):
"""
:param curve:
A PrimeCurve object
:param x:
The x coordinate of the point as an integer
:param y:
The y coordinate of the point as an integer
:param order:
The order of the point, as an integer - optional
"""
self.curve = curve
self.x = x
self.y = y
self.order = order
# self.curve is allowed to be None only for INFINITY:
if self.curve:
if not self.curve.contains(self):
raise ValueError('Invalid EC point')
if self.order:
if self * self.order != INFINITY:
raise ValueError('Invalid EC point')
def __cmp__(self, other):
"""
:param other:
A PrimePoint object
:return:
0 if identical, 1 otherwise
"""
if self.curve == other.curve and self.x == other.x and self.y == other.y:
return 0
else:
return 1
def __add__(self, other):
"""
:param other:
A PrimePoint object
:return:
A PrimePoint object
"""
# X9.62 B.3:
if other == INFINITY:
return self
if self == INFINITY:
return other
assert self.curve == other.curve
if self.x == other.x:
if (self.y + other.y) % self.curve.p == 0:
return INFINITY
else:
return self.double()
p = self.curve.p
l_ = ((other.y - self.y) * inverse_mod(other.x - self.x, p)) % p
x3 = (l_ * l_ - self.x - other.x) % p
y3 = (l_ * (self.x - x3) - self.y) % p
return PrimePoint(self.curve, x3, y3)
def __mul__(self, other):
"""
:param other:
An integer to multiple the Point by
:return:
A PrimePoint object
"""
def leftmost_bit(x):
assert x > 0
result = 1
while result <= x:
result = 2 * result
return result // 2
e = other
if self.order:
e = e % self.order
if e == 0:
return INFINITY
if self == INFINITY:
return INFINITY
assert e > 0
# From X9.62 D.3.2:
e3 = 3 * e
negative_self = PrimePoint(self.curve, self.x, -self.y, self.order)
i = leftmost_bit(e3) // 2
result = self
# print "Multiplying %s by %d (e3 = %d):" % ( self, other, e3 )
while i > 1:
result = result.double()
if (e3 & i) != 0 and (e & i) == 0:
result = result + self
if (e3 & i) == 0 and (e & i) != 0:
result = result + negative_self
# print ". . . i = %d, result = %s" % ( i, result )
i = i // 2
return result
def __rmul__(self, other):
"""
:param other:
An integer to multiple the Point by
:return:
A PrimePoint object
"""
return self * other
def double(self):
"""
:return:
A PrimePoint object that is twice this point
"""
# X9.62 B.3:
p = self.curve.p
a = self.curve.a
l_ = ((3 * self.x * self.x + a) * inverse_mod(2 * self.y, p)) % p
x3 = (l_ * l_ - 2 * self.x) % p
y3 = (l_ * (self.x - x3) - self.y) % p
return PrimePoint(self.curve, x3, y3)
# This one point is the Point At Infinity for all purposes:
INFINITY = PrimePoint(None, None, None)
# NIST Curve P-192:
SECP192R1_CURVE = PrimeCurve(
6277101735386680763835789423207666416083908700390324961279,
-3,
0x64210519e59c80e70fa7e9ab72243049feb8deecc146b9b1
)
SECP192R1_BASE_POINT = PrimePoint(
SECP192R1_CURVE,
0x188da80eb03090f67cbf20eb43a18800f4ff0afd82ff1012,
0x07192b95ffc8da78631011ed6b24cdd573f977a11e794811,
6277101735386680763835789423176059013767194773182842284081
)
# NIST Curve P-224:
SECP224R1_CURVE = PrimeCurve(
26959946667150639794667015087019630673557916260026308143510066298881,
-3,
0xb4050a850c04b3abf54132565044b0b7d7bfd8ba270b39432355ffb4
)
SECP224R1_BASE_POINT = PrimePoint(
SECP224R1_CURVE,
0xb70e0cbd6bb4bf7f321390b94a03c1d356c21122343280d6115c1d21,
0xbd376388b5f723fb4c22dfe6cd4375a05a07476444d5819985007e34,
26959946667150639794667015087019625940457807714424391721682722368061
)
# NIST Curve P-256:
SECP256R1_CURVE = PrimeCurve(
115792089210356248762697446949407573530086143415290314195533631308867097853951,
-3,
0x5ac635d8aa3a93e7b3ebbd55769886bc651d06b0cc53b0f63bce3c3e27d2604b
)
SECP256R1_BASE_POINT = PrimePoint(
SECP256R1_CURVE,
0x6b17d1f2e12c4247f8bce6e563a440f277037d812deb33a0f4a13945d898c296,
0x4fe342e2fe1a7f9b8ee7eb4a7c0f9e162bce33576b315ececbb6406837bf51f5,
115792089210356248762697446949407573529996955224135760342422259061068512044369
)
# NIST Curve P-384:
SECP384R1_CURVE = PrimeCurve(
39402006196394479212279040100143613805079739270465446667948293404245721771496870329047266088258938001861606973112319, # noqa
-3,
0xb3312fa7e23ee7e4988e056be3f82d19181d9c6efe8141120314088f5013875ac656398d8a2ed19d2a85c8edd3ec2aef
)
SECP384R1_BASE_POINT = PrimePoint(
SECP384R1_CURVE,
0xaa87ca22be8b05378eb1c71ef320ad746e1d3b628ba79b9859f741e082542a385502f25dbf55296c3a545e3872760ab7,
0x3617de4a96262c6f5d9e98bf9292dc29f8f41dbd289a147ce9da3113b5f0b8c00a60b1ce1d7e819d7a431d7c90ea0e5f,
39402006196394479212279040100143613805079739270465446667946905279627659399113263569398956308152294913554433653942643
)
# NIST Curve P-521:
SECP521R1_CURVE = PrimeCurve(
6864797660130609714981900799081393217269435300143305409394463459185543183397656052122559640661454554977296311391480858037121987999716643812574028291115057151, # noqa
-3,
0x051953eb9618e1c9a1f929a21a0b68540eea2da725b99b315f3b8b489918ef109e156193951ec7e937b1652c0bd3bb1bf073573df883d2c34f1ef451fd46b503f00 # noqa
)
SECP521R1_BASE_POINT = PrimePoint(
SECP521R1_CURVE,
0xc6858e06b70404e9cd9e3ecb662395b4429c648139053fb521f828af606b4d3dbaa14b5e77efe75928fe1dc127a2ffa8de3348b3c1856a429bf97e7e31c2e5bd66, # noqa
0x11839296a789a3bc0045c8a5fb42c7d1bd998f54449579b446817afbd17273e662c97ee72995ef42640c550b9013fad0761353c7086a272c24088be94769fd16650, # noqa
6864797660130609714981900799081393217269435300143305409394463459185543183397655394245057746333217197532963996371363321113864768612440380340372808892707005449 # noqa
)

View file

@ -0,0 +1,45 @@
# coding: utf-8
"""
Helper for formatting exception messages. Exports the following items:
- unwrap()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import re
import textwrap
def unwrap(string, *params):
"""
Takes a multi-line string and does the following:
- dedents
- converts newlines with text before and after into a single line
- strips leading and trailing whitespace
:param string:
The string to format
:param *params:
Params to interpolate into the string
:return:
The formatted string
"""
output = textwrap.dedent(string)
# Unwrap lines, taking into account bulleted lists, ordered lists and
# underlines consisting of = signs
if output.find('\n') != -1:
output = re.sub('(?<=\\S)\n(?=[^ \n\t\\d\\*\\-=])', ' ', output)
if params:
output = output % params
output = output.strip()
return output

View file

@ -0,0 +1,45 @@
# coding: utf-8
"""
FFI helper compatibility functions. Exports the following items:
- LibraryNotFoundError
- FFIEngineError
- bytes_from_buffer()
- buffer_from_bytes()
- null()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from ctypes import create_string_buffer
def buffer_from_bytes(initializer):
return create_string_buffer(initializer)
def bytes_from_buffer(buffer, maxlen=None):
return buffer.raw
def null():
return None
class LibraryNotFoundError(Exception):
"""
An exception when trying to find a shared library
"""
pass
class FFIEngineError(Exception):
"""
An exception when trying to instantiate ctypes or cffi
"""
pass

View file

@ -0,0 +1,170 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
import socket
import struct
from ._errors import unwrap
from ._types import byte_cls, bytes_to_list, str_cls, type_name
def inet_ntop(address_family, packed_ip):
"""
Windows compatibility shim for socket.inet_ntop().
:param address_family:
socket.AF_INET for IPv4 or socket.AF_INET6 for IPv6
:param packed_ip:
A byte string of the network form of an IP address
:return:
A unicode string of the IP address
"""
if address_family not in set([socket.AF_INET, socket.AF_INET6]):
raise ValueError(unwrap(
'''
address_family must be socket.AF_INET (%s) or socket.AF_INET6 (%s),
not %s
''',
repr(socket.AF_INET),
repr(socket.AF_INET6),
repr(address_family)
))
if not isinstance(packed_ip, byte_cls):
raise TypeError(unwrap(
'''
packed_ip must be a byte string, not %s
''',
type_name(packed_ip)
))
required_len = 4 if address_family == socket.AF_INET else 16
if len(packed_ip) != required_len:
raise ValueError(unwrap(
'''
packed_ip must be %d bytes long - is %d
''',
required_len,
len(packed_ip)
))
if address_family == socket.AF_INET:
return '%d.%d.%d.%d' % tuple(bytes_to_list(packed_ip))
octets = struct.unpack(b'!HHHHHHHH', packed_ip)
runs_of_zero = {}
longest_run = 0
zero_index = None
for i, octet in enumerate(octets + (-1,)):
if octet != 0:
if zero_index is not None:
length = i - zero_index
if length not in runs_of_zero:
runs_of_zero[length] = zero_index
longest_run = max(longest_run, length)
zero_index = None
elif zero_index is None:
zero_index = i
hexed = [hex(o)[2:] for o in octets]
if longest_run < 2:
return ':'.join(hexed)
zero_start = runs_of_zero[longest_run]
zero_end = zero_start + longest_run
return ':'.join(hexed[:zero_start]) + '::' + ':'.join(hexed[zero_end:])
def inet_pton(address_family, ip_string):
"""
Windows compatibility shim for socket.inet_ntop().
:param address_family:
socket.AF_INET for IPv4 or socket.AF_INET6 for IPv6
:param ip_string:
A unicode string of an IP address
:return:
A byte string of the network form of the IP address
"""
if address_family not in set([socket.AF_INET, socket.AF_INET6]):
raise ValueError(unwrap(
'''
address_family must be socket.AF_INET (%s) or socket.AF_INET6 (%s),
not %s
''',
repr(socket.AF_INET),
repr(socket.AF_INET6),
repr(address_family)
))
if not isinstance(ip_string, str_cls):
raise TypeError(unwrap(
'''
ip_string must be a unicode string, not %s
''',
type_name(ip_string)
))
if address_family == socket.AF_INET:
octets = ip_string.split('.')
error = len(octets) != 4
if not error:
ints = []
for o in octets:
o = int(o)
if o > 255 or o < 0:
error = True
break
ints.append(o)
if error:
raise ValueError(unwrap(
'''
ip_string must be a dotted string with four integers in the
range of 0 to 255, got %s
''',
repr(ip_string)
))
return struct.pack(b'!BBBB', *ints)
error = False
omitted = ip_string.count('::')
if omitted > 1:
error = True
elif omitted == 0:
octets = ip_string.split(':')
error = len(octets) != 8
else:
begin, end = ip_string.split('::')
begin_octets = begin.split(':')
end_octets = end.split(':')
missing = 8 - len(begin_octets) - len(end_octets)
octets = begin_octets + (['0'] * missing) + end_octets
if not error:
ints = []
for o in octets:
o = int(o, 16)
if o > 65535 or o < 0:
error = True
break
ints.append(o)
return struct.pack(b'!HHHHHHHH', *ints)
raise ValueError(unwrap(
'''
ip_string must be a valid ipv6 string, got %s
''',
repr(ip_string)
))

View file

@ -0,0 +1,159 @@
# coding: utf-8
"""
Function for calculating the modular inverse. Exports the following items:
- inverse_mod()
Source code is derived from
http://webpages.charter.net/curryfans/peter/downloads.html, but has been heavily
modified to fit into this projects lint settings. The original project license
is listed below:
Copyright (c) 2014 Peter Pearson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import math
import platform
from .util import int_to_bytes, int_from_bytes
# First try to use ctypes with OpenSSL for better performance
try:
from ._ffi import (
buffer_from_bytes,
bytes_from_buffer,
FFIEngineError,
LibraryNotFoundError,
null,
)
# Some versions of PyPy have segfault issues, so we just punt on PyPy
if platform.python_implementation() == 'PyPy':
raise EnvironmentError()
try:
from ._perf._big_num_ctypes import libcrypto
def inverse_mod(a, p):
"""
Compute the modular inverse of a (mod p)
:param a:
An integer
:param p:
An integer
:return:
An integer
"""
ctx = libcrypto.BN_CTX_new()
a_bytes = int_to_bytes(abs(a))
p_bytes = int_to_bytes(abs(p))
a_buf = buffer_from_bytes(a_bytes)
a_bn = libcrypto.BN_bin2bn(a_buf, len(a_bytes), null())
if a < 0:
libcrypto.BN_set_negative(a_bn, 1)
p_buf = buffer_from_bytes(p_bytes)
p_bn = libcrypto.BN_bin2bn(p_buf, len(p_bytes), null())
if p < 0:
libcrypto.BN_set_negative(p_bn, 1)
r_bn = libcrypto.BN_mod_inverse(null(), a_bn, p_bn, ctx)
r_len_bits = libcrypto.BN_num_bits(r_bn)
r_len = int(math.ceil(r_len_bits / 8))
r_buf = buffer_from_bytes(r_len)
libcrypto.BN_bn2bin(r_bn, r_buf)
r_bytes = bytes_from_buffer(r_buf, r_len)
result = int_from_bytes(r_bytes)
libcrypto.BN_free(a_bn)
libcrypto.BN_free(p_bn)
libcrypto.BN_free(r_bn)
libcrypto.BN_CTX_free(ctx)
return result
except (LibraryNotFoundError, FFIEngineError):
raise EnvironmentError()
# If there was an issue using ctypes or OpenSSL, we fall back to pure python
except (EnvironmentError, ImportError):
def inverse_mod(a, p):
"""
Compute the modular inverse of a (mod p)
:param a:
An integer
:param p:
An integer
:return:
An integer
"""
if a < 0 or p <= a:
a = a % p
# From Ferguson and Schneier, roughly:
c, d = a, p
uc, vc, ud, vd = 1, 0, 0, 1
while c != 0:
q, c, d = divmod(d, c) + (c,)
uc, vc, ud, vd = ud - q * uc, vd - q * vc, uc, vc
# At this point, d is the GCD, and ud*a+vd*p = d.
# If d == 1, this means that ud is a inverse.
assert d == 1
if ud > 0:
return ud
else:
return ud + p
def fill_width(bytes_, width):
"""
Ensure a byte string representing a positive integer is a specific width
(in bytes)
:param bytes_:
The integer byte string
:param width:
The desired width as an integer
:return:
A byte string of the width specified
"""
while len(bytes_) < width:
bytes_ = b'\x00' + bytes_
return bytes_

View file

@ -0,0 +1,288 @@
# coding: utf-8
"""
Functions to convert unicode IRIs into ASCII byte string URIs and back. Exports
the following items:
- iri_to_uri()
- uri_to_iri()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from encodings import idna # noqa
import codecs
import re
import sys
from ._errors import unwrap
from ._types import byte_cls, str_cls, type_name, bytes_to_list, int_types
if sys.version_info < (3,):
from urlparse import urlsplit, urlunsplit
from urllib import (
quote as urlquote,
unquote as unquote_to_bytes,
)
else:
from urllib.parse import (
quote as urlquote,
unquote_to_bytes,
urlsplit,
urlunsplit,
)
def iri_to_uri(value):
"""
Normalizes and encodes a unicode IRI into an ASCII byte string URI
:param value:
A unicode string of an IRI
:return:
A byte string of the ASCII-encoded URI
"""
if not isinstance(value, str_cls):
raise TypeError(unwrap(
'''
value must be a unicode string, not %s
''',
type_name(value)
))
scheme = None
# Python 2.6 doesn't split properly is the URL doesn't start with http:// or https://
if sys.version_info < (2, 7) and not value.startswith('http://') and not value.startswith('https://'):
real_prefix = None
prefix_match = re.match('^[^:]*://', value)
if prefix_match:
real_prefix = prefix_match.group(0)
value = 'http://' + value[len(real_prefix):]
parsed = urlsplit(value)
if real_prefix:
value = real_prefix + value[7:]
scheme = _urlquote(real_prefix[:-3])
else:
parsed = urlsplit(value)
if scheme is None:
scheme = _urlquote(parsed.scheme)
hostname = parsed.hostname
if hostname is not None:
hostname = hostname.encode('idna')
# RFC 3986 allows userinfo to contain sub-delims
username = _urlquote(parsed.username, safe='!$&\'()*+,;=')
password = _urlquote(parsed.password, safe='!$&\'()*+,;=')
port = parsed.port
if port is not None:
port = str_cls(port).encode('ascii')
netloc = b''
if username is not None:
netloc += username
if password:
netloc += b':' + password
netloc += b'@'
if hostname is not None:
netloc += hostname
if port is not None:
default_http = scheme == b'http' and port == b'80'
default_https = scheme == b'https' and port == b'443'
if not default_http and not default_https:
netloc += b':' + port
# RFC 3986 allows a path to contain sub-delims, plus "@" and ":"
path = _urlquote(parsed.path, safe='/!$&\'()*+,;=@:')
# RFC 3986 allows the query to contain sub-delims, plus "@", ":" , "/" and "?"
query = _urlquote(parsed.query, safe='/?!$&\'()*+,;=@:')
# RFC 3986 allows the fragment to contain sub-delims, plus "@", ":" , "/" and "?"
fragment = _urlquote(parsed.fragment, safe='/?!$&\'()*+,;=@:')
if query is None and fragment is None and path == b'/':
path = None
# Python 2.7 compat
if path is None:
path = ''
output = urlunsplit((scheme, netloc, path, query, fragment))
if isinstance(output, str_cls):
output = output.encode('latin1')
return output
def uri_to_iri(value):
"""
Converts an ASCII URI byte string into a unicode IRI
:param value:
An ASCII-encoded byte string of the URI
:return:
A unicode string of the IRI
"""
if not isinstance(value, byte_cls):
raise TypeError(unwrap(
'''
value must be a byte string, not %s
''',
type_name(value)
))
parsed = urlsplit(value)
scheme = parsed.scheme
if scheme is not None:
scheme = scheme.decode('ascii')
username = _urlunquote(parsed.username, remap=[':', '@'])
password = _urlunquote(parsed.password, remap=[':', '@'])
hostname = parsed.hostname
if hostname:
hostname = hostname.decode('idna')
port = parsed.port
if port and not isinstance(port, int_types):
port = port.decode('ascii')
netloc = ''
if username is not None:
netloc += username
if password:
netloc += ':' + password
netloc += '@'
if hostname is not None:
netloc += hostname
if port is not None:
netloc += ':' + str_cls(port)
path = _urlunquote(parsed.path, remap=['/'], preserve=True)
query = _urlunquote(parsed.query, remap=['&', '='], preserve=True)
fragment = _urlunquote(parsed.fragment)
return urlunsplit((scheme, netloc, path, query, fragment))
def _iri_utf8_errors_handler(exc):
"""
Error handler for decoding UTF-8 parts of a URI into an IRI. Leaves byte
sequences encoded in %XX format, but as part of a unicode string.
:param exc:
The UnicodeDecodeError exception
:return:
A 2-element tuple of (replacement unicode string, integer index to
resume at)
"""
bytes_as_ints = bytes_to_list(exc.object[exc.start:exc.end])
replacements = ['%%%02x' % num for num in bytes_as_ints]
return (''.join(replacements), exc.end)
codecs.register_error('iriutf8', _iri_utf8_errors_handler)
def _urlquote(string, safe=''):
"""
Quotes a unicode string for use in a URL
:param string:
A unicode string
:param safe:
A unicode string of character to not encode
:return:
None (if string is None) or an ASCII byte string of the quoted string
"""
if string is None or string == '':
return None
# Anything already hex quoted is pulled out of the URL and unquoted if
# possible
escapes = []
if re.search('%[0-9a-fA-F]{2}', string):
# Try to unquote any percent values, restoring them if they are not
# valid UTF-8. Also, requote any safe chars since encoded versions of
# those are functionally different than the unquoted ones.
def _try_unescape(match):
byte_string = unquote_to_bytes(match.group(0))
unicode_string = byte_string.decode('utf-8', 'iriutf8')
for safe_char in list(safe):
unicode_string = unicode_string.replace(safe_char, '%%%02x' % ord(safe_char))
return unicode_string
string = re.sub('(?:%[0-9a-fA-F]{2})+', _try_unescape, string)
# Once we have the minimal set of hex quoted values, removed them from
# the string so that they are not double quoted
def _extract_escape(match):
escapes.append(match.group(0).encode('ascii'))
return '\x00'
string = re.sub('%[0-9a-fA-F]{2}', _extract_escape, string)
output = urlquote(string.encode('utf-8'), safe=safe.encode('utf-8'))
if not isinstance(output, byte_cls):
output = output.encode('ascii')
# Restore the existing quoted values that we extracted
if len(escapes) > 0:
def _return_escape(_):
return escapes.pop(0)
output = re.sub(b'%00', _return_escape, output)
return output
def _urlunquote(byte_string, remap=None, preserve=None):
"""
Unquotes a URI portion from a byte string into unicode using UTF-8
:param byte_string:
A byte string of the data to unquote
:param remap:
A list of characters (as unicode) that should be re-mapped to a
%XX encoding. This is used when characters are not valid in part of a
URL.
:param preserve:
A bool - indicates that the chars to be remapped if they occur in
non-hex form, should be preserved. E.g. / for URL path.
:return:
A unicode string
"""
if byte_string is None:
return byte_string
if byte_string == b'':
return ''
if preserve:
replacements = ['\x1A', '\x1C', '\x1D', '\x1E', '\x1F']
preserve_unmap = {}
for char in remap:
replacement = replacements.pop(0)
preserve_unmap[replacement] = char
byte_string = byte_string.replace(char.encode('ascii'), replacement.encode('ascii'))
byte_string = unquote_to_bytes(byte_string)
if remap:
for char in remap:
byte_string = byte_string.replace(char.encode('ascii'), ('%%%02x' % ord(char)).encode('ascii'))
output = byte_string.decode('utf-8', 'iriutf8')
if preserve:
for replacement, original in preserve_unmap.items():
output = output.replace(replacement, original)
return output

View file

@ -0,0 +1,135 @@
# Copyright (c) 2009 Raymond Hettinger
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
import sys
if not sys.version_info < (2, 7):
from collections import OrderedDict
else:
from UserDict import DictMixin
class OrderedDict(dict, DictMixin):
def __init__(self, *args, **kwds):
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__end
except AttributeError:
self.clear()
self.update(*args, **kwds)
def clear(self):
self.__end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.__map = {} # key --> [key, prev, next]
dict.clear(self)
def __setitem__(self, key, value):
if key not in self:
end = self.__end
curr = end[1]
curr[2] = end[1] = self.__map[key] = [key, curr, end]
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
key, prev, next_ = self.__map.pop(key)
prev[2] = next_
next_[1] = prev
def __iter__(self):
end = self.__end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.__end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def popitem(self, last=True):
if not self:
raise KeyError('dictionary is empty')
if last:
key = reversed(self).next()
else:
key = iter(self).next()
value = self.pop(key)
return key, value
def __reduce__(self):
items = [[k, self[k]] for k in self]
tmp = self.__map, self.__end
del self.__map, self.__end
inst_dict = vars(self).copy()
self.__map, self.__end = tmp
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def keys(self):
return list(self)
setdefault = DictMixin.setdefault
update = DictMixin.update
pop = DictMixin.pop
values = DictMixin.values
items = DictMixin.items
iterkeys = DictMixin.iterkeys
itervalues = DictMixin.itervalues
iteritems = DictMixin.iteritems
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
def copy(self):
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
if isinstance(other, OrderedDict):
if len(self) != len(other):
return False
for p, q in zip(self.items(), other.items()):
if p != q:
return False
return True
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other

View file

@ -0,0 +1,69 @@
# coding: utf-8
"""
ctypes interface for BN_mod_inverse() function from OpenSSL. Exports the
following items:
- libcrypto
- BN_bn2bin()
- BN_CTX_free()
- BN_CTX_new()
- BN_free()
- BN_mod_inverse()
- BN_new()
- BN_num_bits()
- BN_set_negative()
Will raise asn1crypto._ffi.LibraryNotFoundError() if libcrypto can not be
found. Will raise asn1crypto._ffi.FFIEngineError() if there is an error
interfacing with libcrypto.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import sys
from ctypes import CDLL, c_int, c_char_p, c_void_p
from ctypes.util import find_library
from .._ffi import LibraryNotFoundError, FFIEngineError
try:
# On Python 2, the unicode string here may raise a UnicodeDecodeError as it
# tries to join a bytestring path to the unicode name "crypto"
libcrypto_path = find_library(b'crypto' if sys.version_info < (3,) else 'crypto')
if not libcrypto_path:
raise LibraryNotFoundError('The library libcrypto could not be found')
libcrypto = CDLL(libcrypto_path)
libcrypto.BN_new.argtypes = []
libcrypto.BN_new.restype = c_void_p
libcrypto.BN_bin2bn.argtypes = [c_char_p, c_int, c_void_p]
libcrypto.BN_bin2bn.restype = c_void_p
libcrypto.BN_bn2bin.argtypes = [c_void_p, c_char_p]
libcrypto.BN_bn2bin.restype = c_int
libcrypto.BN_set_negative.argtypes = [c_void_p, c_int]
libcrypto.BN_set_negative.restype = None
libcrypto.BN_num_bits.argtypes = [c_void_p]
libcrypto.BN_num_bits.restype = c_int
libcrypto.BN_free.argtypes = [c_void_p]
libcrypto.BN_free.restype = None
libcrypto.BN_CTX_new.argtypes = []
libcrypto.BN_CTX_new.restype = c_void_p
libcrypto.BN_CTX_free.argtypes = [c_void_p]
libcrypto.BN_CTX_free.restype = None
libcrypto.BN_mod_inverse.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p]
libcrypto.BN_mod_inverse.restype = c_void_p
except (AttributeError):
raise FFIEngineError('Error initializing ctypes')

View file

@ -0,0 +1,331 @@
# coding: utf-8
"""
Implementation of the teletex T.61 codec. Exports the following items:
- register()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import codecs
class TeletexCodec(codecs.Codec):
def encode(self, input_, errors='strict'):
return codecs.charmap_encode(input_, errors, ENCODING_TABLE)
def decode(self, input_, errors='strict'):
return codecs.charmap_decode(input_, errors, DECODING_TABLE)
class TeletexIncrementalEncoder(codecs.IncrementalEncoder):
def encode(self, input_, final=False):
return codecs.charmap_encode(input_, self.errors, ENCODING_TABLE)[0]
class TeletexIncrementalDecoder(codecs.IncrementalDecoder):
def decode(self, input_, final=False):
return codecs.charmap_decode(input_, self.errors, DECODING_TABLE)[0]
class TeletexStreamWriter(TeletexCodec, codecs.StreamWriter):
pass
class TeletexStreamReader(TeletexCodec, codecs.StreamReader):
pass
def teletex_search_function(name):
"""
Search function for teletex codec that is passed to codecs.register()
"""
if name != 'teletex':
return None
return codecs.CodecInfo(
name='teletex',
encode=TeletexCodec().encode,
decode=TeletexCodec().decode,
incrementalencoder=TeletexIncrementalEncoder,
incrementaldecoder=TeletexIncrementalDecoder,
streamreader=TeletexStreamReader,
streamwriter=TeletexStreamWriter,
)
def register():
"""
Registers the teletex codec
"""
codecs.register(teletex_search_function)
# http://en.wikipedia.org/wiki/ITU_T.61
DECODING_TABLE = (
'\u0000'
'\u0001'
'\u0002'
'\u0003'
'\u0004'
'\u0005'
'\u0006'
'\u0007'
'\u0008'
'\u0009'
'\u000A'
'\u000B'
'\u000C'
'\u000D'
'\u000E'
'\u000F'
'\u0010'
'\u0011'
'\u0012'
'\u0013'
'\u0014'
'\u0015'
'\u0016'
'\u0017'
'\u0018'
'\u0019'
'\u001A'
'\u001B'
'\u001C'
'\u001D'
'\u001E'
'\u001F'
'\u0020'
'\u0021'
'\u0022'
'\ufffe'
'\ufffe'
'\u0025'
'\u0026'
'\u0027'
'\u0028'
'\u0029'
'\u002A'
'\u002B'
'\u002C'
'\u002D'
'\u002E'
'\u002F'
'\u0030'
'\u0031'
'\u0032'
'\u0033'
'\u0034'
'\u0035'
'\u0036'
'\u0037'
'\u0038'
'\u0039'
'\u003A'
'\u003B'
'\u003C'
'\u003D'
'\u003E'
'\u003F'
'\u0040'
'\u0041'
'\u0042'
'\u0043'
'\u0044'
'\u0045'
'\u0046'
'\u0047'
'\u0048'
'\u0049'
'\u004A'
'\u004B'
'\u004C'
'\u004D'
'\u004E'
'\u004F'
'\u0050'
'\u0051'
'\u0052'
'\u0053'
'\u0054'
'\u0055'
'\u0056'
'\u0057'
'\u0058'
'\u0059'
'\u005A'
'\u005B'
'\ufffe'
'\u005D'
'\ufffe'
'\u005F'
'\ufffe'
'\u0061'
'\u0062'
'\u0063'
'\u0064'
'\u0065'
'\u0066'
'\u0067'
'\u0068'
'\u0069'
'\u006A'
'\u006B'
'\u006C'
'\u006D'
'\u006E'
'\u006F'
'\u0070'
'\u0071'
'\u0072'
'\u0073'
'\u0074'
'\u0075'
'\u0076'
'\u0077'
'\u0078'
'\u0079'
'\u007A'
'\ufffe'
'\u007C'
'\ufffe'
'\ufffe'
'\u007F'
'\u0080'
'\u0081'
'\u0082'
'\u0083'
'\u0084'
'\u0085'
'\u0086'
'\u0087'
'\u0088'
'\u0089'
'\u008A'
'\u008B'
'\u008C'
'\u008D'
'\u008E'
'\u008F'
'\u0090'
'\u0091'
'\u0092'
'\u0093'
'\u0094'
'\u0095'
'\u0096'
'\u0097'
'\u0098'
'\u0099'
'\u009A'
'\u009B'
'\u009C'
'\u009D'
'\u009E'
'\u009F'
'\u00A0'
'\u00A1'
'\u00A2'
'\u00A3'
'\u0024'
'\u00A5'
'\u0023'
'\u00A7'
'\u00A4'
'\ufffe'
'\ufffe'
'\u00AB'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\u00B0'
'\u00B1'
'\u00B2'
'\u00B3'
'\u00D7'
'\u00B5'
'\u00B6'
'\u00B7'
'\u00F7'
'\ufffe'
'\ufffe'
'\u00BB'
'\u00BC'
'\u00BD'
'\u00BE'
'\u00BF'
'\ufffe'
'\u0300'
'\u0301'
'\u0302'
'\u0303'
'\u0304'
'\u0306'
'\u0307'
'\u0308'
'\ufffe'
'\u030A'
'\u0327'
'\u0332'
'\u030B'
'\u0328'
'\u030C'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\u2126'
'\u00C6'
'\u00D0'
'\u00AA'
'\u0126'
'\ufffe'
'\u0132'
'\u013F'
'\u0141'
'\u00D8'
'\u0152'
'\u00BA'
'\u00DE'
'\u0166'
'\u014A'
'\u0149'
'\u0138'
'\u00E6'
'\u0111'
'\u00F0'
'\u0127'
'\u0131'
'\u0133'
'\u0140'
'\u0142'
'\u00F8'
'\u0153'
'\u00DF'
'\u00FE'
'\u0167'
'\u014B'
'\ufffe'
)
ENCODING_TABLE = codecs.charmap_build(DECODING_TABLE)

View file

@ -0,0 +1,46 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
import inspect
import sys
if sys.version_info < (3,):
str_cls = unicode # noqa
byte_cls = str
int_types = (int, long) # noqa
def bytes_to_list(byte_string):
return [ord(b) for b in byte_string]
chr_cls = chr
else:
str_cls = str
byte_cls = bytes
int_types = int
bytes_to_list = list
def chr_cls(num):
return bytes([num])
def type_name(value):
"""
Returns a user-readable name for the type of an object
:param value:
A value to get the type name of
:return:
A unicode string of the object's type name
"""
if inspect.isclass(value):
cls = value
else:
cls = value.__class__
if cls.__module__ in set(['builtins', '__builtin__']):
return cls.__name__
return '%s.%s' % (cls.__module__, cls.__name__)

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,932 @@
# coding: utf-8
"""
ASN.1 type classes for cryptographic message syntax (CMS). Structures are also
compatible with PKCS#7. Exports the following items:
- AuthenticatedData()
- AuthEnvelopedData()
- CompressedData()
- ContentInfo()
- DigestedData()
- EncryptedData()
- EnvelopedData()
- SignedAndEnvelopedData()
- SignedData()
Other type classes are defined that help compose the types listed above.
Most CMS structures in the wild are formatted as ContentInfo encapsulating one of the other types.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
try:
import zlib
except (ImportError):
zlib = None
from .algos import (
_ForceNullParameters,
DigestAlgorithm,
EncryptionAlgorithm,
HmacAlgorithm,
KdfAlgorithm,
SignedDigestAlgorithm,
)
from .core import (
Any,
BitString,
Choice,
Enumerated,
GeneralizedTime,
Integer,
ObjectIdentifier,
OctetBitString,
OctetString,
ParsableOctetString,
Sequence,
SequenceOf,
SetOf,
UTCTime,
UTF8String,
)
from .crl import CertificateList
from .keys import PublicKeyInfo
from .ocsp import OCSPResponse
from .x509 import Attributes, Certificate, Extensions, GeneralName, GeneralNames, Name
# These structures are taken from
# ftp://ftp.rsasecurity.com/pub/pkcs/ascii/pkcs-6.asc
class ExtendedCertificateInfo(Sequence):
_fields = [
('version', Integer),
('certificate', Certificate),
('attributes', Attributes),
]
class ExtendedCertificate(Sequence):
_fields = [
('extended_certificate_info', ExtendedCertificateInfo),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
]
# These structures are taken from https://tools.ietf.org/html/rfc5652,
# https://tools.ietf.org/html/rfc5083, http://tools.ietf.org/html/rfc2315,
# https://tools.ietf.org/html/rfc5940, https://tools.ietf.org/html/rfc3274,
# https://tools.ietf.org/html/rfc3281
class CMSVersion(Integer):
_map = {
0: 'v0',
1: 'v1',
2: 'v2',
3: 'v3',
4: 'v4',
5: 'v5',
}
class CMSAttributeType(ObjectIdentifier):
_map = {
'1.2.840.113549.1.9.3': 'content_type',
'1.2.840.113549.1.9.4': 'message_digest',
'1.2.840.113549.1.9.5': 'signing_time',
'1.2.840.113549.1.9.6': 'counter_signature',
# https://tools.ietf.org/html/rfc3161#page-20
'1.2.840.113549.1.9.16.2.14': 'signature_time_stamp_token',
# https://tools.ietf.org/html/rfc6211#page-5
'1.2.840.113549.1.9.52': 'cms_algorithm_protection',
}
class Time(Choice):
_alternatives = [
('utc_time', UTCTime),
('generalized_time', GeneralizedTime),
]
class ContentType(ObjectIdentifier):
_map = {
'1.2.840.113549.1.7.1': 'data',
'1.2.840.113549.1.7.2': 'signed_data',
'1.2.840.113549.1.7.3': 'enveloped_data',
'1.2.840.113549.1.7.4': 'signed_and_enveloped_data',
'1.2.840.113549.1.7.5': 'digested_data',
'1.2.840.113549.1.7.6': 'encrypted_data',
'1.2.840.113549.1.9.16.1.2': 'authenticated_data',
'1.2.840.113549.1.9.16.1.9': 'compressed_data',
'1.2.840.113549.1.9.16.1.23': 'authenticated_enveloped_data',
}
class CMSAlgorithmProtection(Sequence):
_fields = [
('digest_algorithm', DigestAlgorithm),
('signature_algorithm', SignedDigestAlgorithm, {'implicit': 1, 'optional': True}),
('mac_algorithm', HmacAlgorithm, {'implicit': 2, 'optional': True}),
]
class SetOfContentType(SetOf):
_child_spec = ContentType
class SetOfOctetString(SetOf):
_child_spec = OctetString
class SetOfTime(SetOf):
_child_spec = Time
class SetOfAny(SetOf):
_child_spec = Any
class SetOfCMSAlgorithmProtection(SetOf):
_child_spec = CMSAlgorithmProtection
class CMSAttribute(Sequence):
_fields = [
('type', CMSAttributeType),
('values', None),
]
_oid_specs = {}
def _values_spec(self):
return self._oid_specs.get(self['type'].native, SetOfAny)
_spec_callbacks = {
'values': _values_spec
}
class CMSAttributes(SetOf):
_child_spec = CMSAttribute
class IssuerSerial(Sequence):
_fields = [
('issuer', GeneralNames),
('serial', Integer),
('issuer_uid', OctetBitString, {'optional': True}),
]
class AttCertVersion(Integer):
_map = {
0: 'v1',
1: 'v2',
}
class AttCertSubject(Choice):
_alternatives = [
('base_certificate_id', IssuerSerial, {'explicit': 0}),
('subject_name', GeneralNames, {'explicit': 1}),
]
class AttCertValidityPeriod(Sequence):
_fields = [
('not_before_time', GeneralizedTime),
('not_after_time', GeneralizedTime),
]
class AttributeCertificateInfoV1(Sequence):
_fields = [
('version', AttCertVersion, {'default': 'v1'}),
('subject', AttCertSubject),
('issuer', GeneralNames),
('signature', SignedDigestAlgorithm),
('serial_number', Integer),
('att_cert_validity_period', AttCertValidityPeriod),
('attributes', Attributes),
('issuer_unique_id', OctetBitString, {'optional': True}),
('extensions', Extensions, {'optional': True}),
]
class AttributeCertificateV1(Sequence):
_fields = [
('ac_info', AttributeCertificateInfoV1),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
]
class DigestedObjectType(Enumerated):
_map = {
0: 'public_key',
1: 'public_key_cert',
2: 'other_objy_types',
}
class ObjectDigestInfo(Sequence):
_fields = [
('digested_object_type', DigestedObjectType),
('other_object_type_id', ObjectIdentifier, {'optional': True}),
('digest_algorithm', DigestAlgorithm),
('object_digest', OctetBitString),
]
class Holder(Sequence):
_fields = [
('base_certificate_id', IssuerSerial, {'implicit': 0, 'optional': True}),
('entity_name', GeneralNames, {'implicit': 1, 'optional': True}),
('object_digest_info', ObjectDigestInfo, {'implicit': 2, 'optional': True}),
]
class V2Form(Sequence):
_fields = [
('issuer_name', GeneralNames, {'optional': True}),
('base_certificate_id', IssuerSerial, {'explicit': 0, 'optional': True}),
('object_digest_info', ObjectDigestInfo, {'explicit': 1, 'optional': True}),
]
class AttCertIssuer(Choice):
_alternatives = [
('v1_form', GeneralNames),
('v2_form', V2Form, {'explicit': 0}),
]
class IetfAttrValue(Choice):
_alternatives = [
('octets', OctetString),
('oid', ObjectIdentifier),
('string', UTF8String),
]
class IetfAttrValues(SequenceOf):
_child_spec = IetfAttrValue
class IetfAttrSyntax(Sequence):
_fields = [
('policy_authority', GeneralNames, {'implicit': 0, 'optional': True}),
('values', IetfAttrValues),
]
class SetOfIetfAttrSyntax(SetOf):
_child_spec = IetfAttrSyntax
class SvceAuthInfo(Sequence):
_fields = [
('service', GeneralName),
('ident', GeneralName),
('auth_info', OctetString, {'optional': True}),
]
class SetOfSvceAuthInfo(SetOf):
_child_spec = SvceAuthInfo
class RoleSyntax(Sequence):
_fields = [
('role_authority', GeneralNames, {'implicit': 0, 'optional': True}),
('role_name', GeneralName, {'implicit': 1}),
]
class SetOfRoleSyntax(SetOf):
_child_spec = RoleSyntax
class ClassList(BitString):
_map = {
0: 'unmarked',
1: 'unclassified',
2: 'restricted',
3: 'confidential',
4: 'secret',
5: 'top_secret',
}
class SecurityCategory(Sequence):
_fields = [
('type', ObjectIdentifier, {'implicit': 0}),
('value', Any, {'implicit': 1}),
]
class SetOfSecurityCategory(SetOf):
_child_spec = SecurityCategory
class Clearance(Sequence):
_fields = [
('policy_id', ObjectIdentifier, {'implicit': 0}),
('class_list', ClassList, {'implicit': 1, 'default': 'unclassified'}),
('security_categories', SetOfSecurityCategory, {'implicit': 2, 'optional': True}),
]
class SetOfClearance(SetOf):
_child_spec = Clearance
class BigTime(Sequence):
_fields = [
('major', Integer),
('fractional_seconds', Integer),
('sign', Integer, {'optional': True}),
]
class LeapData(Sequence):
_fields = [
('leap_time', BigTime),
('action', Integer),
]
class SetOfLeapData(SetOf):
_child_spec = LeapData
class TimingMetrics(Sequence):
_fields = [
('ntp_time', BigTime),
('offset', BigTime),
('delay', BigTime),
('expiration', BigTime),
('leap_event', SetOfLeapData, {'optional': True}),
]
class SetOfTimingMetrics(SetOf):
_child_spec = TimingMetrics
class TimingPolicy(Sequence):
_fields = [
('policy_id', SequenceOf, {'spec': ObjectIdentifier}),
('max_offset', BigTime, {'explicit': 0, 'optional': True}),
('max_delay', BigTime, {'explicit': 1, 'optional': True}),
]
class SetOfTimingPolicy(SetOf):
_child_spec = TimingPolicy
class AttCertAttributeType(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.10.1': 'authentication_info',
'1.3.6.1.5.5.7.10.2': 'access_identity',
'1.3.6.1.5.5.7.10.3': 'charging_identity',
'1.3.6.1.5.5.7.10.4': 'group',
'2.5.4.72': 'role',
'2.5.4.55': 'clearance',
'1.3.6.1.4.1.601.10.4.1': 'timing_metrics',
'1.3.6.1.4.1.601.10.4.2': 'timing_policy',
}
class AttCertAttribute(Sequence):
_fields = [
('type', AttCertAttributeType),
('values', None),
]
_oid_specs = {
'authentication_info': SetOfSvceAuthInfo,
'access_identity': SetOfSvceAuthInfo,
'charging_identity': SetOfIetfAttrSyntax,
'group': SetOfIetfAttrSyntax,
'role': SetOfRoleSyntax,
'clearance': SetOfClearance,
'timing_metrics': SetOfTimingMetrics,
'timing_policy': SetOfTimingPolicy,
}
def _values_spec(self):
return self._oid_specs.get(self['type'].native, SetOfAny)
_spec_callbacks = {
'values': _values_spec
}
class AttCertAttributes(SequenceOf):
_child_spec = AttCertAttribute
class AttributeCertificateInfoV2(Sequence):
_fields = [
('version', AttCertVersion),
('holder', Holder),
('issuer', AttCertIssuer),
('signature', SignedDigestAlgorithm),
('serial_number', Integer),
('att_cert_validity_period', AttCertValidityPeriod),
('attributes', AttCertAttributes),
('issuer_unique_id', OctetBitString, {'optional': True}),
('extensions', Extensions, {'optional': True}),
]
class AttributeCertificateV2(Sequence):
# Handle the situation where a V2 cert is encoded as V1
_bad_tag = 1
_fields = [
('ac_info', AttributeCertificateInfoV2),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
]
class OtherCertificateFormat(Sequence):
_fields = [
('other_cert_format', ObjectIdentifier),
('other_cert', Any),
]
class CertificateChoices(Choice):
_alternatives = [
('certificate', Certificate),
('extended_certificate', ExtendedCertificate, {'implicit': 0}),
('v1_attr_cert', AttributeCertificateV1, {'implicit': 1}),
('v2_attr_cert', AttributeCertificateV2, {'implicit': 2}),
('other', OtherCertificateFormat, {'implicit': 3}),
]
def validate(self, class_, tag, contents):
"""
Ensures that the class and tag specified exist as an alternative. This
custom version fixes parsing broken encodings there a V2 attribute
# certificate is encoded as a V1
:param class_:
The integer class_ from the encoded value header
:param tag:
The integer tag from the encoded value header
:param contents:
A byte string of the contents of the value - used when the object
is explicitly tagged
:raises:
ValueError - when value is not a valid alternative
"""
super(CertificateChoices, self).validate(class_, tag, contents)
if self._choice == 2:
if AttCertVersion.load(Sequence.load(contents)[0].dump()).native == 'v2':
self._choice = 3
class CertificateSet(SetOf):
_child_spec = CertificateChoices
class ContentInfo(Sequence):
_fields = [
('content_type', ContentType),
('content', Any, {'explicit': 0, 'optional': True}),
]
_oid_pair = ('content_type', 'content')
_oid_specs = {}
class SetOfContentInfo(SetOf):
_child_spec = ContentInfo
class EncapsulatedContentInfo(Sequence):
_fields = [
('content_type', ContentType),
('content', ParsableOctetString, {'explicit': 0, 'optional': True}),
]
_oid_pair = ('content_type', 'content')
_oid_specs = {}
class IssuerAndSerialNumber(Sequence):
_fields = [
('issuer', Name),
('serial_number', Integer),
]
class SignerIdentifier(Choice):
_alternatives = [
('issuer_and_serial_number', IssuerAndSerialNumber),
('subject_key_identifier', OctetString, {'implicit': 0}),
]
class DigestAlgorithms(SetOf):
_child_spec = DigestAlgorithm
class CertificateRevocationLists(SetOf):
_child_spec = CertificateList
class SCVPReqRes(Sequence):
_fields = [
('request', ContentInfo, {'explicit': 0, 'optional': True}),
('response', ContentInfo),
]
class OtherRevInfoFormatId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.16.2': 'ocsp_response',
'1.3.6.1.5.5.7.16.4': 'scvp',
}
class OtherRevocationInfoFormat(Sequence):
_fields = [
('other_rev_info_format', OtherRevInfoFormatId),
('other_rev_info', Any),
]
_oid_pair = ('other_rev_info_format', 'other_rev_info')
_oid_specs = {
'ocsp_response': OCSPResponse,
'scvp': SCVPReqRes,
}
class RevocationInfoChoice(Choice):
_alternatives = [
('crl', CertificateList),
('other', OtherRevocationInfoFormat, {'implicit': 1}),
]
class RevocationInfoChoices(SetOf):
_child_spec = RevocationInfoChoice
class SignerInfo(Sequence):
_fields = [
('version', CMSVersion),
('sid', SignerIdentifier),
('digest_algorithm', DigestAlgorithm),
('signed_attrs', CMSAttributes, {'implicit': 0, 'optional': True}),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetString),
('unsigned_attrs', CMSAttributes, {'implicit': 1, 'optional': True}),
]
class SignerInfos(SetOf):
_child_spec = SignerInfo
class SignedData(Sequence):
_fields = [
('version', CMSVersion),
('digest_algorithms', DigestAlgorithms),
('encap_content_info', None),
('certificates', CertificateSet, {'implicit': 0, 'optional': True}),
('crls', RevocationInfoChoices, {'implicit': 1, 'optional': True}),
('signer_infos', SignerInfos),
]
def _encap_content_info_spec(self):
# If the encap_content_info is version v1, then this could be a PKCS#7
# structure, or a CMS structure. CMS wraps the encoded value in an
# Octet String tag.
# If the version is greater than 1, it is definite CMS
if self['version'].native != 'v1':
return EncapsulatedContentInfo
# Otherwise, the ContentInfo spec from PKCS#7 will be compatible with
# CMS v1 (which only allows Data, an Octet String) and PKCS#7, which
# allows Any
return ContentInfo
_spec_callbacks = {
'encap_content_info': _encap_content_info_spec
}
class OriginatorInfo(Sequence):
_fields = [
('certs', CertificateSet, {'implicit': 0, 'optional': True}),
('crls', RevocationInfoChoices, {'implicit': 1, 'optional': True}),
]
class RecipientIdentifier(Choice):
_alternatives = [
('issuer_and_serial_number', IssuerAndSerialNumber),
('subject_key_identifier', OctetString, {'implicit': 0}),
]
class KeyEncryptionAlgorithmId(ObjectIdentifier):
_map = {
'1.2.840.113549.1.1.1': 'rsa',
'2.16.840.1.101.3.4.1.5': 'aes128_wrap',
'2.16.840.1.101.3.4.1.8': 'aes128_wrap_pad',
'2.16.840.1.101.3.4.1.25': 'aes192_wrap',
'2.16.840.1.101.3.4.1.28': 'aes192_wrap_pad',
'2.16.840.1.101.3.4.1.45': 'aes256_wrap',
'2.16.840.1.101.3.4.1.48': 'aes256_wrap_pad',
}
class KeyEncryptionAlgorithm(_ForceNullParameters, Sequence):
_fields = [
('algorithm', KeyEncryptionAlgorithmId),
('parameters', Any, {'optional': True}),
]
class KeyTransRecipientInfo(Sequence):
_fields = [
('version', CMSVersion),
('rid', RecipientIdentifier),
('key_encryption_algorithm', KeyEncryptionAlgorithm),
('encrypted_key', OctetString),
]
class OriginatorIdentifierOrKey(Choice):
_alternatives = [
('issuer_and_serial_number', IssuerAndSerialNumber),
('subject_key_identifier', OctetString, {'implicit': 0}),
('originator_key', PublicKeyInfo, {'implicit': 1}),
]
class OtherKeyAttribute(Sequence):
_fields = [
('key_attr_id', ObjectIdentifier),
('key_attr', Any),
]
class RecipientKeyIdentifier(Sequence):
_fields = [
('subject_key_identifier', OctetString),
('date', GeneralizedTime, {'optional': True}),
('other', OtherKeyAttribute, {'optional': True}),
]
class KeyAgreementRecipientIdentifier(Choice):
_alternatives = [
('issuer_and_serial_number', IssuerAndSerialNumber),
('r_key_id', RecipientKeyIdentifier, {'implicit': 0}),
]
class RecipientEncryptedKey(Sequence):
_fields = [
('rid', KeyAgreementRecipientIdentifier),
('encrypted_key', OctetString),
]
class RecipientEncryptedKeys(SequenceOf):
_child_spec = RecipientEncryptedKey
class KeyAgreeRecipientInfo(Sequence):
_fields = [
('version', CMSVersion),
('originator', OriginatorIdentifierOrKey, {'explicit': 0}),
('ukm', OctetString, {'explicit': 1, 'optional': True}),
('key_encryption_algorithm', KeyEncryptionAlgorithm),
('recipient_encrypted_keys', RecipientEncryptedKeys),
]
class KEKIdentifier(Sequence):
_fields = [
('key_identifier', OctetString),
('date', GeneralizedTime, {'optional': True}),
('other', OtherKeyAttribute, {'optional': True}),
]
class KEKRecipientInfo(Sequence):
_fields = [
('version', CMSVersion),
('kekid', KEKIdentifier),
('key_encryption_algorithm', KeyEncryptionAlgorithm),
('encrypted_key', OctetString),
]
class PasswordRecipientInfo(Sequence):
_fields = [
('version', CMSVersion),
('key_derivation_algorithm', KdfAlgorithm, {'implicit': 0, 'optional': True}),
('key_encryption_algorithm', KeyEncryptionAlgorithm),
('encrypted_key', OctetString),
]
class OtherRecipientInfo(Sequence):
_fields = [
('ori_type', ObjectIdentifier),
('ori_value', Any),
]
class RecipientInfo(Choice):
_alternatives = [
('ktri', KeyTransRecipientInfo),
('kari', KeyAgreeRecipientInfo, {'implicit': 1}),
('kekri', KEKRecipientInfo, {'implicit': 2}),
('pwri', PasswordRecipientInfo, {'implicit': 3}),
('ori', OtherRecipientInfo, {'implicit': 4}),
]
class RecipientInfos(SetOf):
_child_spec = RecipientInfo
class EncryptedContentInfo(Sequence):
_fields = [
('content_type', ContentType),
('content_encryption_algorithm', EncryptionAlgorithm),
('encrypted_content', OctetString, {'implicit': 0, 'optional': True}),
]
class EnvelopedData(Sequence):
_fields = [
('version', CMSVersion),
('originator_info', OriginatorInfo, {'implicit': 0, 'optional': True}),
('recipient_infos', RecipientInfos),
('encrypted_content_info', EncryptedContentInfo),
('unprotected_attrs', CMSAttributes, {'implicit': 1, 'optional': True}),
]
class SignedAndEnvelopedData(Sequence):
_fields = [
('version', CMSVersion),
('recipient_infos', RecipientInfos),
('digest_algorithms', DigestAlgorithms),
('encrypted_content_info', EncryptedContentInfo),
('certificates', CertificateSet, {'implicit': 0, 'optional': True}),
('crls', CertificateRevocationLists, {'implicit': 1, 'optional': True}),
('signer_infos', SignerInfos),
]
class DigestedData(Sequence):
_fields = [
('version', CMSVersion),
('digest_algorithm', DigestAlgorithm),
('encap_content_info', None),
('digest', OctetString),
]
def _encap_content_info_spec(self):
# If the encap_content_info is version v1, then this could be a PKCS#7
# structure, or a CMS structure. CMS wraps the encoded value in an
# Octet String tag.
# If the version is greater than 1, it is definite CMS
if self['version'].native != 'v1':
return EncapsulatedContentInfo
# Otherwise, the ContentInfo spec from PKCS#7 will be compatible with
# CMS v1 (which only allows Data, an Octet String) and PKCS#7, which
# allows Any
return ContentInfo
_spec_callbacks = {
'encap_content_info': _encap_content_info_spec
}
class EncryptedData(Sequence):
_fields = [
('version', CMSVersion),
('encrypted_content_info', EncryptedContentInfo),
('unprotected_attrs', CMSAttributes, {'implicit': 1, 'optional': True}),
]
class AuthenticatedData(Sequence):
_fields = [
('version', CMSVersion),
('originator_info', OriginatorInfo, {'implicit': 0, 'optional': True}),
('recipient_infos', RecipientInfos),
('mac_algorithm', HmacAlgorithm),
('digest_algorithm', DigestAlgorithm, {'implicit': 1, 'optional': True}),
# This does not require the _spec_callbacks approach of SignedData and
# DigestedData since AuthenticatedData was not part of PKCS#7
('encap_content_info', EncapsulatedContentInfo),
('auth_attrs', CMSAttributes, {'implicit': 2, 'optional': True}),
('mac', OctetString),
('unauth_attrs', CMSAttributes, {'implicit': 3, 'optional': True}),
]
class AuthEnvelopedData(Sequence):
_fields = [
('version', CMSVersion),
('originator_info', OriginatorInfo, {'implicit': 0, 'optional': True}),
('recipient_infos', RecipientInfos),
('auth_encrypted_content_info', EncryptedContentInfo),
('auth_attrs', CMSAttributes, {'implicit': 1, 'optional': True}),
('mac', OctetString),
('unauth_attrs', CMSAttributes, {'implicit': 2, 'optional': True}),
]
class CompressionAlgorithmId(ObjectIdentifier):
_map = {
'1.2.840.113549.1.9.16.3.8': 'zlib',
}
class CompressionAlgorithm(Sequence):
_fields = [
('algorithm', CompressionAlgorithmId),
('parameters', Any, {'optional': True}),
]
class CompressedData(Sequence):
_fields = [
('version', CMSVersion),
('compression_algorithm', CompressionAlgorithm),
('encap_content_info', EncapsulatedContentInfo),
]
_decompressed = None
@property
def decompressed(self):
if self._decompressed is None:
if zlib is None:
raise SystemError('The zlib module is not available')
self._decompressed = zlib.decompress(self['encap_content_info']['content'].native)
return self._decompressed
ContentInfo._oid_specs = {
'data': OctetString,
'signed_data': SignedData,
'enveloped_data': EnvelopedData,
'signed_and_enveloped_data': SignedAndEnvelopedData,
'digested_data': DigestedData,
'encrypted_data': EncryptedData,
'authenticated_data': AuthenticatedData,
'compressed_data': CompressedData,
'authenticated_enveloped_data': AuthEnvelopedData,
}
EncapsulatedContentInfo._oid_specs = {
'signed_data': SignedData,
'enveloped_data': EnvelopedData,
'signed_and_enveloped_data': SignedAndEnvelopedData,
'digested_data': DigestedData,
'encrypted_data': EncryptedData,
'authenticated_data': AuthenticatedData,
'compressed_data': CompressedData,
'authenticated_enveloped_data': AuthEnvelopedData,
}
CMSAttribute._oid_specs = {
'content_type': SetOfContentType,
'message_digest': SetOfOctetString,
'signing_time': SetOfTime,
'counter_signature': SignerInfos,
'signature_time_stamp_token': SetOfContentInfo,
'cms_algorithm_protection': SetOfCMSAlgorithmProtection,
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,536 @@
# coding: utf-8
"""
ASN.1 type classes for certificate revocation lists (CRL). Exports the
following items:
- CertificateList()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import hashlib
from .algos import SignedDigestAlgorithm
from .core import (
Boolean,
Enumerated,
GeneralizedTime,
Integer,
ObjectIdentifier,
OctetBitString,
ParsableOctetString,
Sequence,
SequenceOf,
)
from .x509 import (
AuthorityInfoAccessSyntax,
AuthorityKeyIdentifier,
CRLDistributionPoints,
DistributionPointName,
GeneralNames,
Name,
ReasonFlags,
Time,
)
# The structures in this file are taken from https://tools.ietf.org/html/rfc5280
class Version(Integer):
_map = {
0: 'v1',
1: 'v2',
2: 'v3',
}
class IssuingDistributionPoint(Sequence):
_fields = [
('distribution_point', DistributionPointName, {'explicit': 0, 'optional': True}),
('only_contains_user_certs', Boolean, {'implicit': 1, 'default': False}),
('only_contains_ca_certs', Boolean, {'implicit': 2, 'default': False}),
('only_some_reasons', ReasonFlags, {'implicit': 3, 'optional': True}),
('indirect_crl', Boolean, {'implicit': 4, 'default': False}),
('only_contains_attribute_certs', Boolean, {'implicit': 5, 'default': False}),
]
class TBSCertListExtensionId(ObjectIdentifier):
_map = {
'2.5.29.18': 'issuer_alt_name',
'2.5.29.20': 'crl_number',
'2.5.29.27': 'delta_crl_indicator',
'2.5.29.28': 'issuing_distribution_point',
'2.5.29.35': 'authority_key_identifier',
'2.5.29.46': 'freshest_crl',
'1.3.6.1.5.5.7.1.1': 'authority_information_access',
}
class TBSCertListExtension(Sequence):
_fields = [
('extn_id', TBSCertListExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'issuer_alt_name': GeneralNames,
'crl_number': Integer,
'delta_crl_indicator': Integer,
'issuing_distribution_point': IssuingDistributionPoint,
'authority_key_identifier': AuthorityKeyIdentifier,
'freshest_crl': CRLDistributionPoints,
'authority_information_access': AuthorityInfoAccessSyntax,
}
class TBSCertListExtensions(SequenceOf):
_child_spec = TBSCertListExtension
class CRLReason(Enumerated):
_map = {
0: 'unspecified',
1: 'key_compromise',
2: 'ca_compromise',
3: 'affiliation_changed',
4: 'superseded',
5: 'cessation_of_operation',
6: 'certificate_hold',
8: 'remove_from_crl',
9: 'privilege_withdrawn',
10: 'aa_compromise',
}
@property
def human_friendly(self):
"""
:return:
A unicode string with revocation description that is suitable to
show to end-users. Starts with a lower case letter and phrased in
such a way that it makes sense after the phrase "because of" or
"due to".
"""
return {
'unspecified': 'an unspecified reason',
'key_compromise': 'a compromised key',
'ca_compromise': 'the CA being compromised',
'affiliation_changed': 'an affiliation change',
'superseded': 'certificate supersession',
'cessation_of_operation': 'a cessation of operation',
'certificate_hold': 'a certificate hold',
'remove_from_crl': 'removal from the CRL',
'privilege_withdrawn': 'privilege withdrawl',
'aa_compromise': 'the AA being compromised',
}[self.native]
class CRLEntryExtensionId(ObjectIdentifier):
_map = {
'2.5.29.21': 'crl_reason',
'2.5.29.23': 'hold_instruction_code',
'2.5.29.24': 'invalidity_date',
'2.5.29.29': 'certificate_issuer',
}
class CRLEntryExtension(Sequence):
_fields = [
('extn_id', CRLEntryExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'crl_reason': CRLReason,
'hold_instruction_code': ObjectIdentifier,
'invalidity_date': GeneralizedTime,
'certificate_issuer': GeneralNames,
}
class CRLEntryExtensions(SequenceOf):
_child_spec = CRLEntryExtension
class RevokedCertificate(Sequence):
_fields = [
('user_certificate', Integer),
('revocation_date', Time),
('crl_entry_extensions', CRLEntryExtensions, {'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_crl_reason_value = None
_invalidity_date_value = None
_certificate_issuer_value = None
_issuer_name = False
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['crl_entry_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def crl_reason_value(self):
"""
This extension indicates the reason that a certificate was revoked.
:return:
None or a CRLReason object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._crl_reason_value
@property
def invalidity_date_value(self):
"""
This extension indicates the suspected date/time the private key was
compromised or the certificate became invalid. This would usually be
before the revocation date, which is when the CA processed the
revocation.
:return:
None or a GeneralizedTime object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._invalidity_date_value
@property
def certificate_issuer_value(self):
"""
This extension indicates the issuer of the certificate in question,
and is used in indirect CRLs. CRL entries without this extension are
for certificates issued from the last seen issuer.
:return:
None or an x509.GeneralNames object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._certificate_issuer_value
@property
def issuer_name(self):
"""
:return:
None, or an asn1crypto.x509.Name object for the issuer of the cert
"""
if self._issuer_name is False:
self._issuer_name = None
if self.certificate_issuer_value:
for general_name in self.certificate_issuer_value:
if general_name.name == 'directory_name':
self._issuer_name = general_name.chosen
break
return self._issuer_name
class RevokedCertificates(SequenceOf):
_child_spec = RevokedCertificate
class TbsCertList(Sequence):
_fields = [
('version', Version, {'optional': True}),
('signature', SignedDigestAlgorithm),
('issuer', Name),
('this_update', Time),
('next_update', Time, {'optional': True}),
('revoked_certificates', RevokedCertificates, {'optional': True}),
('crl_extensions', TBSCertListExtensions, {'explicit': 0, 'optional': True}),
]
class CertificateList(Sequence):
_fields = [
('tbs_cert_list', TbsCertList),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
]
_processed_extensions = False
_critical_extensions = None
_issuer_alt_name_value = None
_crl_number_value = None
_delta_crl_indicator_value = None
_issuing_distribution_point_value = None
_authority_key_identifier_value = None
_freshest_crl_value = None
_authority_information_access_value = None
_issuer_cert_urls = None
_delta_crl_distribution_points = None
_sha1 = None
_sha256 = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['tbs_cert_list']['crl_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def issuer_alt_name_value(self):
"""
This extension allows associating one or more alternative names with
the issuer of the CRL.
:return:
None or an x509.GeneralNames object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._issuer_alt_name_value
@property
def crl_number_value(self):
"""
This extension adds a monotonically increasing number to the CRL and is
used to distinguish different versions of the CRL.
:return:
None or an Integer object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._crl_number_value
@property
def delta_crl_indicator_value(self):
"""
This extension indicates a CRL is a delta CRL, and contains the CRL
number of the base CRL that it is a delta from.
:return:
None or an Integer object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._delta_crl_indicator_value
@property
def issuing_distribution_point_value(self):
"""
This extension includes information about what types of revocations
and certificates are part of the CRL.
:return:
None or an IssuingDistributionPoint object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._issuing_distribution_point_value
@property
def authority_key_identifier_value(self):
"""
This extension helps in identifying the public key with which to
validate the authenticity of the CRL.
:return:
None or an AuthorityKeyIdentifier object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._authority_key_identifier_value
@property
def freshest_crl_value(self):
"""
This extension is used in complete CRLs to indicate where a delta CRL
may be located.
:return:
None or a CRLDistributionPoints object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._freshest_crl_value
@property
def authority_information_access_value(self):
"""
This extension is used to provide a URL with which to download the
certificate used to sign this CRL.
:return:
None or an AuthorityInfoAccessSyntax object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._authority_information_access_value
@property
def issuer(self):
"""
:return:
An asn1crypto.x509.Name object for the issuer of the CRL
"""
return self['tbs_cert_list']['issuer']
@property
def authority_key_identifier(self):
"""
:return:
None or a byte string of the key_identifier from the authority key
identifier extension
"""
if not self.authority_key_identifier_value:
return None
return self.authority_key_identifier_value['key_identifier'].native
@property
def issuer_cert_urls(self):
"""
:return:
A list of unicode strings that are URLs that should contain either
an individual DER-encoded X.509 certificate, or a DER-encoded CMS
message containing multiple certificates
"""
if self._issuer_cert_urls is None:
self._issuer_cert_urls = []
if self.authority_information_access_value:
for entry in self.authority_information_access_value:
if entry['access_method'].native == 'ca_issuers':
location = entry['access_location']
if location.name != 'uniform_resource_identifier':
continue
url = location.native
if url.lower()[0:7] == 'http://':
self._issuer_cert_urls.append(url)
return self._issuer_cert_urls
@property
def delta_crl_distribution_points(self):
"""
Returns delta CRL URLs - only applies to complete CRLs
:return:
A list of zero or more DistributionPoint objects
"""
if self._delta_crl_distribution_points is None:
self._delta_crl_distribution_points = []
if self.freshest_crl_value is not None:
for distribution_point in self.freshest_crl_value:
distribution_point_name = distribution_point['distribution_point']
# RFC 5280 indicates conforming CA should not use the relative form
if distribution_point_name.name == 'name_relative_to_crl_issuer':
continue
# This library is currently only concerned with HTTP-based CRLs
for general_name in distribution_point_name.chosen:
if general_name.name == 'uniform_resource_identifier':
self._delta_crl_distribution_points.append(distribution_point)
return self._delta_crl_distribution_points
@property
def signature(self):
"""
:return:
A byte string of the signature
"""
return self['signature'].native
@property
def sha1(self):
"""
:return:
The SHA1 hash of the DER-encoded bytes of this certificate list
"""
if self._sha1 is None:
self._sha1 = hashlib.sha1(self.dump()).digest()
return self._sha1
@property
def sha256(self):
"""
:return:
The SHA-256 hash of the DER-encoded bytes of this certificate list
"""
if self._sha256 is None:
self._sha256 = hashlib.sha256(self.dump()).digest()
return self._sha256

View file

@ -0,0 +1,96 @@
# coding: utf-8
"""
ASN.1 type classes for certificate signing requests (CSR). Exports the
following items:
- CertificatationRequest()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .algos import SignedDigestAlgorithm
from .core import (
Any,
Integer,
ObjectIdentifier,
OctetBitString,
Sequence,
SetOf,
)
from .keys import PublicKeyInfo
from .x509 import DirectoryString, Extensions, Name
# The structures in this file are taken from https://tools.ietf.org/html/rfc2986
# and https://tools.ietf.org/html/rfc2985
class Version(Integer):
_map = {
0: 'v1',
}
class CSRAttributeType(ObjectIdentifier):
_map = {
'1.2.840.113549.1.9.7': 'challenge_password',
'1.2.840.113549.1.9.9': 'extended_certificate_attributes',
'1.2.840.113549.1.9.14': 'extension_request',
}
class SetOfDirectoryString(SetOf):
_child_spec = DirectoryString
class Attribute(Sequence):
_fields = [
('type', ObjectIdentifier),
('values', SetOf, {'spec': Any}),
]
class SetOfAttributes(SetOf):
_child_spec = Attribute
class SetOfExtensions(SetOf):
_child_spec = Extensions
class CRIAttribute(Sequence):
_fields = [
('type', CSRAttributeType),
('values', Any),
]
_oid_pair = ('type', 'values')
_oid_specs = {
'challenge_password': SetOfDirectoryString,
'extended_certificate_attributes': SetOfAttributes,
'extension_request': SetOfExtensions,
}
class CRIAttributes(SetOf):
_child_spec = CRIAttribute
class CertificationRequestInfo(Sequence):
_fields = [
('version', Version),
('subject', Name),
('subject_pk_info', PublicKeyInfo),
('attributes', CRIAttributes, {'implicit': 0, 'optional': True}),
]
class CertificationRequest(Sequence):
_fields = [
('certification_request_info', CertificationRequestInfo),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
]

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,652 @@
# coding: utf-8
"""
ASN.1 type classes for the online certificate status protocol (OCSP). Exports
the following items:
- OCSPRequest()
- OCSPResponse()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .algos import DigestAlgorithm, SignedDigestAlgorithm
from .core import (
Boolean,
Choice,
Enumerated,
GeneralizedTime,
IA5String,
Integer,
Null,
ObjectIdentifier,
OctetBitString,
OctetString,
ParsableOctetString,
Sequence,
SequenceOf,
)
from .crl import AuthorityInfoAccessSyntax, CRLReason
from .keys import PublicKeyAlgorithm
from .x509 import Certificate, GeneralName, GeneralNames, Name
# The structures in this file are taken from https://tools.ietf.org/html/rfc6960
class Version(Integer):
_map = {
0: 'v1'
}
class CertId(Sequence):
_fields = [
('hash_algorithm', DigestAlgorithm),
('issuer_name_hash', OctetString),
('issuer_key_hash', OctetString),
('serial_number', Integer),
]
class ServiceLocator(Sequence):
_fields = [
('issuer', Name),
('locator', AuthorityInfoAccessSyntax),
]
class RequestExtensionId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.7': 'service_locator',
}
class RequestExtension(Sequence):
_fields = [
('extn_id', RequestExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'service_locator': ServiceLocator,
}
class RequestExtensions(SequenceOf):
_child_spec = RequestExtension
class Request(Sequence):
_fields = [
('req_cert', CertId),
('single_request_extensions', RequestExtensions, {'explicit': 0, 'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_service_locator_value = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['single_request_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def service_locator_value(self):
"""
This extension is used when communicating with an OCSP responder that
acts as a proxy for OCSP requests
:return:
None or a ServiceLocator object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._service_locator_value
class Requests(SequenceOf):
_child_spec = Request
class ResponseType(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.1': 'basic_ocsp_response',
}
class AcceptableResponses(SequenceOf):
_child_spec = ResponseType
class PreferredSignatureAlgorithm(Sequence):
_fields = [
('sig_identifier', SignedDigestAlgorithm),
('cert_identifier', PublicKeyAlgorithm, {'optional': True}),
]
class PreferredSignatureAlgorithms(SequenceOf):
_child_spec = PreferredSignatureAlgorithm
class TBSRequestExtensionId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.2': 'nonce',
'1.3.6.1.5.5.7.48.1.4': 'acceptable_responses',
'1.3.6.1.5.5.7.48.1.8': 'preferred_signature_algorithms',
}
class TBSRequestExtension(Sequence):
_fields = [
('extn_id', TBSRequestExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'nonce': OctetString,
'acceptable_responses': AcceptableResponses,
'preferred_signature_algorithms': PreferredSignatureAlgorithms,
}
class TBSRequestExtensions(SequenceOf):
_child_spec = TBSRequestExtension
class TBSRequest(Sequence):
_fields = [
('version', Version, {'explicit': 0, 'default': 'v1'}),
('requestor_name', GeneralName, {'explicit': 1, 'optional': True}),
('request_list', Requests),
('request_extensions', TBSRequestExtensions, {'explicit': 2, 'optional': True}),
]
class Certificates(SequenceOf):
_child_spec = Certificate
class Signature(Sequence):
_fields = [
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
('certs', Certificates, {'explicit': 0, 'optional': True}),
]
class OCSPRequest(Sequence):
_fields = [
('tbs_request', TBSRequest),
('optional_signature', Signature, {'explicit': 0, 'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_nonce_value = None
_acceptable_responses_value = None
_preferred_signature_algorithms_value = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['tbs_request']['request_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def nonce_value(self):
"""
This extension is used to prevent replay attacks by including a unique,
random value with each request/response pair
:return:
None or an OctetString object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._nonce_value
@property
def acceptable_responses_value(self):
"""
This extension is used to allow the client and server to communicate
with alternative response formats other than just basic_ocsp_response,
although no other formats are defined in the standard.
:return:
None or an AcceptableResponses object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._acceptable_responses_value
@property
def preferred_signature_algorithms_value(self):
"""
This extension is used by the client to define what signature algorithms
are preferred, including both the hash algorithm and the public key
algorithm, with a level of detail down to even the public key algorithm
parameters, such as curve name.
:return:
None or a PreferredSignatureAlgorithms object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._preferred_signature_algorithms_value
class OCSPResponseStatus(Enumerated):
_map = {
0: 'successful',
1: 'malformed_request',
2: 'internal_error',
3: 'try_later',
5: 'sign_required',
6: 'unauthorized',
}
class ResponderId(Choice):
_alternatives = [
('by_name', Name, {'explicit': 1}),
('by_key', OctetString, {'explicit': 2}),
]
class RevokedInfo(Sequence):
_fields = [
('revocation_time', GeneralizedTime),
('revocation_reason', CRLReason, {'explicit': 0, 'optional': True}),
]
class CertStatus(Choice):
_alternatives = [
('good', Null, {'implicit': 0}),
('revoked', RevokedInfo, {'implicit': 1}),
('unknown', Null, {'implicit': 2}),
]
class CrlId(Sequence):
_fields = [
('crl_url', IA5String, {'explicit': 0, 'optional': True}),
('crl_num', Integer, {'explicit': 1, 'optional': True}),
('crl_time', GeneralizedTime, {'explicit': 2, 'optional': True}),
]
class SingleResponseExtensionId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.3': 'crl',
'1.3.6.1.5.5.7.48.1.6': 'archive_cutoff',
# These are CRLEntryExtension values from
# https://tools.ietf.org/html/rfc5280
'2.5.29.21': 'crl_reason',
'2.5.29.24': 'invalidity_date',
'2.5.29.29': 'certificate_issuer',
# https://tools.ietf.org/html/rfc6962.html#page-13
'1.3.6.1.4.1.11129.2.4.5': 'signed_certificate_timestamp_list',
}
class SingleResponseExtension(Sequence):
_fields = [
('extn_id', SingleResponseExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'crl': CrlId,
'archive_cutoff': GeneralizedTime,
'crl_reason': CRLReason,
'invalidity_date': GeneralizedTime,
'certificate_issuer': GeneralNames,
'signed_certificate_timestamp_list': OctetString,
}
class SingleResponseExtensions(SequenceOf):
_child_spec = SingleResponseExtension
class SingleResponse(Sequence):
_fields = [
('cert_id', CertId),
('cert_status', CertStatus),
('this_update', GeneralizedTime),
('next_update', GeneralizedTime, {'explicit': 0, 'optional': True}),
('single_extensions', SingleResponseExtensions, {'explicit': 1, 'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_crl_value = None
_archive_cutoff_value = None
_crl_reason_value = None
_invalidity_date_value = None
_certificate_issuer_value = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['single_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def crl_value(self):
"""
This extension is used to locate the CRL that a certificate's revocation
is contained within.
:return:
None or a CrlId object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._crl_value
@property
def archive_cutoff_value(self):
"""
This extension is used to indicate the date at which an archived
(historical) certificate status entry will no longer be available.
:return:
None or a GeneralizedTime object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._archive_cutoff_value
@property
def crl_reason_value(self):
"""
This extension indicates the reason that a certificate was revoked.
:return:
None or a CRLReason object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._crl_reason_value
@property
def invalidity_date_value(self):
"""
This extension indicates the suspected date/time the private key was
compromised or the certificate became invalid. This would usually be
before the revocation date, which is when the CA processed the
revocation.
:return:
None or a GeneralizedTime object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._invalidity_date_value
@property
def certificate_issuer_value(self):
"""
This extension indicates the issuer of the certificate in question.
:return:
None or an x509.GeneralNames object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._certificate_issuer_value
class Responses(SequenceOf):
_child_spec = SingleResponse
class ResponseDataExtensionId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.2': 'nonce',
'1.3.6.1.5.5.7.48.1.9': 'extended_revoke',
}
class ResponseDataExtension(Sequence):
_fields = [
('extn_id', ResponseDataExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'nonce': OctetString,
'extended_revoke': Null,
}
class ResponseDataExtensions(SequenceOf):
_child_spec = ResponseDataExtension
class ResponseData(Sequence):
_fields = [
('version', Version, {'explicit': 0, 'default': 'v1'}),
('responder_id', ResponderId),
('produced_at', GeneralizedTime),
('responses', Responses),
('response_extensions', ResponseDataExtensions, {'explicit': 1, 'optional': True}),
]
class BasicOCSPResponse(Sequence):
_fields = [
('tbs_response_data', ResponseData),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
('certs', Certificates, {'explicit': 0, 'optional': True}),
]
class ResponseBytes(Sequence):
_fields = [
('response_type', ResponseType),
('response', ParsableOctetString),
]
_oid_pair = ('response_type', 'response')
_oid_specs = {
'basic_ocsp_response': BasicOCSPResponse,
}
class OCSPResponse(Sequence):
_fields = [
('response_status', OCSPResponseStatus),
('response_bytes', ResponseBytes, {'explicit': 0, 'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_nonce_value = None
_extended_revoke_value = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['response_bytes']['response'].parsed['tbs_response_data']['response_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def nonce_value(self):
"""
This extension is used to prevent replay attacks on the request/response
exchange
:return:
None or an OctetString object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._nonce_value
@property
def extended_revoke_value(self):
"""
This extension is used to signal that the responder will return a
"revoked" status for non-issued certificates.
:return:
None or a Null object (if present)
"""
if self._processed_extensions is False:
self._set_extensions()
return self._extended_revoke_value
@property
def basic_ocsp_response(self):
"""
A shortcut into the BasicOCSPResponse sequence
:return:
None or an asn1crypto.ocsp.BasicOCSPResponse object
"""
return self['response_bytes']['response'].parsed
@property
def response_data(self):
"""
A shortcut into the parsed, ResponseData sequence
:return:
None or an asn1crypto.ocsp.ResponseData object
"""
return self['response_bytes']['response'].parsed['tbs_response_data']

View file

@ -0,0 +1,289 @@
# coding: utf-8
"""
Functions for parsing and dumping using the ASN.1 DER encoding. Exports the
following items:
- emit()
- parse()
- peek()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import sys
from ._types import byte_cls, chr_cls, type_name
from .util import int_from_bytes, int_to_bytes
_PY2 = sys.version_info <= (3,)
_INSUFFICIENT_DATA_MESSAGE = 'Insufficient data - %s bytes requested but only %s available'
def emit(class_, method, tag, contents):
"""
Constructs a byte string of an ASN.1 DER-encoded value
This is typically not useful. Instead, use one of the standard classes from
asn1crypto.core, or construct a new class with specific fields, and call the
.dump() method.
:param class_:
An integer ASN.1 class value: 0 (universal), 1 (application),
2 (context), 3 (private)
:param method:
An integer ASN.1 method value: 0 (primitive), 1 (constructed)
:param tag:
An integer ASN.1 tag value
:param contents:
A byte string of the encoded byte contents
:return:
A byte string of the ASN.1 DER value (header and contents)
"""
if not isinstance(class_, int):
raise TypeError('class_ must be an integer, not %s' % type_name(class_))
if class_ < 0 or class_ > 3:
raise ValueError('class_ must be one of 0, 1, 2 or 3, not %s' % class_)
if not isinstance(method, int):
raise TypeError('method must be an integer, not %s' % type_name(method))
if method < 0 or method > 1:
raise ValueError('method must be 0 or 1, not %s' % method)
if not isinstance(tag, int):
raise TypeError('tag must be an integer, not %s' % type_name(tag))
if tag < 0:
raise ValueError('tag must be greater than zero, not %s' % tag)
if not isinstance(contents, byte_cls):
raise TypeError('contents must be a byte string, not %s' % type_name(contents))
return _dump_header(class_, method, tag, contents) + contents
def parse(contents, strict=False):
"""
Parses a byte string of ASN.1 BER/DER-encoded data.
This is typically not useful. Instead, use one of the standard classes from
asn1crypto.core, or construct a new class with specific fields, and call the
.load() class method.
:param contents:
A byte string of BER/DER-encoded data
:param strict:
A boolean indicating if trailing data should be forbidden - if so, a
ValueError will be raised when trailing data exists
:raises:
ValueError - when the contents do not contain an ASN.1 header or are truncated in some way
TypeError - when contents is not a byte string
:return:
A 6-element tuple:
- 0: integer class (0 to 3)
- 1: integer method
- 2: integer tag
- 3: byte string header
- 4: byte string content
- 5: byte string trailer
"""
if not isinstance(contents, byte_cls):
raise TypeError('contents must be a byte string, not %s' % type_name(contents))
contents_len = len(contents)
info, consumed = _parse(contents, contents_len)
if strict and consumed != contents_len:
raise ValueError('Extra data - %d bytes of trailing data were provided' % (contents_len - consumed))
return info
def peek(contents):
"""
Parses a byte string of ASN.1 BER/DER-encoded data to find the length
This is typically used to look into an encoded value to see how long the
next chunk of ASN.1-encoded data is. Primarily it is useful when a
value is a concatenation of multiple values.
:param contents:
A byte string of BER/DER-encoded data
:raises:
ValueError - when the contents do not contain an ASN.1 header or are truncated in some way
TypeError - when contents is not a byte string
:return:
An integer with the number of bytes occupied by the ASN.1 value
"""
if not isinstance(contents, byte_cls):
raise TypeError('contents must be a byte string, not %s' % type_name(contents))
info, consumed = _parse(contents, len(contents))
return consumed
def _parse(encoded_data, data_len, pointer=0, lengths_only=False):
"""
Parses a byte string into component parts
:param encoded_data:
A byte string that contains BER-encoded data
:param data_len:
The integer length of the encoded data
:param pointer:
The index in the byte string to parse from
:param lengths_only:
A boolean to cause the call to return a 2-element tuple of the integer
number of bytes in the header and the integer number of bytes in the
contents. Internal use only.
:return:
A 2-element tuple:
- 0: A tuple of (class_, method, tag, header, content, trailer)
- 1: An integer indicating how many bytes were consumed
"""
if data_len < pointer + 2:
raise ValueError(_INSUFFICIENT_DATA_MESSAGE % (2, data_len - pointer))
start = pointer
first_octet = ord(encoded_data[pointer]) if _PY2 else encoded_data[pointer]
pointer += 1
tag = first_octet & 31
# Base 128 length using 8th bit as continuation indicator
if tag == 31:
tag = 0
while True:
num = ord(encoded_data[pointer]) if _PY2 else encoded_data[pointer]
pointer += 1
tag *= 128
tag += num & 127
if num >> 7 == 0:
break
length_octet = ord(encoded_data[pointer]) if _PY2 else encoded_data[pointer]
pointer += 1
if length_octet >> 7 == 0:
if lengths_only:
return (pointer, pointer + (length_octet & 127))
contents_end = pointer + (length_octet & 127)
else:
length_octets = length_octet & 127
if length_octets:
pointer += length_octets
contents_end = pointer + int_from_bytes(encoded_data[pointer - length_octets:pointer], signed=False)
if lengths_only:
return (pointer, contents_end)
else:
# To properly parse indefinite length values, we need to scan forward
# parsing headers until we find a value with a length of zero. If we
# just scanned looking for \x00\x00, nested indefinite length values
# would not work.
contents_end = pointer
# Unfortunately we need to understand the contents of the data to
# properly scan forward, which bleeds some representation info into
# the parser. This condition handles the unused bits byte in
# constructed bit strings.
if tag == 3:
contents_end += 1
while contents_end < data_len:
sub_header_end, contents_end = _parse(encoded_data, data_len, contents_end, lengths_only=True)
if contents_end == sub_header_end and encoded_data[contents_end - 2:contents_end] == b'\x00\x00':
break
if lengths_only:
return (pointer, contents_end)
if contents_end > data_len:
raise ValueError(_INSUFFICIENT_DATA_MESSAGE % (contents_end, data_len))
return (
(
first_octet >> 6,
(first_octet >> 5) & 1,
tag,
encoded_data[start:pointer],
encoded_data[pointer:contents_end - 2],
b'\x00\x00'
),
contents_end
)
if contents_end > data_len:
raise ValueError(_INSUFFICIENT_DATA_MESSAGE % (contents_end, data_len))
return (
(
first_octet >> 6,
(first_octet >> 5) & 1,
tag,
encoded_data[start:pointer],
encoded_data[pointer:contents_end],
b''
),
contents_end
)
def _dump_header(class_, method, tag, contents):
"""
Constructs the header bytes for an ASN.1 object
:param class_:
An integer ASN.1 class value: 0 (universal), 1 (application),
2 (context), 3 (private)
:param method:
An integer ASN.1 method value: 0 (primitive), 1 (constructed)
:param tag:
An integer ASN.1 tag value
:param contents:
A byte string of the encoded byte contents
:return:
A byte string of the ASN.1 DER header
"""
header = b''
id_num = 0
id_num |= class_ << 6
id_num |= method << 5
if tag >= 31:
header += chr_cls(id_num | 31)
while tag > 0:
continuation_bit = 0x80 if tag > 0x7F else 0
header += chr_cls(continuation_bit | (tag & 0x7F))
tag = tag >> 7
else:
header += chr_cls(id_num | tag)
length = len(contents)
if length <= 127:
header += chr_cls(length)
else:
length_bytes = int_to_bytes(length)
header += chr_cls(0x80 | len(length_bytes))
header += length_bytes
return header

View file

@ -0,0 +1,84 @@
# coding: utf-8
"""
ASN.1 type classes for PDF signature structures. Adds extra oid mapping and
value parsing to asn1crypto.x509.Extension() and asn1crypto.xms.CMSAttribute().
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .cms import CMSAttributeType, CMSAttribute
from .core import (
Boolean,
Integer,
Null,
ObjectIdentifier,
OctetString,
Sequence,
SequenceOf,
SetOf,
)
from .crl import CertificateList
from .ocsp import OCSPResponse
from .x509 import (
Extension,
ExtensionId,
GeneralName,
KeyPurposeId,
)
class AdobeArchiveRevInfo(Sequence):
_fields = [
('version', Integer)
]
class AdobeTimestamp(Sequence):
_fields = [
('version', Integer),
('location', GeneralName),
('requires_auth', Boolean, {'optional': True, 'default': False}),
]
class OtherRevInfo(Sequence):
_fields = [
('type', ObjectIdentifier),
('value', OctetString),
]
class SequenceOfCertificateList(SequenceOf):
_child_spec = CertificateList
class SequenceOfOCSPResponse(SequenceOf):
_child_spec = OCSPResponse
class SequenceOfOtherRevInfo(SequenceOf):
_child_spec = OtherRevInfo
class RevocationInfoArchival(Sequence):
_fields = [
('crl', SequenceOfCertificateList, {'explicit': 0, 'optional': True}),
('ocsp', SequenceOfOCSPResponse, {'explicit': 1, 'optional': True}),
('other_rev_info', SequenceOfOtherRevInfo, {'explicit': 2, 'optional': True}),
]
class SetOfRevocationInfoArchival(SetOf):
_child_spec = RevocationInfoArchival
ExtensionId._map['1.2.840.113583.1.1.9.2'] = 'adobe_archive_rev_info'
ExtensionId._map['1.2.840.113583.1.1.9.1'] = 'adobe_timestamp'
ExtensionId._map['1.2.840.113583.1.1.10'] = 'adobe_ppklite_credential'
Extension._oid_specs['adobe_archive_rev_info'] = AdobeArchiveRevInfo
Extension._oid_specs['adobe_timestamp'] = AdobeTimestamp
Extension._oid_specs['adobe_ppklite_credential'] = Null
KeyPurposeId._map['1.2.840.113583.1.1.5'] = 'pdf_signing'
CMSAttributeType._map['1.2.840.113583.1.1.8'] = 'adobe_revocation_info_archival'
CMSAttribute._oid_specs['adobe_revocation_info_archival'] = SetOfRevocationInfoArchival

View file

@ -0,0 +1,222 @@
# coding: utf-8
"""
Encoding DER to PEM and decoding PEM to DER. Exports the following items:
- armor()
- detect()
- unarmor()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import base64
import re
import sys
from ._errors import unwrap
from ._types import type_name as _type_name, str_cls, byte_cls
if sys.version_info < (3,):
from cStringIO import StringIO as BytesIO
else:
from io import BytesIO
def detect(byte_string):
"""
Detect if a byte string seems to contain a PEM-encoded block
:param byte_string:
A byte string to look through
:return:
A boolean, indicating if a PEM-encoded block is contained in the byte
string
"""
if not isinstance(byte_string, byte_cls):
raise TypeError(unwrap(
'''
byte_string must be a byte string, not %s
''',
_type_name(byte_string)
))
return byte_string.find(b'-----BEGIN') != -1 or byte_string.find(b'---- BEGIN') != -1
def armor(type_name, der_bytes, headers=None):
"""
Armors a DER-encoded byte string in PEM
:param type_name:
A unicode string that will be capitalized and placed in the header
and footer of the block. E.g. "CERTIFICATE", "PRIVATE KEY", etc. This
will appear as "-----BEGIN CERTIFICATE-----" and
"-----END CERTIFICATE-----".
:param der_bytes:
A byte string to be armored
:param headers:
An OrderedDict of the header lines to write after the BEGIN line
:return:
A byte string of the PEM block
"""
if not isinstance(der_bytes, byte_cls):
raise TypeError(unwrap(
'''
der_bytes must be a byte string, not %s
''' % _type_name(der_bytes)
))
if not isinstance(type_name, str_cls):
raise TypeError(unwrap(
'''
type_name must be a unicode string, not %s
''',
_type_name(type_name)
))
type_name = type_name.upper().encode('ascii')
output = BytesIO()
output.write(b'-----BEGIN ')
output.write(type_name)
output.write(b'-----\n')
if headers:
for key in headers:
output.write(key.encode('ascii'))
output.write(b': ')
output.write(headers[key].encode('ascii'))
output.write(b'\n')
output.write(b'\n')
b64_bytes = base64.b64encode(der_bytes)
b64_len = len(b64_bytes)
i = 0
while i < b64_len:
output.write(b64_bytes[i:i + 64])
output.write(b'\n')
i += 64
output.write(b'-----END ')
output.write(type_name)
output.write(b'-----\n')
return output.getvalue()
def _unarmor(pem_bytes):
"""
Convert a PEM-encoded byte string into one or more DER-encoded byte strings
:param pem_bytes:
A byte string of the PEM-encoded data
:raises:
ValueError - when the pem_bytes do not appear to be PEM-encoded bytes
:return:
A generator of 3-element tuples in the format: (object_type, headers,
der_bytes). The object_type is a unicode string of what is between
"-----BEGIN " and "-----". Examples include: "CERTIFICATE",
"PUBLIC KEY", "PRIVATE KEY". The headers is a dict containing any lines
in the form "Name: Value" that are right after the begin line.
"""
if not isinstance(pem_bytes, byte_cls):
raise TypeError(unwrap(
'''
pem_bytes must be a byte string, not %s
''',
_type_name(pem_bytes)
))
# Valid states include: "trash", "headers", "body"
state = 'trash'
headers = {}
base64_data = b''
object_type = None
found_start = False
found_end = False
for line in pem_bytes.splitlines(False):
if line == b'':
continue
if state == "trash":
# Look for a starting line since some CA cert bundle show the cert
# into in a parsed format above each PEM block
type_name_match = re.match(b'^(?:---- |-----)BEGIN ([A-Z0-9 ]+)(?: ----|-----)', line)
if not type_name_match:
continue
object_type = type_name_match.group(1).decode('ascii')
found_start = True
state = 'headers'
continue
if state == 'headers':
if line.find(b':') == -1:
state = 'body'
else:
decoded_line = line.decode('ascii')
name, value = decoded_line.split(':', 1)
headers[name] = value.strip()
continue
if state == 'body':
if line[0:5] in (b'-----', b'---- '):
der_bytes = base64.b64decode(base64_data)
yield (object_type, headers, der_bytes)
state = 'trash'
headers = {}
base64_data = b''
object_type = None
found_end = True
continue
base64_data += line
if not found_start or not found_end:
raise ValueError(unwrap(
'''
pem_bytes does not appear to contain PEM-encoded data - no
BEGIN/END combination found
'''
))
def unarmor(pem_bytes, multiple=False):
"""
Convert a PEM-encoded byte string into a DER-encoded byte string
:param pem_bytes:
A byte string of the PEM-encoded data
:param multiple:
If True, function will return a generator
:raises:
ValueError - when the pem_bytes do not appear to be PEM-encoded bytes
:return:
A 3-element tuple (object_name, headers, der_bytes). The object_name is
a unicode string of what is between "-----BEGIN " and "-----". Examples
include: "CERTIFICATE", "PUBLIC KEY", "PRIVATE KEY". The headers is a
dict containing any lines in the form "Name: Value" that are right
after the begin line.
"""
generator = _unarmor(pem_bytes)
if not multiple:
return next(generator)
return generator

View file

@ -0,0 +1,193 @@
# coding: utf-8
"""
ASN.1 type classes for PKCS#12 files. Exports the following items:
- CertBag()
- CrlBag()
- Pfx()
- SafeBag()
- SecretBag()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .algos import DigestInfo
from .cms import ContentInfo, SignedData
from .core import (
Any,
BMPString,
Integer,
ObjectIdentifier,
OctetString,
ParsableOctetString,
Sequence,
SequenceOf,
SetOf,
)
from .keys import PrivateKeyInfo, EncryptedPrivateKeyInfo
from .x509 import Certificate, KeyPurposeId
# The structures in this file are taken from https://tools.ietf.org/html/rfc7292
class MacData(Sequence):
_fields = [
('mac', DigestInfo),
('mac_salt', OctetString),
('iterations', Integer, {'default': 1}),
]
class Version(Integer):
_map = {
3: 'v3'
}
class AttributeType(ObjectIdentifier):
_map = {
# https://tools.ietf.org/html/rfc2985#page-18
'1.2.840.113549.1.9.20': 'friendly_name',
'1.2.840.113549.1.9.21': 'local_key_id',
# https://support.microsoft.com/en-us/kb/287547
'1.3.6.1.4.1.311.17.1': 'microsoft_local_machine_keyset',
# https://github.com/frohoff/jdk8u-dev-jdk/blob/master/src/share/classes/sun/security/pkcs12/PKCS12KeyStore.java
# this is a set of OIDs, representing key usage, the usual value is a SET of one element OID 2.5.29.37.0
'2.16.840.1.113894.746875.1.1': 'trusted_key_usage',
}
class SetOfAny(SetOf):
_child_spec = Any
class SetOfBMPString(SetOf):
_child_spec = BMPString
class SetOfOctetString(SetOf):
_child_spec = OctetString
class SetOfKeyPurposeId(SetOf):
_child_spec = KeyPurposeId
class Attribute(Sequence):
_fields = [
('type', AttributeType),
('values', None),
]
_oid_specs = {
'friendly_name': SetOfBMPString,
'local_key_id': SetOfOctetString,
'microsoft_csp_name': SetOfBMPString,
'trusted_key_usage': SetOfKeyPurposeId,
}
def _values_spec(self):
return self._oid_specs.get(self['type'].native, SetOfAny)
_spec_callbacks = {
'values': _values_spec
}
class Attributes(SetOf):
_child_spec = Attribute
class Pfx(Sequence):
_fields = [
('version', Version),
('auth_safe', ContentInfo),
('mac_data', MacData, {'optional': True})
]
_authenticated_safe = None
@property
def authenticated_safe(self):
if self._authenticated_safe is None:
content = self['auth_safe']['content']
if isinstance(content, SignedData):
content = content['content_info']['content']
self._authenticated_safe = AuthenticatedSafe.load(content.native)
return self._authenticated_safe
class AuthenticatedSafe(SequenceOf):
_child_spec = ContentInfo
class BagId(ObjectIdentifier):
_map = {
'1.2.840.113549.1.12.10.1.1': 'key_bag',
'1.2.840.113549.1.12.10.1.2': 'pkcs8_shrouded_key_bag',
'1.2.840.113549.1.12.10.1.3': 'cert_bag',
'1.2.840.113549.1.12.10.1.4': 'crl_bag',
'1.2.840.113549.1.12.10.1.5': 'secret_bag',
'1.2.840.113549.1.12.10.1.6': 'safe_contents',
}
class CertId(ObjectIdentifier):
_map = {
'1.2.840.113549.1.9.22.1': 'x509',
'1.2.840.113549.1.9.22.2': 'sdsi',
}
class CertBag(Sequence):
_fields = [
('cert_id', CertId),
('cert_value', ParsableOctetString, {'explicit': 0}),
]
_oid_pair = ('cert_id', 'cert_value')
_oid_specs = {
'x509': Certificate,
}
class CrlBag(Sequence):
_fields = [
('crl_id', ObjectIdentifier),
('crl_value', OctetString, {'explicit': 0}),
]
class SecretBag(Sequence):
_fields = [
('secret_type_id', ObjectIdentifier),
('secret_value', OctetString, {'explicit': 0}),
]
class SafeContents(SequenceOf):
pass
class SafeBag(Sequence):
_fields = [
('bag_id', BagId),
('bag_value', Any, {'explicit': 0}),
('bag_attributes', Attributes, {'optional': True}),
]
_oid_pair = ('bag_id', 'bag_value')
_oid_specs = {
'key_bag': PrivateKeyInfo,
'pkcs8_shrouded_key_bag': EncryptedPrivateKeyInfo,
'cert_bag': CertBag,
'crl_bag': CrlBag,
'secret_bag': SecretBag,
'safe_contents': SafeContents
}
SafeContents._child_spec = SafeBag

View file

@ -0,0 +1,310 @@
# coding: utf-8
"""
ASN.1 type classes for the time stamp protocol (TSP). Exports the following
items:
- TimeStampReq()
- TimeStampResp()
Also adds TimeStampedData() support to asn1crypto.cms.ContentInfo(),
TimeStampedData() and TSTInfo() support to
asn1crypto.cms.EncapsulatedContentInfo() and some oids and value parsers to
asn1crypto.cms.CMSAttribute().
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .algos import DigestAlgorithm
from .cms import (
CMSAttribute,
CMSAttributeType,
ContentInfo,
ContentType,
EncapsulatedContentInfo,
)
from .core import (
Any,
BitString,
Boolean,
Choice,
GeneralizedTime,
IA5String,
Integer,
ObjectIdentifier,
OctetString,
Sequence,
SequenceOf,
SetOf,
UTF8String,
)
from .crl import CertificateList
from .x509 import (
Attributes,
CertificatePolicies,
GeneralName,
GeneralNames,
)
# The structures in this file are based on https://tools.ietf.org/html/rfc3161,
# https://tools.ietf.org/html/rfc4998, https://tools.ietf.org/html/rfc5544,
# https://tools.ietf.org/html/rfc5035, https://tools.ietf.org/html/rfc2634
class Version(Integer):
_map = {
0: 'v0',
1: 'v1',
2: 'v2',
3: 'v3',
4: 'v4',
5: 'v5',
}
class MessageImprint(Sequence):
_fields = [
('hash_algorithm', DigestAlgorithm),
('hashed_message', OctetString),
]
class Accuracy(Sequence):
_fields = [
('seconds', Integer, {'optional': True}),
('millis', Integer, {'implicit': 0, 'optional': True}),
('micros', Integer, {'implicit': 1, 'optional': True}),
]
class Extension(Sequence):
_fields = [
('extn_id', ObjectIdentifier),
('critical', Boolean, {'default': False}),
('extn_value', OctetString),
]
class Extensions(SequenceOf):
_child_spec = Extension
class TSTInfo(Sequence):
_fields = [
('version', Version),
('policy', ObjectIdentifier),
('message_imprint', MessageImprint),
('serial_number', Integer),
('gen_time', GeneralizedTime),
('accuracy', Accuracy, {'optional': True}),
('ordering', Boolean, {'default': False}),
('nonce', Integer, {'optional': True}),
('tsa', GeneralName, {'explicit': 0, 'optional': True}),
('extensions', Extensions, {'implicit': 1, 'optional': True}),
]
class TimeStampReq(Sequence):
_fields = [
('version', Version),
('message_imprint', MessageImprint),
('req_policy', ObjectIdentifier, {'optional': True}),
('nonce', Integer, {'optional': True}),
('cert_req', Boolean, {'default': False}),
('extensions', Extensions, {'implicit': 0, 'optional': True}),
]
class PKIStatus(Integer):
_map = {
0: 'granted',
1: 'granted_with_mods',
2: 'rejection',
3: 'waiting',
4: 'revocation_warning',
5: 'revocation_notification',
}
class PKIFreeText(SequenceOf):
_child_spec = UTF8String
class PKIFailureInfo(BitString):
_map = {
0: 'bad_alg',
2: 'bad_request',
5: 'bad_data_format',
14: 'time_not_available',
15: 'unaccepted_policy',
16: 'unaccepted_extensions',
17: 'add_info_not_available',
25: 'system_failure',
}
class PKIStatusInfo(Sequence):
_fields = [
('status', PKIStatus),
('status_string', PKIFreeText, {'optional': True}),
('fail_info', PKIFailureInfo, {'optional': True}),
]
class TimeStampResp(Sequence):
_fields = [
('status', PKIStatusInfo),
('time_stamp_token', ContentInfo),
]
class MetaData(Sequence):
_fields = [
('hash_protected', Boolean),
('file_name', UTF8String, {'optional': True}),
('media_type', IA5String, {'optional': True}),
('other_meta_data', Attributes, {'optional': True}),
]
class TimeStampAndCRL(SequenceOf):
_fields = [
('time_stamp', EncapsulatedContentInfo),
('crl', CertificateList, {'optional': True}),
]
class TimeStampTokenEvidence(SequenceOf):
_child_spec = TimeStampAndCRL
class DigestAlgorithms(SequenceOf):
_child_spec = DigestAlgorithm
class EncryptionInfo(Sequence):
_fields = [
('encryption_info_type', ObjectIdentifier),
('encryption_info_value', Any),
]
class PartialHashtree(SequenceOf):
_child_spec = OctetString
class PartialHashtrees(SequenceOf):
_child_spec = PartialHashtree
class ArchiveTimeStamp(Sequence):
_fields = [
('digest_algorithm', DigestAlgorithm, {'implicit': 0, 'optional': True}),
('attributes', Attributes, {'implicit': 1, 'optional': True}),
('reduced_hashtree', PartialHashtrees, {'implicit': 2, 'optional': True}),
('time_stamp', ContentInfo),
]
class ArchiveTimeStampSequence(SequenceOf):
_child_spec = ArchiveTimeStamp
class EvidenceRecord(Sequence):
_fields = [
('version', Version),
('digest_algorithms', DigestAlgorithms),
('crypto_infos', Attributes, {'implicit': 0, 'optional': True}),
('encryption_info', EncryptionInfo, {'implicit': 1, 'optional': True}),
('archive_time_stamp_sequence', ArchiveTimeStampSequence),
]
class OtherEvidence(Sequence):
_fields = [
('oe_type', ObjectIdentifier),
('oe_value', Any),
]
class Evidence(Choice):
_alternatives = [
('tst_evidence', TimeStampTokenEvidence, {'implicit': 0}),
('ers_evidence', EvidenceRecord, {'implicit': 1}),
('other_evidence', OtherEvidence, {'implicit': 2}),
]
class TimeStampedData(Sequence):
_fields = [
('version', Version),
('data_uri', IA5String, {'optional': True}),
('meta_data', MetaData, {'optional': True}),
('content', OctetString, {'optional': True}),
('temporal_evidence', Evidence),
]
class IssuerSerial(Sequence):
_fields = [
('issuer', GeneralNames),
('serial_number', Integer),
]
class ESSCertID(Sequence):
_fields = [
('cert_hash', OctetString),
('issuer_serial', IssuerSerial, {'optional': True}),
]
class ESSCertIDs(SequenceOf):
_child_spec = ESSCertID
class SigningCertificate(Sequence):
_fields = [
('certs', ESSCertIDs),
('policies', CertificatePolicies, {'optional': True}),
]
class SetOfSigningCertificates(SetOf):
_child_spec = SigningCertificate
class ESSCertIDv2(Sequence):
_fields = [
('hash_algorithm', DigestAlgorithm, {'default': {'algorithm': 'sha256'}}),
('cert_hash', OctetString),
('issuer_serial', IssuerSerial, {'optional': True}),
]
class ESSCertIDv2s(SequenceOf):
_child_spec = ESSCertIDv2
class SigningCertificateV2(Sequence):
_fields = [
('certs', ESSCertIDv2s),
('policies', CertificatePolicies, {'optional': True}),
]
class SetOfSigningCertificatesV2(SetOf):
_child_spec = SigningCertificateV2
EncapsulatedContentInfo._oid_specs['tst_info'] = TSTInfo
EncapsulatedContentInfo._oid_specs['timestamped_data'] = TimeStampedData
ContentInfo._oid_specs['timestamped_data'] = TimeStampedData
ContentType._map['1.2.840.113549.1.9.16.1.4'] = 'tst_info'
ContentType._map['1.2.840.113549.1.9.16.1.31'] = 'timestamped_data'
CMSAttributeType._map['1.2.840.113549.1.9.16.2.12'] = 'signing_certificate'
CMSAttribute._oid_specs['signing_certificate'] = SetOfSigningCertificates
CMSAttributeType._map['1.2.840.113549.1.9.16.2.47'] = 'signing_certificate_v2'
CMSAttribute._oid_specs['signing_certificate_v2'] = SetOfSigningCertificatesV2

View file

@ -0,0 +1,712 @@
# coding: utf-8
"""
Miscellaneous data helpers, including functions for converting integers to and
from bytes and UTC timezone. Exports the following items:
- OrderedDict()
- int_from_bytes()
- int_to_bytes()
- timezone.utc
- inet_ntop()
- inet_pton()
- uri_to_iri()
- iri_to_uri()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import math
import sys
from datetime import datetime, date, time
from ._errors import unwrap
from ._iri import iri_to_uri, uri_to_iri # noqa
from ._ordereddict import OrderedDict # noqa
from ._types import type_name
if sys.platform == 'win32':
from ._inet import inet_ntop, inet_pton
else:
from socket import inet_ntop, inet_pton # noqa
# Python 2
if sys.version_info <= (3,):
from datetime import timedelta, tzinfo
py2 = True
def int_to_bytes(value, signed=False, width=None):
"""
Converts an integer to a byte string
:param value:
The integer to convert
:param signed:
If the byte string should be encoded using two's complement
:param width:
None == auto, otherwise an integer of the byte width for the return
value
:return:
A byte string
"""
# Handle negatives in two's complement
is_neg = False
if signed and value < 0:
is_neg = True
bits = int(math.ceil(len('%x' % abs(value)) / 2.0) * 8)
value = (value + (1 << bits)) % (1 << bits)
hex_str = '%x' % value
if len(hex_str) & 1:
hex_str = '0' + hex_str
output = hex_str.decode('hex')
if signed and not is_neg and ord(output[0:1]) & 0x80:
output = b'\x00' + output
if width is not None:
if is_neg:
pad_char = b'\xFF'
else:
pad_char = b'\x00'
output = (pad_char * (width - len(output))) + output
elif is_neg and ord(output[0:1]) & 0x80 == 0:
output = b'\xFF' + output
return output
def int_from_bytes(value, signed=False):
"""
Converts a byte string to an integer
:param value:
The byte string to convert
:param signed:
If the byte string should be interpreted using two's complement
:return:
An integer
"""
if value == b'':
return 0
num = long(value.encode("hex"), 16) # noqa
if not signed:
return num
# Check for sign bit and handle two's complement
if ord(value[0:1]) & 0x80:
bit_len = len(value) * 8
return num - (1 << bit_len)
return num
class utc(tzinfo): # noqa
def tzname(self, _):
return b'UTC+00:00'
def utcoffset(self, _):
return timedelta(0)
def dst(self, _):
return timedelta(0)
class timezone(): # noqa
utc = utc()
# Python 3
else:
from datetime import timezone # noqa
py2 = False
def int_to_bytes(value, signed=False, width=None):
"""
Converts an integer to a byte string
:param value:
The integer to convert
:param signed:
If the byte string should be encoded using two's complement
:param width:
None == auto, otherwise an integer of the byte width for the return
value
:return:
A byte string
"""
if width is None:
if signed:
if value < 0:
bits_required = abs(value + 1).bit_length()
else:
bits_required = value.bit_length()
if bits_required % 8 == 0:
bits_required += 1
else:
bits_required = value.bit_length()
width = math.ceil(bits_required / 8) or 1
return value.to_bytes(width, byteorder='big', signed=signed)
def int_from_bytes(value, signed=False):
"""
Converts a byte string to an integer
:param value:
The byte string to convert
:param signed:
If the byte string should be interpreted using two's complement
:return:
An integer
"""
return int.from_bytes(value, 'big', signed=signed)
_DAYS_PER_MONTH_YEAR_0 = {
1: 31,
2: 29, # Year 0 was a leap year
3: 31,
4: 30,
5: 31,
6: 30,
7: 31,
8: 31,
9: 30,
10: 31,
11: 30,
12: 31
}
class extended_date(object):
"""
A datetime.date-like object that can represent the year 0. This is just
to handle 0000-01-01 found in some certificates.
"""
year = None
month = None
day = None
def __init__(self, year, month, day):
"""
:param year:
The integer 0
:param month:
An integer from 1 to 12
:param day:
An integer from 1 to 31
"""
if year != 0:
raise ValueError('year must be 0')
if month < 1 or month > 12:
raise ValueError('month is out of range')
if day < 0 or day > _DAYS_PER_MONTH_YEAR_0[month]:
raise ValueError('day is out of range')
self.year = year
self.month = month
self.day = day
def _format(self, format):
"""
Performs strftime(), always returning a unicode string
:param format:
A strftime() format string
:return:
A unicode string of the formatted date
"""
format = format.replace('%Y', '0000')
# Year 0 is 1BC and a leap year. Leap years repeat themselves
# every 28 years. Because of adjustments and the proleptic gregorian
# calendar, the simplest way to format is to substitute year 2000.
temp = date(2000, self.month, self.day)
if '%c' in format:
c_out = temp.strftime('%c')
# Handle full years
c_out = c_out.replace('2000', '0000')
c_out = c_out.replace('%', '%%')
format = format.replace('%c', c_out)
if '%x' in format:
x_out = temp.strftime('%x')
# Handle formats such as 08/16/2000 or 16.08.2000
x_out = x_out.replace('2000', '0000')
x_out = x_out.replace('%', '%%')
format = format.replace('%x', x_out)
return temp.strftime(format)
def isoformat(self):
"""
Formats the date as %Y-%m-%d
:return:
The date formatted to %Y-%m-%d as a unicode string in Python 3
and a byte string in Python 2
"""
return self.strftime('0000-%m-%d')
def strftime(self, format):
"""
Formats the date using strftime()
:param format:
The strftime() format string
:return:
The formatted date as a unicode string in Python 3 and a byte
string in Python 2
"""
output = self._format(format)
if py2:
return output.encode('utf-8')
return output
def replace(self, year=None, month=None, day=None):
"""
Returns a new datetime.date or asn1crypto.util.extended_date
object with the specified components replaced
:return:
A datetime.date or asn1crypto.util.extended_date object
"""
if year is None:
year = self.year
if month is None:
month = self.month
if day is None:
day = self.day
if year > 0:
cls = date
else:
cls = extended_date
return cls(
year,
month,
day
)
def __str__(self):
if py2:
return self.__bytes__()
else:
return self.__unicode__()
def __bytes__(self):
return self.__unicode__().encode('utf-8')
def __unicode__(self):
return self._format('%Y-%m-%d')
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.__cmp__(other) == 0
def __ne__(self, other):
return not self.__eq__(other)
def _comparison_error(self, other):
raise TypeError(unwrap(
'''
An asn1crypto.util.extended_date object can only be compared to
an asn1crypto.util.extended_date or datetime.date object, not %s
''',
type_name(other)
))
def __cmp__(self, other):
if isinstance(other, date):
return -1
if not isinstance(other, self.__class__):
self._comparison_error(other)
st = (
self.year,
self.month,
self.day
)
ot = (
other.year,
other.month,
other.day
)
if st < ot:
return -1
if st > ot:
return 1
return 0
def __lt__(self, other):
return self.__cmp__(other) < 0
def __le__(self, other):
return self.__cmp__(other) <= 0
def __gt__(self, other):
return self.__cmp__(other) > 0
def __ge__(self, other):
return self.__cmp__(other) >= 0
class extended_datetime(object):
"""
A datetime.datetime-like object that can represent the year 0. This is just
to handle 0000-01-01 found in some certificates.
"""
year = None
month = None
day = None
hour = None
minute = None
second = None
microsecond = None
tzinfo = None
def __init__(self, year, month, day, hour=0, minute=0, second=0, microsecond=0, tzinfo=None):
"""
:param year:
The integer 0
:param month:
An integer from 1 to 12
:param day:
An integer from 1 to 31
:param hour:
An integer from 0 to 23
:param minute:
An integer from 0 to 59
:param second:
An integer from 0 to 59
:param microsecond:
An integer from 0 to 999999
"""
if year != 0:
raise ValueError('year must be 0')
if month < 1 or month > 12:
raise ValueError('month is out of range')
if day < 0 or day > _DAYS_PER_MONTH_YEAR_0[month]:
raise ValueError('day is out of range')
if hour < 0 or hour > 23:
raise ValueError('hour is out of range')
if minute < 0 or minute > 59:
raise ValueError('minute is out of range')
if second < 0 or second > 59:
raise ValueError('second is out of range')
if microsecond < 0 or microsecond > 999999:
raise ValueError('microsecond is out of range')
self.year = year
self.month = month
self.day = day
self.hour = hour
self.minute = minute
self.second = second
self.microsecond = microsecond
self.tzinfo = tzinfo
def date(self):
"""
:return:
An asn1crypto.util.extended_date of the date
"""
return extended_date(self.year, self.month, self.day)
def time(self):
"""
:return:
A datetime.time object of the time
"""
return time(self.hour, self.minute, self.second, self.microsecond, self.tzinfo)
def utcoffset(self):
"""
:return:
None or a datetime.timedelta() of the offset from UTC
"""
if self.tzinfo is None:
return None
return self.tzinfo.utcoffset(self.replace(year=2000))
def dst(self):
"""
:return:
None or a datetime.timedelta() of the daylight savings time offset
"""
if self.tzinfo is None:
return None
return self.tzinfo.dst(self.replace(year=2000))
def tzname(self):
"""
:return:
None or the name of the timezone as a unicode string in Python 3
and a byte string in Python 2
"""
if self.tzinfo is None:
return None
return self.tzinfo.tzname(self.replace(year=2000))
def _format(self, format):
"""
Performs strftime(), always returning a unicode string
:param format:
A strftime() format string
:return:
A unicode string of the formatted datetime
"""
format = format.replace('%Y', '0000')
# Year 0 is 1BC and a leap year. Leap years repeat themselves
# every 28 years. Because of adjustments and the proleptic gregorian
# calendar, the simplest way to format is to substitute year 2000.
temp = datetime(
2000,
self.month,
self.day,
self.hour,
self.minute,
self.second,
self.microsecond,
self.tzinfo
)
if '%c' in format:
c_out = temp.strftime('%c')
# Handle full years
c_out = c_out.replace('2000', '0000')
c_out = c_out.replace('%', '%%')
format = format.replace('%c', c_out)
if '%x' in format:
x_out = temp.strftime('%x')
# Handle formats such as 08/16/2000 or 16.08.2000
x_out = x_out.replace('2000', '0000')
x_out = x_out.replace('%', '%%')
format = format.replace('%x', x_out)
return temp.strftime(format)
def isoformat(self, sep='T'):
"""
Formats the date as "%Y-%m-%d %H:%M:%S" with the sep param between the
date and time portions
:param set:
A single character of the separator to place between the date and
time
:return:
The formatted datetime as a unicode string in Python 3 and a byte
string in Python 2
"""
if self.microsecond == 0:
return self.strftime('0000-%%m-%%d%s%%H:%%M:%%S' % sep)
return self.strftime('0000-%%m-%%d%s%%H:%%M:%%S.%%f' % sep)
def strftime(self, format):
"""
Formats the date using strftime()
:param format:
The strftime() format string
:return:
The formatted date as a unicode string in Python 3 and a byte
string in Python 2
"""
output = self._format(format)
if py2:
return output.encode('utf-8')
return output
def replace(self, year=None, month=None, day=None, hour=None, minute=None,
second=None, microsecond=None, tzinfo=None):
"""
Returns a new datetime.datetime or asn1crypto.util.extended_datetime
object with the specified components replaced
:return:
A datetime.datetime or asn1crypto.util.extended_datetime object
"""
if year is None:
year = self.year
if month is None:
month = self.month
if day is None:
day = self.day
if hour is None:
hour = self.hour
if minute is None:
minute = self.minute
if second is None:
second = self.second
if microsecond is None:
microsecond = self.microsecond
if tzinfo is None:
tzinfo = self.tzinfo
if year > 0:
cls = datetime
else:
cls = extended_datetime
return cls(
year,
month,
day,
hour,
minute,
second,
microsecond,
tzinfo
)
def __str__(self):
if py2:
return self.__bytes__()
else:
return self.__unicode__()
def __bytes__(self):
return self.__unicode__().encode('utf-8')
def __unicode__(self):
format = '%Y-%m-%d %H:%M:%S'
if self.microsecond != 0:
format += '.%f'
return self._format(format)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.__cmp__(other) == 0
def __ne__(self, other):
return not self.__eq__(other)
def _comparison_error(self, other):
"""
Raises a TypeError about the other object not being suitable for
comparison
:param other:
The object being compared to
"""
raise TypeError(unwrap(
'''
An asn1crypto.util.extended_datetime object can only be compared to
an asn1crypto.util.extended_datetime or datetime.datetime object,
not %s
''',
type_name(other)
))
def __cmp__(self, other):
so = self.utcoffset()
oo = other.utcoffset()
if (so is not None and oo is None) or (so is None and oo is not None):
raise TypeError("can't compare offset-naive and offset-aware datetimes")
if isinstance(other, datetime):
return -1
if not isinstance(other, self.__class__):
self._comparison_error(other)
st = (
self.year,
self.month,
self.day,
self.hour,
self.minute,
self.second,
self.microsecond,
so
)
ot = (
other.year,
other.month,
other.day,
other.hour,
other.minute,
other.second,
other.microsecond,
oo
)
if st < ot:
return -1
if st > ot:
return 1
return 0
def __lt__(self, other):
return self.__cmp__(other) < 0
def __le__(self, other):
return self.__cmp__(other) <= 0
def __gt__(self, other):
return self.__cmp__(other) > 0
def __ge__(self, other):
return self.__cmp__(other) >= 0

View file

@ -0,0 +1,6 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
__version__ = '0.24.0'
__version_info__ = (0, 24, 0)

File diff suppressed because it is too large Load diff