diff --git a/peerlink/tls.py b/peerlink/link.py similarity index 71% rename from peerlink/tls.py rename to peerlink/link.py index f227067..11adf36 100644 --- a/peerlink/tls.py +++ b/peerlink/link.py @@ -3,51 +3,14 @@ import socket import urllib2 import ssl import hashlib -import os -import OpenSSL from utils import valid import settings -from settings import ENCODING import logging -logger = logging.getLogger('tls') +logger = logging.getLogger('link') -def get_fingerprint(): - with open(settings.tls_cert_path) as fd: - data = fd.read() - cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, data) - return hashlib.sha1(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)).hexdigest() - -def generate_tls(): - key = OpenSSL.crypto.PKey() - key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) - with open(settings.tls_key_path, 'wb') as fd: - os.chmod(settings.tls_key_path, 0600) - fd.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)) - os.chmod(settings.tls_key_path, 0400) - - ca = OpenSSL.crypto.X509() - ca.set_version(2) - ca.set_serial_number(1) - ca.get_subject().CN = settings.USER_ID - ca.gmtime_adj_notBefore(0) - ca.gmtime_adj_notAfter(24 * 60 * 60) - ca.set_issuer(ca.get_subject()) - ca.set_pubkey(key) - ca.add_extensions([ - OpenSSL.crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:0"), - OpenSSL.crypto.X509Extension("nsCertType", True, "sslCA"), - OpenSSL.crypto.X509Extension("extendedKeyUsage", True, - "serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"), - OpenSSL.crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"), - OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=ca), - ]) - ca.sign(key, "sha1") - with open(settings.tls_cert_path, 'wb') as fd: - fd.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) - return get_fingerprint() class InvalidCertificateException(httplib.HTTPException, urllib2.URLError): def __init__(self, fingerprint, cert, reason): @@ -121,7 +84,9 @@ class Response(object): user = None code = 200 -def read(url, body=None, headers={}, fingerprint=None): +def read(url, body=None, headers={}, fingerprint=None, timeout=None): + if not timeout: + timeout = settings.TIMEOUT if not body: body = None opener = get_opener(fingerprint) @@ -131,7 +96,7 @@ def read(url, body=None, headers={}, fingerprint=None): logger.debug('open %s [%s]', url, fingerprint) logger.debug('headers: %s', headers) try: - r = opener.open(request, timeout=settings.TIMEOUT) + r = opener.open(request, timeout=timeout) except urllib2.HTTPError as e: response.code = e.code if e.code >= 500: @@ -168,3 +133,18 @@ def read(url, body=None, headers={}, fingerprint=None): response.body = r logger.debug('response headers: %s', dict(r.headers)) return response + +def node_url(node): + host = node['host'] + port = node['port'] + if ':' in host: + url = 'https://[%s]:%s' % (host, port) + else: + url = 'https://%s:%s' % (host, port) + return url + +def can_connect(data): + r = read(node_url(data), fingerprint=data['cert'], timeout=1) + if r.error: + return False + return True diff --git a/peerlink/localnodes.py b/peerlink/localnodes.py index 48a7d23..cc0c483 100644 --- a/peerlink/localnodes.py +++ b/peerlink/localnodes.py @@ -9,6 +9,7 @@ import struct import thread import time +from link import can_connect from settings import server, USER_ID, sk, ENCODING from utils import valid, get_public_ipv6, get_local_ipv4, get_interface @@ -16,21 +17,6 @@ import logging logger = logging.getLogger('localnodes') -def can_connect(data): - try: - if ':' in data['host']: - s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - else: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.settimeout(1) - s.connect((data['host'], data['port'])) - s.close() - return True - except: - pass - logger.debug('can_connect failed') - return False - class LocalNodesBase(Thread): _PORT = 9851 @@ -46,21 +32,48 @@ class LocalNodesBase(Thread): self.daemon = True self.start() + def get(self, user_id): + if user_id in self._nodes: + if can_connect(self._nodes[user_id]): + return self._nodes[user_id] + + def get_ip(self): + pass + def get_packet(self): - message = json.dumps({ - 'host': self.host, - 'port': server['node_port'], - 'cert': server['cert'] - }) - sig = sk.sign(message, encoding=ENCODING) - packet = json.dumps([sig, USER_ID, message]) + self.host = self.get_ip() + if self.host: + message = json.dumps({ + 'host': self.host, + 'port': server['node_port'], + 'cert': server['cert'] + }) + sig = sk.sign(message, encoding=ENCODING) + packet = json.dumps([sig, USER_ID, message]) + else: + packet = None return packet def get_socket(self): pass - def send(self): - pass + def join(self): + self._active = False + ''' + if self._socket: + try: + self._socket.shutdown(socket.SHUT_RDWR) + except: + pass + self._socket.close() + ''' + return Thread.join(self) + + def new_node(self, data): + logger.debug('new node %s', data) + if can_connect(data): + self._nodes[data['id']] = data + self.send() def receive(self): last = time.mktime(time.localtime()) @@ -86,6 +99,21 @@ class LocalNodesBase(Thread): logger.debug('receive failed. restart later', exc_info=1) time.sleep(10) + def run(self): + self.send() + self.receive() + + def send(self): + pass + + def update_node(self, data): + #logger.debug('update node %s', data) + if data['id'] != USER_ID: + if data['id'] not in self._nodes: + thread.start_new_thread(self.new_node, (data, )) + elif can_connect(data): + self._nodes[data['id']] = data + def verify(self, data): try: packet = json.loads(data) @@ -101,59 +129,14 @@ class LocalNodesBase(Thread): return None return message - def update_node(self, data): - #logger.debug('update node %s', data) - if data['id'] != USER_ID: - if data['id'] not in self._nodes: - thread.start_new_thread(self.new_node, (data, )) - elif can_connect(data): - self._nodes[data['id']] = data - - def get(self, user_id): - if user_id in self._nodes: - if can_connect(self._nodes[user_id]): - return self._nodes[user_id] - - def new_node(self, data): - logger.debug('new node %s', data) - if can_connect(data): - self._nodes[data['id']] = data - self.send() - - def get_ip(self): - pass - - def run(self): - self.host = self.get_ip() - self.send() - self.receive() - - def join(self): - self._active = False - if self._socket: - try: - self._socket.shutdown(socket.SHUT_RDWR) - except: - pass - self._socket.close() - return Thread.join(self) class LocalNodes4(LocalNodesBase): _BROADCAST = "239.255.255.250" _TTL = 1 - def send(self): - logger.debug('send4') - packet = self.get_packet() - sockaddr = (self._BROADCAST, self._PORT) - s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) - s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL) - try: - s.sendto(packet + '\0', sockaddr) - except: - logger.debug('LocalNodes4.send failed', exc_info=1) - s.close() + def get_ip(self): + return get_local_ipv4() def get_socket(self): s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) @@ -163,28 +146,26 @@ class LocalNodes4(LocalNodesBase): self._socket = s return s - def get_ip(self): - return get_local_ipv4() + def send(self): + packet = self.get_packet() + if packet: + #logger.debug('send4') + sockaddr = (self._BROADCAST, self._PORT) + s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL) + try: + s.sendto(packet + '\0', sockaddr) + except: + logger.debug('LocalNodes4.send failed', exc_info=1) + s.close() + class LocalNodes6(LocalNodesBase): _BROADCAST = "ff02::1" - def send(self): - logger.debug('send6') - packet = self.get_packet() - ttl = struct.pack('@i', self._TTL) - address = self._BROADCAST + get_interface() - addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) - addr = addrs[0] - (family, socktype, proto, canonname, sockaddr) = addr - s = socket.socket(family, socktype, proto) - s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl) - try: - s.sendto(packet + '\0', sockaddr) - except: - logger.debug('LocalNodes6.send failed', exc_info=1) - s.close() + def get_ip(self): + return get_public_ipv6() def get_socket(self): s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) @@ -194,11 +175,26 @@ class LocalNodes6(LocalNodesBase): self._socket = s return s - def get_ip(self): - return get_public_ipv6() + def send(self): + packet = self.get_packet() + if packet: + logger.debug('send6 %s', packet) + ttl = struct.pack('@i', self._TTL) + address = self._BROADCAST + get_interface() + addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) + addr = addrs[0] + (family, socktype, proto, canonname, sockaddr) = addr + s = socket.socket(family, socktype, proto) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl) + try: + s.sendto(packet + '\0', sockaddr) + except: + logger.debug('LocalNodes6.send failed', exc_info=1) + s.close() class LocalNodes(object): + _active = True _nodes4 = None _nodes6 = None @@ -209,12 +205,24 @@ class LocalNodes(object): self._nodes4 = LocalNodes4(self._nodes) self._nodes6 = LocalNodes6(self._nodes) + def cleanup(self): + if self._active: + for id in self._nodes.keys(): + if not can_connect(self._nodes[id]): + del self._nodes[id] + if not self._active: + break + def get(self, user_id): if user_id in self._nodes: if can_connect(self._nodes[user_id]): return self._nodes[user_id] + def info(self): + return self._nodes.keys() + def join(self): + self._active = False if self._nodes4: self._nodes4.join() if self._nodes6: diff --git a/peerlink/nodes.py b/peerlink/nodes.py index 62acf87..7a1633f 100644 --- a/peerlink/nodes.py +++ b/peerlink/nodes.py @@ -2,46 +2,88 @@ # vi:si:et:sw=4:sts=4:ts=4 from __future__ import division +from threading import Thread +from Queue import Queue + +from tornado.ioloop import PeriodicCallback + import directory from localnodes import LocalNodes +from link import can_connect, node_url import logging logger = logging.getLogger('lookup') -class Nodes(object): - _nodes = {} +class Nodes(Thread): + + _active = True _local = None + _nodes = {} def __init__(self): self._local = LocalNodes() + Thread.__init__(self) + self._q = Queue() + self.daemon = True + self._cleanup = PeriodicCallback(lambda: self._q.put(''), 120000) + self._cleanup.start() + self.start() - def get(self, user_id): - # local nodes - node = self._local.get(user_id) - # local cache - if user_id in self._nodes: - node = self._nodes[user_id] - # directory - if not node: - try: - node = directory.get(user_id) - except: - logger.debug('directory failed', exc_info=1) - node = None + def cleanup(self): + if self._active: + self._local.cleanup() + for id in self._nodes.keys(): + if id in self._local._nodes: + del self._nodes[id] + if not can_connect(self._nodes[id]): + del self._nodes[id] + if not self._active: + break + + def fingerprint(self, id): + node = self.get(id) if node: - node['url'] = self._url(node) - self._nodes[user_id] = node + return node['cert'] + return None + + def get(self, id): + # check local nodes + node = self._local.get(id) + if not node: + # check local cache + node = self._nodes.get(id) + # lookup directory + if not node: + try: + node = directory.get(id) + except: + logger.debug('directory failed', exc_info=1) + node = None + if node: + self._nodes[id] = node + if node: + node['url'] = node_url(node) return node - def _url(self, node): - host = node['host'] - port = node['port'] - if ':' in host: - url = 'https://[%s]:%s' % (host, port) - else: - url = 'https://%s:%s' % (host, port) - return url + def info(self): + l = self._local.info() + return { + 'local': l, + 'nodes': sorted(set(self._nodes.keys() + l)) + } + + def join(self): + self._active = False + self._q.put('') + self._local.join() + return Thread.join(self) + + def run(self): + while self._active: + self._q.get() + if self._active: + self.cleanup() def url(self, user_id): node = self.get(user_id) @@ -51,10 +93,3 @@ class Nodes(object): url = None logger.debug('resolved %s -> %s', user_id, url) return url - - def fingerprint(self, user_id): - node = self.get(user_id) - if node: - return node['cert'] - return None - diff --git a/peerlink/nodeserver.py b/peerlink/nodeserver.py index b681042..c1143b5 100644 --- a/peerlink/nodeserver.py +++ b/peerlink/nodeserver.py @@ -3,9 +3,9 @@ import json -import tornado.web from tornado.httpserver import HTTPServer from tornado.ioloop import PeriodicCallback +import tornado.web from proxy import ProxyHandler from utils import get_public_ipv6, valid @@ -68,6 +68,12 @@ class NodeHandler(ProxyHandler): self.finish() return url +class StaticHandler(tornado.web.RequestHandler): + + def get(self): + self.write('') + self.finish() + def publish_node(): update_online() state._online = PeriodicCallback(update_online, 60000) @@ -89,6 +95,7 @@ def update_online(): def start(): application = tornado.web.Application([ + (r"/", StaticHandler), (r".*", NodeHandler), ], gzip=True) http_server = HTTPServer(application, ssl_options={ diff --git a/peerlink/proxy.py b/peerlink/proxy.py index 3e5ff68..c535a5c 100644 --- a/peerlink/proxy.py +++ b/peerlink/proxy.py @@ -6,7 +6,7 @@ import tornado.httpclient import tornado.gen from utils import run_async -import tls +import link import logging logger = logging.getLogger('proxy') @@ -65,7 +65,7 @@ class ProxyHandler(tornado.web.RequestHandler): @run_async def _fetch_response(self, url, fingerprint, callback): - response = tls.read(url, self.request.body, self.request.headers, fingerprint) + response = link.read(url, self.request.body, self.request.headers, fingerprint) callback(response) def remote_url(self): diff --git a/peerlink/server.py b/peerlink/server.py index 282c51d..58212c5 100644 --- a/peerlink/server.py +++ b/peerlink/server.py @@ -22,6 +22,25 @@ import logging logger = logging.getLogger('server') +def render_json(handler, response): + response = json.dumps(response, indent=2) + handler.set_header('Content-Type', 'application/json') + handler.set_header('Content-Length', str(len(response))) + handler.write(response) + handler.finish() + +class StatusHandler(tornado.web.RequestHandler): + + def get(self, action): + response = {} + if action == 'info': + response['id'] = settings.USER_ID + response['online'] = state.online + response.update(state.nodes.info()) + else: + response['error'] = 'unknown action' + return render_json(self, response) + class ServiceHandler(tornado.web.RequestHandler): def post(self, action): @@ -35,7 +54,7 @@ class ServiceHandler(tornado.web.RequestHandler): response = json.dumps({'status': 200}) else: self.set_status(500) - response = 'Unsupported action' + response = json.dumps({'error': 'unknown action'}) self.write(response) self.finish() @@ -58,9 +77,12 @@ class RequestHandler(ProxyHandler): url = node['url'] + '/' + uri return url, node['cert'] else: - self.set_status(404) - self.write(json.dumps({'status': 'unknown peer'})) - self.finish() + if state.online: + self.set_status(404) + render_json(self, {'status': 'unknown peer'}) + else: + self.set_status(500) + render_json(self, {'status': 'offline'}) return None def run(): @@ -75,6 +97,7 @@ def run(): 'debug': False, } handlers = [ + (r'/(info)', StatusHandler), (r'/(add|remove)', ServiceHandler), (r".*", RequestHandler), ] @@ -100,9 +123,10 @@ def run(): else: host = settings.server['address'] url = 'http://%s:%s/' % (host, settings.server['port']) - print('open browser at %s' % url) + print('peerlink runnig at %s' % url) def shutdown(): + state.nodes.join() state.node.stop() http_server.stop() diff --git a/peerlink/settings.py b/peerlink/settings.py index 376a979..c340b97 100644 --- a/peerlink/settings.py +++ b/peerlink/settings.py @@ -46,9 +46,8 @@ ENCODING='base64' USER_ID = vk.to_ascii(encoding=ENCODING) if not os.path.exists(tls_cert_path): - import tls - server['cert'] = tls.generate_tls() - + import utils + server['cert'] = utils.create_tls_certificate() VERSION="0.0" USER_AGENT = 'PeerLink/%s' % VERSION diff --git a/peerlink/utils.py b/peerlink/utils.py index 9717ca3..37f5434 100644 --- a/peerlink/utils.py +++ b/peerlink/utils.py @@ -2,50 +2,27 @@ # vi:si:et:sw=4:sts=4:ts=4 from __future__ import division -import os -import sys -import socket -import time -from datetime import datetime -import subprocess -from threading import Thread from functools import wraps +from threading import Thread +import hashlib +import os +import socket +import subprocess +import sys +from urlparse import urlparse import ed25519 +import OpenSSL + +import settings import logging -logger = logging.getLogger('oml.utils') +logger = logging.getLogger('utils') -from settings import ENCODING - -def valid(key, value, sig): - ''' - validate that value was signed by key - ''' - vk = ed25519.VerifyingKey(str(key), encoding=ENCODING) - try: - vk.verify(str(sig), str(value), encoding=ENCODING) - #except ed25519.BadSignatureError: - except: - return False - return True - -def get_public_ipv6(): - try: - host = ('2a01:4f8:120:3201::3', 25519) - s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - s.settimeout(1) - s.connect(host) - ip = s.getsockname()[0] - s.close() - except: - ip = None - return ip def get_interface(): interface = '' if sys.platform == 'darwin' or sys.platform.startswith('freebsd'): - #cmd = ['/usr/sbin/netstat', '-rn'] cmd = ['/sbin/route', '-n', 'get', 'default'] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) stdout, stderr = p.communicate() @@ -86,31 +63,18 @@ def get_local_ipv4(): ip = [p for p in local_ip[0].split(' ')[1:] if '.' in p][0] return ip -def remove_empty_folders(prefix): - empty = [] - for root, folders, files in os.walk(prefix): - if not folders and not files: - empty.append(root) - for folder in empty: - remove_empty_tree(folder) - -def remove_empty_tree(leaf): - while leaf: - if not os.path.exists(leaf): - leaf = os.path.dirname(leaf) - elif os.path.isdir(leaf) and not os.listdir(leaf): - logger.debug('rmdir %s', leaf) - os.rmdir(leaf) - else: - break - -utc_0 = int(time.mktime(datetime(1970, 01, 01).timetuple())) - -def datetime2ts(dt): - return int(time.mktime(dt.utctimetuple())) - utc_0 - -def ts2datetime(ts): - return datetime.utcfromtimestamp(float(ts)) +def get_public_ipv6(): + n = urlparse(settings.server['directory_service']) + host = (n.hostname, n.port) + try: + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s.settimeout(1) + s.connect(host) + ip = s.getsockname()[0] + s.close() + except: + ip = None + return ip def run_async(func): @wraps(func) @@ -121,3 +85,51 @@ def run_async(func): return async_func +# ed25519 utils +def valid(key, value, sig): + ''' + validate that value was signed by key + ''' + vk = ed25519.VerifyingKey(str(key), encoding=settings.ENCODING) + try: + vk.verify(str(sig), str(value), encoding=settings.ENCODING) + #except ed25519.BadSignatureError: + except: + return False + return True + +# tls utils +def get_fingerprint(): + with open(settings.tls_cert_path) as fd: + data = fd.read() + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, data) + return hashlib.sha1(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)).hexdigest() + +def create_tls_certificate(): + key = OpenSSL.crypto.PKey() + key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) + with open(settings.tls_key_path, 'wb') as fd: + os.chmod(settings.tls_key_path, 0600) + fd.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)) + os.chmod(settings.tls_key_path, 0400) + + ca = OpenSSL.crypto.X509() + ca.set_version(2) + ca.set_serial_number(1) + ca.get_subject().CN = settings.USER_ID + ca.gmtime_adj_notBefore(0) + ca.gmtime_adj_notAfter(24 * 60 * 60) + ca.set_issuer(ca.get_subject()) + ca.set_pubkey(key) + ca.add_extensions([ + OpenSSL.crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:0"), + OpenSSL.crypto.X509Extension("nsCertType", True, "sslCA"), + OpenSSL.crypto.X509Extension("extendedKeyUsage", True, + "serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"), + OpenSSL.crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"), + OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=ca), + ]) + ca.sign(key, "sha1") + with open(settings.tls_cert_path, 'wb') as fd: + fd.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca)) + return get_fingerprint()