openmedialibrary/oml/tor_request.py
2024-06-10 14:02:41 +01:00

134 lines
4.7 KiB
Python

# -*- coding: utf-8 -*-
import ssl
import http.client
import urllib.request, urllib.error, urllib.parse
import logging
import socks
import socket
import settings
import state
from utils import get_service_id
logger = logging.getLogger(__name__)
class InvalidCertificateException(http.client.HTTPException, urllib.error.URLError):
def __init__(self, service_id, cert, reason):
http.client.HTTPException.__init__(self)
self._service_id = service_id
self._cert_service_id = get_service_id(cert=cert)
self.reason = reason
def __str__(self):
return ('%s (local) != %s (remote) (%s)\n' %
(self._service_id, self._cert_service_id, self.reason))
def is_local(host):
return len([p for p in host.split(':')[0].split('.') if p.isdigit()]) == 4
def getaddrinfo(*args):
return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))]
def create_tor_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None):
host, port = address
err = None
af = socket.AF_INET
socktype = socket.SOCK_STREAM
proto = 6
sa = address
sock = None
logger.debug('make tor connection to: %s', address)
try:
sock = socks.socksocket(af, socktype, proto)
if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
socks_port = state.tor.socks_port if state.tor else 9150
sock.set_proxy(socks.SOCKS5, "localhost", socks_port, True)
if source_address:
sock.bind(source_address)
sock.connect(sa)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise sock.error("getaddrinfo returns an empty list")
class TorHTTPSConnection(http.client.HTTPSConnection):
def __init__(self, host, port=None, service_id=None, check_hostname=None, context=None, **kwargs):
self._service_id = service_id
if self._service_id:
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if context:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.load_cert_chain(settings.ssl_cert_path, settings.ssl_key_path)
context.load_default_certs()
context.set_alpn_protocols(['http/1.1'])
context.post_handshake_auth = True
http.client.HTTPSConnection.__init__(self, host, port,
check_hostname=check_hostname, context=context, **kwargs)
if not is_local(host):
self._create_connection = create_tor_connection
def get_service_id_cert(self):
for cert in self.sock._sslobj.get_verified_chain():
info = cert.get_info()
subject = info.get("subject")
if subject:
CN = subject[0][0][1]
if CN == self._service_id:
cert = cert.public_bytes()
return cert
def _check_service_id(self, cert):
service_id = get_service_id(cert=cert)
if service_id != self._service_id:
logger.debug('service_id mismatch: %s expected: %s', service_id, self._service_id)
return service_id == self._service_id
def connect(self):
http.client.HTTPSConnection.connect(self)
if self._service_id:
cert = self.get_service_id_cert()
if not self._check_service_id(cert):
raise InvalidCertificateException(self._service_id, cert, 'service_id mismatch')
class TorHTTPSHandler(urllib.request.HTTPSHandler):
def __init__(self, debuglevel=0, context=None, check_hostname=None, service_id=None):
urllib.request.AbstractHTTPHandler.__init__(self, debuglevel)
self._context = context
self._check_hostname = check_hostname
self._service_id = service_id
def https_open(self, req):
return self.do_open(TorHTTPSConnection, req,
context=self._context, check_hostname=self._check_hostname,
service_id=self._service_id)
class TorHTTPConnection(http.client.HTTPConnection):
def __init__(self, host, port=None, **kwargs):
http.client.HTTPConnection.__init__(self, host, port, **kwargs)
if not is_local(host):
self._create_connection = create_tor_connection
class TorHTTPHandler(urllib.request.HTTPHandler):
def http_open(self, req):
return self.do_open(TorHTTPConnection, req)
def get_opener(service_id=None):
handler = TorHTTPSHandler(service_id=service_id)
opener = urllib.request.build_opener(handler, TorHTTPHandler())
return opener