openmedialibrary/oml/ssl_request.py

84 lines
3.3 KiB
Python
Raw Normal View History

2014-09-02 22:32:44 +00:00
import http.client
2014-05-14 09:57:11 +00:00
import socket
2014-09-02 22:32:44 +00:00
import urllib.request, urllib.error, urllib.parse
2014-05-14 09:57:11 +00:00
import ssl
import hashlib
2014-05-17 14:26:59 +00:00
import logging
logger = logging.getLogger('oml.ssl_request')
2014-05-14 09:57:11 +00:00
2014-09-02 22:32:44 +00:00
class InvalidCertificateException(http.client.HTTPException, urllib.error.URLError):
2014-05-14 09:57:11 +00:00
def __init__(self, fingerprint, cert, reason):
2014-09-02 22:32:44 +00:00
http.client.HTTPException.__init__(self)
2014-05-14 09:57:11 +00:00
self.fingerprint = fingerprint
self.cert_fingerprint = hashlib.sha1(cert).hexdigest()
self.reason = reason
def __str__(self):
return ('%s (local) != %s (remote) (%s)\n' %
(self.fingerprint, self.cert_fingerprint, self.reason))
2014-09-02 22:32:44 +00:00
class CertValidatingHTTPSConnection(http.client.HTTPConnection):
default_port = http.client.HTTPS_PORT
2014-05-14 09:57:11 +00:00
def __init__(self, host, port=None, fingerprint=None, strict=None, **kwargs):
2014-09-02 22:32:44 +00:00
http.client.HTTPConnection.__init__(self, host, port, strict, **kwargs)
2014-05-14 09:57:11 +00:00
self.fingerprint = fingerprint
if self.fingerprint:
self.cert_reqs = ssl.CERT_REQUIRED
else:
self.cert_reqs = ssl.CERT_NONE
self.cert_reqs = ssl.CERT_NONE
def _ValidateCertificateFingerprint(self, cert):
2014-09-05 23:44:17 +00:00
if len(self.fingerprint) == 40:
fingerprint = hashlib.sha1(cert).hexdigest()
elif len(self.fingerprint) == 64:
fingerprint = hashlib.sha256(cert).hexdigest()
elif len(self.fingerprint) == 128:
fingerprint = hashlib.sha512(cert).hexdigest()
else:
logging.error('unkown fingerprint length %s (%s)', self.fingerprint, len(self.fingerprint))
return False
2014-05-14 09:57:11 +00:00
return fingerprint == self.fingerprint
def connect(self):
sock = socket.create_connection((self.host, self.port))
self.sock = ssl.wrap_socket(sock, cert_reqs=self.cert_reqs)
#if self.cert_reqs & ssl.CERT_REQUIRED:
if self.fingerprint:
cert = self.sock.getpeercert(binary_form=True)
if not self._ValidateCertificateFingerprint(cert):
raise InvalidCertificateException(self.fingerprint, cert,
'fingerprint mismatch')
2014-05-19 15:00:33 +00:00
#logger.debug('CIPHER %s VERSION %s', self.sock.cipher(), self.sock.ssl_version)
2014-05-14 09:57:11 +00:00
2014-09-02 22:32:44 +00:00
class VerifiedHTTPSHandler(urllib.request.HTTPSHandler):
2014-05-14 09:57:11 +00:00
def __init__(self, **kwargs):
2014-09-02 22:32:44 +00:00
urllib.request.AbstractHTTPHandler.__init__(self)
2014-05-14 09:57:11 +00:00
self._connection_args = kwargs
def https_open(self, req):
def http_class_wrapper(host, **kwargs):
full_kwargs = dict(self._connection_args)
full_kwargs.update(kwargs)
2014-09-02 23:09:42 +00:00
print(self._connection_args)
print(kwargs)
if 'timeout' in full_kwargs:
del full_kwargs['timeout']
2014-05-14 09:57:11 +00:00
return CertValidatingHTTPSConnection(host, **full_kwargs)
try:
return self.do_open(http_class_wrapper, req)
2014-09-02 22:32:44 +00:00
except urllib.error.URLError as e:
2014-05-14 09:57:11 +00:00
if type(e.reason) == ssl.SSLError and e.reason.args[0] == 1:
raise InvalidCertificateException(self.fingerprint, '',
e.reason.args[1])
raise
2014-09-02 22:32:44 +00:00
https_request = urllib.request.HTTPSHandler.do_request_
2014-05-14 09:57:11 +00:00
def get_opener(fingerprint):
handler = VerifiedHTTPSHandler(fingerprint=fingerprint)
2014-09-02 22:32:44 +00:00
opener = urllib.request.build_opener(handler)
2014-05-14 09:57:11 +00:00
return opener