use tor hidden service instead of ed25515 as peer id

This commit is contained in:
j 2015-11-26 01:26:10 +01:00
parent cc258fb5ee
commit 7c1e5c691a
23 changed files with 1139 additions and 324 deletions

View file

@ -39,10 +39,10 @@ To update to latest version:
./ctl update
On Linux you need a working python3 installation with pillow, python-lxml and poppler-utils:
apt-get install python3.4 python3-pil python3-lxml poppler-utils
On Linux you need a working python3.4 installation with pillow, python-lxml, pyOpenSSL and pyCrypto and popler-utils:
apt-get install python3.4 python3-pil python3-lxml \
python3-pyopenssl python3-crypto poppler-utils
Platform
----------

View file

@ -70,6 +70,11 @@ class Install(Thread):
except:
apt_packages += ' python3-openssl'
dnf_packages += ' python3-pyOpenSSL'
try:
import Crypto
except:
apt_packages += ' python3-crypto'
dnf_packages += ' python3-pyCrypto'
if not has_bin('pdftocairo'):
apt_packages += ' poppler-utils'

View file

@ -7,7 +7,7 @@ import json
import sqlalchemy as sa
from utils import valid, datetime2ts, ts2datetime
from utils import datetime2ts, ts2datetime
from websocket import trigger_event
import db
import settings
@ -56,7 +56,6 @@ class Changelog(db.Model):
c.data = json.dumps([action] + list(args), ensure_ascii=False)
_data = str(c.revision) + str(c.timestamp) + c.data
_data = _data.encode()
c.sig = settings.sk.sign(_data, encoding='base64').decode()
state.db.session.add(c)
state.db.session.commit()
if state.nodes:
@ -64,45 +63,38 @@ class Changelog(db.Model):
@classmethod
def apply_changes(cls, user, changes):
trigger = changes
for change in changes:
if not Changelog.apply_change(user, change, trigger=False):
if not cls.apply_change(user, change, trigger=False):
logger.debug('FAIL %s', change)
trigger = False
break
return False
if changes:
if trigger:
trigger_event('change', {});
return True
@classmethod
def apply_change(cls, user, change, rebuild=False, trigger=True):
revision, timestamp, sig, data = change
last = Changelog.query.filter_by(user_id=user.id).order_by('-revision').first()
def apply_change(cls, user, change, trigger=True):
revision, timestamp, data = change
last = cls.query.filter_by(user_id=user.id).order_by('-revision').first()
next_revision = last.revision + 1 if last else 0
if revision == next_revision:
_data = str(revision) + str(timestamp) + data
_data = _data.encode()
if rebuild:
sig = settings.sk.sign(_data, encoding='base64').decode()
if valid(user.id, _data, sig):
c = cls()
c.created = datetime.utcnow()
c.timestamp = timestamp
c.user_id = user.id
c.revision = revision
c.data = data
c.sig = sig
args = json.loads(data)
logger.debug('apply change from %s: %s', user.name, args)
if getattr(c, 'action_' + args[0])(user, timestamp, *args[1:]):
logger.debug('change applied')
state.db.session.add(c)
state.db.session.commit()
if trigger:
trigger_event('change', {});
return True
else:
logger.debug('INVLAID SIGNATURE ON CHANGE %s', change)
raise Exception('invalid signature')
c = cls()
c.created = datetime.utcnow()
c.timestamp = timestamp
c.user_id = user.id
c.revision = revision
c.data = data
args = json.loads(data)
logger.debug('apply change from %s: %s', user.name, args)
if getattr(c, 'action_' + args[0])(user, timestamp, *args[1:]):
logger.debug('change applied')
state.db.session.add(c)
state.db.session.commit()
if trigger:
trigger_event('change', {});
return True
else:
logger.debug('revsion does not match! got %s expecting %s', revision, next_revision)
return False
@ -110,26 +102,9 @@ class Changelog(db.Model):
def __repr__(self):
return self.data
def verify(self):
_data = str(self.revision) + str(self.timestamp) + self.data
_data = _data.encode()
return valid(self.user_id, _data, self.sig.encode())
@classmethod
def _rebuild(cls):
for c in cls.query.filter_by(user_id=settings.USER_ID):
_data = str(c.revision) + str(c.timestamp) + c.data
_data = _data.encode()
c.sig = settings.sk.sign(_data, encoding='base64')
state.db.session.add(c)
state.db.session.commit()
def json(self):
timestamp = self.timestamp or datetime2ts(self.created)
sig = self.sig
if isinstance(sig, bytes):
sig = sig.decode()
return [self.revision, timestamp, sig, self.data]
return [self.revision, timestamp, self.data]
@classmethod
def restore(cls, user_id, path=None):
@ -161,9 +136,9 @@ class Changelog(db.Model):
if not i:
i = Item.get_or_create(itemid, info)
i.modified = ts2datetime(timestamp)
if user not in i.users:
i.users.append(user)
i.update()
if user not in i.users:
i.users.append(user)
i.update()
return True
def action_edititem(self, user, timestamp, itemid, meta):

View file

@ -89,7 +89,7 @@ class ScrapeThread(Thread):
with db.session():
while self._running:
if not self.scrape_queue():
time.sleep(10)
time.sleep(1)
def join(self):
self._running = False

View file

@ -395,7 +395,6 @@ class Item(db.Model):
def remove_file(self):
for f in self.files.all():
path = f.fullpath()
logger.debug('remove file %s', path)
if os.path.exists(path):
os.unlink(path)
remove_empty_folders(os.path.dirname(path))

View file

@ -44,7 +44,6 @@ def add_file(id, f, prefix, from_=None):
user = state.user()
path = f[len(prefix):]
data = media.metadata(f, from_)
print(path)
file = File.get_or_create(id, data, path)
item = file.item
if 'primaryid' in file.info:

View file

@ -14,23 +14,38 @@ from settings import preferences, server, USER_ID, sk
import state
import db
import user.models
from tor_request import get_opener
import settings
import logging
logger = logging.getLogger('oml.localnodes')
def can_connect(data):
try:
opener = get_opener(data['id'])
headers = {
'User-Agent': settings.USER_AGENT,
'X-Node-Protocol': settings.NODE_PROTOCOL,
'Accept-Encoding': 'gzip',
}
if ':' in data['host']:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
url = 'https://[{host}]:{port}'.format(**data)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
s.connect((data['host'], data['port']))
s.close()
url = 'https://{host}:{port}'.format(**data)
opener.addheaders = list(zip(headers.keys(), headers.values()))
opener.timeout = 1
logger.debug('try connection %s', url)
r = opener.open(url)
version = r.headers.get('X-Node-Protocol', None)
if version != settings.NODE_PROTOCOL:
logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version)
return False
c = r.read()
logger.debug('can connect to local node')
return True
except:
logger.debug('can_connect failed', exc_info=1)
pass
logger.debug('can_connect failed')
return False
class LocalNodesBase(Thread):
@ -53,13 +68,12 @@ class LocalNodesBase(Thread):
self.host = self.get_ip()
if self.host:
message = json.dumps({
'id': USER_ID,
'username': preferences.get('username', 'anonymous'),
'host': self.host,
'port': server['node_port'],
'cert': server['cert']
'port': server['node_port']
})
sig = sk.sign(message.encode(), encoding='base64').decode()
packet = json.dumps([sig, USER_ID, message]).encode()
packet = message.encode()
else:
packet = None
return packet
@ -100,18 +114,13 @@ class LocalNodesBase(Thread):
def verify(self, data):
try:
packet = json.loads(data.decode())
message = json.loads(data.decode())
except:
return None
if len(packet) == 3:
sig, user_id, data = packet
if valid(user_id, data, sig):
message = json.loads(data)
message['id'] = user_id
for key in ['id', 'username', 'host', 'port', 'cert']:
if key not in message:
return None
return message
for key in ['id', 'username', 'host', 'port']:
if key not in message:
return None
return message
def update_node(self, data):
#fixme use local link address
@ -233,7 +242,7 @@ class LocalNodes(object):
if not server['localnode_discovery']:
return
self._nodes4 = LocalNodes4(self._nodes)
self._nodes6 = LocalNodes6(self._nodes)
#self._nodes6 = LocalNodes6(self._nodes)
def cleanup(self):
if self._active:

View file

@ -8,7 +8,6 @@ import OpenSSL
import settings
def get_fingerprint():
with open(settings.ssl_cert_path) as fd:
data = fd.read()
@ -17,7 +16,7 @@ def get_fingerprint():
def generate_ssl():
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
key.generate_key(OpenSSL.crypto.TYPE_RSA, 1024)
with open(settings.ssl_key_path, 'wb') as fd:
os.chmod(settings.ssl_key_path, 0o600)
fd.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))

View file

@ -1,37 +1,165 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
import tornado
from tornado.web import Application
from tornado.httpserver import HTTPServer
from tornado.ioloop import PeriodicCallback
from oxtornado import run_async
from utils import valid, get_public_ipv6
from websocket import trigger_event
from . import cert
from socketserver import ThreadingMixIn
from threading import Thread
import base64
import db
import directory
import gzip
import hashlib
import http.server
import io
import json
from . import nodeapi
import os
import socket
import socketserver
from Crypto.PublicKey import RSA
from Crypto.Util.asn1 import DerSequence
from OpenSSL.crypto import dump_privatekey, FILETYPE_ASN1
from OpenSSL.SSL import (
Context, Connection, TLSv1_2_METHOD,
VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE
)
import settings
import state
import user
from . import nodeapi
from .sslsocket import fileobject
import logging
logger = logging.getLogger('oml.node.server')
class NodeHandler(tornado.web.RequestHandler):
def get_service_id(key):
'''
service_id is the first half of the sha1 of the rsa public key encoded in base32
'''
# compute sha1 of public key and encode first half in base32
pub_der = DerSequence()
pub_der.decode(dump_privatekey(FILETYPE_ASN1, key))
public_key = RSA.construct((pub_der._seq[1], pub_der._seq[2])).exportKey('DER')[22:]
service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode()
return service_id
def initialize(self):
pass
class TLSTCPServer(socketserver.TCPServer):
@tornado.web.asynchronous
@tornado.gen.coroutine
def post(self):
def _accept(self, connection, x509, errnum, errdepth, ok):
# client_id is validated in request
return True
def __init__(self, server_address, HandlerClass, bind_and_activate=True):
socketserver.TCPServer.__init__(self, server_address, HandlerClass)
ctx = Context(TLSv1_2_METHOD)
ctx.use_privatekey_file (settings.ssl_key_path)
ctx.use_certificate_file(settings.ssl_cert_path)
# only allow clients with cert:
ctx.set_verify(VERIFY_PEER | VERIFY_CLIENT_ONCE | VERIFY_FAIL_IF_NO_PEER_CERT, self._accept)
#ctx.set_verify(VERIFY_PEER | VERIFY_CLIENT_ONCE, self._accept)
self.socket = Connection(ctx, socket.socket(self.address_family, self.socket_type))
if bind_and_activate:
self.server_bind()
self.server_activate()
def shutdown_request(self,request):
try:
request.shutdown()
except:
pass
class NodeServer(ThreadingMixIn, TLSTCPServer):
allow_reuse_address = True
def api_call(action, user_id, args):
with db.session():
u = user.models.User.get(user_id)
if action in (
'requestPeering', 'acceptPeering', 'rejectPeering', 'removePeering'
) or (u and u.peered):
content = getattr(nodeapi, 'api_' + action)(user_id, *args)
else:
if u and u.pending:
logger.debug('ignore request from pending peer[%s] %s (%s)',
user_id, action, args)
content = {}
else:
content = None
return content
class Handler(http.server.SimpleHTTPRequestHandler):
def setup(self):
self.connection = self.request
self.rfile = fileobject(self.connection, 'rb', self.rbufsize)
self.wfile = fileobject(self.connection, 'wb', self.wbufsize)
def version_string(self):
return settings.USER_AGENT
def do_HEAD(self):
return self.do_GET()
def do_GET(self):
import item.models
id = self.path.split('/')[-1] if self.path.startswith('/get/') else None
if id and len(id) == 32 and id.isalnum():
with db.session():
i = item.models.Item.get(id)
if not i:
self.send_response(404, 'Not Found')
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'404 - Not Found')
return
path = i.get_path()
mimetype = {
'epub': 'application/epub+zip',
'pdf': 'application/pdf',
'txt': 'text/plain',
}.get(path.split('.')[-1], None)
self.send_response(200, 'OK')
self.send_header('Content-Type', mimetype)
self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL)
self.send_header('Content-Length', str(os.path.getsize(path)))
self.end_headers()
logger.debug('GET file %s', id)
with open(path, 'rb') as f:
while 1:
data = f.read(16384)
if not data:
break
self.wfile.write(data)
else:
self.send_response(200, 'OK')
self.send_header('Content-type', 'text/plain')
self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL)
self.end_headers()
self.wfile.write('Open Media Library\n'.encode())
def gzip_data(self, data):
encoding = self.headers.get('Accept-Encoding')
if encoding.find('gzip') != -1:
self.send_header('Content-Encoding', 'gzip')
bytes_io = io.BytesIO()
gzip_file = gzip.GzipFile(fileobj=bytes_io, mode='wb')
gzip_file.write(data)
gzip_file.close()
result = bytes_io.getvalue()
bytes_io.close()
return result
else:
return data
def gunzip_data(self, data):
bytes_io = io.BytesIO(data)
gzip_file = gzip.GzipFile(fileobj=bytes_io, mode='rb')
result = gzip_file.read()
gzip_file.close()
return result
def do_POST(self):
'''
API
pullChanges [userid] from [to]
@ -43,141 +171,85 @@ class NodeHandler(tornado.web.RequestHandler):
ping responds public ip
'''
key = str(self.request.headers['X-Ed25519-Key'])
sig = str(self.request.headers['X-Ed25519-Signature'])
data = self.request.body
content = {}
x509 = self.connection.get_peer_certificate()
user_id = get_service_id(x509.get_pubkey()) if x509 else None
self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL)
if self.request.headers.get('X-Node-Protocol', None) > settings.NODE_PROTOCOL:
content = {}
try:
content_len = int(self.headers.get('content-length', 0))
data = self.rfile.read(content_len)
if self.headers.get('Content-Encoding') == 'gzip':
data = self.gunzip_data(data)
except:
logger.debug('invalid request', exc_info=1)
response_status = (500, 'invalid request')
self.write_response(response_status, content)
return
response_status = (200, 'OK')
if self.headers.get('X-Node-Protocol', '') > settings.NODE_PROTOCOL:
state.update_required = True
if self.request.headers.get('X-Node-Protocol', None) != settings.NODE_PROTOCOL:
if self.headers.get('X-Node-Protocol', '') != settings.NODE_PROTOCOL:
logger.debug('protocol missmatch %s vs %s',
self.headers.get('X-Node-Protocol', ''), settings.NODE_PROTOCOL)
logger.debug('headers %s', self.headers)
content = settings.release
else:
if valid(key, data, sig):
try:
action, args = json.loads(data.decode('utf-8'))
logger.debug('NODE action %s %s (%s)', action, args, key)
if action == 'ping':
content = {
'ip': self.request.remote_addr
}
else:
content = yield tornado.gen.Task(api_call, action, key, args)
if content is None:
content = {'status': 'not peered'}
logger.debug('PEER %s IS UNKNOWN SEND 403', key)
self.set_status(403)
content = json.dumps(content).encode('utf-8')
sig = settings.sk.sign(content, encoding='base64')
self.set_header('X-Ed25519-Signature', sig)
self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL)
self.write(content)
def get(self):
self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL)
if self.request.headers.get('X-Node-Protocol', None) > settings.NODE_PROTOCOL:
state.update_required = True
self.write('Open Media Library')
@run_async
def api_call(action, key, args, callback):
with db.session():
u = user.models.User.get(key)
if action in (
'requestPeering', 'acceptPeering', 'rejectPeering', 'removePeering'
) or (u and u.peered):
content = getattr(nodeapi, 'api_' + action)(key, *args)
else:
if u and u.pending:
logger.debug('ignore request from pending peer[%s] %s (%s)', key, action, args)
content = {}
else:
content = None
callback(content)
class ShareHandler(tornado.web.RequestHandler):
def initialize(self):
pass
def get(self, id):
import item.models
with db.session():
i = item.models.Item.get(id)
if not i:
self.set_status(404)
except:
logger.debug('invalid data: %s', data, exc_info=1)
response_status = (500, 'invalid request')
content = {
'status': 'invalid request'
}
self.write_response(response_status, content)
return
path = i.get_path()
mimetype = {
'epub': 'application/epub+zip',
'pdf': 'application/pdf',
'txt': 'text/plain',
}.get(path.split('.')[-1], None)
self.set_header('Content-Type', mimetype)
logger.debug('GET file %s', id)
with open(path, 'rb') as f:
while 1:
data = f.read(16384)
if not data:
break
self.write(data)
logger.debug('NODE action %s %s (%s)', action, args, user_id)
if action == 'ping':
content = {
'status': 'ok'
}
else:
content = api_call(action, user_id, args)
if content is None:
content = {'status': 'not peered'}
logger.debug('PEER %s IS UNKNOWN SEND 403', user_id)
response_status = (403, 'UNKNOWN USER')
content = {}
else:
logger.debug('RESPONSE %s: %s', action, content)
self.write_response(response_status, content)
def publish_node():
update_online()
if state.online:
with db.session():
for u in user.models.User.query.filter_by(queued=True):
logger.debug('adding queued node... %s', u.id)
state.nodes.queue('add', u.id)
state.check_nodes = PeriodicCallback(check_nodes, 120000)
state.check_nodes.start()
state._online = PeriodicCallback(update_online, 60000)
state._online.start()
def write_response(self, response_status, content):
self.send_response(*response_status)
self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL)
self.send_header('Content-Type', 'application/json')
content = json.dumps(content, ensure_ascii=False).encode('utf-8')
content = self.gzip_data(content)
self.send_header('Content-Length', str(len(content)))
self.end_headers()
self.wfile.write(content)
def update_online():
host = get_public_ipv6()
if not host:
if state.online:
state.online = False
trigger_event('status', {
'id': settings.USER_ID,
'online': state.online
})
else:
if host != state.host:
state.host = host
online = directory.put(settings.sk, {
'host': host,
'port': settings.server['node_port'],
'cert': settings.server['cert']
})
if online != state.online:
state.online = online
trigger_event('status', {
'id': settings.USER_ID,
'online': state.online
})
class Server(Thread):
http_server = None
def check_nodes():
if state.online:
with db.session():
for u in user.models.User.query.filter_by(queued=True):
if not state.nodes.is_online(u.id):
logger.debug('queued peering message for %s trying to connect...', u.id)
state.nodes.queue('add', u.id)
def __init__(self):
Thread.__init__(self)
address = (settings.server['node_address'], settings.server['node_port'])
self.http_server = NodeServer(address, Handler)
self.daemon = True
self.start()
def run(self):
self.http_server.serve_forever()
def stop(self):
if self.http_server:
self.http_server.shutdown()
self.http_server.socket.close()
return Thread.join(self)
def start():
application = Application([
(r"/get/(.*)", ShareHandler),
(r".*", NodeHandler),
], gzip=True)
if not os.path.exists(settings.ssl_cert_path):
settings.server['cert'] = cert.generate_ssl()
return Server()
http_server = HTTPServer(application, ssl_options={
"certfile": settings.ssl_cert_path,
"keyfile": settings.ssl_key_path
})
http_server.listen(settings.server['node_port'], settings.server['node_address'])
state.main.add_callback(publish_node)
return http_server

305
oml/node/sslsocket.py Normal file
View file

@ -0,0 +1,305 @@
from io import BytesIO
from socket import error
from errno import EINTR
# Based on socket._fileobject from python2.7
class fileobject(object):
"""Faux file object attached to a socket object."""
default_bufsize = 8192
name = "<socket>"
__slots__ = ["mode", "bufsize", "softspace",
# "closed" is a property, see below
"_sock", "_rbufsize", "_wbufsize", "_rbuf", "_wbuf", "_wbuf_len",
"_close"]
def __init__(self, sock, mode='rb', bufsize=-1, close=False):
self._sock = sock
self.mode = mode # Not actually used in this version
if bufsize < 0:
bufsize = self.default_bufsize
self.bufsize = bufsize
self.softspace = False
# _rbufsize is the suggested recv buffer size. It is *strictly*
# obeyed within readline() for recv calls. If it is larger than
# default_bufsize it will be used for recv calls within read().
if bufsize == 0:
self._rbufsize = 1
elif bufsize == 1:
self._rbufsize = self.default_bufsize
else:
self._rbufsize = bufsize
self._wbufsize = bufsize
# We use BytesIO for the read buffer to avoid holding a list
# of variously sized string objects which have been known to
# fragment the heap due to how they are malloc()ed and often
# realloc()ed down much smaller than their original allocation.
self._rbuf = BytesIO()
self._wbuf = [] # A list of strings
self._wbuf_len = 0
self._close = close
def _getclosed(self):
return self._sock is None
closed = property(_getclosed, doc="True if the file is closed")
def close(self):
try:
if self._sock:
self.flush()
finally:
if self._close:
self._sock.close()
self._sock = None
def __del__(self):
try:
self.close()
except:
# close() may fail if __init__ didn't complete
pass
def flush(self):
if self._wbuf:
data = b"".join(self._wbuf)
self._wbuf = []
self._wbuf_len = 0
buffer_size = max(self._rbufsize, self.default_bufsize)
data_size = len(data)
write_offset = 0
view = memoryview(data)
try:
while write_offset < data_size:
self._sock.sendall(view[write_offset:write_offset+buffer_size])
write_offset += buffer_size
finally:
if write_offset < data_size:
remainder = data[write_offset:]
del view, data # explicit free
self._wbuf.append(remainder)
self._wbuf_len = len(remainder)
def fileno(self):
return self._sock.fileno()
def write(self, data):
data = bytes(data) # XXX Should really reject non-string non-buffers
if not data:
return
self._wbuf.append(data)
self._wbuf_len += len(data)
if (self._wbufsize == 0 or
(self._wbufsize == 1 and b'\n' in data) or
(self._wbufsize > 1 and self._wbuf_len >= self._wbufsize)):
self.flush()
def writelines(self, list):
# XXX We could do better here for very long lists
# XXX Should really reject non-string non-buffers
lines = filter(None, map(bytes, list))
self._wbuf_len += sum(map(len, lines))
self._wbuf.extend(lines)
if (self._wbufsize <= 1 or
self._wbuf_len >= self._wbufsize):
self.flush()
def read(self, size=-1):
# Use max, disallow tiny reads in a loop as they are very inefficient.
# We never leave read() with any leftover data from a new recv() call
# in our internal buffer.
rbufsize = max(self._rbufsize, self.default_bufsize)
# Our use of BytesIO rather than lists of string objects returned by
# recv() minimizes memory usage and fragmentation that occurs when
# rbufsize is large compared to the typical return value of recv().
buf = self._rbuf
buf.seek(0, 2) # seek end
if size < 0:
# Read until EOF
self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
while True:
try:
data = self._sock.recv(rbufsize)
except error as e:
if e.args[0] == EINTR:
continue
raise
if not data:
break
buf.write(data)
return buf.getvalue()
else:
# Read until size bytes or EOF seen, whichever comes first
buf_len = buf.tell()
if buf_len >= size:
# Already have size bytes in our buffer? Extract and return.
buf.seek(0)
rv = buf.read(size)
self._rbuf = BytesIO()
self._rbuf.write(buf.read())
return rv
self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
while True:
left = size - buf_len
# recv() will malloc the amount of memory given as its
# parameter even though it often returns much less data
# than that. The returned data string is short lived
# as we copy it into a BytesIO and free it. This avoids
# fragmentation issues on many platforms.
try:
data = self._sock.recv(left)
except error as e:
if e.args[0] == EINTR:
continue
raise
if not data:
break
n = len(data)
if n == size and not buf_len:
# Shortcut. Avoid buffer data copies when:
# - We have no data in our buffer.
# AND
# - Our call to recv returned exactly the
# number of bytes we were asked to read.
return data
if n == left:
buf.write(data)
del data # explicit free
break
assert n <= left, "recv(%d) returned %d bytes" % (left, n)
buf.write(data)
buf_len += n
del data # explicit free
#assert buf_len == buf.tell()
return buf.getvalue()
def readline(self, size=-1):
buf = self._rbuf
buf.seek(0, 2) # seek end
if buf.tell() > 0:
# check if we already have it in our buffer
buf.seek(0)
bline = buf.readline(size)
if bline.endswith(b'\n') or len(bline) == size:
self._rbuf = BytesIO()
self._rbuf.write(buf.read())
return bline
del bline
if size < 0:
# Read until \n or EOF, whichever comes first
if self._rbufsize <= 1:
# Speed up unbuffered case
buf.seek(0)
buffers = [buf.read()]
self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
data = None
recv = self._sock.recv
while True:
try:
while data != b"\n":
data = recv(1)
if not data:
break
buffers.append(data)
except error as e:
# The try..except to catch EINTR was moved outside the
# recv loop to avoid the per byte overhead.
if e.args[0] == EINTR:
continue
raise
break
return "".join(buffers)
buf.seek(0, 2) # seek end
self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
while True:
try:
data = self._sock.recv(self._rbufsize)
except error as e:
if e.args[0] == EINTR:
continue
raise
if not data:
break
nl = data.find(b'\n')
if nl >= 0:
nl += 1
buf.write(data[:nl])
self._rbuf.write(data[nl:])
del data
break
buf.write(data)
return buf.getvalue()
else:
# Read until size bytes or \n or EOF seen, whichever comes first
buf.seek(0, 2) # seek end
buf_len = buf.tell()
if buf_len >= size:
buf.seek(0)
rv = buf.read(size)
self._rbuf = BytesIO()
self._rbuf.write(buf.read())
return rv
self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
while True:
try:
data = self._sock.recv(self._rbufsize)
except error as e:
if e.args[0] == EINTR:
continue
raise
if not data:
break
left = size - buf_len
# did we just receive a newline?
nl = data.find(b'\n', 0, left)
if nl >= 0:
nl += 1
# save the excess data to _rbuf
self._rbuf.write(data[nl:])
if buf_len:
buf.write(data[:nl])
break
else:
# Shortcut. Avoid data copy through buf when returning
# a substring of our first recv().
return data[:nl]
n = len(data)
if n == size and not buf_len:
# Shortcut. Avoid data copy through buf when
# returning exactly all of our first recv().
return data
if n >= left:
buf.write(data[:left])
self._rbuf.write(data[left:])
break
buf.write(data)
buf_len += n
#assert buf_len == buf.tell()
return buf.getvalue()
def readlines(self, sizehint=0):
total = 0
list = []
while True:
line = self.readline()
if not line:
break
list.append(line)
total += len(line)
if sizehint and total >= sizehint:
break
return list
# Iterator protocols
def __iter__(self):
return self
def next(self):
line = self.readline()
if not line:
raise StopIteration
return line

View file

@ -24,7 +24,7 @@ from changelog import Changelog
import directory
from websocket import trigger_event
from localnodes import LocalNodes
from ssl_request import get_opener
from tor_request import get_opener
import state
import db
@ -35,7 +35,6 @@ ENCODING='base64'
class Node(Thread):
_running = True
_cert = None
host = None
online = False
download_speed = 0
@ -44,8 +43,7 @@ class Node(Thread):
def __init__(self, nodes, user):
self._nodes = nodes
self.user_id = user.id
key = user.id.encode()
self.vk = ed25519.VerifyingKey(key, encoding=ENCODING)
self._opener = get_opener(self.user_id)
logger.debug('new Node %s online=%s', self.user_id, self.online)
self._q = Queue()
Thread.__init__(self)
@ -78,65 +76,50 @@ class Node(Thread):
@property
def url(self):
if self.host:
if ':' in self.host:
url = 'https://[%s]:%s' % (self.host, self.port)
if self.local:
if ':' in self.local:
url = 'https://[%s]:%s' % (self.local, self.port)
else:
url = 'https://%s:%s' % (self.host, self.port)
url = 'https://%s:%s' % (self.local, self.port)
else:
url = None
url = 'https://%s.onion:9851' % self.user_id
return url
def resolve(self):
logger.debug('resolve node')
r = self.get_local()
if not r:
try:
r = directory.get(self.vk)
except:
logger.debug('directory failed', exc_info=1)
r = None
if r:
self.host = r['host']
self.local = r['host']
if 'port' in r:
self.port = r['port']
if r['cert'] != self._cert:
self._cert = r['cert']
self._opener = get_opener(self._cert)
else:
self.host = None
self.local = None
self.port = 9851
def get_local(self):
if self._nodes and self._nodes._local:
local = self._nodes._local.get(self.user_id)
if local and local['cert'] != self._cert:
self._cert = local['cert']
self._opener = get_opener(self._cert)
return local
return self._nodes._local.get(self.user_id)
return None
def request(self, action, *args):
url = self.url
if not url:
self.resolve()
logger.debug('request %s%s', action, args)
self.resolve()
url = self.url
if not self.url:
logger.debug('unable to find host %s', self.user_id)
self.online = False
return None
logger.debug('url=%s', url)
content = json.dumps([action, args]).encode()
sig = settings.sk.sign(content, encoding=ENCODING).decode()
#sig = settings.sk.sign(content, encoding=ENCODING).decode()
headers = {
'User-Agent': settings.USER_AGENT,
'X-Node-Protocol': settings.NODE_PROTOCOL,
'Accept': 'text/plain',
'Accept-Encoding': 'gzip',
'Content-Type': 'application/json',
'X-Ed25519-Key': settings.USER_ID,
'X-Ed25519-Signature': sig,
}
self._opener.addheaders = list(zip(list(headers.keys()), list(headers.values())))
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
logger.debug('headers: %s', self._opener.addheaders)
try:
self._opener.timeout = self.TIMEOUT
@ -173,12 +156,15 @@ class Node(Thread):
state.update_required = True
return None
sig = r.headers.get('X-Ed25519-Signature')
'''
sig = r.headers.get('X-Node-Signature')
if sig and self._valid(data, sig):
response = json.loads(data.decode('utf-8'))
else:
logger.debug('invalid signature %s', data)
response = None
'''
response = json.loads(data.decode('utf-8'))
logger.debug('response: %s', response)
return response
@ -206,7 +192,7 @@ class Node(Thread):
'X-Node-Protocol': settings.NODE_PROTOCOL,
'Accept-Encoding': 'gzip',
}
self._opener.addheaders = list(zip(list(headers.keys()), list(headers.values())))
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
self._opener.timeout = 1
r = self._opener.open(url)
version = r.headers.get('X-Node-Protocol', None)
@ -217,19 +203,19 @@ class Node(Thread):
logger.debug('can connect to: %s (%s)', url, self.user.nickname)
return True
except:
logger.debug('can not connect to: %s (%s)', url, self.user.nickname)
logger.debug('can not connect to: %s (%s)', url, self.user.nickname, exc_info=1)
pass
return False
def _go_online(self):
self.resolve()
u = self.user
if (u.peered or u.queued) and self.host:
logger.debug('go_online peered=%s queued=%s %s [%s]:%s (%s)', u.peered, u.queued, u.id, self.host, self.port, u.nickname)
if u.peered or u.queued:
logger.debug('go_online peered=%s queued=%s %s [%s]:%s (%s)', u.peered, u.queued, u.id, self.local, self.port, u.nickname)
try:
self.online = False
if self.can_connect():
logger.debug('connected to [%s]:%s', self.host, self.port)
logger.debug('connected to %s', self.url)
self.online = True
if u.queued:
logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered)
@ -299,11 +285,12 @@ class Node(Thread):
from item.models import Transfer
url = '%s/get/%s' % (self.url, item.id)
headers = {
'X-Node-Protocol': settings.NODE_PROTOCOL,
'User-Agent': settings.USER_AGENT,
}
t1 = datetime.utcnow()
logger.debug('download %s', url)
self._opener.addheaders = zip(headers.keys(), headers.values())
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
try:
r = self._opener.open(url, timeout=self.TIMEOUT*2)
except:
@ -352,7 +339,7 @@ class Node(Thread):
headers = {
'User-Agent': settings.USER_AGENT,
}
self._opener.addheaders = list(zip(list(headers.keys()), list(headers.values())))
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
r = self._opener.open(url)
if r.getcode() == 200:
with open(path, 'w') as fd:
@ -379,7 +366,7 @@ class Nodes(Thread):
self.start()
def cleanup(self):
if self._running:
if self._running and self._local:
self._local.cleanup()
def queue(self, *args):
@ -401,7 +388,8 @@ class Nodes(Thread):
else:
nodes = [self._nodes[target]]
for node in nodes:
getattr(node, action)(*args)
r = getattr(node, action)(*args)
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
def _add(self, user_id):
if user_id not in self._nodes:
@ -428,5 +416,30 @@ class Nodes(Thread):
self._q.put(None)
for node in list(self._nodes.values()):
node.join()
self._local.join()
if self._local:
self._local.join()
return Thread.join(self)
def publish_node():
update_online()
state.check_nodes = PeriodicCallback(check_nodes, 120000)
state.check_nodes.start()
state._online = PeriodicCallback(update_online, 60000)
state._online.start()
def update_online():
online = state.tor and state.tor.is_online()
if online != state.online:
state.online = online
trigger_event('status', {
'id': settings.USER_ID,
'online': state.online
})
def check_nodes():
if state.online:
with db.session():
for u in user.models.User.query.filter_by(queued=True):
if not state.nodes.is_online(u.id):
logger.debug('queued peering message for %s trying to connect...', u.id)
state.nodes.queue('add', u.id)

View file

@ -96,14 +96,23 @@ def run():
import user
import downloads
import nodes
import tor
state.tor = tor.Tor()
state.node = node.server.start()
state.nodes = nodes.Nodes()
state.downloads = downloads.Downloads()
state.scraping = downloads.ScrapeThread()
state.nodes = nodes.Nodes()
def add_users():
with db.session():
for p in user.models.User.query.filter_by(peered=True):
state.nodes.queue('add', p.id)
if not state.tor.is_online():
state.main.add_callback(add_users)
else:
with db.session():
for u in user.models.User.query.filter_by(peered=True):
state.nodes.queue('add', u.id)
for u in user.models.User.query.filter_by(queued=True):
logger.debug('adding queued node... %s', u.id)
state.nodes.queue('add', u.id)
nodes.publish_node()
state.main.add_callback(add_users)
state.main.add_callback(start_node)
if ':' in settings.server['address']:
@ -117,6 +126,8 @@ def run():
logger.debug('Starting OML %s at %s', settings.VERSION, url)
def shutdown():
if state.tor:
state.tor._shutdown = True
if state.downloads:
logger.debug('shutdown downloads')
state.downloads.join()
@ -131,6 +142,11 @@ def run():
if state.nodes:
logger.debug('shutdown nodes')
state.nodes.join()
if state.node:
state.node.stop()
if state.tor:
logger.debug('shutdown tor')
state.tor.shutdown()
if PID and os.path.exists(PID):
logger.debug('remove %s', PID)
os.unlink(PID)

View file

@ -6,6 +6,7 @@ import os
import ed25519
from pdict import pdict
from utils import get_user_id
base_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '..'))
static_path = os.path.join(base_dir, 'static')
@ -22,7 +23,7 @@ log_path = os.path.join(config_path, 'debug.log')
icons_db_path = os.path.join(config_path, 'icons.db')
key_path = os.path.join(config_path, 'node.key')
ssl_cert_path = os.path.join(config_path, 'node.ssl.crt')
ssl_key_path = os.path.join(config_path, 'node.ssl.key')
ssl_key_path = os.path.join(config_path, 'tor', 'private_key')
if os.path.exists(oml_config_path):
@ -64,7 +65,9 @@ else:
fd.write(sk.to_bytes())
os.chmod(key_path, 0o400)
USER_ID = vk.to_ascii(encoding='base64').decode()
USER_ID = get_user_id(ssl_key_path, ssl_cert_path)
OLD_USER_ID = vk.to_ascii(encoding='base64').decode()
OML_UPDATE_KEY='K55EZpPYbP3X+3mA66cztlw1sSaUMqGwfTDKQyP2qOU'
if 'modules' in release and 'openmedialibrary' in release['modules']:
@ -72,7 +75,7 @@ if 'modules' in release and 'openmedialibrary' in release['modules']:
else:
MINOR_VERSION = 'git'
NODE_PROTOCOL="0.1"
NODE_PROTOCOL="0.2"
VERSION="%s.%s" % (NODE_PROTOCOL, MINOR_VERSION)

View file

@ -200,6 +200,7 @@ def upgrade_db(old, new=None):
if old <= '20140526-118-d451eb3' and new > '20140526-118-d451eb3':
import item.models
item.models.Find.query.filter_by(key='list').delete()
if old <= '20140527-120-3cb9819':
run_sql('CREATE INDEX ix_find_findvalue ON find (findvalue)')
@ -211,6 +212,18 @@ def upgrade_db(old, new=None):
FOREIGN KEY(item_id) REFERENCES item (id)
)''')
run_sql('CREATE INDEX idx_scrape_added ON scrape (added)')
if old <= '20151118-346-7e86e68':
old_key = os.path.join(settings.config_path, 'node.ssl.key')
if os.path.exists(old_key):
os.unlink(old_key)
statements = [
"UPDATE user SET id = '{nid}' WHERE id = '{oid}'",
"UPDATE list SET user_id = '{nid}' WHERE user_id = '{oid}'",
"UPDATE useritem SET user_id = '{nid}' WHERE user_id = '{oid}'",
"UPDATE changelog SET user_id = '{nid}' WHERE user_id = '{oid}'",
]
for sql in statements:
run_sql(sql.format(oid=settings.OLD_USER_ID, nid=settings.USER_ID))
def create_default_lists(user_id=None):
with db.session():

View file

@ -6,24 +6,34 @@ import http.client
import urllib.request, urllib.error, urllib.parse
import hashlib
import logging
import base64
from OpenSSL import crypto
logger = logging.getLogger('oml.ssl_request')
def get_service_id(cert):
# compute sha1 of public key and encode first half in base32
key = crypto.load_certificate(crypto.FILETYPE_ASN1, cert).get_pubkey()
public_key = crypto.dump_privatekey(crypto.FILETYPE_ASN1, key)[22:]
service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower()
return service_id
class InvalidCertificateException(http.client.HTTPException, urllib.error.URLError):
def __init__(self, fingerprint, cert, reason):
def __init__(self, service_id, cert, reason):
http.client.HTTPException.__init__(self)
self._fingerprint = fingerprint
self._cert_fingerprint = hashlib.sha1(cert).hexdigest()
self._service_id = service_id
self._cert_service_id = get_service_id(cert)
self.reason = reason
def __str__(self):
return ('%s (local) != %s (remote) (%s)\n' %
(self._fingerprint, self._cert_fingerprint, self.reason))
(self._service_id, self._cert_service_id, self.reason))
class FingerprintHTTPSConnection(http.client.HTTPSConnection):
class ServiceIdHTTPSConnection(http.client.HTTPSConnection):
def __init__(self, host, port=None, fingerprint=None, check_hostname=None, context=None, **kwargs):
self._fingerprint = fingerprint
if self._fingerprint:
def __init__(self, host, port=None, service_id=None, check_hostname=None, context=None, **kwargs):
self._service_id = service_id
if self._service_id:
check_hostname = False
# dont fial for older verions of python
# without ssl._create_default_https_context
@ -37,45 +47,36 @@ class FingerprintHTTPSConnection(http.client.HTTPSConnection):
http.client.HTTPSConnection.__init__(self, host, port,
check_hostname=check_hostname, context=context, **kwargs)
def _check_fingerprint(self, cert):
if len(self._fingerprint) == 40:
fingerprint = hashlib.sha1(cert).hexdigest()
elif len(self._fingerprint) == 64:
fingerprint = hashlib.sha256(cert).hexdigest()
elif len(self._fingerprint) == 128:
fingerprint = hashlib.sha512(cert).hexdigest()
else:
logging.error('unkown _fingerprint length %s (%s)',
self._fingerprint, len(self._fingerprint))
return False
logger.debug('ssl fingerprint: %s (match: %s)', fingerprint, fingerprint == self._fingerprint)
if fingerprint != self._fingerprint:
logger.debug('expected fingerprint: %s', self._fingerprint)
return fingerprint == self._fingerprint
def _check_service_id(self, cert):
service_id = get_service_id(cert)
logger.debug('ssl service_id: %s (match: %s)', service_id, service_id == self._service_id)
if service_id != self._service_id:
logger.debug('expected service_id: %s', self._service_id)
return service_id == self._service_id
def connect(self):
http.client.HTTPSConnection.connect(self)
if self._fingerprint:
if self._service_id:
cert = self.sock.getpeercert(binary_form=True)
if not self._check_fingerprint(cert):
raise InvalidCertificateException(self._fingerprint, cert,
'fingerprint mismatch')
if not self._check_service_id(cert):
raise InvalidCertificateException(self._service_id, cert,
'service_id mismatch')
#logger.debug('CIPHER %s VERSION %s', self.sock.cipher(), self.sock.ssl_version)
class FingerprintHTTPSHandler(urllib.request.HTTPSHandler):
class ServiceIdHTTPSHandler(urllib.request.HTTPSHandler):
def __init__(self, debuglevel=0, context=None, check_hostname=None, fingerprint=None):
def __init__(self, debuglevel=0, context=None, check_hostname=None, service_id=None):
urllib.request.AbstractHTTPHandler.__init__(self, debuglevel)
self._context = context
self._check_hostname = check_hostname
self._fingerprint = fingerprint
self._service_id = service_id
def https_open(self, req):
return self.do_open(FingerprintHTTPSConnection, req,
return self.do_open(ServiceIdHTTPSConnection, req,
context=self._context, check_hostname=self._check_hostname,
fingerprint=self._fingerprint)
service_id=self._service_id)
def get_opener(fingerprint):
handler = FingerprintHTTPSHandler(fingerprint=fingerprint)
def get_opener(service_id):
handler = ServiceIdHTTPSHandler(service_id=service_id)
opener = urllib.request.build_opener(handler)
return opener

View file

@ -1,9 +1,10 @@
websockets = []
nodes = False
tasks = False
main = None
online = False
host = None
main = None
nodes = False
online = False
tasks = False
tor = False
websockets = []
activity = {}

196
oml/tor.py Normal file
View file

@ -0,0 +1,196 @@
import os
import subprocess
from threading import Thread
import distutils
import stem
from stem.control import Controller
import settings
import logging
import state
import time
logger = logging.getLogger('oml.tor')
class TorDaemon(Thread):
def __init__(self):
self._status = []
Thread.__init__(self)
self.daemon = True
self.start()
def create_torrc(self):
defaults = os.path.join(settings.config_path, 'torrc-defaults')
torrc = os.path.join(settings.config_path, 'torrc')
if not os.path.exists(defaults):
with open(defaults, 'w') as fd:
fd.write('''
AvoidDiskWrites 1
# Where to send logging messages. Format is minSeverity[-maxSeverity]
# (stderr|stdout|syslog|file FILENAME).
Log notice stdout
SocksPort 9830
ControlPort 9831
CookieAuthentication 1
'''.strip())
if not os.path.exists(torrc):
with open(torrc, 'w') as fd:
fd.write('''
DataDirectory {base}/TorData
DirReqStatistics 0
'''.strip().format(base=settings.config_path))
return defaults, torrc
def get_tor(self):
def cmd_exists(cmd):
return subprocess.call("type " + cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0
for path in (
'/Applications/TorBrowser.app/TorBrowser/Tor/tor',
):
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
start = os.path.expanduser('~/.local/share/applications/start-tor-browser.desktop')
if os.path.exists(start):
with open(start) as fd:
e = [line for line in fd.read().split('\n') if line.startswith('Exec')]
if e:
try:
base = os.path.dirname(e[0].split('"')[1])
path = os.path.join(base, 'TorBrowser', 'Tor', 'tor')
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
except:
pass
return distutils.spawn.find_executable('tor')
def run(self):
defaults, torrc = self.create_torrc()
tor = self.get_tor()
if not tor:
self._status.append('No tor binary found. Please install TorBrowser or tor')
else:
cmd = [tor, '--defaults-torrc', defaults, '-f', torrc]
self.p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
for line in self.p.stdout:
self._status.append(line)
self.p = None
def shutdown(self):
if self.p:
self.p.kill()
def status(self, max_lines=50):
return ''.join(self._status[-max_lines:])
class Tor(object):
_shutdown = False
connected = False
controller = None
daemon = None
socks_port = 9150
def __init__(self):
if not self.connect():
self.reconnect()
def connect(self):
self.connected = False
self.dir = os.path.join(settings.config_path, 'tor')
connected = False
for port in (9831, 9151):
try:
self.controller = Controller.from_port('127.0.0.1', port)
connected = True
break
except stem.SocketError:
pass
if not connected:
if not self.daemon:
logger.debug("start own tor process")
self.daemon = TorDaemon()
logger.debug("daemon %s", self.daemon)
return self.connect()
logger.debug("Failed to connect to system or own tor process.")
return False
try:
self.controller.authenticate()
except stem.connection.MissingPassword:
logger.debug("TOR requires a password")
return False
except stem.connection.PasswordAuthFailed:
logger.debug("invalid tor password")
return False
self.controller.add_event_listener(self.event_listener)
self.controller.add_status_listener(self.status_listener)
self.connected = True
self.socks_port = int(self.controller.get_conf('SocksPort').split(' ')[0])
self.publish()
state.online = True
return True
def reconnect(self):
if not self.connect():
if state.main:
state.main.call_later(1, self.reconnect)
def status_listener(self, controller, status, timestamp):
if status == 'Closed':
if not self._shutdown:
self.connected = False
state.online = False
self.reconnect()
else:
logger.debug('unknonw change %s', status)
def event_listener(self, event):
print('EVENT', event)
def shutdown(self):
self._shutdown = True
try:
self.unpublish()
if self.controller:
#self.controller.remove_event_listener(self.connection_change)
self.controller.close()
if self.daemon:
self.daemon.shutdown()
except:
logger.debug('shutdown exception', exc_info=1)
pass
self.connected = False
def publish(self):
logger.debug("publish tor node")
if not self.connected:
return False
controller = self.controller
logger.debug("FIXME: dont remove/add service if already defined")
controller.remove_hidden_service(self.dir)
result = controller.create_hidden_service(
self.dir,
settings.server_defaults['node_port'],
target_port=settings.server['node_port']
)
logger.debug('published node as https://%s:%s', result.hostname, settings.server_defaults['node_port'])
'''
with open(settings.ssl_key_path) as fd:
key_content = fd.read()
ports = {9851: settings.server['node_port']}
response = controller.create_ephemeral_hidden_service(ports,
key_type='RSA1024', key_content=key_content,
detached=True, await_publication = True)
logger.debug('published node as https://%s.onion:%s',
settings.USER_ID, settings.server_defaults['node_port'])
'''
def unpublish(self):
if not self.connected:
return False
if self.controller:
self.controller.remove_hidden_service(self.dir)
state.online = False
def is_online(self):
return self.connected and self.controller.is_alive()

122
oml/tor_request.py Normal file
View file

@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import ssl
import http.client
import urllib.request, urllib.error, urllib.parse
import logging
import socks
import socket
import settings
import state
from utils import get_service_id, get_local_ipv4
logger = logging.getLogger('oml.tor_request')
class InvalidCertificateException(http.client.HTTPException, urllib.error.URLError):
def __init__(self, service_id, cert, reason):
http.client.HTTPException.__init__(self)
self._service_id = service_id
self._cert_service_id = get_service_id(cert=cert)
self.reason = reason
def __str__(self):
return ('%s (local) != %s (remote) (%s)\n' %
(self._service_id, self._cert_service_id, self.reason))
def is_local(host):
local_net = get_local_ipv4()[:-2]
return host.startswith('127.0.0.1') or host.startswith(local_net)
def getaddrinfo(*args):
return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))]
def create_tor_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None):
host, port = address
err = None
af = socket.AF_INET
socktype = socket.SOCK_STREAM
proto = 6
sa = address
sock = None
try:
sock = socks.socksocket(af, socktype, proto)
if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
sock.set_proxy(socks.SOCKS5, "localhost", state.tor.socks_port, True)
if source_address:
sock.bind(source_address)
sock.connect(sa)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise sock.error("getaddrinfo returns an empty list")
class TorHTTPSConnection(http.client.HTTPSConnection):
def __init__(self, host, port=None, service_id=None, check_hostname=None, context=None, **kwargs):
self._service_id = service_id
if self._service_id:
context = ssl._create_default_https_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.load_cert_chain(settings.ssl_cert_path, settings.ssl_key_path)
context.load_default_certs()
http.client.HTTPSConnection.__init__(self, host, port,
check_hostname=check_hostname, context=context, **kwargs)
if not is_local(host):
self._create_connection = create_tor_connection
def _check_service_id(self, cert):
service_id = get_service_id(cert=cert)
logger.debug('ssl service_id: %s (match: %s)', service_id, service_id == self._service_id)
if service_id != self._service_id:
logger.debug('expected service_id: %s', self._service_id)
return service_id == self._service_id
def connect(self):
http.client.HTTPSConnection.connect(self)
if self._service_id:
cert = self.sock.getpeercert(binary_form=True)
if not self._check_service_id(cert):
raise InvalidCertificateException(self._service_id, cert,
'service_id mismatch')
#logger.debug('CIPHER %s VERSION %s', self.sock.cipher(), self.sock.ssl_version)
class TorHTTPSHandler(urllib.request.HTTPSHandler):
def __init__(self, debuglevel=0, context=None, check_hostname=None, service_id=None):
urllib.request.AbstractHTTPHandler.__init__(self, debuglevel)
self._context = context
self._check_hostname = check_hostname
self._service_id = service_id
def https_open(self, req):
return self.do_open(TorHTTPSConnection, req,
context=self._context, check_hostname=self._check_hostname,
service_id=self._service_id)
class TorHTTPConnection(http.client.HTTPConnection):
def __init__(self, host, port=None, **kwargs):
http.client.HTTPConnection.__init__(self, host, port, **kwargs)
if not is_local(host):
self._create_connection = create_tor_connection
class TorHTTPHandler(urllib.request.HTTPHandler):
def http_open(self, req):
return self.do_open(TorHTTPConnection, req)
def get_opener(service_id=None):
handler = TorHTTPSHandler(service_id=service_id)
opener = urllib.request.build_opener(handler, TorHTTPHandler())
return opener

View file

@ -297,7 +297,7 @@ def requestPeering(data):
nickname (optional)
}
'''
if len(data.get('id', '')) != 43:
if len(data.get('id', '')) != 16:
logger.debug('invalid user id')
return {}
u = models.User.get_or_create(data['id'])
@ -321,7 +321,7 @@ def acceptPeering(data):
message
}
'''
if len(data.get('id', '')) != 43:
if len(data.get('id', '')) != 16:
logger.debug('invalid user id')
return {}
logger.debug('acceptPeering... %s', data)
@ -341,7 +341,7 @@ def rejectPeering(data):
message
}
'''
if len(data.get('id', '')) != 43:
if len(data.get('id', '')) != 16:
logger.debug('invalid user id')
return {}
u = models.User.get_or_create(data['id'])
@ -360,7 +360,7 @@ def removePeering(data):
message
}
'''
if len(data.get('id', '')) != 43:
if len(data.get('id', '')) != 16:
logger.debug('invalid user id')
return {}
u = models.User.get_or_create(data['id'])
@ -377,7 +377,7 @@ def cancelPeering(data):
takes {
}
'''
if len(data.get('id', '')) != 43:
if len(data.get('id', '')) != 16:
logger.debug('invalid user id')
return {}
u = models.User.get_or_create(data['id'])

View file

@ -12,11 +12,22 @@ import socket
import io
import gzip
import time
import hashlib
from datetime import datetime
import subprocess
import base64
import ox
import ed25519
from OpenSSL.crypto import (
load_privatekey, load_certificate,
dump_privatekey, dump_certificate,
FILETYPE_ASN1, FILETYPE_PEM, PKey, TYPE_RSA,
X509, X509Extension
)
from Crypto.PublicKey import RSA
from Crypto.Util.asn1 import DerSequence
from meta.utils import normalize_isbn, find_isbns
@ -128,6 +139,79 @@ def valid(key, value, sig):
return False
return True
def get_user_id(private_key, cert_path):
if os.path.exists(private_key):
with open(private_key) as fd:
key = load_privatekey(FILETYPE_PEM, fd.read())
if key.bits() != 1024:
os.unlink(private_key)
else:
user_id = get_service_id(private_key)
if not os.path.exists(private_key):
if os.path.exists(cert_path):
os.unlink(cert_path)
folder = os.path.dirname(private_key)
if not os.path.exists(folder):
os.makedirs(folder)
os.chmod(folder, 0o700)
key = PKey()
key.generate_key(TYPE_RSA, 1024)
with open(private_key, 'wb') as fd:
os.chmod(private_key, 0o600)
fd.write(dump_privatekey(FILETYPE_PEM, key))
os.chmod(private_key, 0o400)
user_id = get_service_id(private_key)
if not os.path.exists(cert_path):
ca = X509()
ca.set_version(2)
ca.set_serial_number(1)
ca.get_subject().CN = user_id
ca.gmtime_adj_notBefore(0)
ca.gmtime_adj_notAfter(24 * 60 * 60)
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
ca.add_extensions([
X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
X509Extension(b"nsCertType", True, b"sslCA"),
X509Extension(b"extendedKeyUsage", True,
b"serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"),
X509Extension(b"keyUsage", False, b"keyCertSign, cRLSign"),
X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=ca),
])
ca.sign(key, "sha256")
with open(cert_path, 'wb') as fd:
fd.write(dump_certificate(FILETYPE_PEM, ca))
return user_id
def get_service_id(private_key_file=None, cert=None):
'''
service_id is the first half of the sha1 of the rsa public key encoded in base32
'''
if private_key_file:
with open(private_key_file, 'rb') as fd:
private_key = fd.read()
public_key = RSA.importKey(private_key).publickey().exportKey('DER')[22:]
# compute sha1 of public key and encode first half in base32
service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode()
'''
# compute public key from priate key and export in DER format
# ignoring the SPKI header(22 bytes)
key = load_privatekey(FILETYPE_PEM, private_key)
cert = X509()
cert.set_pubkey(key)
public_key = dump_privatekey(FILETYPE_ASN1, cert.get_pubkey())[22:]
# compute sha1 of public key and encode first half in base32
service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode()
'''
elif cert:
# compute sha1 of public key and encode first half in base32
key = load_certificate(FILETYPE_ASN1, cert).get_pubkey()
pub_der = DerSequence()
pub_der.decode(dump_privatekey(FILETYPE_ASN1, key))
public_key = RSA.construct((pub_der._seq[1], pub_der._seq[2])).exportKey('DER')[22:]
service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode()
return service_id
def get_public_ipv6():
try:
host = ('2a01:4f8:120:3201::3', 25519)

View file

@ -6,3 +6,5 @@ html5lib
git+http://git.0x2620.org/python-ox.git#egg=python-ox
python-stdnum==0.9
PyPDF2==1.23
pysocks
stem

View file

@ -1,5 +1,6 @@
lxml
simplejson
ed25519>=1.3
ed25519>=1.4
SQLAlchemy==0.9.7
pyopenssl>=0.14
pyopenssl>=0.15
pyCrypto>=2.6.1

View file

@ -990,7 +990,7 @@ oml.updateFilterMenus = function() {
};
oml.validatePublicKey = function(value) {
return /^[A-Za-z0-9+\/]{43}$/.test(value);
return /^[a-z0-9+\/]{16}$/.test(value);
};
oml.updateDebugMenu = function() {