From 7c1e5c691a308df6b6f49eef92e420e49e6eddd7 Mon Sep 17 00:00:00 2001 From: j Date: Thu, 26 Nov 2015 01:26:10 +0100 Subject: [PATCH] use tor hidden service instead of ed25515 as peer id --- README.md | 6 +- install | 5 + oml/changelog.py | 79 +++------ oml/downloads.py | 2 +- oml/item/models.py | 1 - oml/item/scan.py | 1 - oml/localnodes.py | 51 +++--- oml/node/cert.py | 3 +- oml/node/server.py | 364 ++++++++++++++++++++++++---------------- oml/node/sslsocket.py | 305 +++++++++++++++++++++++++++++++++ oml/nodes.py | 99 ++++++----- oml/server.py | 24 ++- oml/settings.py | 9 +- oml/setup.py | 13 ++ oml/ssl_request.py | 69 ++++---- oml/state.py | 11 +- oml/tor.py | 196 ++++++++++++++++++++++ oml/tor_request.py | 122 ++++++++++++++ oml/user/api.py | 10 +- oml/utils.py | 84 ++++++++++ requirements-shared.txt | 2 + requirements.txt | 5 +- static/js/utils.js | 2 +- 23 files changed, 1139 insertions(+), 324 deletions(-) create mode 100644 oml/node/sslsocket.py create mode 100644 oml/tor.py create mode 100644 oml/tor_request.py diff --git a/README.md b/README.md index 8abd657..f28e09f 100644 --- a/README.md +++ b/README.md @@ -39,10 +39,10 @@ To update to latest version: ./ctl update -On Linux you need a working python3 installation with pillow, python-lxml and poppler-utils: - - apt-get install python3.4 python3-pil python3-lxml poppler-utils +On Linux you need a working python3.4 installation with pillow, python-lxml, pyOpenSSL and pyCrypto and popler-utils: + apt-get install python3.4 python3-pil python3-lxml \ + python3-pyopenssl python3-crypto poppler-utils Platform ---------- diff --git a/install b/install index 807dbe0..43f23b0 100755 --- a/install +++ b/install @@ -70,6 +70,11 @@ class Install(Thread): except: apt_packages += ' python3-openssl' dnf_packages += ' python3-pyOpenSSL' + try: + import Crypto + except: + apt_packages += ' python3-crypto' + dnf_packages += ' python3-pyCrypto' if not has_bin('pdftocairo'): apt_packages += ' poppler-utils' diff --git a/oml/changelog.py b/oml/changelog.py index 2e7e497..03f811d 100644 --- a/oml/changelog.py +++ b/oml/changelog.py @@ -7,7 +7,7 @@ import json import sqlalchemy as sa -from utils import valid, datetime2ts, ts2datetime +from utils import datetime2ts, ts2datetime from websocket import trigger_event import db import settings @@ -56,7 +56,6 @@ class Changelog(db.Model): c.data = json.dumps([action] + list(args), ensure_ascii=False) _data = str(c.revision) + str(c.timestamp) + c.data _data = _data.encode() - c.sig = settings.sk.sign(_data, encoding='base64').decode() state.db.session.add(c) state.db.session.commit() if state.nodes: @@ -64,45 +63,38 @@ class Changelog(db.Model): @classmethod def apply_changes(cls, user, changes): + trigger = changes for change in changes: - if not Changelog.apply_change(user, change, trigger=False): + if not cls.apply_change(user, change, trigger=False): logger.debug('FAIL %s', change) + trigger = False break return False - if changes: + if trigger: trigger_event('change', {}); return True @classmethod - def apply_change(cls, user, change, rebuild=False, trigger=True): - revision, timestamp, sig, data = change - last = Changelog.query.filter_by(user_id=user.id).order_by('-revision').first() + def apply_change(cls, user, change, trigger=True): + revision, timestamp, data = change + last = cls.query.filter_by(user_id=user.id).order_by('-revision').first() next_revision = last.revision + 1 if last else 0 if revision == next_revision: - _data = str(revision) + str(timestamp) + data - _data = _data.encode() - if rebuild: - sig = settings.sk.sign(_data, encoding='base64').decode() - if valid(user.id, _data, sig): - c = cls() - c.created = datetime.utcnow() - c.timestamp = timestamp - c.user_id = user.id - c.revision = revision - c.data = data - c.sig = sig - args = json.loads(data) - logger.debug('apply change from %s: %s', user.name, args) - if getattr(c, 'action_' + args[0])(user, timestamp, *args[1:]): - logger.debug('change applied') - state.db.session.add(c) - state.db.session.commit() - if trigger: - trigger_event('change', {}); - return True - else: - logger.debug('INVLAID SIGNATURE ON CHANGE %s', change) - raise Exception('invalid signature') + c = cls() + c.created = datetime.utcnow() + c.timestamp = timestamp + c.user_id = user.id + c.revision = revision + c.data = data + args = json.loads(data) + logger.debug('apply change from %s: %s', user.name, args) + if getattr(c, 'action_' + args[0])(user, timestamp, *args[1:]): + logger.debug('change applied') + state.db.session.add(c) + state.db.session.commit() + if trigger: + trigger_event('change', {}); + return True else: logger.debug('revsion does not match! got %s expecting %s', revision, next_revision) return False @@ -110,26 +102,9 @@ class Changelog(db.Model): def __repr__(self): return self.data - def verify(self): - _data = str(self.revision) + str(self.timestamp) + self.data - _data = _data.encode() - return valid(self.user_id, _data, self.sig.encode()) - - @classmethod - def _rebuild(cls): - for c in cls.query.filter_by(user_id=settings.USER_ID): - _data = str(c.revision) + str(c.timestamp) + c.data - _data = _data.encode() - c.sig = settings.sk.sign(_data, encoding='base64') - state.db.session.add(c) - state.db.session.commit() - def json(self): timestamp = self.timestamp or datetime2ts(self.created) - sig = self.sig - if isinstance(sig, bytes): - sig = sig.decode() - return [self.revision, timestamp, sig, self.data] + return [self.revision, timestamp, self.data] @classmethod def restore(cls, user_id, path=None): @@ -161,9 +136,9 @@ class Changelog(db.Model): if not i: i = Item.get_or_create(itemid, info) i.modified = ts2datetime(timestamp) - if user not in i.users: - i.users.append(user) - i.update() + if user not in i.users: + i.users.append(user) + i.update() return True def action_edititem(self, user, timestamp, itemid, meta): diff --git a/oml/downloads.py b/oml/downloads.py index e3469c9..2994a54 100644 --- a/oml/downloads.py +++ b/oml/downloads.py @@ -89,7 +89,7 @@ class ScrapeThread(Thread): with db.session(): while self._running: if not self.scrape_queue(): - time.sleep(10) + time.sleep(1) def join(self): self._running = False diff --git a/oml/item/models.py b/oml/item/models.py index 1cb672a..595d9b3 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -395,7 +395,6 @@ class Item(db.Model): def remove_file(self): for f in self.files.all(): path = f.fullpath() - logger.debug('remove file %s', path) if os.path.exists(path): os.unlink(path) remove_empty_folders(os.path.dirname(path)) diff --git a/oml/item/scan.py b/oml/item/scan.py index 475245f..5e5b392 100644 --- a/oml/item/scan.py +++ b/oml/item/scan.py @@ -44,7 +44,6 @@ def add_file(id, f, prefix, from_=None): user = state.user() path = f[len(prefix):] data = media.metadata(f, from_) - print(path) file = File.get_or_create(id, data, path) item = file.item if 'primaryid' in file.info: diff --git a/oml/localnodes.py b/oml/localnodes.py index fa67abf..fc5f985 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -14,23 +14,38 @@ from settings import preferences, server, USER_ID, sk import state import db import user.models +from tor_request import get_opener +import settings import logging logger = logging.getLogger('oml.localnodes') def can_connect(data): try: + opener = get_opener(data['id']) + headers = { + 'User-Agent': settings.USER_AGENT, + 'X-Node-Protocol': settings.NODE_PROTOCOL, + 'Accept-Encoding': 'gzip', + } if ':' in data['host']: - s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + url = 'https://[{host}]:{port}'.format(**data) else: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.settimeout(1) - s.connect((data['host'], data['port'])) - s.close() + url = 'https://{host}:{port}'.format(**data) + opener.addheaders = list(zip(headers.keys(), headers.values())) + opener.timeout = 1 + logger.debug('try connection %s', url) + r = opener.open(url) + version = r.headers.get('X-Node-Protocol', None) + if version != settings.NODE_PROTOCOL: + logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version) + return False + c = r.read() + logger.debug('can connect to local node') return True except: + logger.debug('can_connect failed', exc_info=1) pass - logger.debug('can_connect failed') return False class LocalNodesBase(Thread): @@ -53,13 +68,12 @@ class LocalNodesBase(Thread): self.host = self.get_ip() if self.host: message = json.dumps({ + 'id': USER_ID, 'username': preferences.get('username', 'anonymous'), 'host': self.host, - 'port': server['node_port'], - 'cert': server['cert'] + 'port': server['node_port'] }) - sig = sk.sign(message.encode(), encoding='base64').decode() - packet = json.dumps([sig, USER_ID, message]).encode() + packet = message.encode() else: packet = None return packet @@ -100,18 +114,13 @@ class LocalNodesBase(Thread): def verify(self, data): try: - packet = json.loads(data.decode()) + message = json.loads(data.decode()) except: return None - if len(packet) == 3: - sig, user_id, data = packet - if valid(user_id, data, sig): - message = json.loads(data) - message['id'] = user_id - for key in ['id', 'username', 'host', 'port', 'cert']: - if key not in message: - return None - return message + for key in ['id', 'username', 'host', 'port']: + if key not in message: + return None + return message def update_node(self, data): #fixme use local link address @@ -233,7 +242,7 @@ class LocalNodes(object): if not server['localnode_discovery']: return self._nodes4 = LocalNodes4(self._nodes) - self._nodes6 = LocalNodes6(self._nodes) + #self._nodes6 = LocalNodes6(self._nodes) def cleanup(self): if self._active: diff --git a/oml/node/cert.py b/oml/node/cert.py index af9c43a..64cb741 100644 --- a/oml/node/cert.py +++ b/oml/node/cert.py @@ -8,7 +8,6 @@ import OpenSSL import settings - def get_fingerprint(): with open(settings.ssl_cert_path) as fd: data = fd.read() @@ -17,7 +16,7 @@ def get_fingerprint(): def generate_ssl(): key = OpenSSL.crypto.PKey() - key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) + key.generate_key(OpenSSL.crypto.TYPE_RSA, 1024) with open(settings.ssl_key_path, 'wb') as fd: os.chmod(settings.ssl_key_path, 0o600) fd.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)) diff --git a/oml/node/server.py b/oml/node/server.py index e921a4b..6d00ee3 100644 --- a/oml/node/server.py +++ b/oml/node/server.py @@ -1,37 +1,165 @@ # -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 - -import os - -import tornado -from tornado.web import Application -from tornado.httpserver import HTTPServer -from tornado.ioloop import PeriodicCallback - -from oxtornado import run_async -from utils import valid, get_public_ipv6 -from websocket import trigger_event -from . import cert +from socketserver import ThreadingMixIn +from threading import Thread +import base64 import db -import directory +import gzip +import hashlib +import http.server +import io import json -from . import nodeapi +import os +import socket +import socketserver + +from Crypto.PublicKey import RSA +from Crypto.Util.asn1 import DerSequence +from OpenSSL.crypto import dump_privatekey, FILETYPE_ASN1 +from OpenSSL.SSL import ( + Context, Connection, TLSv1_2_METHOD, + VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE +) + import settings import state import user +from . import nodeapi +from .sslsocket import fileobject + import logging logger = logging.getLogger('oml.node.server') -class NodeHandler(tornado.web.RequestHandler): +def get_service_id(key): + ''' + service_id is the first half of the sha1 of the rsa public key encoded in base32 + ''' + # compute sha1 of public key and encode first half in base32 + pub_der = DerSequence() + pub_der.decode(dump_privatekey(FILETYPE_ASN1, key)) + public_key = RSA.construct((pub_der._seq[1], pub_der._seq[2])).exportKey('DER')[22:] + service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode() + return service_id - def initialize(self): - pass +class TLSTCPServer(socketserver.TCPServer): - @tornado.web.asynchronous - @tornado.gen.coroutine - def post(self): + def _accept(self, connection, x509, errnum, errdepth, ok): + # client_id is validated in request + return True + + def __init__(self, server_address, HandlerClass, bind_and_activate=True): + socketserver.TCPServer.__init__(self, server_address, HandlerClass) + ctx = Context(TLSv1_2_METHOD) + ctx.use_privatekey_file (settings.ssl_key_path) + ctx.use_certificate_file(settings.ssl_cert_path) + # only allow clients with cert: + ctx.set_verify(VERIFY_PEER | VERIFY_CLIENT_ONCE | VERIFY_FAIL_IF_NO_PEER_CERT, self._accept) + #ctx.set_verify(VERIFY_PEER | VERIFY_CLIENT_ONCE, self._accept) + self.socket = Connection(ctx, socket.socket(self.address_family, self.socket_type)) + if bind_and_activate: + self.server_bind() + self.server_activate() + + def shutdown_request(self,request): + try: + request.shutdown() + except: + pass + +class NodeServer(ThreadingMixIn, TLSTCPServer): + allow_reuse_address = True + + +def api_call(action, user_id, args): + with db.session(): + u = user.models.User.get(user_id) + if action in ( + 'requestPeering', 'acceptPeering', 'rejectPeering', 'removePeering' + ) or (u and u.peered): + content = getattr(nodeapi, 'api_' + action)(user_id, *args) + else: + if u and u.pending: + logger.debug('ignore request from pending peer[%s] %s (%s)', + user_id, action, args) + content = {} + else: + content = None + return content + +class Handler(http.server.SimpleHTTPRequestHandler): + + def setup(self): + self.connection = self.request + self.rfile = fileobject(self.connection, 'rb', self.rbufsize) + self.wfile = fileobject(self.connection, 'wb', self.wbufsize) + + def version_string(self): + return settings.USER_AGENT + + def do_HEAD(self): + return self.do_GET() + + def do_GET(self): + import item.models + id = self.path.split('/')[-1] if self.path.startswith('/get/') else None + if id and len(id) == 32 and id.isalnum(): + with db.session(): + i = item.models.Item.get(id) + if not i: + self.send_response(404, 'Not Found') + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(b'404 - Not Found') + return + path = i.get_path() + mimetype = { + 'epub': 'application/epub+zip', + 'pdf': 'application/pdf', + 'txt': 'text/plain', + }.get(path.split('.')[-1], None) + self.send_response(200, 'OK') + self.send_header('Content-Type', mimetype) + self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL) + self.send_header('Content-Length', str(os.path.getsize(path))) + self.end_headers() + logger.debug('GET file %s', id) + with open(path, 'rb') as f: + while 1: + data = f.read(16384) + if not data: + break + self.wfile.write(data) + else: + self.send_response(200, 'OK') + self.send_header('Content-type', 'text/plain') + self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL) + self.end_headers() + self.wfile.write('Open Media Library\n'.encode()) + + def gzip_data(self, data): + encoding = self.headers.get('Accept-Encoding') + if encoding.find('gzip') != -1: + self.send_header('Content-Encoding', 'gzip') + bytes_io = io.BytesIO() + gzip_file = gzip.GzipFile(fileobj=bytes_io, mode='wb') + gzip_file.write(data) + gzip_file.close() + result = bytes_io.getvalue() + bytes_io.close() + return result + else: + return data + + def gunzip_data(self, data): + bytes_io = io.BytesIO(data) + gzip_file = gzip.GzipFile(fileobj=bytes_io, mode='rb') + result = gzip_file.read() + gzip_file.close() + return result + + def do_POST(self): ''' API pullChanges [userid] from [to] @@ -43,141 +171,85 @@ class NodeHandler(tornado.web.RequestHandler): ping responds public ip ''' - key = str(self.request.headers['X-Ed25519-Key']) - sig = str(self.request.headers['X-Ed25519-Signature']) - data = self.request.body - content = {} + x509 = self.connection.get_peer_certificate() + user_id = get_service_id(x509.get_pubkey()) if x509 else None - self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL) - if self.request.headers.get('X-Node-Protocol', None) > settings.NODE_PROTOCOL: + content = {} + try: + content_len = int(self.headers.get('content-length', 0)) + data = self.rfile.read(content_len) + if self.headers.get('Content-Encoding') == 'gzip': + data = self.gunzip_data(data) + except: + logger.debug('invalid request', exc_info=1) + response_status = (500, 'invalid request') + self.write_response(response_status, content) + return + + response_status = (200, 'OK') + if self.headers.get('X-Node-Protocol', '') > settings.NODE_PROTOCOL: state.update_required = True - if self.request.headers.get('X-Node-Protocol', None) != settings.NODE_PROTOCOL: + if self.headers.get('X-Node-Protocol', '') != settings.NODE_PROTOCOL: + logger.debug('protocol missmatch %s vs %s', + self.headers.get('X-Node-Protocol', ''), settings.NODE_PROTOCOL) + logger.debug('headers %s', self.headers) content = settings.release else: - if valid(key, data, sig): + try: action, args = json.loads(data.decode('utf-8')) - logger.debug('NODE action %s %s (%s)', action, args, key) - if action == 'ping': - content = { - 'ip': self.request.remote_addr - } - else: - content = yield tornado.gen.Task(api_call, action, key, args) - if content is None: - content = {'status': 'not peered'} - logger.debug('PEER %s IS UNKNOWN SEND 403', key) - self.set_status(403) - content = json.dumps(content).encode('utf-8') - sig = settings.sk.sign(content, encoding='base64') - self.set_header('X-Ed25519-Signature', sig) - self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL) - self.write(content) - - def get(self): - self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL) - if self.request.headers.get('X-Node-Protocol', None) > settings.NODE_PROTOCOL: - state.update_required = True - self.write('Open Media Library') - -@run_async -def api_call(action, key, args, callback): - with db.session(): - u = user.models.User.get(key) - if action in ( - 'requestPeering', 'acceptPeering', 'rejectPeering', 'removePeering' - ) or (u and u.peered): - content = getattr(nodeapi, 'api_' + action)(key, *args) - else: - if u and u.pending: - logger.debug('ignore request from pending peer[%s] %s (%s)', key, action, args) - content = {} - else: - content = None - callback(content) - -class ShareHandler(tornado.web.RequestHandler): - - def initialize(self): - pass - - def get(self, id): - import item.models - with db.session(): - i = item.models.Item.get(id) - if not i: - self.set_status(404) + except: + logger.debug('invalid data: %s', data, exc_info=1) + response_status = (500, 'invalid request') + content = { + 'status': 'invalid request' + } + self.write_response(response_status, content) return - path = i.get_path() - mimetype = { - 'epub': 'application/epub+zip', - 'pdf': 'application/pdf', - 'txt': 'text/plain', - }.get(path.split('.')[-1], None) - self.set_header('Content-Type', mimetype) - logger.debug('GET file %s', id) - with open(path, 'rb') as f: - while 1: - data = f.read(16384) - if not data: - break - self.write(data) + logger.debug('NODE action %s %s (%s)', action, args, user_id) + if action == 'ping': + content = { + 'status': 'ok' + } + else: + content = api_call(action, user_id, args) + if content is None: + content = {'status': 'not peered'} + logger.debug('PEER %s IS UNKNOWN SEND 403', user_id) + response_status = (403, 'UNKNOWN USER') + content = {} + else: + logger.debug('RESPONSE %s: %s', action, content) + self.write_response(response_status, content) -def publish_node(): - update_online() - if state.online: - with db.session(): - for u in user.models.User.query.filter_by(queued=True): - logger.debug('adding queued node... %s', u.id) - state.nodes.queue('add', u.id) - state.check_nodes = PeriodicCallback(check_nodes, 120000) - state.check_nodes.start() - state._online = PeriodicCallback(update_online, 60000) - state._online.start() + def write_response(self, response_status, content): + self.send_response(*response_status) + self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL) + self.send_header('Content-Type', 'application/json') + content = json.dumps(content, ensure_ascii=False).encode('utf-8') + content = self.gzip_data(content) + self.send_header('Content-Length', str(len(content))) + self.end_headers() + self.wfile.write(content) -def update_online(): - host = get_public_ipv6() - if not host: - if state.online: - state.online = False - trigger_event('status', { - 'id': settings.USER_ID, - 'online': state.online - }) - else: - if host != state.host: - state.host = host - online = directory.put(settings.sk, { - 'host': host, - 'port': settings.server['node_port'], - 'cert': settings.server['cert'] - }) - if online != state.online: - state.online = online - trigger_event('status', { - 'id': settings.USER_ID, - 'online': state.online - }) +class Server(Thread): + http_server = None -def check_nodes(): - if state.online: - with db.session(): - for u in user.models.User.query.filter_by(queued=True): - if not state.nodes.is_online(u.id): - logger.debug('queued peering message for %s trying to connect...', u.id) - state.nodes.queue('add', u.id) + def __init__(self): + Thread.__init__(self) + address = (settings.server['node_address'], settings.server['node_port']) + self.http_server = NodeServer(address, Handler) + self.daemon = True + self.start() + + def run(self): + self.http_server.serve_forever() + + def stop(self): + if self.http_server: + self.http_server.shutdown() + self.http_server.socket.close() + return Thread.join(self) def start(): - application = Application([ - (r"/get/(.*)", ShareHandler), - (r".*", NodeHandler), - ], gzip=True) - if not os.path.exists(settings.ssl_cert_path): - settings.server['cert'] = cert.generate_ssl() + return Server() - http_server = HTTPServer(application, ssl_options={ - "certfile": settings.ssl_cert_path, - "keyfile": settings.ssl_key_path - }) - http_server.listen(settings.server['node_port'], settings.server['node_address']) - state.main.add_callback(publish_node) - return http_server diff --git a/oml/node/sslsocket.py b/oml/node/sslsocket.py new file mode 100644 index 0000000..65a03d4 --- /dev/null +++ b/oml/node/sslsocket.py @@ -0,0 +1,305 @@ +from io import BytesIO +from socket import error +from errno import EINTR + +# Based on socket._fileobject from python2.7 +class fileobject(object): + """Faux file object attached to a socket object.""" + + default_bufsize = 8192 + name = "" + + __slots__ = ["mode", "bufsize", "softspace", + # "closed" is a property, see below + "_sock", "_rbufsize", "_wbufsize", "_rbuf", "_wbuf", "_wbuf_len", + "_close"] + + def __init__(self, sock, mode='rb', bufsize=-1, close=False): + self._sock = sock + self.mode = mode # Not actually used in this version + if bufsize < 0: + bufsize = self.default_bufsize + self.bufsize = bufsize + self.softspace = False + # _rbufsize is the suggested recv buffer size. It is *strictly* + # obeyed within readline() for recv calls. If it is larger than + # default_bufsize it will be used for recv calls within read(). + if bufsize == 0: + self._rbufsize = 1 + elif bufsize == 1: + self._rbufsize = self.default_bufsize + else: + self._rbufsize = bufsize + self._wbufsize = bufsize + # We use BytesIO for the read buffer to avoid holding a list + # of variously sized string objects which have been known to + # fragment the heap due to how they are malloc()ed and often + # realloc()ed down much smaller than their original allocation. + self._rbuf = BytesIO() + self._wbuf = [] # A list of strings + self._wbuf_len = 0 + self._close = close + + def _getclosed(self): + return self._sock is None + closed = property(_getclosed, doc="True if the file is closed") + + def close(self): + try: + if self._sock: + self.flush() + finally: + if self._close: + self._sock.close() + self._sock = None + + def __del__(self): + try: + self.close() + except: + # close() may fail if __init__ didn't complete + pass + + def flush(self): + if self._wbuf: + data = b"".join(self._wbuf) + self._wbuf = [] + self._wbuf_len = 0 + buffer_size = max(self._rbufsize, self.default_bufsize) + data_size = len(data) + write_offset = 0 + view = memoryview(data) + try: + while write_offset < data_size: + self._sock.sendall(view[write_offset:write_offset+buffer_size]) + write_offset += buffer_size + finally: + if write_offset < data_size: + remainder = data[write_offset:] + del view, data # explicit free + self._wbuf.append(remainder) + self._wbuf_len = len(remainder) + + def fileno(self): + return self._sock.fileno() + + def write(self, data): + data = bytes(data) # XXX Should really reject non-string non-buffers + if not data: + return + self._wbuf.append(data) + self._wbuf_len += len(data) + if (self._wbufsize == 0 or + (self._wbufsize == 1 and b'\n' in data) or + (self._wbufsize > 1 and self._wbuf_len >= self._wbufsize)): + self.flush() + + def writelines(self, list): + # XXX We could do better here for very long lists + # XXX Should really reject non-string non-buffers + lines = filter(None, map(bytes, list)) + self._wbuf_len += sum(map(len, lines)) + self._wbuf.extend(lines) + if (self._wbufsize <= 1 or + self._wbuf_len >= self._wbufsize): + self.flush() + + def read(self, size=-1): + # Use max, disallow tiny reads in a loop as they are very inefficient. + # We never leave read() with any leftover data from a new recv() call + # in our internal buffer. + rbufsize = max(self._rbufsize, self.default_bufsize) + # Our use of BytesIO rather than lists of string objects returned by + # recv() minimizes memory usage and fragmentation that occurs when + # rbufsize is large compared to the typical return value of recv(). + buf = self._rbuf + buf.seek(0, 2) # seek end + if size < 0: + # Read until EOF + self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. + while True: + try: + data = self._sock.recv(rbufsize) + except error as e: + if e.args[0] == EINTR: + continue + raise + if not data: + break + buf.write(data) + return buf.getvalue() + else: + # Read until size bytes or EOF seen, whichever comes first + buf_len = buf.tell() + if buf_len >= size: + # Already have size bytes in our buffer? Extract and return. + buf.seek(0) + rv = buf.read(size) + self._rbuf = BytesIO() + self._rbuf.write(buf.read()) + return rv + + self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. + while True: + left = size - buf_len + # recv() will malloc the amount of memory given as its + # parameter even though it often returns much less data + # than that. The returned data string is short lived + # as we copy it into a BytesIO and free it. This avoids + # fragmentation issues on many platforms. + try: + data = self._sock.recv(left) + except error as e: + if e.args[0] == EINTR: + continue + raise + if not data: + break + n = len(data) + if n == size and not buf_len: + # Shortcut. Avoid buffer data copies when: + # - We have no data in our buffer. + # AND + # - Our call to recv returned exactly the + # number of bytes we were asked to read. + return data + if n == left: + buf.write(data) + del data # explicit free + break + assert n <= left, "recv(%d) returned %d bytes" % (left, n) + buf.write(data) + buf_len += n + del data # explicit free + #assert buf_len == buf.tell() + return buf.getvalue() + + def readline(self, size=-1): + buf = self._rbuf + buf.seek(0, 2) # seek end + if buf.tell() > 0: + # check if we already have it in our buffer + buf.seek(0) + bline = buf.readline(size) + if bline.endswith(b'\n') or len(bline) == size: + self._rbuf = BytesIO() + self._rbuf.write(buf.read()) + return bline + del bline + if size < 0: + # Read until \n or EOF, whichever comes first + if self._rbufsize <= 1: + # Speed up unbuffered case + buf.seek(0) + buffers = [buf.read()] + self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. + data = None + recv = self._sock.recv + while True: + try: + while data != b"\n": + data = recv(1) + if not data: + break + buffers.append(data) + except error as e: + # The try..except to catch EINTR was moved outside the + # recv loop to avoid the per byte overhead. + if e.args[0] == EINTR: + continue + raise + break + return "".join(buffers) + + buf.seek(0, 2) # seek end + self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. + while True: + try: + data = self._sock.recv(self._rbufsize) + except error as e: + if e.args[0] == EINTR: + continue + raise + if not data: + break + nl = data.find(b'\n') + if nl >= 0: + nl += 1 + buf.write(data[:nl]) + self._rbuf.write(data[nl:]) + del data + break + buf.write(data) + return buf.getvalue() + else: + # Read until size bytes or \n or EOF seen, whichever comes first + buf.seek(0, 2) # seek end + buf_len = buf.tell() + if buf_len >= size: + buf.seek(0) + rv = buf.read(size) + self._rbuf = BytesIO() + self._rbuf.write(buf.read()) + return rv + self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. + while True: + try: + data = self._sock.recv(self._rbufsize) + except error as e: + if e.args[0] == EINTR: + continue + raise + if not data: + break + left = size - buf_len + # did we just receive a newline? + nl = data.find(b'\n', 0, left) + if nl >= 0: + nl += 1 + # save the excess data to _rbuf + self._rbuf.write(data[nl:]) + if buf_len: + buf.write(data[:nl]) + break + else: + # Shortcut. Avoid data copy through buf when returning + # a substring of our first recv(). + return data[:nl] + n = len(data) + if n == size and not buf_len: + # Shortcut. Avoid data copy through buf when + # returning exactly all of our first recv(). + return data + if n >= left: + buf.write(data[:left]) + self._rbuf.write(data[left:]) + break + buf.write(data) + buf_len += n + #assert buf_len == buf.tell() + return buf.getvalue() + + def readlines(self, sizehint=0): + total = 0 + list = [] + while True: + line = self.readline() + if not line: + break + list.append(line) + total += len(line) + if sizehint and total >= sizehint: + break + return list + + # Iterator protocols + + def __iter__(self): + return self + + def next(self): + line = self.readline() + if not line: + raise StopIteration + return line + diff --git a/oml/nodes.py b/oml/nodes.py index 3d18df0..2550d6d 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -24,7 +24,7 @@ from changelog import Changelog import directory from websocket import trigger_event from localnodes import LocalNodes -from ssl_request import get_opener +from tor_request import get_opener import state import db @@ -35,7 +35,6 @@ ENCODING='base64' class Node(Thread): _running = True - _cert = None host = None online = False download_speed = 0 @@ -44,8 +43,7 @@ class Node(Thread): def __init__(self, nodes, user): self._nodes = nodes self.user_id = user.id - key = user.id.encode() - self.vk = ed25519.VerifyingKey(key, encoding=ENCODING) + self._opener = get_opener(self.user_id) logger.debug('new Node %s online=%s', self.user_id, self.online) self._q = Queue() Thread.__init__(self) @@ -78,65 +76,50 @@ class Node(Thread): @property def url(self): - if self.host: - if ':' in self.host: - url = 'https://[%s]:%s' % (self.host, self.port) + if self.local: + if ':' in self.local: + url = 'https://[%s]:%s' % (self.local, self.port) else: - url = 'https://%s:%s' % (self.host, self.port) + url = 'https://%s:%s' % (self.local, self.port) else: - url = None + url = 'https://%s.onion:9851' % self.user_id return url def resolve(self): logger.debug('resolve node') r = self.get_local() - if not r: - try: - r = directory.get(self.vk) - except: - logger.debug('directory failed', exc_info=1) - r = None if r: - self.host = r['host'] + self.local = r['host'] if 'port' in r: self.port = r['port'] - if r['cert'] != self._cert: - self._cert = r['cert'] - self._opener = get_opener(self._cert) else: - self.host = None + self.local = None self.port = 9851 def get_local(self): if self._nodes and self._nodes._local: - local = self._nodes._local.get(self.user_id) - if local and local['cert'] != self._cert: - self._cert = local['cert'] - self._opener = get_opener(self._cert) - return local + return self._nodes._local.get(self.user_id) return None def request(self, action, *args): - url = self.url - if not url: - self.resolve() + logger.debug('request %s%s', action, args) + self.resolve() url = self.url if not self.url: logger.debug('unable to find host %s', self.user_id) self.online = False return None + logger.debug('url=%s', url) content = json.dumps([action, args]).encode() - sig = settings.sk.sign(content, encoding=ENCODING).decode() + #sig = settings.sk.sign(content, encoding=ENCODING).decode() headers = { 'User-Agent': settings.USER_AGENT, 'X-Node-Protocol': settings.NODE_PROTOCOL, 'Accept': 'text/plain', 'Accept-Encoding': 'gzip', 'Content-Type': 'application/json', - 'X-Ed25519-Key': settings.USER_ID, - 'X-Ed25519-Signature': sig, } - self._opener.addheaders = list(zip(list(headers.keys()), list(headers.values()))) + self._opener.addheaders = list(zip(headers.keys(), headers.values())) logger.debug('headers: %s', self._opener.addheaders) try: self._opener.timeout = self.TIMEOUT @@ -173,12 +156,15 @@ class Node(Thread): state.update_required = True return None - sig = r.headers.get('X-Ed25519-Signature') + ''' + sig = r.headers.get('X-Node-Signature') if sig and self._valid(data, sig): response = json.loads(data.decode('utf-8')) else: logger.debug('invalid signature %s', data) response = None + ''' + response = json.loads(data.decode('utf-8')) logger.debug('response: %s', response) return response @@ -206,7 +192,7 @@ class Node(Thread): 'X-Node-Protocol': settings.NODE_PROTOCOL, 'Accept-Encoding': 'gzip', } - self._opener.addheaders = list(zip(list(headers.keys()), list(headers.values()))) + self._opener.addheaders = list(zip(headers.keys(), headers.values())) self._opener.timeout = 1 r = self._opener.open(url) version = r.headers.get('X-Node-Protocol', None) @@ -217,19 +203,19 @@ class Node(Thread): logger.debug('can connect to: %s (%s)', url, self.user.nickname) return True except: - logger.debug('can not connect to: %s (%s)', url, self.user.nickname) + logger.debug('can not connect to: %s (%s)', url, self.user.nickname, exc_info=1) pass return False def _go_online(self): self.resolve() u = self.user - if (u.peered or u.queued) and self.host: - logger.debug('go_online peered=%s queued=%s %s [%s]:%s (%s)', u.peered, u.queued, u.id, self.host, self.port, u.nickname) + if u.peered or u.queued: + logger.debug('go_online peered=%s queued=%s %s [%s]:%s (%s)', u.peered, u.queued, u.id, self.local, self.port, u.nickname) try: self.online = False if self.can_connect(): - logger.debug('connected to [%s]:%s', self.host, self.port) + logger.debug('connected to %s', self.url) self.online = True if u.queued: logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered) @@ -299,11 +285,12 @@ class Node(Thread): from item.models import Transfer url = '%s/get/%s' % (self.url, item.id) headers = { + 'X-Node-Protocol': settings.NODE_PROTOCOL, 'User-Agent': settings.USER_AGENT, } t1 = datetime.utcnow() logger.debug('download %s', url) - self._opener.addheaders = zip(headers.keys(), headers.values()) + self._opener.addheaders = list(zip(headers.keys(), headers.values())) try: r = self._opener.open(url, timeout=self.TIMEOUT*2) except: @@ -352,7 +339,7 @@ class Node(Thread): headers = { 'User-Agent': settings.USER_AGENT, } - self._opener.addheaders = list(zip(list(headers.keys()), list(headers.values()))) + self._opener.addheaders = list(zip(headers.keys(), headers.values())) r = self._opener.open(url) if r.getcode() == 200: with open(path, 'w') as fd: @@ -379,7 +366,7 @@ class Nodes(Thread): self.start() def cleanup(self): - if self._running: + if self._running and self._local: self._local.cleanup() def queue(self, *args): @@ -401,7 +388,8 @@ class Nodes(Thread): else: nodes = [self._nodes[target]] for node in nodes: - getattr(node, action)(*args) + r = getattr(node, action)(*args) + logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r) def _add(self, user_id): if user_id not in self._nodes: @@ -428,5 +416,30 @@ class Nodes(Thread): self._q.put(None) for node in list(self._nodes.values()): node.join() - self._local.join() + if self._local: + self._local.join() return Thread.join(self) + +def publish_node(): + update_online() + state.check_nodes = PeriodicCallback(check_nodes, 120000) + state.check_nodes.start() + state._online = PeriodicCallback(update_online, 60000) + state._online.start() + +def update_online(): + online = state.tor and state.tor.is_online() + if online != state.online: + state.online = online + trigger_event('status', { + 'id': settings.USER_ID, + 'online': state.online + }) + +def check_nodes(): + if state.online: + with db.session(): + for u in user.models.User.query.filter_by(queued=True): + if not state.nodes.is_online(u.id): + logger.debug('queued peering message for %s trying to connect...', u.id) + state.nodes.queue('add', u.id) diff --git a/oml/server.py b/oml/server.py index 3c78a6b..7a45b7a 100644 --- a/oml/server.py +++ b/oml/server.py @@ -96,14 +96,23 @@ def run(): import user import downloads import nodes + import tor + state.tor = tor.Tor() state.node = node.server.start() - state.nodes = nodes.Nodes() state.downloads = downloads.Downloads() state.scraping = downloads.ScrapeThread() + state.nodes = nodes.Nodes() def add_users(): - with db.session(): - for p in user.models.User.query.filter_by(peered=True): - state.nodes.queue('add', p.id) + if not state.tor.is_online(): + state.main.add_callback(add_users) + else: + with db.session(): + for u in user.models.User.query.filter_by(peered=True): + state.nodes.queue('add', u.id) + for u in user.models.User.query.filter_by(queued=True): + logger.debug('adding queued node... %s', u.id) + state.nodes.queue('add', u.id) + nodes.publish_node() state.main.add_callback(add_users) state.main.add_callback(start_node) if ':' in settings.server['address']: @@ -117,6 +126,8 @@ def run(): logger.debug('Starting OML %s at %s', settings.VERSION, url) def shutdown(): + if state.tor: + state.tor._shutdown = True if state.downloads: logger.debug('shutdown downloads') state.downloads.join() @@ -131,6 +142,11 @@ def run(): if state.nodes: logger.debug('shutdown nodes') state.nodes.join() + if state.node: + state.node.stop() + if state.tor: + logger.debug('shutdown tor') + state.tor.shutdown() if PID and os.path.exists(PID): logger.debug('remove %s', PID) os.unlink(PID) diff --git a/oml/settings.py b/oml/settings.py index eb034e4..e402b08 100644 --- a/oml/settings.py +++ b/oml/settings.py @@ -6,6 +6,7 @@ import os import ed25519 from pdict import pdict +from utils import get_user_id base_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')) static_path = os.path.join(base_dir, 'static') @@ -22,7 +23,7 @@ log_path = os.path.join(config_path, 'debug.log') icons_db_path = os.path.join(config_path, 'icons.db') key_path = os.path.join(config_path, 'node.key') ssl_cert_path = os.path.join(config_path, 'node.ssl.crt') -ssl_key_path = os.path.join(config_path, 'node.ssl.key') +ssl_key_path = os.path.join(config_path, 'tor', 'private_key') if os.path.exists(oml_config_path): @@ -64,7 +65,9 @@ else: fd.write(sk.to_bytes()) os.chmod(key_path, 0o400) -USER_ID = vk.to_ascii(encoding='base64').decode() +USER_ID = get_user_id(ssl_key_path, ssl_cert_path) +OLD_USER_ID = vk.to_ascii(encoding='base64').decode() + OML_UPDATE_KEY='K55EZpPYbP3X+3mA66cztlw1sSaUMqGwfTDKQyP2qOU' if 'modules' in release and 'openmedialibrary' in release['modules']: @@ -72,7 +75,7 @@ if 'modules' in release and 'openmedialibrary' in release['modules']: else: MINOR_VERSION = 'git' -NODE_PROTOCOL="0.1" +NODE_PROTOCOL="0.2" VERSION="%s.%s" % (NODE_PROTOCOL, MINOR_VERSION) diff --git a/oml/setup.py b/oml/setup.py index c830094..7828d65 100644 --- a/oml/setup.py +++ b/oml/setup.py @@ -200,6 +200,7 @@ def upgrade_db(old, new=None): if old <= '20140526-118-d451eb3' and new > '20140526-118-d451eb3': import item.models item.models.Find.query.filter_by(key='list').delete() + if old <= '20140527-120-3cb9819': run_sql('CREATE INDEX ix_find_findvalue ON find (findvalue)') @@ -211,6 +212,18 @@ def upgrade_db(old, new=None): FOREIGN KEY(item_id) REFERENCES item (id) )''') run_sql('CREATE INDEX idx_scrape_added ON scrape (added)') + if old <= '20151118-346-7e86e68': + old_key = os.path.join(settings.config_path, 'node.ssl.key') + if os.path.exists(old_key): + os.unlink(old_key) + statements = [ + "UPDATE user SET id = '{nid}' WHERE id = '{oid}'", + "UPDATE list SET user_id = '{nid}' WHERE user_id = '{oid}'", + "UPDATE useritem SET user_id = '{nid}' WHERE user_id = '{oid}'", + "UPDATE changelog SET user_id = '{nid}' WHERE user_id = '{oid}'", + ] + for sql in statements: + run_sql(sql.format(oid=settings.OLD_USER_ID, nid=settings.USER_ID)) def create_default_lists(user_id=None): with db.session(): diff --git a/oml/ssl_request.py b/oml/ssl_request.py index 0bb2a06..664b9c7 100644 --- a/oml/ssl_request.py +++ b/oml/ssl_request.py @@ -6,24 +6,34 @@ import http.client import urllib.request, urllib.error, urllib.parse import hashlib import logging +import base64 +from OpenSSL import crypto + logger = logging.getLogger('oml.ssl_request') +def get_service_id(cert): + # compute sha1 of public key and encode first half in base32 + key = crypto.load_certificate(crypto.FILETYPE_ASN1, cert).get_pubkey() + public_key = crypto.dump_privatekey(crypto.FILETYPE_ASN1, key)[22:] + service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower() + return service_id + class InvalidCertificateException(http.client.HTTPException, urllib.error.URLError): - def __init__(self, fingerprint, cert, reason): + def __init__(self, service_id, cert, reason): http.client.HTTPException.__init__(self) - self._fingerprint = fingerprint - self._cert_fingerprint = hashlib.sha1(cert).hexdigest() + self._service_id = service_id + self._cert_service_id = get_service_id(cert) self.reason = reason def __str__(self): return ('%s (local) != %s (remote) (%s)\n' % - (self._fingerprint, self._cert_fingerprint, self.reason)) + (self._service_id, self._cert_service_id, self.reason)) -class FingerprintHTTPSConnection(http.client.HTTPSConnection): +class ServiceIdHTTPSConnection(http.client.HTTPSConnection): - def __init__(self, host, port=None, fingerprint=None, check_hostname=None, context=None, **kwargs): - self._fingerprint = fingerprint - if self._fingerprint: + def __init__(self, host, port=None, service_id=None, check_hostname=None, context=None, **kwargs): + self._service_id = service_id + if self._service_id: check_hostname = False # dont fial for older verions of python # without ssl._create_default_https_context @@ -37,45 +47,36 @@ class FingerprintHTTPSConnection(http.client.HTTPSConnection): http.client.HTTPSConnection.__init__(self, host, port, check_hostname=check_hostname, context=context, **kwargs) - def _check_fingerprint(self, cert): - if len(self._fingerprint) == 40: - fingerprint = hashlib.sha1(cert).hexdigest() - elif len(self._fingerprint) == 64: - fingerprint = hashlib.sha256(cert).hexdigest() - elif len(self._fingerprint) == 128: - fingerprint = hashlib.sha512(cert).hexdigest() - else: - logging.error('unkown _fingerprint length %s (%s)', - self._fingerprint, len(self._fingerprint)) - return False - logger.debug('ssl fingerprint: %s (match: %s)', fingerprint, fingerprint == self._fingerprint) - if fingerprint != self._fingerprint: - logger.debug('expected fingerprint: %s', self._fingerprint) - return fingerprint == self._fingerprint + def _check_service_id(self, cert): + service_id = get_service_id(cert) + logger.debug('ssl service_id: %s (match: %s)', service_id, service_id == self._service_id) + if service_id != self._service_id: + logger.debug('expected service_id: %s', self._service_id) + return service_id == self._service_id def connect(self): http.client.HTTPSConnection.connect(self) - if self._fingerprint: + if self._service_id: cert = self.sock.getpeercert(binary_form=True) - if not self._check_fingerprint(cert): - raise InvalidCertificateException(self._fingerprint, cert, - 'fingerprint mismatch') + if not self._check_service_id(cert): + raise InvalidCertificateException(self._service_id, cert, + 'service_id mismatch') #logger.debug('CIPHER %s VERSION %s', self.sock.cipher(), self.sock.ssl_version) -class FingerprintHTTPSHandler(urllib.request.HTTPSHandler): +class ServiceIdHTTPSHandler(urllib.request.HTTPSHandler): - def __init__(self, debuglevel=0, context=None, check_hostname=None, fingerprint=None): + def __init__(self, debuglevel=0, context=None, check_hostname=None, service_id=None): urllib.request.AbstractHTTPHandler.__init__(self, debuglevel) self._context = context self._check_hostname = check_hostname - self._fingerprint = fingerprint + self._service_id = service_id def https_open(self, req): - return self.do_open(FingerprintHTTPSConnection, req, + return self.do_open(ServiceIdHTTPSConnection, req, context=self._context, check_hostname=self._check_hostname, - fingerprint=self._fingerprint) + service_id=self._service_id) -def get_opener(fingerprint): - handler = FingerprintHTTPSHandler(fingerprint=fingerprint) +def get_opener(service_id): + handler = ServiceIdHTTPSHandler(service_id=service_id) opener = urllib.request.build_opener(handler) return opener diff --git a/oml/state.py b/oml/state.py index d1a2b06..3181842 100644 --- a/oml/state.py +++ b/oml/state.py @@ -1,9 +1,10 @@ -websockets = [] -nodes = False -tasks = False -main = None -online = False host = None +main = None +nodes = False +online = False +tasks = False +tor = False +websockets = [] activity = {} diff --git a/oml/tor.py b/oml/tor.py new file mode 100644 index 0000000..a3bdb89 --- /dev/null +++ b/oml/tor.py @@ -0,0 +1,196 @@ +import os +import subprocess +from threading import Thread +import distutils + +import stem +from stem.control import Controller +import settings + +import logging +import state +import time + +logger = logging.getLogger('oml.tor') + +class TorDaemon(Thread): + def __init__(self): + self._status = [] + Thread.__init__(self) + self.daemon = True + self.start() + + def create_torrc(self): + defaults = os.path.join(settings.config_path, 'torrc-defaults') + torrc = os.path.join(settings.config_path, 'torrc') + if not os.path.exists(defaults): + with open(defaults, 'w') as fd: + fd.write(''' +AvoidDiskWrites 1 +# Where to send logging messages. Format is minSeverity[-maxSeverity] +# (stderr|stdout|syslog|file FILENAME). +Log notice stdout +SocksPort 9830 +ControlPort 9831 +CookieAuthentication 1 + '''.strip()) + if not os.path.exists(torrc): + with open(torrc, 'w') as fd: + fd.write(''' +DataDirectory {base}/TorData +DirReqStatistics 0 + '''.strip().format(base=settings.config_path)) + return defaults, torrc + + def get_tor(self): + def cmd_exists(cmd): + return subprocess.call("type " + cmd, shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0 + for path in ( + '/Applications/TorBrowser.app/TorBrowser/Tor/tor', + ): + if os.path.isfile(path) and os.access(path, os.X_OK): + return path + start = os.path.expanduser('~/.local/share/applications/start-tor-browser.desktop') + if os.path.exists(start): + with open(start) as fd: + e = [line for line in fd.read().split('\n') if line.startswith('Exec')] + if e: + try: + base = os.path.dirname(e[0].split('"')[1]) + path = os.path.join(base, 'TorBrowser', 'Tor', 'tor') + if os.path.isfile(path) and os.access(path, os.X_OK): + return path + except: + pass + return distutils.spawn.find_executable('tor') + + def run(self): + defaults, torrc = self.create_torrc() + tor = self.get_tor() + if not tor: + self._status.append('No tor binary found. Please install TorBrowser or tor') + else: + cmd = [tor, '--defaults-torrc', defaults, '-f', torrc] + self.p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True) + for line in self.p.stdout: + self._status.append(line) + self.p = None + + def shutdown(self): + if self.p: + self.p.kill() + + def status(self, max_lines=50): + return ''.join(self._status[-max_lines:]) + +class Tor(object): + _shutdown = False + connected = False + controller = None + daemon = None + socks_port = 9150 + + def __init__(self): + if not self.connect(): + self.reconnect() + + def connect(self): + self.connected = False + self.dir = os.path.join(settings.config_path, 'tor') + connected = False + for port in (9831, 9151): + try: + self.controller = Controller.from_port('127.0.0.1', port) + connected = True + break + except stem.SocketError: + pass + if not connected: + if not self.daemon: + logger.debug("start own tor process") + self.daemon = TorDaemon() + logger.debug("daemon %s", self.daemon) + return self.connect() + logger.debug("Failed to connect to system or own tor process.") + return False + try: + self.controller.authenticate() + except stem.connection.MissingPassword: + logger.debug("TOR requires a password") + return False + except stem.connection.PasswordAuthFailed: + logger.debug("invalid tor password") + return False + self.controller.add_event_listener(self.event_listener) + self.controller.add_status_listener(self.status_listener) + self.connected = True + self.socks_port = int(self.controller.get_conf('SocksPort').split(' ')[0]) + self.publish() + state.online = True + return True + + def reconnect(self): + if not self.connect(): + if state.main: + state.main.call_later(1, self.reconnect) + + def status_listener(self, controller, status, timestamp): + if status == 'Closed': + if not self._shutdown: + self.connected = False + state.online = False + self.reconnect() + else: + logger.debug('unknonw change %s', status) + + def event_listener(self, event): + print('EVENT', event) + + def shutdown(self): + self._shutdown = True + try: + self.unpublish() + if self.controller: + #self.controller.remove_event_listener(self.connection_change) + self.controller.close() + if self.daemon: + self.daemon.shutdown() + except: + logger.debug('shutdown exception', exc_info=1) + pass + self.connected = False + + def publish(self): + logger.debug("publish tor node") + if not self.connected: + return False + controller = self.controller + logger.debug("FIXME: dont remove/add service if already defined") + controller.remove_hidden_service(self.dir) + result = controller.create_hidden_service( + self.dir, + settings.server_defaults['node_port'], + target_port=settings.server['node_port'] + ) + logger.debug('published node as https://%s:%s', result.hostname, settings.server_defaults['node_port']) + ''' + with open(settings.ssl_key_path) as fd: + key_content = fd.read() + ports = {9851: settings.server['node_port']} + response = controller.create_ephemeral_hidden_service(ports, + key_type='RSA1024', key_content=key_content, + detached=True, await_publication = True) + logger.debug('published node as https://%s.onion:%s', + settings.USER_ID, settings.server_defaults['node_port']) + ''' + + def unpublish(self): + if not self.connected: + return False + if self.controller: + self.controller.remove_hidden_service(self.dir) + state.online = False + + def is_online(self): + return self.connected and self.controller.is_alive() diff --git a/oml/tor_request.py b/oml/tor_request.py new file mode 100644 index 0000000..2ed6b2e --- /dev/null +++ b/oml/tor_request.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 + +import ssl +import http.client +import urllib.request, urllib.error, urllib.parse +import logging + +import socks +import socket + +import settings +import state +from utils import get_service_id, get_local_ipv4 + +logger = logging.getLogger('oml.tor_request') + +class InvalidCertificateException(http.client.HTTPException, urllib.error.URLError): + def __init__(self, service_id, cert, reason): + http.client.HTTPException.__init__(self) + self._service_id = service_id + self._cert_service_id = get_service_id(cert=cert) + self.reason = reason + + def __str__(self): + return ('%s (local) != %s (remote) (%s)\n' % + (self._service_id, self._cert_service_id, self.reason)) + +def is_local(host): + local_net = get_local_ipv4()[:-2] + return host.startswith('127.0.0.1') or host.startswith(local_net) + +def getaddrinfo(*args): + return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))] + +def create_tor_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, + source_address=None): + host, port = address + err = None + af = socket.AF_INET + socktype = socket.SOCK_STREAM + proto = 6 + sa = address + sock = None + try: + sock = socks.socksocket(af, socktype, proto) + if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: + sock.settimeout(timeout) + sock.set_proxy(socks.SOCKS5, "localhost", state.tor.socks_port, True) + if source_address: + sock.bind(source_address) + sock.connect(sa) + return sock + + except socket.error as _: + err = _ + if sock is not None: + sock.close() + + if err is not None: + raise err + else: + raise sock.error("getaddrinfo returns an empty list") + +class TorHTTPSConnection(http.client.HTTPSConnection): + + def __init__(self, host, port=None, service_id=None, check_hostname=None, context=None, **kwargs): + self._service_id = service_id + if self._service_id: + context = ssl._create_default_https_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + context.load_cert_chain(settings.ssl_cert_path, settings.ssl_key_path) + context.load_default_certs() + http.client.HTTPSConnection.__init__(self, host, port, + check_hostname=check_hostname, context=context, **kwargs) + + if not is_local(host): + self._create_connection = create_tor_connection + + def _check_service_id(self, cert): + service_id = get_service_id(cert=cert) + logger.debug('ssl service_id: %s (match: %s)', service_id, service_id == self._service_id) + if service_id != self._service_id: + logger.debug('expected service_id: %s', self._service_id) + return service_id == self._service_id + + def connect(self): + http.client.HTTPSConnection.connect(self) + if self._service_id: + cert = self.sock.getpeercert(binary_form=True) + if not self._check_service_id(cert): + raise InvalidCertificateException(self._service_id, cert, + 'service_id mismatch') + #logger.debug('CIPHER %s VERSION %s', self.sock.cipher(), self.sock.ssl_version) + +class TorHTTPSHandler(urllib.request.HTTPSHandler): + def __init__(self, debuglevel=0, context=None, check_hostname=None, service_id=None): + urllib.request.AbstractHTTPHandler.__init__(self, debuglevel) + self._context = context + self._check_hostname = check_hostname + self._service_id = service_id + + def https_open(self, req): + return self.do_open(TorHTTPSConnection, req, + context=self._context, check_hostname=self._check_hostname, + service_id=self._service_id) + +class TorHTTPConnection(http.client.HTTPConnection): + def __init__(self, host, port=None, **kwargs): + http.client.HTTPConnection.__init__(self, host, port, **kwargs) + if not is_local(host): + self._create_connection = create_tor_connection + +class TorHTTPHandler(urllib.request.HTTPHandler): + def http_open(self, req): + return self.do_open(TorHTTPConnection, req) + +def get_opener(service_id=None): + handler = TorHTTPSHandler(service_id=service_id) + opener = urllib.request.build_opener(handler, TorHTTPHandler()) + return opener diff --git a/oml/user/api.py b/oml/user/api.py index 13bcd13..0206e70 100644 --- a/oml/user/api.py +++ b/oml/user/api.py @@ -297,7 +297,7 @@ def requestPeering(data): nickname (optional) } ''' - if len(data.get('id', '')) != 43: + if len(data.get('id', '')) != 16: logger.debug('invalid user id') return {} u = models.User.get_or_create(data['id']) @@ -321,7 +321,7 @@ def acceptPeering(data): message } ''' - if len(data.get('id', '')) != 43: + if len(data.get('id', '')) != 16: logger.debug('invalid user id') return {} logger.debug('acceptPeering... %s', data) @@ -341,7 +341,7 @@ def rejectPeering(data): message } ''' - if len(data.get('id', '')) != 43: + if len(data.get('id', '')) != 16: logger.debug('invalid user id') return {} u = models.User.get_or_create(data['id']) @@ -360,7 +360,7 @@ def removePeering(data): message } ''' - if len(data.get('id', '')) != 43: + if len(data.get('id', '')) != 16: logger.debug('invalid user id') return {} u = models.User.get_or_create(data['id']) @@ -377,7 +377,7 @@ def cancelPeering(data): takes { } ''' - if len(data.get('id', '')) != 43: + if len(data.get('id', '')) != 16: logger.debug('invalid user id') return {} u = models.User.get_or_create(data['id']) diff --git a/oml/utils.py b/oml/utils.py index 9f9b312..521929b 100644 --- a/oml/utils.py +++ b/oml/utils.py @@ -12,11 +12,22 @@ import socket import io import gzip import time +import hashlib from datetime import datetime import subprocess +import base64 import ox import ed25519 +from OpenSSL.crypto import ( + load_privatekey, load_certificate, + dump_privatekey, dump_certificate, + FILETYPE_ASN1, FILETYPE_PEM, PKey, TYPE_RSA, + X509, X509Extension +) +from Crypto.PublicKey import RSA +from Crypto.Util.asn1 import DerSequence + from meta.utils import normalize_isbn, find_isbns @@ -128,6 +139,79 @@ def valid(key, value, sig): return False return True +def get_user_id(private_key, cert_path): + if os.path.exists(private_key): + with open(private_key) as fd: + key = load_privatekey(FILETYPE_PEM, fd.read()) + if key.bits() != 1024: + os.unlink(private_key) + else: + user_id = get_service_id(private_key) + if not os.path.exists(private_key): + if os.path.exists(cert_path): + os.unlink(cert_path) + folder = os.path.dirname(private_key) + if not os.path.exists(folder): + os.makedirs(folder) + os.chmod(folder, 0o700) + key = PKey() + key.generate_key(TYPE_RSA, 1024) + with open(private_key, 'wb') as fd: + os.chmod(private_key, 0o600) + fd.write(dump_privatekey(FILETYPE_PEM, key)) + os.chmod(private_key, 0o400) + user_id = get_service_id(private_key) + if not os.path.exists(cert_path): + ca = X509() + ca.set_version(2) + ca.set_serial_number(1) + ca.get_subject().CN = user_id + ca.gmtime_adj_notBefore(0) + ca.gmtime_adj_notAfter(24 * 60 * 60) + ca.set_issuer(ca.get_subject()) + ca.set_pubkey(key) + ca.add_extensions([ + X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"), + X509Extension(b"nsCertType", True, b"sslCA"), + X509Extension(b"extendedKeyUsage", True, + b"serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"), + X509Extension(b"keyUsage", False, b"keyCertSign, cRLSign"), + X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=ca), + ]) + ca.sign(key, "sha256") + with open(cert_path, 'wb') as fd: + fd.write(dump_certificate(FILETYPE_PEM, ca)) + return user_id + +def get_service_id(private_key_file=None, cert=None): + ''' + service_id is the first half of the sha1 of the rsa public key encoded in base32 + ''' + if private_key_file: + with open(private_key_file, 'rb') as fd: + private_key = fd.read() + public_key = RSA.importKey(private_key).publickey().exportKey('DER')[22:] + # compute sha1 of public key and encode first half in base32 + service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode() + ''' + # compute public key from priate key and export in DER format + # ignoring the SPKI header(22 bytes) + key = load_privatekey(FILETYPE_PEM, private_key) + cert = X509() + cert.set_pubkey(key) + public_key = dump_privatekey(FILETYPE_ASN1, cert.get_pubkey())[22:] + # compute sha1 of public key and encode first half in base32 + service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode() + ''' + elif cert: + # compute sha1 of public key and encode first half in base32 + key = load_certificate(FILETYPE_ASN1, cert).get_pubkey() + pub_der = DerSequence() + pub_der.decode(dump_privatekey(FILETYPE_ASN1, key)) + public_key = RSA.construct((pub_der._seq[1], pub_der._seq[2])).exportKey('DER')[22:] + service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode() + return service_id + def get_public_ipv6(): try: host = ('2a01:4f8:120:3201::3', 25519) diff --git a/requirements-shared.txt b/requirements-shared.txt index 37bd0ff..a8fe629 100644 --- a/requirements-shared.txt +++ b/requirements-shared.txt @@ -6,3 +6,5 @@ html5lib git+http://git.0x2620.org/python-ox.git#egg=python-ox python-stdnum==0.9 PyPDF2==1.23 +pysocks +stem diff --git a/requirements.txt b/requirements.txt index 77292f7..bec0502 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ lxml simplejson -ed25519>=1.3 +ed25519>=1.4 SQLAlchemy==0.9.7 -pyopenssl>=0.14 +pyopenssl>=0.15 +pyCrypto>=2.6.1 diff --git a/static/js/utils.js b/static/js/utils.js index affb331..40cf851 100644 --- a/static/js/utils.js +++ b/static/js/utils.js @@ -990,7 +990,7 @@ oml.updateFilterMenus = function() { }; oml.validatePublicKey = function(value) { - return /^[A-Za-z0-9+\/]{43}$/.test(value); + return /^[a-z0-9+\/]{16}$/.test(value); }; oml.updateDebugMenu = function() {