commit fabac5da4ac00f23fc465662890829aadb179afc Author: j Date: Tue Aug 26 21:07:33 2014 +0200 peerlink diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..131c935 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.pyc +._* +.*.swp +build +dist +config diff --git a/example/server.py b/example/server.py new file mode 100644 index 0000000..794411e --- /dev/null +++ b/example/server.py @@ -0,0 +1,104 @@ +#!/usr/bin/python + +import json +import os +import shutil +import SimpleHTTPServer +import socket +import SocketServer +import sys +import urllib2 + +peers = {} + +NETBASE='http://[::1]:8842/' +NAME = 'chat' + +def remote(peer, action, data): + url = NETBASE + '%s/%s/%s' % (peer, NAME, action) + if data and not isinstance(data, str): + data = json.dumps(data) + opener = urllib2.build_opener() + req = urllib2.Request(url, data=data, headers={ + 'Content-Type': 'application/json', + 'User-Agent': 'ChatServer/0.0' + }) + response = opener.open(req) + return response.read() + + +def add_service(name, url): + add = NETBASE + 'add' + urllib2.urlopen(add, json.dumps({'name': name, 'url': url})) + +class Handler(SimpleHTTPServer.SimpleHTTPRequestHandler): + + def do_GET(self): + print 'GET', self.path + path = os.path.join('static', self.path[1:] if self.path != '/' else 'index.html') + if os.path.exists(path): + with open(path) as fd: + shutil.copyfileobj(fd, self.wfile) + + def _remote_request(self, action, data): + response = {} + if action == 'message': + pass + elif action == 'ping': + print self.headers + response['userid'] = self.headers.getheader('From') + response['remote ping'] = True + response['data'] = data + return response + + def _request(self, action, data): + response = {} + if action == 'test': + response['test'] = 'ok' + response['data'] = data + elif action in ('ping', 'pong'): + id = data['id'] + del data['id'] + response = remote(id, action, data) + return response + + def do_POST(self): + print 'POST', self.path + length = int(self.headers.getheader('content-length')) + body = self.rfile.read(length) + data = json.loads(body) + if self.path.startswith('/remote'): + action = self.path.split('/')[2] + response = self._remote_request(action, data) + else: + action = self.path.split('/')[1] + response = self._request(action, data) + + response = json.dumps(response, indent=2) + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', str(len(response))) + self.end_headers() + self.wfile.write(response) + +class Server(SocketServer.ThreadingMixIn, SocketServer.TCPServer): + ''' + IPv4/IPv6 Dual Stack + ''' + address_family = socket.AF_INET6 + allow_reuse_address = True + + def server_bind(self): + self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) + SocketServer.TCPServer.server_bind(self) + +if __name__ == '__main__': + if len(sys.argv) == 2: + port = int(sys.argv[1]) + else: + port = 8000 + print "listening on port", port + url = 'http://127.0.0.1:%s/remote/' % port + add_service(NAME, url) + httpd = Server(("", port), Handler) + httpd.serve_forever() diff --git a/example/static/index.html b/example/static/index.html new file mode 100644 index 0000000..b063a6c --- /dev/null +++ b/example/static/index.html @@ -0,0 +1,32 @@ + + + + + + + + + + + + + diff --git a/peerlink/__init__.py b/peerlink/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/peerlink/__main__.py b/peerlink/__main__.py new file mode 100644 index 0000000..6bc39a0 --- /dev/null +++ b/peerlink/__main__.py @@ -0,0 +1,2 @@ +import server +server.run() diff --git a/peerlink/directory.py b/peerlink/directory.py new file mode 100644 index 0000000..7844435 --- /dev/null +++ b/peerlink/directory.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +# DHT placeholder +from __future__ import division + +import json + +import ed25519 +import requests + +import settings + +import logging +logger = logging.getLogger('net.directory') + + +base = settings.server['directory_service'] + +def get(vk): + if isinstance(vk, str): + id = vk + else: + id = vk.to_ascii(encoding='base64') + url ='%s/%s' % (base, id) + headers = { + 'User-Agent': settings.USER_AGENT + } + r = requests.get(url, headers=headers) + sig = r.headers.get('X-Ed25519-Signature') + data = r.content + if sig and data: + vk = ed25519.VerifyingKey(id, encoding='base64') + try: + vk.verify(sig, data, encoding='base64') + data = json.loads(data) + except ed25519.BadSignatureError: + logger.debug('invalid signature') + + data = None + return data + +def put(sk, data): + id = sk.get_verifying_key().to_ascii(encoding='base64') + data = json.dumps(data) + sig = sk.sign(data, encoding='base64') + url ='%s/%s' % (base, id) + headers = { + 'User-Agent': settings.USER_AGENT, + 'X-Ed25519-Signature': sig + } + try: + r = requests.put(url, data, headers=headers, timeout=2) + except: + import traceback + logger.info('directory.put failed: %s', data) + traceback.print_exc() + return False + return r.status_code == 200 diff --git a/peerlink/localnodes.py b/peerlink/localnodes.py new file mode 100644 index 0000000..48a7d23 --- /dev/null +++ b/peerlink/localnodes.py @@ -0,0 +1,221 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +from __future__ import division + +from threading import Thread +import json +import socket +import struct +import thread +import time + +from settings import server, USER_ID, sk, ENCODING +from utils import valid, get_public_ipv6, get_local_ipv4, get_interface + +import logging +logger = logging.getLogger('localnodes') + + +def can_connect(data): + try: + if ':' in data['host']: + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + else: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(1) + s.connect((data['host'], data['port'])) + s.close() + return True + except: + pass + logger.debug('can_connect failed') + return False + +class LocalNodesBase(Thread): + + _PORT = 9851 + _TTL = 1 + + def __init__(self, nodes): + self._socket = None + self._active = True + self._nodes = nodes + Thread.__init__(self) + if not server['localnode_discovery']: + return + self.daemon = True + self.start() + + def get_packet(self): + message = json.dumps({ + 'host': self.host, + 'port': server['node_port'], + 'cert': server['cert'] + }) + sig = sk.sign(message, encoding=ENCODING) + packet = json.dumps([sig, USER_ID, message]) + return packet + + def get_socket(self): + pass + + def send(self): + pass + + def receive(self): + last = time.mktime(time.localtime()) + while self._active: + try: + s = self.get_socket() + s.settimeout(2) + s.bind(('', self._PORT)) + while self._active: + data, addr = s.recvfrom(1024) + if self._active: + while data[-1] == '\0': + data = data[:-1] # Strip trailing \0's + data = self.verify(data) + if data: + self.update_node(data) + except socket.timeout: + now = time.mktime(time.localtime()) + if now - last > 60: + last = now + thread.start_new_thread(self.send, ()) + except: + logger.debug('receive failed. restart later', exc_info=1) + time.sleep(10) + + def verify(self, data): + try: + packet = json.loads(data) + except: + return None + if len(packet) == 3: + sig, user_id, data = packet + if valid(user_id, data, sig): + message = json.loads(data) + message['id'] = user_id + for key in ['id', 'host', 'port', 'cert']: + if key not in message: + return None + return message + + def update_node(self, data): + #logger.debug('update node %s', data) + if data['id'] != USER_ID: + if data['id'] not in self._nodes: + thread.start_new_thread(self.new_node, (data, )) + elif can_connect(data): + self._nodes[data['id']] = data + + def get(self, user_id): + if user_id in self._nodes: + if can_connect(self._nodes[user_id]): + return self._nodes[user_id] + + def new_node(self, data): + logger.debug('new node %s', data) + if can_connect(data): + self._nodes[data['id']] = data + self.send() + + def get_ip(self): + pass + + def run(self): + self.host = self.get_ip() + self.send() + self.receive() + + def join(self): + self._active = False + if self._socket: + try: + self._socket.shutdown(socket.SHUT_RDWR) + except: + pass + self._socket.close() + return Thread.join(self) + +class LocalNodes4(LocalNodesBase): + + _BROADCAST = "239.255.255.250" + _TTL = 1 + + def send(self): + logger.debug('send4') + packet = self.get_packet() + sockaddr = (self._BROADCAST, self._PORT) + s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL) + try: + s.sendto(packet + '\0', sockaddr) + except: + logger.debug('LocalNodes4.send failed', exc_info=1) + s.close() + + def get_socket(self): + s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + mreq = struct.pack("=4sl", socket.inet_aton(self._BROADCAST), socket.INADDR_ANY) + s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + self._socket = s + return s + + def get_ip(self): + return get_local_ipv4() + +class LocalNodes6(LocalNodesBase): + + _BROADCAST = "ff02::1" + + def send(self): + logger.debug('send6') + packet = self.get_packet() + ttl = struct.pack('@i', self._TTL) + address = self._BROADCAST + get_interface() + addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) + addr = addrs[0] + (family, socktype, proto, canonname, sockaddr) = addr + s = socket.socket(family, socktype, proto) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl) + try: + s.sendto(packet + '\0', sockaddr) + except: + logger.debug('LocalNodes6.send failed', exc_info=1) + s.close() + + def get_socket(self): + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + group_bin = socket.inet_pton(socket.AF_INET6, self._BROADCAST) + '\0'*4 + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, group_bin) + self._socket = s + return s + + def get_ip(self): + return get_public_ipv6() + +class LocalNodes(object): + + _nodes4 = None + _nodes6 = None + + def __init__(self): + self._nodes = {} + if not server['localnode_discovery']: + return + self._nodes4 = LocalNodes4(self._nodes) + self._nodes6 = LocalNodes6(self._nodes) + + def get(self, user_id): + if user_id in self._nodes: + if can_connect(self._nodes[user_id]): + return self._nodes[user_id] + + def join(self): + if self._nodes4: + self._nodes4.join() + if self._nodes6: + self._nodes6.join() diff --git a/peerlink/nodes.py b/peerlink/nodes.py new file mode 100644 index 0000000..62acf87 --- /dev/null +++ b/peerlink/nodes.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +from __future__ import division + +import directory +from localnodes import LocalNodes + +import logging +logger = logging.getLogger('lookup') + + +class Nodes(object): + _nodes = {} + _local = None + + def __init__(self): + self._local = LocalNodes() + + def get(self, user_id): + # local nodes + node = self._local.get(user_id) + # local cache + if user_id in self._nodes: + node = self._nodes[user_id] + # directory + if not node: + try: + node = directory.get(user_id) + except: + logger.debug('directory failed', exc_info=1) + node = None + if node: + node['url'] = self._url(node) + self._nodes[user_id] = node + return node + + def _url(self, node): + host = node['host'] + port = node['port'] + if ':' in host: + url = 'https://[%s]:%s' % (host, port) + else: + url = 'https://%s:%s' % (host, port) + return url + + def url(self, user_id): + node = self.get(user_id) + if node: + url = node['url'] + else: + url = None + logger.debug('resolved %s -> %s', user_id, url) + return url + + def fingerprint(self, user_id): + node = self.get(user_id) + if node: + return node['cert'] + return None + diff --git a/peerlink/nodeserver.py b/peerlink/nodeserver.py new file mode 100644 index 0000000..b681042 --- /dev/null +++ b/peerlink/nodeserver.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 + +import json + +import tornado.web +from tornado.httpserver import HTTPServer +from tornado.ioloop import PeriodicCallback + +from proxy import ProxyHandler +from utils import get_public_ipv6, valid +import directory +import settings +import state + +import logging +logger = logging.getLogger('nodeserver') + + +class NodeHandler(ProxyHandler): + + def validate_request(self): + if 'From' in self.request.headers: + del self.request.headers['From'] + sig = self.request.headers.get('X-Ed25519-Signature') + key = self.request.headers.get('X-Ed25519-Key') + if sig or key: + try: + is_valid = valid(key, self.request.body, sig) + except: + is_valid = False + if is_valid: + self.request.headers['From'] = key + return is_valid + return True + + @tornado.web.asynchronous + def _handle_response(self, response): + # sign json responses from local services + if not response.error and \ + response.headers.get('Content-Type') == 'application/json': + if response.body: + response.data = response.body.read() + response.body = None + sig = settings.sk.sign(response.data, encoding=settings.ENCODING) + response.headers['X-Ed25519-Key'] = settings.USER_ID + response.headers['X-Ed25519-Signature'] = sig + return ProxyHandler._handle_response(self, response) + + def remote_url(self): + if not self.validate_request(): + url = None + self.set_status(403) + self.write(json.dumps({'status': 'Invalid Signature'})) + self.finish() + return None + try: + service, uri = self.request.uri[1:].split('/', 1) + except: + logger.debug('Invalid Request %s', self.request.uri) + return None + if service in settings.services: + url = settings.services[service] + uri + else: + url = None + self.set_status(404) + self.write(json.dumps({'status': 'unknown app'})) + self.finish() + return url + +def publish_node(): + update_online() + state._online = PeriodicCallback(update_online, 60000) + state._online.start() + +def update_online(): + host = get_public_ipv6() + if not host: + state.online = False + else: + if host != state.host: + state.host = host + online = directory.put(settings.sk, { + 'host': host, + 'port': settings.server['node_port'], + 'cert': settings.server['cert'] + }) + state.online = online + +def start(): + application = tornado.web.Application([ + (r".*", NodeHandler), + ], gzip=True) + http_server = HTTPServer(application, ssl_options={ + "certfile": settings.tls_cert_path, + "keyfile": settings.tls_key_path + }) + http_server.listen(settings.server['node_port'], settings.server['node_address']) + state.main.add_callback(publish_node) + return http_server diff --git a/peerlink/pdict.py b/peerlink/pdict.py new file mode 100644 index 0000000..020d5a4 --- /dev/null +++ b/peerlink/pdict.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 + +import os +import json + +class pdict(dict): + def __init__(self, path, defaults=None): + self._path = None + self._defaults = defaults + if os.path.exists(path): + with open(path) as fd: + _data = json.load(fd) + for key in _data: + self[key] = _data[key] + self._path = path + + def _save(self): + if self._path: + with open(self._path, 'w') as fd: + json.dump(self, fd, indent=1) + + def get(self, key, default=None): + if default == None and self._defaults: + default = self._defaults.get(key) + return dict.get(self, key, default) + + def __getitem__(self, key): + if key not in self and self._defaults and key in self._defaults: + return self._defaults[key] + return dict.__getitem__(self, key) + + def __setitem__(self, key, value): + dict.__setitem__(self, key, value) + self._save() + + def __delitem__(self, key): + dict.__delitem__(self, key) + self._save() + diff --git a/peerlink/proxy.py b/peerlink/proxy.py new file mode 100644 index 0000000..3e5ff68 --- /dev/null +++ b/peerlink/proxy.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 + +import tornado.web +import tornado.httpclient +import tornado.gen + +from utils import run_async +import tls + +import logging +logger = logging.getLogger('proxy') + + +class ProxyHandler(tornado.web.RequestHandler): + SUPPORTED_METHODS = ['GET', 'POST', 'PUT', 'DELETE'] + + @tornado.web.asynchronous + def _handle_response(self, response): + if response.error: + logger.debug('ERROR %s', str(response.error)) + self.set_status(response.code) + if response.body: + self.write(response.body) + else: + self.write('Internal server error:\n' + str(response.error)) + else: + if response.code: + self.set_status(response.code) + allowed_headers = ( + 'X-Ed25519-Key', + 'X-Ed25519-Signature', + 'Accept-Ranges', + 'Cache-Control', + 'Connection', + 'Content-Encoding', + 'Content-Length', + 'Content-Range', + 'Content-Type', + 'Date', + 'ETag', + 'Last-Modified', + 'Location', + 'Range', + 'Server', + 'Vary' + ) + for header in allowed_headers: + v = response.headers.get(header) + if v: + self.set_header(header, v) + + ignored = set(response.headers.keys()) - set([h.lower() for h in allowed_headers]) + if ignored: + print 'IGNORED', ignored + + if response.data: + self.write(response.data) + elif response.body: + chunk = True + while chunk: + chunk = response.body.read(4096) + self.write(chunk) + self.finish() + + @run_async + def _fetch_response(self, url, fingerprint, callback): + response = tls.read(url, self.request.body, self.request.headers, fingerprint) + callback(response) + + def remote_url(self): + self.set_status(500) + self.write('Internal server error:\n') + self.finish() + return None + + @tornado.web.asynchronous + @tornado.gen.coroutine + def proxy(self): + url = self.remote_url() + if isinstance(url, tuple): + url, fingerprint = url + else: + fingerprint = None + if url: + logger.debug('request to %s', url) + response = yield tornado.gen.Task(self._fetch_response, url, fingerprint) + self._handle_response(response) + + @tornado.web.asynchronous + def get(self): + return self.proxy() + + @tornado.web.asynchronous + def post(self): + return self.proxy() + + @tornado.web.asynchronous + def put(self): + return self.proxy() + + @tornado.web.asynchronous + def delete(self): + return self.proxy() + diff --git a/peerlink/server.py b/peerlink/server.py new file mode 100644 index 0000000..282c51d --- /dev/null +++ b/peerlink/server.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +from __future__ import division, print_function + +import json +import os +import signal +import sys + +from tornado.httpserver import HTTPServer +from tornado.ioloop import IOLoop +from tornado.web import Application +import tornado + +from proxy import ProxyHandler +import nodes +import nodeserver +import settings +import state + +import logging +logger = logging.getLogger('server') + + +class ServiceHandler(tornado.web.RequestHandler): + + def post(self, action): + data = json.loads(self.request.body) + if action == 'add': + settings.services[data['name']] = data['url'] + response = json.dumps({'status': 200}) + elif action == 'remove': + if data['name'] in settings.services: + del settings.services[data['name']] + response = json.dumps({'status': 200}) + else: + self.set_status(500) + response = 'Unsupported action' + self.write(response) + self.finish() + +class RequestHandler(ProxyHandler): + + def sign(self): + if self.request.body: + sig = settings.sk.sign(self.request.body, encoding=settings.ENCODING) + self.request.headers['X-Ed25519-Key'] = settings.USER_ID + self.request.headers['X-Ed25519-Signature'] = sig + + def remote_url(self): + try: + user_id, uri = self.request.uri[1:].split('/', 1) + except: + return None + node = state.nodes.get(user_id) + if node: + self.sign() + url = node['url'] + '/' + uri + return url, node['cert'] + else: + self.set_status(404) + self.write(json.dumps({'status': 'unknown peer'})) + self.finish() + return None + +def run(): + root_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')) + os.chdir(root_dir) + PID = sys.argv[1] if len(sys.argv) > 1 else None + + if not PID: + logging.basicConfig(level=logging.DEBUG) + + options = { + 'debug': False, + } + handlers = [ + (r'/(add|remove)', ServiceHandler), + (r".*", RequestHandler), + ] + + http_server = HTTPServer(Application(handlers, **options)) + + http_server.listen(settings.server['port'], settings.server['address']) + + if PID: + with open(PID, 'w') as pid: + pid.write('%s' % os.getpid()) + + state.main = IOLoop.instance() + state.node = nodeserver.start() + def start_node(): + state.nodes = nodes.Nodes() + state.main.add_callback(start_node) + + if ':' in settings.server['address']: + host = '[%s]' % settings.server['address'] + elif not settings.server['address']: + host = '[::1]' + else: + host = settings.server['address'] + url = 'http://%s:%s/' % (host, settings.server['port']) + print('open browser at %s' % url) + + def shutdown(): + state.node.stop() + http_server.stop() + + signal.signal(signal.SIGTERM, shutdown) + + try: + state.main.start() + except: + print('shutting down...') + shutdown() diff --git a/peerlink/settings.py b/peerlink/settings.py new file mode 100644 index 0000000..376a979 --- /dev/null +++ b/peerlink/settings.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 + +import os +import ed25519 + +from pdict import pdict + + +base_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')) + +config_path = os.path.normpath(os.path.join(base_dir, 'config')) +if not os.path.exists(config_path): + os.makedirs(config_path) + +nodes_path = os.path.join(config_path, 'nodes.db') +key_path = os.path.join(config_path, 'node.key') +tls_cert_path = os.path.join(config_path, 'node.tls.crt') +tls_key_path = os.path.join(config_path, 'node.tls.key') + + +defaults = { + "address": "::1", + "port": 8842, + "node_address": "", + "node_port": 8851, + "cert": "", + "directory_service": "http://[2a01:4f8:120:3201::3]:25519", + "localnode_discovery": True +} +server = pdict(os.path.join(config_path, 'settings.json'), defaults) +services = pdict(os.path.join(config_path, 'services.json'), {}) + +if os.path.exists(key_path): + with open(key_path) as fd: + sk = ed25519.SigningKey(fd.read()) + vk = sk.get_verifying_key() +else: + sk, vk = ed25519.create_keypair() + with open(key_path, 'w') as fd: + os.chmod(key_path, 0600) + fd.write(sk.to_bytes()) + os.chmod(key_path, 0400) + +ENCODING='base64' +USER_ID = vk.to_ascii(encoding=ENCODING) + +if not os.path.exists(tls_cert_path): + import tls + server['cert'] = tls.generate_tls() + + +VERSION="0.0" +USER_AGENT = 'PeerLink/%s' % VERSION + +TIMEOUT = 5 diff --git a/peerlink/state.py b/peerlink/state.py new file mode 100644 index 0000000..1b24495 --- /dev/null +++ b/peerlink/state.py @@ -0,0 +1,2 @@ +main = None +host = None diff --git a/peerlink/tls.py b/peerlink/tls.py new file mode 100644 index 0000000..f227067 --- /dev/null +++ b/peerlink/tls.py @@ -0,0 +1,170 @@ +import httplib +import socket +import urllib2 +import ssl +import hashlib +import os + +import OpenSSL +from utils import valid + +import settings +from settings import ENCODING + +import logging +logger = logging.getLogger('tls') + +def get_fingerprint(): + with open(settings.tls_cert_path) as fd: + data = fd.read() + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, data) + return hashlib.sha1(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)).hexdigest() + +def generate_tls(): + key = OpenSSL.crypto.PKey() + key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) + with open(settings.tls_key_path, 'wb') as fd: + os.chmod(settings.tls_key_path, 0600) + fd.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)) + os.chmod(settings.tls_key_path, 0400) + + ca = OpenSSL.crypto.X509() + ca.set_version(2) + ca.set_serial_number(1) + ca.get_subject().CN = settings.USER_ID + ca.gmtime_adj_notBefore(0) + ca.gmtime_adj_notAfter(24 * 60 * 60) + ca.set_issuer(ca.get_subject()) + ca.set_pubkey(key) + ca.add_extensions([ + OpenSSL.crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:0"), + OpenSSL.crypto.X509Extension("nsCertType", True, "sslCA"), + OpenSSL.crypto.X509Extension("extendedKeyUsage", True, + "serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"), + OpenSSL.crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"), + OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=ca), + ]) + ca.sign(key, "sha1") + with open(settings.tls_cert_path, 'wb') as fd: + fd.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) + return get_fingerprint() + +class InvalidCertificateException(httplib.HTTPException, urllib2.URLError): + def __init__(self, fingerprint, cert, reason): + httplib.HTTPException.__init__(self) + self.fingerprint = fingerprint + self.cert_fingerprint = hashlib.sha1(cert).hexdigest() + self.reason = reason + + def __str__(self): + return ('%s (local) != %s (remote) (%s)\n' % + (self.fingerprint, self.cert_fingerprint, self.reason)) + +class CertValidatingHTTPSConnection(httplib.HTTPConnection): + default_port = httplib.HTTPS_PORT + + def __init__(self, host, port=None, fingerprint=None, strict=None, **kwargs): + httplib.HTTPConnection.__init__(self, host, port, strict, **kwargs) + self.fingerprint = fingerprint + if self.fingerprint: + self.cert_reqs = ssl.CERT_REQUIRED + else: + self.cert_reqs = ssl.CERT_NONE + self.cert_reqs = ssl.CERT_NONE + + def _ValidateCertificateFingerprint(self, cert): + fingerprint = hashlib.sha1(cert).hexdigest() + return fingerprint == self.fingerprint + + def connect(self): + sock = socket.create_connection((self.host, self.port)) + self.sock = ssl.wrap_socket(sock, cert_reqs=self.cert_reqs) + #if self.cert_reqs & ssl.CERT_REQUIRED: + if self.fingerprint: + cert = self.sock.getpeercert(binary_form=True) + if not self._ValidateCertificateFingerprint(cert): + raise InvalidCertificateException(self.fingerprint, cert, + 'fingerprint mismatch') + #logger.debug('CIPHER %s VERSION %s', self.sock.cipher(), self.sock.ssl_version) + +class VerifiedHTTPSHandler(urllib2.HTTPSHandler): + def __init__(self, **kwargs): + urllib2.AbstractHTTPHandler.__init__(self) + self._connection_args = kwargs + + def https_open(self, req): + def http_class_wrapper(host, **kwargs): + full_kwargs = dict(self._connection_args) + full_kwargs.update(kwargs) + return CertValidatingHTTPSConnection(host, **full_kwargs) + + try: + return self.do_open(http_class_wrapper, req) + except urllib2.URLError, e: + if type(e.reason) == ssl.SSLError and e.reason.args[0] == 1: + raise InvalidCertificateException(self.fingerprint, '', + e.reason.args[1]) + raise + + https_request = urllib2.HTTPSHandler.do_request_ + +def get_opener(fingerprint): + handler = VerifiedHTTPSHandler(fingerprint=fingerprint) + opener = urllib2.build_opener(handler) + return opener + +class Response(object): + headers = {} + error = None + body = None + data = None + user = None + code = 200 + +def read(url, body=None, headers={}, fingerprint=None): + if not body: + body = None + opener = get_opener(fingerprint) + headers = dict(headers) + request = urllib2.Request(url, data=body, headers=headers) + response = Response() + logger.debug('open %s [%s]', url, fingerprint) + logger.debug('headers: %s', headers) + try: + r = opener.open(request, timeout=settings.TIMEOUT) + except urllib2.HTTPError as e: + response.code = e.code + if e.code >= 500: + logger.debug('urllib2.HTTPError %s %s', e, e.code) + response.error = e + else: + response.headers = e.headers + response.body = e.read() + return response + except urllib2.URLError as e: + logger.debug('urllib2.URLError %s', e) + response.error = e + response.code = 500 + return response + except: + logger.debug('unknown url error', exc_info=1) + response.error = 'unkown url error' + response.code = 500 + return response + + response.headers = r.headers + response.code = r.getcode() + sig = r.headers.get('X-Ed25519-Signature') + if sig: + key = r.headers.get('X-Ed25519-Key') + data = r.read() + if valid(key, data, sig): + response.data = data + response.user = key + else: + response.error = 'Invalid Signature' + response.code = 500 + else: + response.body = r + logger.debug('response headers: %s', dict(r.headers)) + return response diff --git a/peerlink/utils.py b/peerlink/utils.py new file mode 100644 index 0000000..9717ca3 --- /dev/null +++ b/peerlink/utils.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +from __future__ import division + +import os +import sys +import socket +import time +from datetime import datetime +import subprocess +from threading import Thread +from functools import wraps + +import ed25519 + +import logging +logger = logging.getLogger('oml.utils') + +from settings import ENCODING + +def valid(key, value, sig): + ''' + validate that value was signed by key + ''' + vk = ed25519.VerifyingKey(str(key), encoding=ENCODING) + try: + vk.verify(str(sig), str(value), encoding=ENCODING) + #except ed25519.BadSignatureError: + except: + return False + return True + +def get_public_ipv6(): + try: + host = ('2a01:4f8:120:3201::3', 25519) + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s.settimeout(1) + s.connect(host) + ip = s.getsockname()[0] + s.close() + except: + ip = None + return ip + +def get_interface(): + interface = '' + if sys.platform == 'darwin' or sys.platform.startswith('freebsd'): + #cmd = ['/usr/sbin/netstat', '-rn'] + cmd = ['/sbin/route', '-n', 'get', 'default'] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) + stdout, stderr = p.communicate() + interface = [[p.strip() + for p in s.split(':', 1)] + for s in stdout.strip().split('\n') if 'interface' in s] + if interface: + interface = '%%%s' % interface[0][1] + else: + interface = '' + return interface + +def get_local_ipv4(): + ip = None + if sys.platform == 'darwin' or sys.platform.startswith('freebsd'): + cmd = ['/sbin/route', '-n', 'get', 'default'] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) + stdout, stderr = p.communicate() + interface = [[p.strip() for p in s.split(':', 1)] + for s in stdout.strip().split('\n') if 'interface' in s] + if interface: + interface = interface[0][1] + cmd = ['ifconfig', interface] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) + stdout, stderr = p.communicate() + ips = [l for l in stdout.split('\n') if 'inet ' in l] + if ips: + ip = ips[0].strip().split(' ')[1] + else: + cmd = ['ip', 'route', 'show'] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) + stdout, stderr = p.communicate() + local = [l for l in stdout.split('\n') if 'default' in l] + if local: + dev = local[0].split(' ')[4] + local_ip = [l for l in stdout.split('\n') + if dev in l and not 'default' in l and 'src' in l] + ip = [p for p in local_ip[0].split(' ')[1:] if '.' in p][0] + return ip + +def remove_empty_folders(prefix): + empty = [] + for root, folders, files in os.walk(prefix): + if not folders and not files: + empty.append(root) + for folder in empty: + remove_empty_tree(folder) + +def remove_empty_tree(leaf): + while leaf: + if not os.path.exists(leaf): + leaf = os.path.dirname(leaf) + elif os.path.isdir(leaf) and not os.listdir(leaf): + logger.debug('rmdir %s', leaf) + os.rmdir(leaf) + else: + break + +utc_0 = int(time.mktime(datetime(1970, 01, 01).timetuple())) + +def datetime2ts(dt): + return int(time.mktime(dt.utctimetuple())) - utc_0 + +def ts2datetime(ts): + return datetime.utcfromtimestamp(float(ts)) + +def run_async(func): + @wraps(func) + def async_func(*args, **kwargs): + func_hl = Thread(target = func, args = args, kwargs = kwargs) + func_hl.start() + return func_hl + + return async_func +