win32 platform
This commit is contained in:
commit
c1666978b2
1122 changed files with 348397 additions and 0 deletions
|
|
@ -0,0 +1,10 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
from cryptography.hazmat.backends.openssl.backend import backend
|
||||
|
||||
|
||||
__all__ = ["backend"]
|
||||
2299
Lib/site-packages/cryptography/hazmat/backends/openssl/backend.py
Normal file
2299
Lib/site-packages/cryptography/hazmat/backends/openssl/backend.py
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -0,0 +1,213 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
from cryptography import utils
|
||||
from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm, _Reasons
|
||||
from cryptography.hazmat.primitives import ciphers
|
||||
from cryptography.hazmat.primitives.ciphers import modes
|
||||
|
||||
|
||||
@utils.register_interface(ciphers.CipherContext)
|
||||
@utils.register_interface(ciphers.AEADCipherContext)
|
||||
@utils.register_interface(ciphers.AEADEncryptionContext)
|
||||
class _CipherContext(object):
|
||||
_ENCRYPT = 1
|
||||
_DECRYPT = 0
|
||||
|
||||
def __init__(self, backend, cipher, mode, operation):
|
||||
self._backend = backend
|
||||
self._cipher = cipher
|
||||
self._mode = mode
|
||||
self._operation = operation
|
||||
self._tag = None
|
||||
|
||||
if isinstance(self._cipher, ciphers.BlockCipherAlgorithm):
|
||||
self._block_size = self._cipher.block_size
|
||||
else:
|
||||
self._block_size = 1
|
||||
|
||||
ctx = self._backend._lib.EVP_CIPHER_CTX_new()
|
||||
ctx = self._backend._ffi.gc(
|
||||
ctx, self._backend._lib.EVP_CIPHER_CTX_free
|
||||
)
|
||||
|
||||
registry = self._backend._cipher_registry
|
||||
try:
|
||||
adapter = registry[type(cipher), type(mode)]
|
||||
except KeyError:
|
||||
raise UnsupportedAlgorithm(
|
||||
"cipher {0} in {1} mode is not supported "
|
||||
"by this backend.".format(
|
||||
cipher.name, mode.name if mode else mode),
|
||||
_Reasons.UNSUPPORTED_CIPHER
|
||||
)
|
||||
|
||||
evp_cipher = adapter(self._backend, cipher, mode)
|
||||
if evp_cipher == self._backend._ffi.NULL:
|
||||
raise UnsupportedAlgorithm(
|
||||
"cipher {0} in {1} mode is not supported "
|
||||
"by this backend.".format(
|
||||
cipher.name, mode.name if mode else mode),
|
||||
_Reasons.UNSUPPORTED_CIPHER
|
||||
)
|
||||
|
||||
if isinstance(mode, modes.ModeWithInitializationVector):
|
||||
iv_nonce = mode.initialization_vector
|
||||
elif isinstance(mode, modes.ModeWithNonce):
|
||||
iv_nonce = mode.nonce
|
||||
else:
|
||||
iv_nonce = self._backend._ffi.NULL
|
||||
# begin init with cipher and operation type
|
||||
res = self._backend._lib.EVP_CipherInit_ex(ctx, evp_cipher,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
operation)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
# set the key length to handle variable key ciphers
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_set_key_length(
|
||||
ctx, len(cipher.key)
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
if isinstance(mode, modes.GCM):
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
ctx, self._backend._lib.EVP_CTRL_GCM_SET_IVLEN,
|
||||
len(iv_nonce), self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
if operation == self._DECRYPT:
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
ctx, self._backend._lib.EVP_CTRL_GCM_SET_TAG,
|
||||
len(mode.tag), mode.tag
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
# pass key/iv
|
||||
res = self._backend._lib.EVP_CipherInit_ex(
|
||||
ctx,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
cipher.key,
|
||||
iv_nonce,
|
||||
operation
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
# We purposely disable padding here as it's handled higher up in the
|
||||
# API.
|
||||
self._backend._lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
|
||||
self._ctx = ctx
|
||||
|
||||
def update(self, data):
|
||||
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
|
||||
# to SIGABRT if you call update with an empty byte string. This can be
|
||||
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
|
||||
# should be taken only when length is zero and mode is not GCM because
|
||||
# AES GCM can return improper tag values if you don't call update
|
||||
# with empty plaintext when authenticating AAD for ...reasons.
|
||||
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
|
||||
return b""
|
||||
|
||||
buf = self._backend._ffi.new("unsigned char[]",
|
||||
len(data) + self._block_size - 1)
|
||||
outlen = self._backend._ffi.new("int *")
|
||||
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
|
||||
len(data))
|
||||
self._backend.openssl_assert(res != 0)
|
||||
return self._backend._ffi.buffer(buf)[:outlen[0]]
|
||||
|
||||
def finalize(self):
|
||||
# OpenSSL 1.0.1 on Ubuntu 12.04 (and possibly other distributions)
|
||||
# appears to have a bug where you must make at least one call to update
|
||||
# even if you are only using authenticate_additional_data or the
|
||||
# GCM tag will be wrong. An (empty) call to update resolves this
|
||||
# and is harmless for all other versions of OpenSSL.
|
||||
if isinstance(self._mode, modes.GCM):
|
||||
self.update(b"")
|
||||
|
||||
buf = self._backend._ffi.new("unsigned char[]", self._block_size)
|
||||
outlen = self._backend._ffi.new("int *")
|
||||
res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
|
||||
if res == 0:
|
||||
errors = self._backend._consume_errors()
|
||||
|
||||
if not errors and isinstance(self._mode, modes.GCM):
|
||||
raise InvalidTag
|
||||
|
||||
self._backend.openssl_assert(
|
||||
errors[0][1:] == (
|
||||
self._backend._lib.ERR_LIB_EVP,
|
||||
self._backend._lib.EVP_F_EVP_ENCRYPTFINAL_EX,
|
||||
self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH
|
||||
) or errors[0][1:] == (
|
||||
self._backend._lib.ERR_LIB_EVP,
|
||||
self._backend._lib.EVP_F_EVP_DECRYPTFINAL_EX,
|
||||
self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH
|
||||
)
|
||||
)
|
||||
raise ValueError(
|
||||
"The length of the provided data is not a multiple of "
|
||||
"the block length."
|
||||
)
|
||||
|
||||
if (isinstance(self._mode, modes.GCM) and
|
||||
self._operation == self._ENCRYPT):
|
||||
block_byte_size = self._block_size // 8
|
||||
tag_buf = self._backend._ffi.new(
|
||||
"unsigned char[]", block_byte_size
|
||||
)
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
self._ctx, self._backend._lib.EVP_CTRL_GCM_GET_TAG,
|
||||
block_byte_size, tag_buf
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._tag = self._backend._ffi.buffer(tag_buf)[:]
|
||||
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_cleanup(self._ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
return self._backend._ffi.buffer(buf)[:outlen[0]]
|
||||
|
||||
def authenticate_additional_data(self, data):
|
||||
outlen = self._backend._ffi.new("int *")
|
||||
res = self._backend._lib.EVP_CipherUpdate(
|
||||
self._ctx, self._backend._ffi.NULL, outlen, data, len(data)
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
tag = utils.read_only_property("_tag")
|
||||
|
||||
|
||||
@utils.register_interface(ciphers.CipherContext)
|
||||
class _AESCTRCipherContext(object):
|
||||
"""
|
||||
This is needed to provide support for AES CTR mode in OpenSSL 0.9.8. It can
|
||||
be removed when we drop 0.9.8 support (RHEL5 extended life ends 2020).
|
||||
"""
|
||||
def __init__(self, backend, cipher, mode):
|
||||
self._backend = backend
|
||||
|
||||
self._key = self._backend._ffi.new("AES_KEY *")
|
||||
res = self._backend._lib.AES_set_encrypt_key(
|
||||
cipher.key, len(cipher.key) * 8, self._key
|
||||
)
|
||||
self._backend.openssl_assert(res == 0)
|
||||
self._ecount = self._backend._ffi.new("char[]", 16)
|
||||
self._nonce = self._backend._ffi.new("char[16]", mode.nonce)
|
||||
self._num = self._backend._ffi.new("unsigned int *", 0)
|
||||
|
||||
def update(self, data):
|
||||
buf = self._backend._ffi.new("unsigned char[]", len(data))
|
||||
self._backend._lib.AES_ctr128_encrypt(
|
||||
data, buf, len(data), self._key, self._nonce,
|
||||
self._ecount, self._num
|
||||
)
|
||||
return self._backend._ffi.buffer(buf)[:]
|
||||
|
||||
def finalize(self):
|
||||
self._key = None
|
||||
self._ecount = None
|
||||
self._nonce = None
|
||||
self._num = None
|
||||
return b""
|
||||
|
|
@ -0,0 +1,80 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
|
||||
from cryptography import utils
|
||||
from cryptography.exceptions import (
|
||||
InvalidSignature, UnsupportedAlgorithm, _Reasons
|
||||
)
|
||||
from cryptography.hazmat.primitives import constant_time, interfaces
|
||||
from cryptography.hazmat.primitives.ciphers.modes import CBC
|
||||
|
||||
|
||||
@utils.register_interface(interfaces.MACContext)
|
||||
class _CMACContext(object):
|
||||
def __init__(self, backend, algorithm, ctx=None):
|
||||
if not backend.cmac_algorithm_supported(algorithm):
|
||||
raise UnsupportedAlgorithm("This backend does not support CMAC.",
|
||||
_Reasons.UNSUPPORTED_CIPHER)
|
||||
|
||||
self._backend = backend
|
||||
self._key = algorithm.key
|
||||
self._algorithm = algorithm
|
||||
self._output_length = algorithm.block_size // 8
|
||||
|
||||
if ctx is None:
|
||||
registry = self._backend._cipher_registry
|
||||
adapter = registry[type(algorithm), CBC]
|
||||
|
||||
evp_cipher = adapter(self._backend, algorithm, CBC)
|
||||
|
||||
ctx = self._backend._lib.CMAC_CTX_new()
|
||||
|
||||
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
|
||||
ctx = self._backend._ffi.gc(ctx, self._backend._lib.CMAC_CTX_free)
|
||||
|
||||
self._backend._lib.CMAC_Init(
|
||||
ctx, self._key, len(self._key),
|
||||
evp_cipher, self._backend._ffi.NULL
|
||||
)
|
||||
|
||||
self._ctx = ctx
|
||||
|
||||
algorithm = utils.read_only_property("_algorithm")
|
||||
|
||||
def update(self, data):
|
||||
res = self._backend._lib.CMAC_Update(self._ctx, data, len(data))
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
def finalize(self):
|
||||
buf = self._backend._ffi.new("unsigned char[]", self._output_length)
|
||||
length = self._backend._ffi.new("size_t *", self._output_length)
|
||||
res = self._backend._lib.CMAC_Final(
|
||||
self._ctx, buf, length
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
self._ctx = None
|
||||
|
||||
return self._backend._ffi.buffer(buf)[:]
|
||||
|
||||
def copy(self):
|
||||
copied_ctx = self._backend._lib.CMAC_CTX_new()
|
||||
copied_ctx = self._backend._ffi.gc(
|
||||
copied_ctx, self._backend._lib.CMAC_CTX_free
|
||||
)
|
||||
res = self._backend._lib.CMAC_CTX_copy(
|
||||
copied_ctx, self._ctx
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
return _CMACContext(
|
||||
self._backend, self._algorithm, ctx=copied_ctx
|
||||
)
|
||||
|
||||
def verify(self, signature):
|
||||
digest = self.finalize()
|
||||
if not constant_time.bytes_eq(digest, signature):
|
||||
raise InvalidSignature("Signature did not match digest.")
|
||||
218
Lib/site-packages/cryptography/hazmat/backends/openssl/dsa.py
Normal file
218
Lib/site-packages/cryptography/hazmat/backends/openssl/dsa.py
Normal file
|
|
@ -0,0 +1,218 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
from cryptography import utils
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.backends.openssl.utils import _truncate_digest
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import (
|
||||
AsymmetricSignatureContext, AsymmetricVerificationContext, dsa
|
||||
)
|
||||
|
||||
|
||||
def _truncate_digest_for_dsa(dsa_cdata, digest, backend):
|
||||
"""
|
||||
This function truncates digests that are longer than a given DS
|
||||
key's length so they can be signed. OpenSSL does this for us in
|
||||
1.0.0c+ and it isn't needed in 0.9.8, but that leaves us with three
|
||||
releases (1.0.0, 1.0.0a, and 1.0.0b) where this is a problem. This
|
||||
truncation is not required in 0.9.8 because DSA is limited to SHA-1.
|
||||
"""
|
||||
|
||||
order_bits = backend._lib.BN_num_bits(dsa_cdata.q)
|
||||
return _truncate_digest(digest, order_bits)
|
||||
|
||||
|
||||
@utils.register_interface(AsymmetricVerificationContext)
|
||||
class _DSAVerificationContext(object):
|
||||
def __init__(self, backend, public_key, signature, algorithm):
|
||||
self._backend = backend
|
||||
self._public_key = public_key
|
||||
self._signature = signature
|
||||
self._algorithm = algorithm
|
||||
|
||||
self._hash_ctx = hashes.Hash(self._algorithm, self._backend)
|
||||
|
||||
def update(self, data):
|
||||
self._hash_ctx.update(data)
|
||||
|
||||
def verify(self):
|
||||
data_to_verify = self._hash_ctx.finalize()
|
||||
|
||||
data_to_verify = _truncate_digest_for_dsa(
|
||||
self._public_key._dsa_cdata, data_to_verify, self._backend
|
||||
)
|
||||
|
||||
# The first parameter passed to DSA_verify is unused by OpenSSL but
|
||||
# must be an integer.
|
||||
res = self._backend._lib.DSA_verify(
|
||||
0, data_to_verify, len(data_to_verify), self._signature,
|
||||
len(self._signature), self._public_key._dsa_cdata)
|
||||
|
||||
if res != 1:
|
||||
self._backend._consume_errors()
|
||||
raise InvalidSignature
|
||||
|
||||
|
||||
@utils.register_interface(AsymmetricSignatureContext)
|
||||
class _DSASignatureContext(object):
|
||||
def __init__(self, backend, private_key, algorithm):
|
||||
self._backend = backend
|
||||
self._private_key = private_key
|
||||
self._algorithm = algorithm
|
||||
self._hash_ctx = hashes.Hash(self._algorithm, self._backend)
|
||||
|
||||
def update(self, data):
|
||||
self._hash_ctx.update(data)
|
||||
|
||||
def finalize(self):
|
||||
data_to_sign = self._hash_ctx.finalize()
|
||||
data_to_sign = _truncate_digest_for_dsa(
|
||||
self._private_key._dsa_cdata, data_to_sign, self._backend
|
||||
)
|
||||
sig_buf_len = self._backend._lib.DSA_size(self._private_key._dsa_cdata)
|
||||
sig_buf = self._backend._ffi.new("unsigned char[]", sig_buf_len)
|
||||
buflen = self._backend._ffi.new("unsigned int *")
|
||||
|
||||
# The first parameter passed to DSA_sign is unused by OpenSSL but
|
||||
# must be an integer.
|
||||
res = self._backend._lib.DSA_sign(
|
||||
0, data_to_sign, len(data_to_sign), sig_buf,
|
||||
buflen, self._private_key._dsa_cdata)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0])
|
||||
|
||||
return self._backend._ffi.buffer(sig_buf)[:buflen[0]]
|
||||
|
||||
|
||||
@utils.register_interface(dsa.DSAParametersWithNumbers)
|
||||
class _DSAParameters(object):
|
||||
def __init__(self, backend, dsa_cdata):
|
||||
self._backend = backend
|
||||
self._dsa_cdata = dsa_cdata
|
||||
|
||||
def parameter_numbers(self):
|
||||
return dsa.DSAParameterNumbers(
|
||||
p=self._backend._bn_to_int(self._dsa_cdata.p),
|
||||
q=self._backend._bn_to_int(self._dsa_cdata.q),
|
||||
g=self._backend._bn_to_int(self._dsa_cdata.g)
|
||||
)
|
||||
|
||||
def generate_private_key(self):
|
||||
return self._backend.generate_dsa_private_key(self)
|
||||
|
||||
|
||||
@utils.register_interface(dsa.DSAPrivateKeyWithSerialization)
|
||||
class _DSAPrivateKey(object):
|
||||
def __init__(self, backend, dsa_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._dsa_cdata = dsa_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
self._key_size = self._backend._lib.BN_num_bits(self._dsa_cdata.p)
|
||||
|
||||
key_size = utils.read_only_property("_key_size")
|
||||
|
||||
def signer(self, signature_algorithm):
|
||||
return _DSASignatureContext(self._backend, self, signature_algorithm)
|
||||
|
||||
def private_numbers(self):
|
||||
return dsa.DSAPrivateNumbers(
|
||||
public_numbers=dsa.DSAPublicNumbers(
|
||||
parameter_numbers=dsa.DSAParameterNumbers(
|
||||
p=self._backend._bn_to_int(self._dsa_cdata.p),
|
||||
q=self._backend._bn_to_int(self._dsa_cdata.q),
|
||||
g=self._backend._bn_to_int(self._dsa_cdata.g)
|
||||
),
|
||||
y=self._backend._bn_to_int(self._dsa_cdata.pub_key)
|
||||
),
|
||||
x=self._backend._bn_to_int(self._dsa_cdata.priv_key)
|
||||
)
|
||||
|
||||
def public_key(self):
|
||||
dsa_cdata = self._backend._lib.DSA_new()
|
||||
self._backend.openssl_assert(dsa_cdata != self._backend._ffi.NULL)
|
||||
dsa_cdata = self._backend._ffi.gc(
|
||||
dsa_cdata, self._backend._lib.DSA_free
|
||||
)
|
||||
dsa_cdata.p = self._backend._lib.BN_dup(self._dsa_cdata.p)
|
||||
dsa_cdata.q = self._backend._lib.BN_dup(self._dsa_cdata.q)
|
||||
dsa_cdata.g = self._backend._lib.BN_dup(self._dsa_cdata.g)
|
||||
dsa_cdata.pub_key = self._backend._lib.BN_dup(self._dsa_cdata.pub_key)
|
||||
evp_pkey = self._backend._dsa_cdata_to_evp_pkey(dsa_cdata)
|
||||
return _DSAPublicKey(self._backend, dsa_cdata, evp_pkey)
|
||||
|
||||
def parameters(self):
|
||||
dsa_cdata = self._backend._lib.DSA_new()
|
||||
self._backend.openssl_assert(dsa_cdata != self._backend._ffi.NULL)
|
||||
dsa_cdata = self._backend._ffi.gc(
|
||||
dsa_cdata, self._backend._lib.DSA_free
|
||||
)
|
||||
dsa_cdata.p = self._backend._lib.BN_dup(self._dsa_cdata.p)
|
||||
dsa_cdata.q = self._backend._lib.BN_dup(self._dsa_cdata.q)
|
||||
dsa_cdata.g = self._backend._lib.BN_dup(self._dsa_cdata.g)
|
||||
return _DSAParameters(self._backend, dsa_cdata)
|
||||
|
||||
def private_bytes(self, encoding, format, encryption_algorithm):
|
||||
return self._backend._private_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
encryption_algorithm,
|
||||
self._evp_pkey,
|
||||
self._dsa_cdata
|
||||
)
|
||||
|
||||
|
||||
@utils.register_interface(dsa.DSAPublicKeyWithSerialization)
|
||||
class _DSAPublicKey(object):
|
||||
def __init__(self, backend, dsa_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._dsa_cdata = dsa_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
self._key_size = self._backend._lib.BN_num_bits(self._dsa_cdata.p)
|
||||
|
||||
key_size = utils.read_only_property("_key_size")
|
||||
|
||||
def verifier(self, signature, signature_algorithm):
|
||||
if not isinstance(signature, bytes):
|
||||
raise TypeError("signature must be bytes.")
|
||||
|
||||
return _DSAVerificationContext(
|
||||
self._backend, self, signature, signature_algorithm
|
||||
)
|
||||
|
||||
def public_numbers(self):
|
||||
return dsa.DSAPublicNumbers(
|
||||
parameter_numbers=dsa.DSAParameterNumbers(
|
||||
p=self._backend._bn_to_int(self._dsa_cdata.p),
|
||||
q=self._backend._bn_to_int(self._dsa_cdata.q),
|
||||
g=self._backend._bn_to_int(self._dsa_cdata.g)
|
||||
),
|
||||
y=self._backend._bn_to_int(self._dsa_cdata.pub_key)
|
||||
)
|
||||
|
||||
def parameters(self):
|
||||
dsa_cdata = self._backend._lib.DSA_new()
|
||||
self._backend.openssl_assert(dsa_cdata != self._backend._ffi.NULL)
|
||||
dsa_cdata = self._backend._ffi.gc(
|
||||
dsa_cdata, self._backend._lib.DSA_free
|
||||
)
|
||||
dsa_cdata.p = self._backend._lib.BN_dup(self._dsa_cdata.p)
|
||||
dsa_cdata.q = self._backend._lib.BN_dup(self._dsa_cdata.q)
|
||||
dsa_cdata.g = self._backend._lib.BN_dup(self._dsa_cdata.g)
|
||||
return _DSAParameters(self._backend, dsa_cdata)
|
||||
|
||||
def public_bytes(self, encoding, format):
|
||||
if format is serialization.PublicFormat.PKCS1:
|
||||
raise ValueError(
|
||||
"DSA public keys do not support PKCS1 serialization"
|
||||
)
|
||||
|
||||
return self._backend._public_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
self._evp_pkey,
|
||||
None
|
||||
)
|
||||
304
Lib/site-packages/cryptography/hazmat/backends/openssl/ec.py
Normal file
304
Lib/site-packages/cryptography/hazmat/backends/openssl/ec.py
Normal file
|
|
@ -0,0 +1,304 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
from cryptography import utils
|
||||
from cryptography.exceptions import (
|
||||
InvalidSignature, UnsupportedAlgorithm, _Reasons
|
||||
)
|
||||
from cryptography.hazmat.backends.openssl.utils import _truncate_digest
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import (
|
||||
AsymmetricSignatureContext, AsymmetricVerificationContext, ec
|
||||
)
|
||||
|
||||
|
||||
def _truncate_digest_for_ecdsa(ec_key_cdata, digest, backend):
|
||||
"""
|
||||
This function truncates digests that are longer than a given elliptic
|
||||
curve key's length so they can be signed. Since elliptic curve keys are
|
||||
much shorter than RSA keys many digests (e.g. SHA-512) may require
|
||||
truncation.
|
||||
"""
|
||||
|
||||
_lib = backend._lib
|
||||
_ffi = backend._ffi
|
||||
|
||||
group = _lib.EC_KEY_get0_group(ec_key_cdata)
|
||||
|
||||
with backend._tmp_bn_ctx() as bn_ctx:
|
||||
order = _lib.BN_CTX_get(bn_ctx)
|
||||
backend.openssl_assert(order != _ffi.NULL)
|
||||
|
||||
res = _lib.EC_GROUP_get_order(group, order, bn_ctx)
|
||||
backend.openssl_assert(res == 1)
|
||||
|
||||
order_bits = _lib.BN_num_bits(order)
|
||||
|
||||
return _truncate_digest(digest, order_bits)
|
||||
|
||||
|
||||
def _ec_key_curve_sn(backend, ec_key):
|
||||
group = backend._lib.EC_KEY_get0_group(ec_key)
|
||||
backend.openssl_assert(group != backend._ffi.NULL)
|
||||
|
||||
nid = backend._lib.EC_GROUP_get_curve_name(group)
|
||||
# The following check is to find EC keys with unnamed curves and raise
|
||||
# an error for now.
|
||||
if nid == backend._lib.NID_undef:
|
||||
raise NotImplementedError(
|
||||
"ECDSA certificates with unnamed curves are unsupported "
|
||||
"at this time"
|
||||
)
|
||||
|
||||
curve_name = backend._lib.OBJ_nid2sn(nid)
|
||||
backend.openssl_assert(curve_name != backend._ffi.NULL)
|
||||
|
||||
sn = backend._ffi.string(curve_name).decode('ascii')
|
||||
return sn
|
||||
|
||||
|
||||
def _mark_asn1_named_ec_curve(backend, ec_cdata):
|
||||
"""
|
||||
Set the named curve flag on the EC_KEY. This causes OpenSSL to
|
||||
serialize EC keys along with their curve OID which makes
|
||||
deserialization easier.
|
||||
"""
|
||||
|
||||
backend._lib.EC_KEY_set_asn1_flag(
|
||||
ec_cdata, backend._lib.OPENSSL_EC_NAMED_CURVE
|
||||
)
|
||||
|
||||
|
||||
def _sn_to_elliptic_curve(backend, sn):
|
||||
try:
|
||||
return ec._CURVE_TYPES[sn]()
|
||||
except KeyError:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{0} is not a supported elliptic curve".format(sn),
|
||||
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE
|
||||
)
|
||||
|
||||
|
||||
@utils.register_interface(AsymmetricSignatureContext)
|
||||
class _ECDSASignatureContext(object):
|
||||
def __init__(self, backend, private_key, algorithm):
|
||||
self._backend = backend
|
||||
self._private_key = private_key
|
||||
self._digest = hashes.Hash(algorithm, backend)
|
||||
|
||||
def update(self, data):
|
||||
self._digest.update(data)
|
||||
|
||||
def finalize(self):
|
||||
ec_key = self._private_key._ec_key
|
||||
|
||||
digest = self._digest.finalize()
|
||||
|
||||
digest = _truncate_digest_for_ecdsa(ec_key, digest, self._backend)
|
||||
|
||||
max_size = self._backend._lib.ECDSA_size(ec_key)
|
||||
self._backend.openssl_assert(max_size > 0)
|
||||
|
||||
sigbuf = self._backend._ffi.new("char[]", max_size)
|
||||
siglen_ptr = self._backend._ffi.new("unsigned int[]", 1)
|
||||
res = self._backend._lib.ECDSA_sign(
|
||||
0,
|
||||
digest,
|
||||
len(digest),
|
||||
sigbuf,
|
||||
siglen_ptr,
|
||||
ec_key
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
return self._backend._ffi.buffer(sigbuf)[:siglen_ptr[0]]
|
||||
|
||||
|
||||
@utils.register_interface(AsymmetricVerificationContext)
|
||||
class _ECDSAVerificationContext(object):
|
||||
def __init__(self, backend, public_key, signature, algorithm):
|
||||
self._backend = backend
|
||||
self._public_key = public_key
|
||||
self._signature = signature
|
||||
self._digest = hashes.Hash(algorithm, backend)
|
||||
|
||||
def update(self, data):
|
||||
self._digest.update(data)
|
||||
|
||||
def verify(self):
|
||||
ec_key = self._public_key._ec_key
|
||||
|
||||
digest = self._digest.finalize()
|
||||
|
||||
digest = _truncate_digest_for_ecdsa(ec_key, digest, self._backend)
|
||||
|
||||
res = self._backend._lib.ECDSA_verify(
|
||||
0,
|
||||
digest,
|
||||
len(digest),
|
||||
self._signature,
|
||||
len(self._signature),
|
||||
ec_key
|
||||
)
|
||||
if res != 1:
|
||||
self._backend._consume_errors()
|
||||
raise InvalidSignature
|
||||
return True
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurvePrivateKeyWithSerialization)
|
||||
class _EllipticCurvePrivateKey(object):
|
||||
def __init__(self, backend, ec_key_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
_mark_asn1_named_ec_curve(backend, ec_key_cdata)
|
||||
self._ec_key = ec_key_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
sn = _ec_key_curve_sn(backend, ec_key_cdata)
|
||||
self._curve = _sn_to_elliptic_curve(backend, sn)
|
||||
|
||||
curve = utils.read_only_property("_curve")
|
||||
|
||||
def signer(self, signature_algorithm):
|
||||
if isinstance(signature_algorithm, ec.ECDSA):
|
||||
return _ECDSASignatureContext(
|
||||
self._backend, self, signature_algorithm.algorithm
|
||||
)
|
||||
else:
|
||||
raise UnsupportedAlgorithm(
|
||||
"Unsupported elliptic curve signature algorithm.",
|
||||
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM)
|
||||
|
||||
def exchange(self, algorithm, peer_public_key):
|
||||
if not (
|
||||
self._backend.elliptic_curve_exchange_algorithm_supported(
|
||||
algorithm, self.curve
|
||||
)
|
||||
):
|
||||
raise UnsupportedAlgorithm(
|
||||
"This backend does not support the ECDH algorithm.",
|
||||
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM
|
||||
)
|
||||
|
||||
if peer_public_key.curve.name != self.curve.name:
|
||||
raise ValueError(
|
||||
"peer_public_key and self are not on the same curve"
|
||||
)
|
||||
|
||||
group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
|
||||
z_len = (self._backend._lib.EC_GROUP_get_degree(group) + 7) // 8
|
||||
self._backend.openssl_assert(z_len > 0)
|
||||
z_buf = self._backend._ffi.new("uint8_t[]", z_len)
|
||||
peer_key = self._backend._lib.EC_KEY_get0_public_key(
|
||||
peer_public_key._ec_key
|
||||
)
|
||||
|
||||
r = self._backend._lib.ECDH_compute_key(
|
||||
z_buf, z_len, peer_key, self._ec_key, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(r > 0)
|
||||
return self._backend._ffi.buffer(z_buf)[:z_len]
|
||||
|
||||
def public_key(self):
|
||||
group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
|
||||
self._backend.openssl_assert(group != self._backend._ffi.NULL)
|
||||
|
||||
curve_nid = self._backend._lib.EC_GROUP_get_curve_name(group)
|
||||
|
||||
public_ec_key = self._backend._lib.EC_KEY_new_by_curve_name(curve_nid)
|
||||
self._backend.openssl_assert(public_ec_key != self._backend._ffi.NULL)
|
||||
public_ec_key = self._backend._ffi.gc(
|
||||
public_ec_key, self._backend._lib.EC_KEY_free
|
||||
)
|
||||
|
||||
point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
|
||||
self._backend.openssl_assert(point != self._backend._ffi.NULL)
|
||||
|
||||
res = self._backend._lib.EC_KEY_set_public_key(public_ec_key, point)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
evp_pkey = self._backend._ec_cdata_to_evp_pkey(public_ec_key)
|
||||
|
||||
return _EllipticCurvePublicKey(self._backend, public_ec_key, evp_pkey)
|
||||
|
||||
def private_numbers(self):
|
||||
bn = self._backend._lib.EC_KEY_get0_private_key(self._ec_key)
|
||||
private_value = self._backend._bn_to_int(bn)
|
||||
return ec.EllipticCurvePrivateNumbers(
|
||||
private_value=private_value,
|
||||
public_numbers=self.public_key().public_numbers()
|
||||
)
|
||||
|
||||
def private_bytes(self, encoding, format, encryption_algorithm):
|
||||
return self._backend._private_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
encryption_algorithm,
|
||||
self._evp_pkey,
|
||||
self._ec_key
|
||||
)
|
||||
|
||||
|
||||
@utils.register_interface(ec.EllipticCurvePublicKeyWithSerialization)
|
||||
class _EllipticCurvePublicKey(object):
|
||||
def __init__(self, backend, ec_key_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
_mark_asn1_named_ec_curve(backend, ec_key_cdata)
|
||||
self._ec_key = ec_key_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
sn = _ec_key_curve_sn(backend, ec_key_cdata)
|
||||
self._curve = _sn_to_elliptic_curve(backend, sn)
|
||||
|
||||
curve = utils.read_only_property("_curve")
|
||||
|
||||
def verifier(self, signature, signature_algorithm):
|
||||
if not isinstance(signature, bytes):
|
||||
raise TypeError("signature must be bytes.")
|
||||
|
||||
if isinstance(signature_algorithm, ec.ECDSA):
|
||||
return _ECDSAVerificationContext(
|
||||
self._backend, self, signature, signature_algorithm.algorithm
|
||||
)
|
||||
else:
|
||||
raise UnsupportedAlgorithm(
|
||||
"Unsupported elliptic curve signature algorithm.",
|
||||
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM)
|
||||
|
||||
def public_numbers(self):
|
||||
set_func, get_func, group = (
|
||||
self._backend._ec_key_determine_group_get_set_funcs(self._ec_key)
|
||||
)
|
||||
point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
|
||||
self._backend.openssl_assert(point != self._backend._ffi.NULL)
|
||||
|
||||
with self._backend._tmp_bn_ctx() as bn_ctx:
|
||||
bn_x = self._backend._lib.BN_CTX_get(bn_ctx)
|
||||
bn_y = self._backend._lib.BN_CTX_get(bn_ctx)
|
||||
|
||||
res = get_func(group, point, bn_x, bn_y, bn_ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
x = self._backend._bn_to_int(bn_x)
|
||||
y = self._backend._bn_to_int(bn_y)
|
||||
|
||||
return ec.EllipticCurvePublicNumbers(
|
||||
x=x,
|
||||
y=y,
|
||||
curve=self._curve
|
||||
)
|
||||
|
||||
def public_bytes(self, encoding, format):
|
||||
if format is serialization.PublicFormat.PKCS1:
|
||||
raise ValueError(
|
||||
"EC public keys do not support PKCS1 serialization"
|
||||
)
|
||||
|
||||
return self._backend._public_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
self._evp_pkey,
|
||||
None
|
||||
)
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
|
||||
from cryptography import utils
|
||||
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
|
||||
@utils.register_interface(hashes.HashContext)
|
||||
class _HashContext(object):
|
||||
def __init__(self, backend, algorithm, ctx=None):
|
||||
self._algorithm = algorithm
|
||||
|
||||
self._backend = backend
|
||||
|
||||
if ctx is None:
|
||||
ctx = self._backend._lib.EVP_MD_CTX_create()
|
||||
ctx = self._backend._ffi.gc(ctx,
|
||||
self._backend._lib.EVP_MD_CTX_destroy)
|
||||
evp_md = self._backend._lib.EVP_get_digestbyname(
|
||||
algorithm.name.encode("ascii"))
|
||||
if evp_md == self._backend._ffi.NULL:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{0} is not a supported hash on this backend.".format(
|
||||
algorithm.name),
|
||||
_Reasons.UNSUPPORTED_HASH
|
||||
)
|
||||
res = self._backend._lib.EVP_DigestInit_ex(ctx, evp_md,
|
||||
self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
self._ctx = ctx
|
||||
|
||||
algorithm = utils.read_only_property("_algorithm")
|
||||
|
||||
def copy(self):
|
||||
copied_ctx = self._backend._lib.EVP_MD_CTX_create()
|
||||
copied_ctx = self._backend._ffi.gc(
|
||||
copied_ctx, self._backend._lib.EVP_MD_CTX_destroy
|
||||
)
|
||||
res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
return _HashContext(self._backend, self.algorithm, ctx=copied_ctx)
|
||||
|
||||
def update(self, data):
|
||||
res = self._backend._lib.EVP_DigestUpdate(self._ctx, data, len(data))
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
def finalize(self):
|
||||
buf = self._backend._ffi.new("unsigned char[]",
|
||||
self._backend._lib.EVP_MAX_MD_SIZE)
|
||||
outlen = self._backend._ffi.new("unsigned int *")
|
||||
res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, outlen)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._backend.openssl_assert(outlen[0] == self.algorithm.digest_size)
|
||||
res = self._backend._lib.EVP_MD_CTX_cleanup(self._ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
return self._backend._ffi.buffer(buf)[:outlen[0]]
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
|
||||
from cryptography import utils
|
||||
from cryptography.exceptions import (
|
||||
InvalidSignature, UnsupportedAlgorithm, _Reasons
|
||||
)
|
||||
from cryptography.hazmat.primitives import constant_time, hashes, interfaces
|
||||
|
||||
|
||||
@utils.register_interface(interfaces.MACContext)
|
||||
@utils.register_interface(hashes.HashContext)
|
||||
class _HMACContext(object):
|
||||
def __init__(self, backend, key, algorithm, ctx=None):
|
||||
self._algorithm = algorithm
|
||||
self._backend = backend
|
||||
|
||||
if ctx is None:
|
||||
ctx = self._backend._ffi.new("HMAC_CTX *")
|
||||
self._backend._lib.HMAC_CTX_init(ctx)
|
||||
ctx = self._backend._ffi.gc(
|
||||
ctx, self._backend._lib.HMAC_CTX_cleanup
|
||||
)
|
||||
evp_md = self._backend._lib.EVP_get_digestbyname(
|
||||
algorithm.name.encode('ascii'))
|
||||
if evp_md == self._backend._ffi.NULL:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{0} is not a supported hash on this backend.".format(
|
||||
algorithm.name),
|
||||
_Reasons.UNSUPPORTED_HASH
|
||||
)
|
||||
res = self._backend._lib.Cryptography_HMAC_Init_ex(
|
||||
ctx, key, len(key), evp_md, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
self._ctx = ctx
|
||||
self._key = key
|
||||
|
||||
algorithm = utils.read_only_property("_algorithm")
|
||||
|
||||
def copy(self):
|
||||
copied_ctx = self._backend._ffi.new("HMAC_CTX *")
|
||||
self._backend._lib.HMAC_CTX_init(copied_ctx)
|
||||
copied_ctx = self._backend._ffi.gc(
|
||||
copied_ctx, self._backend._lib.HMAC_CTX_cleanup
|
||||
)
|
||||
res = self._backend._lib.Cryptography_HMAC_CTX_copy(
|
||||
copied_ctx, self._ctx
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
return _HMACContext(
|
||||
self._backend, self._key, self.algorithm, ctx=copied_ctx
|
||||
)
|
||||
|
||||
def update(self, data):
|
||||
res = self._backend._lib.Cryptography_HMAC_Update(
|
||||
self._ctx, data, len(data)
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
def finalize(self):
|
||||
buf = self._backend._ffi.new("unsigned char[]",
|
||||
self._backend._lib.EVP_MAX_MD_SIZE)
|
||||
outlen = self._backend._ffi.new("unsigned int *")
|
||||
res = self._backend._lib.Cryptography_HMAC_Final(
|
||||
self._ctx, buf, outlen
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._backend.openssl_assert(outlen[0] == self.algorithm.digest_size)
|
||||
self._backend._lib.HMAC_CTX_cleanup(self._ctx)
|
||||
return self._backend._ffi.buffer(buf)[:outlen[0]]
|
||||
|
||||
def verify(self, signature):
|
||||
digest = self.finalize()
|
||||
if not constant_time.bytes_eq(digest, signature):
|
||||
raise InvalidSignature("Signature did not match digest.")
|
||||
605
Lib/site-packages/cryptography/hazmat/backends/openssl/rsa.py
Normal file
605
Lib/site-packages/cryptography/hazmat/backends/openssl/rsa.py
Normal file
|
|
@ -0,0 +1,605 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
import math
|
||||
|
||||
from cryptography import utils
|
||||
from cryptography.exceptions import (
|
||||
AlreadyFinalized, InvalidSignature, UnsupportedAlgorithm, _Reasons
|
||||
)
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import (
|
||||
AsymmetricSignatureContext, AsymmetricVerificationContext, rsa
|
||||
)
|
||||
from cryptography.hazmat.primitives.asymmetric.padding import (
|
||||
AsymmetricPadding, MGF1, OAEP, PKCS1v15, PSS
|
||||
)
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import (
|
||||
RSAPrivateKeyWithSerialization, RSAPublicKeyWithSerialization
|
||||
)
|
||||
|
||||
|
||||
def _get_rsa_pss_salt_length(pss, key_size, digest_size):
|
||||
salt = pss._salt_length
|
||||
|
||||
if salt is MGF1.MAX_LENGTH or salt is PSS.MAX_LENGTH:
|
||||
# bit length - 1 per RFC 3447
|
||||
emlen = int(math.ceil((key_size - 1) / 8.0))
|
||||
salt_length = emlen - digest_size - 2
|
||||
assert salt_length >= 0
|
||||
return salt_length
|
||||
else:
|
||||
return salt
|
||||
|
||||
|
||||
def _enc_dec_rsa(backend, key, data, padding):
|
||||
if not isinstance(padding, AsymmetricPadding):
|
||||
raise TypeError("Padding must be an instance of AsymmetricPadding.")
|
||||
|
||||
if isinstance(padding, PKCS1v15):
|
||||
padding_enum = backend._lib.RSA_PKCS1_PADDING
|
||||
elif isinstance(padding, OAEP):
|
||||
padding_enum = backend._lib.RSA_PKCS1_OAEP_PADDING
|
||||
if not isinstance(padding._mgf, MGF1):
|
||||
raise UnsupportedAlgorithm(
|
||||
"Only MGF1 is supported by this backend.",
|
||||
_Reasons.UNSUPPORTED_MGF
|
||||
)
|
||||
|
||||
if not isinstance(padding._mgf._algorithm, hashes.SHA1):
|
||||
raise UnsupportedAlgorithm(
|
||||
"This backend supports only SHA1 inside MGF1 when "
|
||||
"using OAEP.",
|
||||
_Reasons.UNSUPPORTED_HASH
|
||||
)
|
||||
|
||||
if padding._label is not None and padding._label != b"":
|
||||
raise ValueError("This backend does not support OAEP labels.")
|
||||
|
||||
if not isinstance(padding._algorithm, hashes.SHA1):
|
||||
raise UnsupportedAlgorithm(
|
||||
"This backend only supports SHA1 when using OAEP.",
|
||||
_Reasons.UNSUPPORTED_HASH
|
||||
)
|
||||
else:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{0} is not supported by this backend.".format(
|
||||
padding.name
|
||||
),
|
||||
_Reasons.UNSUPPORTED_PADDING
|
||||
)
|
||||
|
||||
if backend._lib.Cryptography_HAS_PKEY_CTX:
|
||||
return _enc_dec_rsa_pkey_ctx(backend, key, data, padding_enum)
|
||||
else:
|
||||
return _enc_dec_rsa_098(backend, key, data, padding_enum)
|
||||
|
||||
|
||||
def _enc_dec_rsa_pkey_ctx(backend, key, data, padding_enum):
|
||||
if isinstance(key, _RSAPublicKey):
|
||||
init = backend._lib.EVP_PKEY_encrypt_init
|
||||
crypt = backend._lib.Cryptography_EVP_PKEY_encrypt
|
||||
else:
|
||||
init = backend._lib.EVP_PKEY_decrypt_init
|
||||
crypt = backend._lib.Cryptography_EVP_PKEY_decrypt
|
||||
|
||||
pkey_ctx = backend._lib.EVP_PKEY_CTX_new(
|
||||
key._evp_pkey, backend._ffi.NULL
|
||||
)
|
||||
backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
|
||||
pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
|
||||
res = init(pkey_ctx)
|
||||
backend.openssl_assert(res == 1)
|
||||
res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(
|
||||
pkey_ctx, padding_enum)
|
||||
backend.openssl_assert(res > 0)
|
||||
buf_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
|
||||
backend.openssl_assert(buf_size > 0)
|
||||
outlen = backend._ffi.new("size_t *", buf_size)
|
||||
buf = backend._ffi.new("char[]", buf_size)
|
||||
res = crypt(pkey_ctx, buf, outlen, data, len(data))
|
||||
if res <= 0:
|
||||
_handle_rsa_enc_dec_error(backend, key)
|
||||
|
||||
return backend._ffi.buffer(buf)[:outlen[0]]
|
||||
|
||||
|
||||
def _enc_dec_rsa_098(backend, key, data, padding_enum):
|
||||
if isinstance(key, _RSAPublicKey):
|
||||
crypt = backend._lib.RSA_public_encrypt
|
||||
else:
|
||||
crypt = backend._lib.RSA_private_decrypt
|
||||
|
||||
key_size = backend._lib.RSA_size(key._rsa_cdata)
|
||||
backend.openssl_assert(key_size > 0)
|
||||
buf = backend._ffi.new("unsigned char[]", key_size)
|
||||
res = crypt(len(data), data, buf, key._rsa_cdata, padding_enum)
|
||||
if res < 0:
|
||||
_handle_rsa_enc_dec_error(backend, key)
|
||||
|
||||
return backend._ffi.buffer(buf)[:res]
|
||||
|
||||
|
||||
def _handle_rsa_enc_dec_error(backend, key):
|
||||
errors = backend._consume_errors()
|
||||
assert errors
|
||||
assert errors[0].lib == backend._lib.ERR_LIB_RSA
|
||||
if isinstance(key, _RSAPublicKey):
|
||||
assert (errors[0].reason ==
|
||||
backend._lib.RSA_R_DATA_TOO_LARGE_FOR_KEY_SIZE)
|
||||
raise ValueError(
|
||||
"Data too long for key size. Encrypt less data or use a "
|
||||
"larger key size."
|
||||
)
|
||||
else:
|
||||
decoding_errors = [
|
||||
backend._lib.RSA_R_BLOCK_TYPE_IS_NOT_01,
|
||||
backend._lib.RSA_R_BLOCK_TYPE_IS_NOT_02,
|
||||
backend._lib.RSA_R_OAEP_DECODING_ERROR,
|
||||
]
|
||||
if backend._lib.Cryptography_HAS_RSA_R_PKCS_DECODING_ERROR:
|
||||
decoding_errors.append(backend._lib.RSA_R_PKCS_DECODING_ERROR)
|
||||
|
||||
assert errors[0].reason in decoding_errors
|
||||
raise ValueError("Decryption failed.")
|
||||
|
||||
|
||||
@utils.register_interface(AsymmetricSignatureContext)
|
||||
class _RSASignatureContext(object):
|
||||
def __init__(self, backend, private_key, padding, algorithm):
|
||||
self._backend = backend
|
||||
self._private_key = private_key
|
||||
|
||||
if not isinstance(padding, AsymmetricPadding):
|
||||
raise TypeError("Expected provider of AsymmetricPadding.")
|
||||
|
||||
self._pkey_size = self._backend._lib.EVP_PKEY_size(
|
||||
self._private_key._evp_pkey
|
||||
)
|
||||
self._backend.openssl_assert(self._pkey_size > 0)
|
||||
|
||||
if isinstance(padding, PKCS1v15):
|
||||
if self._backend._lib.Cryptography_HAS_PKEY_CTX:
|
||||
self._finalize_method = self._finalize_pkey_ctx
|
||||
self._padding_enum = self._backend._lib.RSA_PKCS1_PADDING
|
||||
else:
|
||||
self._finalize_method = self._finalize_pkcs1
|
||||
elif isinstance(padding, PSS):
|
||||
if not isinstance(padding._mgf, MGF1):
|
||||
raise UnsupportedAlgorithm(
|
||||
"Only MGF1 is supported by this backend.",
|
||||
_Reasons.UNSUPPORTED_MGF
|
||||
)
|
||||
|
||||
# Size of key in bytes - 2 is the maximum
|
||||
# PSS signature length (salt length is checked later)
|
||||
if self._pkey_size - algorithm.digest_size - 2 < 0:
|
||||
raise ValueError("Digest too large for key size. Use a larger "
|
||||
"key.")
|
||||
|
||||
if not self._backend._mgf1_hash_supported(padding._mgf._algorithm):
|
||||
raise UnsupportedAlgorithm(
|
||||
"When OpenSSL is older than 1.0.1 then only SHA1 is "
|
||||
"supported with MGF1.",
|
||||
_Reasons.UNSUPPORTED_HASH
|
||||
)
|
||||
|
||||
if self._backend._lib.Cryptography_HAS_PKEY_CTX:
|
||||
self._finalize_method = self._finalize_pkey_ctx
|
||||
self._padding_enum = self._backend._lib.RSA_PKCS1_PSS_PADDING
|
||||
else:
|
||||
self._finalize_method = self._finalize_pss
|
||||
else:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{0} is not supported by this backend.".format(padding.name),
|
||||
_Reasons.UNSUPPORTED_PADDING
|
||||
)
|
||||
|
||||
self._padding = padding
|
||||
self._algorithm = algorithm
|
||||
self._hash_ctx = hashes.Hash(self._algorithm, self._backend)
|
||||
|
||||
def update(self, data):
|
||||
self._hash_ctx.update(data)
|
||||
|
||||
def finalize(self):
|
||||
evp_md = self._backend._lib.EVP_get_digestbyname(
|
||||
self._algorithm.name.encode("ascii"))
|
||||
self._backend.openssl_assert(evp_md != self._backend._ffi.NULL)
|
||||
|
||||
return self._finalize_method(evp_md)
|
||||
|
||||
def _finalize_pkey_ctx(self, evp_md):
|
||||
pkey_ctx = self._backend._lib.EVP_PKEY_CTX_new(
|
||||
self._private_key._evp_pkey, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(pkey_ctx != self._backend._ffi.NULL)
|
||||
pkey_ctx = self._backend._ffi.gc(pkey_ctx,
|
||||
self._backend._lib.EVP_PKEY_CTX_free)
|
||||
res = self._backend._lib.EVP_PKEY_sign_init(pkey_ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
res = self._backend._lib.EVP_PKEY_CTX_set_signature_md(
|
||||
pkey_ctx, evp_md)
|
||||
self._backend.openssl_assert(res > 0)
|
||||
|
||||
res = self._backend._lib.EVP_PKEY_CTX_set_rsa_padding(
|
||||
pkey_ctx, self._padding_enum)
|
||||
self._backend.openssl_assert(res > 0)
|
||||
if isinstance(self._padding, PSS):
|
||||
res = self._backend._lib.EVP_PKEY_CTX_set_rsa_pss_saltlen(
|
||||
pkey_ctx,
|
||||
_get_rsa_pss_salt_length(
|
||||
self._padding,
|
||||
self._private_key.key_size,
|
||||
self._hash_ctx.algorithm.digest_size
|
||||
)
|
||||
)
|
||||
self._backend.openssl_assert(res > 0)
|
||||
|
||||
if self._backend._lib.Cryptography_HAS_MGF1_MD:
|
||||
# MGF1 MD is configurable in OpenSSL 1.0.1+
|
||||
mgf1_md = self._backend._lib.EVP_get_digestbyname(
|
||||
self._padding._mgf._algorithm.name.encode("ascii"))
|
||||
self._backend.openssl_assert(
|
||||
mgf1_md != self._backend._ffi.NULL
|
||||
)
|
||||
res = self._backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(
|
||||
pkey_ctx, mgf1_md
|
||||
)
|
||||
self._backend.openssl_assert(res > 0)
|
||||
data_to_sign = self._hash_ctx.finalize()
|
||||
buflen = self._backend._ffi.new("size_t *")
|
||||
res = self._backend._lib.EVP_PKEY_sign(
|
||||
pkey_ctx,
|
||||
self._backend._ffi.NULL,
|
||||
buflen,
|
||||
data_to_sign,
|
||||
len(data_to_sign)
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
buf = self._backend._ffi.new("unsigned char[]", buflen[0])
|
||||
res = self._backend._lib.EVP_PKEY_sign(
|
||||
pkey_ctx, buf, buflen, data_to_sign, len(data_to_sign))
|
||||
if res != 1:
|
||||
errors = self._backend._consume_errors()
|
||||
assert errors[0].lib == self._backend._lib.ERR_LIB_RSA
|
||||
reason = None
|
||||
if (errors[0].reason ==
|
||||
self._backend._lib.RSA_R_DATA_TOO_LARGE_FOR_KEY_SIZE):
|
||||
reason = ("Salt length too long for key size. Try using "
|
||||
"MAX_LENGTH instead.")
|
||||
else:
|
||||
assert (errors[0].reason ==
|
||||
self._backend._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY)
|
||||
reason = "Digest too large for key size. Use a larger key."
|
||||
assert reason is not None
|
||||
raise ValueError(reason)
|
||||
|
||||
return self._backend._ffi.buffer(buf)[:]
|
||||
|
||||
def _finalize_pkcs1(self, evp_md):
|
||||
if self._hash_ctx._ctx is None:
|
||||
raise AlreadyFinalized("Context has already been finalized.")
|
||||
|
||||
sig_buf = self._backend._ffi.new("char[]", self._pkey_size)
|
||||
sig_len = self._backend._ffi.new("unsigned int *")
|
||||
res = self._backend._lib.EVP_SignFinal(
|
||||
self._hash_ctx._ctx._ctx,
|
||||
sig_buf,
|
||||
sig_len,
|
||||
self._private_key._evp_pkey
|
||||
)
|
||||
self._hash_ctx.finalize()
|
||||
if res == 0:
|
||||
errors = self._backend._consume_errors()
|
||||
assert errors[0].lib == self._backend._lib.ERR_LIB_RSA
|
||||
assert (errors[0].reason ==
|
||||
self._backend._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY)
|
||||
raise ValueError("Digest too large for key size. Use a larger "
|
||||
"key.")
|
||||
|
||||
return self._backend._ffi.buffer(sig_buf)[:sig_len[0]]
|
||||
|
||||
def _finalize_pss(self, evp_md):
|
||||
data_to_sign = self._hash_ctx.finalize()
|
||||
padded = self._backend._ffi.new("unsigned char[]", self._pkey_size)
|
||||
res = self._backend._lib.RSA_padding_add_PKCS1_PSS(
|
||||
self._private_key._rsa_cdata,
|
||||
padded,
|
||||
data_to_sign,
|
||||
evp_md,
|
||||
_get_rsa_pss_salt_length(
|
||||
self._padding,
|
||||
self._private_key.key_size,
|
||||
len(data_to_sign)
|
||||
)
|
||||
)
|
||||
if res != 1:
|
||||
errors = self._backend._consume_errors()
|
||||
assert errors[0].lib == self._backend._lib.ERR_LIB_RSA
|
||||
assert (errors[0].reason ==
|
||||
self._backend._lib.RSA_R_DATA_TOO_LARGE_FOR_KEY_SIZE)
|
||||
raise ValueError("Salt length too long for key size. Try using "
|
||||
"MAX_LENGTH instead.")
|
||||
|
||||
sig_buf = self._backend._ffi.new("char[]", self._pkey_size)
|
||||
sig_len = self._backend._lib.RSA_private_encrypt(
|
||||
self._pkey_size,
|
||||
padded,
|
||||
sig_buf,
|
||||
self._private_key._rsa_cdata,
|
||||
self._backend._lib.RSA_NO_PADDING
|
||||
)
|
||||
self._backend.openssl_assert(sig_len != -1)
|
||||
return self._backend._ffi.buffer(sig_buf)[:sig_len]
|
||||
|
||||
|
||||
@utils.register_interface(AsymmetricVerificationContext)
|
||||
class _RSAVerificationContext(object):
|
||||
def __init__(self, backend, public_key, signature, padding, algorithm):
|
||||
self._backend = backend
|
||||
self._public_key = public_key
|
||||
self._signature = signature
|
||||
|
||||
if not isinstance(padding, AsymmetricPadding):
|
||||
raise TypeError("Expected provider of AsymmetricPadding.")
|
||||
|
||||
self._pkey_size = self._backend._lib.EVP_PKEY_size(
|
||||
self._public_key._evp_pkey
|
||||
)
|
||||
self._backend.openssl_assert(self._pkey_size > 0)
|
||||
|
||||
if isinstance(padding, PKCS1v15):
|
||||
if self._backend._lib.Cryptography_HAS_PKEY_CTX:
|
||||
self._verify_method = self._verify_pkey_ctx
|
||||
self._padding_enum = self._backend._lib.RSA_PKCS1_PADDING
|
||||
else:
|
||||
self._verify_method = self._verify_pkcs1
|
||||
elif isinstance(padding, PSS):
|
||||
if not isinstance(padding._mgf, MGF1):
|
||||
raise UnsupportedAlgorithm(
|
||||
"Only MGF1 is supported by this backend.",
|
||||
_Reasons.UNSUPPORTED_MGF
|
||||
)
|
||||
|
||||
# Size of key in bytes - 2 is the maximum
|
||||
# PSS signature length (salt length is checked later)
|
||||
if self._pkey_size - algorithm.digest_size - 2 < 0:
|
||||
raise ValueError(
|
||||
"Digest too large for key size. Check that you have the "
|
||||
"correct key and digest algorithm."
|
||||
)
|
||||
|
||||
if not self._backend._mgf1_hash_supported(padding._mgf._algorithm):
|
||||
raise UnsupportedAlgorithm(
|
||||
"When OpenSSL is older than 1.0.1 then only SHA1 is "
|
||||
"supported with MGF1.",
|
||||
_Reasons.UNSUPPORTED_HASH
|
||||
)
|
||||
|
||||
if self._backend._lib.Cryptography_HAS_PKEY_CTX:
|
||||
self._verify_method = self._verify_pkey_ctx
|
||||
self._padding_enum = self._backend._lib.RSA_PKCS1_PSS_PADDING
|
||||
else:
|
||||
self._verify_method = self._verify_pss
|
||||
else:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{0} is not supported by this backend.".format(padding.name),
|
||||
_Reasons.UNSUPPORTED_PADDING
|
||||
)
|
||||
|
||||
self._padding = padding
|
||||
self._algorithm = algorithm
|
||||
self._hash_ctx = hashes.Hash(self._algorithm, self._backend)
|
||||
|
||||
def update(self, data):
|
||||
self._hash_ctx.update(data)
|
||||
|
||||
def verify(self):
|
||||
evp_md = self._backend._lib.EVP_get_digestbyname(
|
||||
self._algorithm.name.encode("ascii"))
|
||||
self._backend.openssl_assert(evp_md != self._backend._ffi.NULL)
|
||||
|
||||
self._verify_method(evp_md)
|
||||
|
||||
def _verify_pkey_ctx(self, evp_md):
|
||||
pkey_ctx = self._backend._lib.EVP_PKEY_CTX_new(
|
||||
self._public_key._evp_pkey, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(pkey_ctx != self._backend._ffi.NULL)
|
||||
pkey_ctx = self._backend._ffi.gc(pkey_ctx,
|
||||
self._backend._lib.EVP_PKEY_CTX_free)
|
||||
res = self._backend._lib.EVP_PKEY_verify_init(pkey_ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
res = self._backend._lib.EVP_PKEY_CTX_set_signature_md(
|
||||
pkey_ctx, evp_md)
|
||||
self._backend.openssl_assert(res > 0)
|
||||
|
||||
res = self._backend._lib.EVP_PKEY_CTX_set_rsa_padding(
|
||||
pkey_ctx, self._padding_enum)
|
||||
self._backend.openssl_assert(res > 0)
|
||||
if isinstance(self._padding, PSS):
|
||||
res = self._backend._lib.EVP_PKEY_CTX_set_rsa_pss_saltlen(
|
||||
pkey_ctx,
|
||||
_get_rsa_pss_salt_length(
|
||||
self._padding,
|
||||
self._public_key.key_size,
|
||||
self._hash_ctx.algorithm.digest_size
|
||||
)
|
||||
)
|
||||
self._backend.openssl_assert(res > 0)
|
||||
if self._backend._lib.Cryptography_HAS_MGF1_MD:
|
||||
# MGF1 MD is configurable in OpenSSL 1.0.1+
|
||||
mgf1_md = self._backend._lib.EVP_get_digestbyname(
|
||||
self._padding._mgf._algorithm.name.encode("ascii"))
|
||||
self._backend.openssl_assert(
|
||||
mgf1_md != self._backend._ffi.NULL
|
||||
)
|
||||
res = self._backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(
|
||||
pkey_ctx, mgf1_md
|
||||
)
|
||||
self._backend.openssl_assert(res > 0)
|
||||
|
||||
data_to_verify = self._hash_ctx.finalize()
|
||||
res = self._backend._lib.EVP_PKEY_verify(
|
||||
pkey_ctx,
|
||||
self._signature,
|
||||
len(self._signature),
|
||||
data_to_verify,
|
||||
len(data_to_verify)
|
||||
)
|
||||
# The previous call can return negative numbers in the event of an
|
||||
# error. This is not a signature failure but we need to fail if it
|
||||
# occurs.
|
||||
self._backend.openssl_assert(res >= 0)
|
||||
if res == 0:
|
||||
errors = self._backend._consume_errors()
|
||||
assert errors
|
||||
raise InvalidSignature
|
||||
|
||||
def _verify_pkcs1(self, evp_md):
|
||||
if self._hash_ctx._ctx is None:
|
||||
raise AlreadyFinalized("Context has already been finalized.")
|
||||
|
||||
res = self._backend._lib.EVP_VerifyFinal(
|
||||
self._hash_ctx._ctx._ctx,
|
||||
self._signature,
|
||||
len(self._signature),
|
||||
self._public_key._evp_pkey
|
||||
)
|
||||
self._hash_ctx.finalize()
|
||||
# The previous call can return negative numbers in the event of an
|
||||
# error. This is not a signature failure but we need to fail if it
|
||||
# occurs.
|
||||
self._backend.openssl_assert(res >= 0)
|
||||
if res == 0:
|
||||
errors = self._backend._consume_errors()
|
||||
assert errors
|
||||
raise InvalidSignature
|
||||
|
||||
def _verify_pss(self, evp_md):
|
||||
buf = self._backend._ffi.new("unsigned char[]", self._pkey_size)
|
||||
res = self._backend._lib.RSA_public_decrypt(
|
||||
len(self._signature),
|
||||
self._signature,
|
||||
buf,
|
||||
self._public_key._rsa_cdata,
|
||||
self._backend._lib.RSA_NO_PADDING
|
||||
)
|
||||
if res != self._pkey_size:
|
||||
errors = self._backend._consume_errors()
|
||||
assert errors
|
||||
raise InvalidSignature
|
||||
|
||||
data_to_verify = self._hash_ctx.finalize()
|
||||
res = self._backend._lib.RSA_verify_PKCS1_PSS(
|
||||
self._public_key._rsa_cdata,
|
||||
data_to_verify,
|
||||
evp_md,
|
||||
buf,
|
||||
_get_rsa_pss_salt_length(
|
||||
self._padding,
|
||||
self._public_key.key_size,
|
||||
len(data_to_verify)
|
||||
)
|
||||
)
|
||||
if res != 1:
|
||||
errors = self._backend._consume_errors()
|
||||
assert errors
|
||||
raise InvalidSignature
|
||||
|
||||
|
||||
@utils.register_interface(RSAPrivateKeyWithSerialization)
|
||||
class _RSAPrivateKey(object):
|
||||
def __init__(self, backend, rsa_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._rsa_cdata = rsa_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
self._key_size = self._backend._lib.BN_num_bits(self._rsa_cdata.n)
|
||||
|
||||
key_size = utils.read_only_property("_key_size")
|
||||
|
||||
def signer(self, padding, algorithm):
|
||||
return _RSASignatureContext(self._backend, self, padding, algorithm)
|
||||
|
||||
def decrypt(self, ciphertext, padding):
|
||||
key_size_bytes = int(math.ceil(self.key_size / 8.0))
|
||||
if key_size_bytes != len(ciphertext):
|
||||
raise ValueError("Ciphertext length must be equal to key size.")
|
||||
|
||||
return _enc_dec_rsa(self._backend, self, ciphertext, padding)
|
||||
|
||||
def public_key(self):
|
||||
ctx = self._backend._lib.RSA_new()
|
||||
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
|
||||
ctx = self._backend._ffi.gc(ctx, self._backend._lib.RSA_free)
|
||||
ctx.e = self._backend._lib.BN_dup(self._rsa_cdata.e)
|
||||
ctx.n = self._backend._lib.BN_dup(self._rsa_cdata.n)
|
||||
res = self._backend._lib.RSA_blinding_on(ctx, self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
evp_pkey = self._backend._rsa_cdata_to_evp_pkey(ctx)
|
||||
return _RSAPublicKey(self._backend, ctx, evp_pkey)
|
||||
|
||||
def private_numbers(self):
|
||||
return rsa.RSAPrivateNumbers(
|
||||
p=self._backend._bn_to_int(self._rsa_cdata.p),
|
||||
q=self._backend._bn_to_int(self._rsa_cdata.q),
|
||||
d=self._backend._bn_to_int(self._rsa_cdata.d),
|
||||
dmp1=self._backend._bn_to_int(self._rsa_cdata.dmp1),
|
||||
dmq1=self._backend._bn_to_int(self._rsa_cdata.dmq1),
|
||||
iqmp=self._backend._bn_to_int(self._rsa_cdata.iqmp),
|
||||
public_numbers=rsa.RSAPublicNumbers(
|
||||
e=self._backend._bn_to_int(self._rsa_cdata.e),
|
||||
n=self._backend._bn_to_int(self._rsa_cdata.n),
|
||||
)
|
||||
)
|
||||
|
||||
def private_bytes(self, encoding, format, encryption_algorithm):
|
||||
return self._backend._private_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
encryption_algorithm,
|
||||
self._evp_pkey,
|
||||
self._rsa_cdata
|
||||
)
|
||||
|
||||
|
||||
@utils.register_interface(RSAPublicKeyWithSerialization)
|
||||
class _RSAPublicKey(object):
|
||||
def __init__(self, backend, rsa_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._rsa_cdata = rsa_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
self._key_size = self._backend._lib.BN_num_bits(self._rsa_cdata.n)
|
||||
|
||||
key_size = utils.read_only_property("_key_size")
|
||||
|
||||
def verifier(self, signature, padding, algorithm):
|
||||
if not isinstance(signature, bytes):
|
||||
raise TypeError("signature must be bytes.")
|
||||
|
||||
return _RSAVerificationContext(
|
||||
self._backend, self, signature, padding, algorithm
|
||||
)
|
||||
|
||||
def encrypt(self, plaintext, padding):
|
||||
return _enc_dec_rsa(self._backend, self, plaintext, padding)
|
||||
|
||||
def public_numbers(self):
|
||||
return rsa.RSAPublicNumbers(
|
||||
e=self._backend._bn_to_int(self._rsa_cdata.e),
|
||||
n=self._backend._bn_to_int(self._rsa_cdata.n),
|
||||
)
|
||||
|
||||
def public_bytes(self, encoding, format):
|
||||
return self._backend._public_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
self._evp_pkey,
|
||||
self._rsa_cdata
|
||||
)
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
import six
|
||||
|
||||
|
||||
def _truncate_digest(digest, order_bits):
|
||||
digest_len = len(digest)
|
||||
|
||||
if 8 * digest_len > order_bits:
|
||||
digest_len = (order_bits + 7) // 8
|
||||
digest = digest[:digest_len]
|
||||
|
||||
if 8 * digest_len > order_bits:
|
||||
rshift = 8 - (order_bits & 0x7)
|
||||
assert 0 < rshift < 8
|
||||
|
||||
mask = 0xFF >> rshift << rshift
|
||||
|
||||
# Set the bottom rshift bits to 0
|
||||
digest = digest[:-1] + six.int2byte(six.indexbytes(digest, -1) & mask)
|
||||
|
||||
return digest
|
||||
1067
Lib/site-packages/cryptography/hazmat/backends/openssl/x509.py
Normal file
1067
Lib/site-packages/cryptography/hazmat/backends/openssl/x509.py
Normal file
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue