# -*- coding: utf-8 -*- import ssl import http.client import urllib.request, urllib.error, urllib.parse import logging import socks import socket import settings import state from utils import get_service_id logger = logging.getLogger(__name__) class InvalidCertificateException(http.client.HTTPException, urllib.error.URLError): def __init__(self, service_id, cert, reason): http.client.HTTPException.__init__(self) self._service_id = service_id self._cert_service_id = get_service_id(cert=cert) self.reason = reason def __str__(self): return ('%s (local) != %s (remote) (%s)\n' % (self._service_id, self._cert_service_id, self.reason)) def is_local(host): return len([p for p in host.split(':')[0].split('.') if p.isdigit()]) == 4 def getaddrinfo(*args): return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))] def create_tor_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): host, port = address err = None af = socket.AF_INET socktype = socket.SOCK_STREAM proto = 6 sa = address sock = None try: sock = socks.socksocket(af, socktype, proto) if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: sock.settimeout(timeout) socks_port = state.tor.socks_port if state.tor else 9150 sock.set_proxy(socks.SOCKS5, "localhost", socks_port, True) if source_address: sock.bind(source_address) sock.connect(sa) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err else: raise sock.error("getaddrinfo returns an empty list") class TorHTTPSConnection(http.client.HTTPSConnection): def __init__(self, host, port=None, service_id=None, check_hostname=None, context=None, **kwargs): self._service_id = service_id if self._service_id: context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) if context: context.check_hostname = False context.verify_mode = ssl.CERT_NONE context.load_cert_chain(settings.ssl_cert_path, settings.ssl_key_path) context.load_default_certs() context.set_alpn_protocols(['http/1.1']) context.post_handshake_auth = True http.client.HTTPSConnection.__init__(self, host, port, check_hostname=check_hostname, context=context, **kwargs) if not is_local(host): self._create_connection = create_tor_connection def get_service_id_cert(self): for cert in self.sock._sslobj.get_verified_chain(): info = cert.get_info() subject = info.get("subject") if subject: CN = subject[0][0][1] if CN == self._service_id: cert = cert.public_bytes() return cert def _check_service_id(self, cert): service_id = get_service_id(cert=cert) if service_id != self._service_id: logger.debug('service_id mismatch: %s expected: %s', service_id, self._service_id) return service_id == self._service_id def connect(self): http.client.HTTPSConnection.connect(self) if self._service_id: cert = self.get_service_id_cert() if not self._check_service_id(cert): raise InvalidCertificateException(self._service_id, cert, 'service_id mismatch') class TorHTTPSHandler(urllib.request.HTTPSHandler): def __init__(self, debuglevel=0, context=None, check_hostname=None, service_id=None): urllib.request.AbstractHTTPHandler.__init__(self, debuglevel) self._context = context self._check_hostname = check_hostname self._service_id = service_id def https_open(self, req): return self.do_open(TorHTTPSConnection, req, context=self._context, check_hostname=self._check_hostname, service_id=self._service_id) class TorHTTPConnection(http.client.HTTPConnection): def __init__(self, host, port=None, **kwargs): http.client.HTTPConnection.__init__(self, host, port, **kwargs) if not is_local(host): self._create_connection = create_tor_connection class TorHTTPHandler(urllib.request.HTTPHandler): def http_open(self, req): return self.do_open(TorHTTPConnection, req) def get_opener(service_id=None): handler = TorHTTPSHandler(service_id=service_id) opener = urllib.request.build_opener(handler, TorHTTPHandler()) return opener