This commit is contained in:
j 2014-08-28 14:51:16 +02:00
parent fabac5da4a
commit 536a99ed22
8 changed files with 297 additions and 232 deletions

View File

@ -3,51 +3,14 @@ import socket
import urllib2
import ssl
import hashlib
import os
import OpenSSL
from utils import valid
import settings
from settings import ENCODING
import logging
logger = logging.getLogger('tls')
logger = logging.getLogger('link')
def get_fingerprint():
with open(settings.tls_cert_path) as fd:
data = fd.read()
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, data)
return hashlib.sha1(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)).hexdigest()
def generate_tls():
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
with open(settings.tls_key_path, 'wb') as fd:
os.chmod(settings.tls_key_path, 0600)
fd.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
os.chmod(settings.tls_key_path, 0400)
ca = OpenSSL.crypto.X509()
ca.set_version(2)
ca.set_serial_number(1)
ca.get_subject().CN = settings.USER_ID
ca.gmtime_adj_notBefore(0)
ca.gmtime_adj_notAfter(24 * 60 * 60)
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
ca.add_extensions([
OpenSSL.crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:0"),
OpenSSL.crypto.X509Extension("nsCertType", True, "sslCA"),
OpenSSL.crypto.X509Extension("extendedKeyUsage", True,
"serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"),
OpenSSL.crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"),
OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=ca),
])
ca.sign(key, "sha1")
with open(settings.tls_cert_path, 'wb') as fd:
fd.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
return get_fingerprint()
class InvalidCertificateException(httplib.HTTPException, urllib2.URLError):
def __init__(self, fingerprint, cert, reason):
@ -121,7 +84,9 @@ class Response(object):
user = None
code = 200
def read(url, body=None, headers={}, fingerprint=None):
def read(url, body=None, headers={}, fingerprint=None, timeout=None):
if not timeout:
timeout = settings.TIMEOUT
if not body:
body = None
opener = get_opener(fingerprint)
@ -131,7 +96,7 @@ def read(url, body=None, headers={}, fingerprint=None):
logger.debug('open %s [%s]', url, fingerprint)
logger.debug('headers: %s', headers)
try:
r = opener.open(request, timeout=settings.TIMEOUT)
r = opener.open(request, timeout=timeout)
except urllib2.HTTPError as e:
response.code = e.code
if e.code >= 500:
@ -168,3 +133,18 @@ def read(url, body=None, headers={}, fingerprint=None):
response.body = r
logger.debug('response headers: %s', dict(r.headers))
return response
def node_url(node):
host = node['host']
port = node['port']
if ':' in host:
url = 'https://[%s]:%s' % (host, port)
else:
url = 'https://%s:%s' % (host, port)
return url
def can_connect(data):
r = read(node_url(data), fingerprint=data['cert'], timeout=1)
if r.error:
return False
return True

View File

@ -9,6 +9,7 @@ import struct
import thread
import time
from link import can_connect
from settings import server, USER_ID, sk, ENCODING
from utils import valid, get_public_ipv6, get_local_ipv4, get_interface
@ -16,21 +17,6 @@ import logging
logger = logging.getLogger('localnodes')
def can_connect(data):
try:
if ':' in data['host']:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
s.connect((data['host'], data['port']))
s.close()
return True
except:
pass
logger.debug('can_connect failed')
return False
class LocalNodesBase(Thread):
_PORT = 9851
@ -46,21 +32,48 @@ class LocalNodesBase(Thread):
self.daemon = True
self.start()
def get(self, user_id):
if user_id in self._nodes:
if can_connect(self._nodes[user_id]):
return self._nodes[user_id]
def get_ip(self):
pass
def get_packet(self):
message = json.dumps({
'host': self.host,
'port': server['node_port'],
'cert': server['cert']
})
sig = sk.sign(message, encoding=ENCODING)
packet = json.dumps([sig, USER_ID, message])
self.host = self.get_ip()
if self.host:
message = json.dumps({
'host': self.host,
'port': server['node_port'],
'cert': server['cert']
})
sig = sk.sign(message, encoding=ENCODING)
packet = json.dumps([sig, USER_ID, message])
else:
packet = None
return packet
def get_socket(self):
pass
def send(self):
pass
def join(self):
self._active = False
'''
if self._socket:
try:
self._socket.shutdown(socket.SHUT_RDWR)
except:
pass
self._socket.close()
'''
return Thread.join(self)
def new_node(self, data):
logger.debug('new node %s', data)
if can_connect(data):
self._nodes[data['id']] = data
self.send()
def receive(self):
last = time.mktime(time.localtime())
@ -86,6 +99,21 @@ class LocalNodesBase(Thread):
logger.debug('receive failed. restart later', exc_info=1)
time.sleep(10)
def run(self):
self.send()
self.receive()
def send(self):
pass
def update_node(self, data):
#logger.debug('update node %s', data)
if data['id'] != USER_ID:
if data['id'] not in self._nodes:
thread.start_new_thread(self.new_node, (data, ))
elif can_connect(data):
self._nodes[data['id']] = data
def verify(self, data):
try:
packet = json.loads(data)
@ -101,59 +129,14 @@ class LocalNodesBase(Thread):
return None
return message
def update_node(self, data):
#logger.debug('update node %s', data)
if data['id'] != USER_ID:
if data['id'] not in self._nodes:
thread.start_new_thread(self.new_node, (data, ))
elif can_connect(data):
self._nodes[data['id']] = data
def get(self, user_id):
if user_id in self._nodes:
if can_connect(self._nodes[user_id]):
return self._nodes[user_id]
def new_node(self, data):
logger.debug('new node %s', data)
if can_connect(data):
self._nodes[data['id']] = data
self.send()
def get_ip(self):
pass
def run(self):
self.host = self.get_ip()
self.send()
self.receive()
def join(self):
self._active = False
if self._socket:
try:
self._socket.shutdown(socket.SHUT_RDWR)
except:
pass
self._socket.close()
return Thread.join(self)
class LocalNodes4(LocalNodesBase):
_BROADCAST = "239.255.255.250"
_TTL = 1
def send(self):
logger.debug('send4')
packet = self.get_packet()
sockaddr = (self._BROADCAST, self._PORT)
s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL)
try:
s.sendto(packet + '\0', sockaddr)
except:
logger.debug('LocalNodes4.send failed', exc_info=1)
s.close()
def get_ip(self):
return get_local_ipv4()
def get_socket(self):
s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
@ -163,28 +146,26 @@ class LocalNodes4(LocalNodesBase):
self._socket = s
return s
def get_ip(self):
return get_local_ipv4()
def send(self):
packet = self.get_packet()
if packet:
#logger.debug('send4')
sockaddr = (self._BROADCAST, self._PORT)
s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL)
try:
s.sendto(packet + '\0', sockaddr)
except:
logger.debug('LocalNodes4.send failed', exc_info=1)
s.close()
class LocalNodes6(LocalNodesBase):
_BROADCAST = "ff02::1"
def send(self):
logger.debug('send6')
packet = self.get_packet()
ttl = struct.pack('@i', self._TTL)
address = self._BROADCAST + get_interface()
addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM)
addr = addrs[0]
(family, socktype, proto, canonname, sockaddr) = addr
s = socket.socket(family, socktype, proto)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl)
try:
s.sendto(packet + '\0', sockaddr)
except:
logger.debug('LocalNodes6.send failed', exc_info=1)
s.close()
def get_ip(self):
return get_public_ipv6()
def get_socket(self):
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
@ -194,11 +175,26 @@ class LocalNodes6(LocalNodesBase):
self._socket = s
return s
def get_ip(self):
return get_public_ipv6()
def send(self):
packet = self.get_packet()
if packet:
logger.debug('send6 %s', packet)
ttl = struct.pack('@i', self._TTL)
address = self._BROADCAST + get_interface()
addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM)
addr = addrs[0]
(family, socktype, proto, canonname, sockaddr) = addr
s = socket.socket(family, socktype, proto)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl)
try:
s.sendto(packet + '\0', sockaddr)
except:
logger.debug('LocalNodes6.send failed', exc_info=1)
s.close()
class LocalNodes(object):
_active = True
_nodes4 = None
_nodes6 = None
@ -209,12 +205,24 @@ class LocalNodes(object):
self._nodes4 = LocalNodes4(self._nodes)
self._nodes6 = LocalNodes6(self._nodes)
def cleanup(self):
if self._active:
for id in self._nodes.keys():
if not can_connect(self._nodes[id]):
del self._nodes[id]
if not self._active:
break
def get(self, user_id):
if user_id in self._nodes:
if can_connect(self._nodes[user_id]):
return self._nodes[user_id]
def info(self):
return self._nodes.keys()
def join(self):
self._active = False
if self._nodes4:
self._nodes4.join()
if self._nodes6:

View File

@ -2,46 +2,88 @@
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
from threading import Thread
from Queue import Queue
from tornado.ioloop import PeriodicCallback
import directory
from localnodes import LocalNodes
from link import can_connect, node_url
import logging
logger = logging.getLogger('lookup')
class Nodes(object):
_nodes = {}
class Nodes(Thread):
_active = True
_local = None
_nodes = {}
def __init__(self):
self._local = LocalNodes()
Thread.__init__(self)
self._q = Queue()
self.daemon = True
self._cleanup = PeriodicCallback(lambda: self._q.put(''), 120000)
self._cleanup.start()
self.start()
def get(self, user_id):
# local nodes
node = self._local.get(user_id)
# local cache
if user_id in self._nodes:
node = self._nodes[user_id]
# directory
if not node:
try:
node = directory.get(user_id)
except:
logger.debug('directory failed', exc_info=1)
node = None
def cleanup(self):
if self._active:
self._local.cleanup()
for id in self._nodes.keys():
if id in self._local._nodes:
del self._nodes[id]
if not can_connect(self._nodes[id]):
del self._nodes[id]
if not self._active:
break
def fingerprint(self, id):
node = self.get(id)
if node:
node['url'] = self._url(node)
self._nodes[user_id] = node
return node['cert']
return None
def get(self, id):
# check local nodes
node = self._local.get(id)
if not node:
# check local cache
node = self._nodes.get(id)
# lookup directory
if not node:
try:
node = directory.get(id)
except:
logger.debug('directory failed', exc_info=1)
node = None
if node:
self._nodes[id] = node
if node:
node['url'] = node_url(node)
return node
def _url(self, node):
host = node['host']
port = node['port']
if ':' in host:
url = 'https://[%s]:%s' % (host, port)
else:
url = 'https://%s:%s' % (host, port)
return url
def info(self):
l = self._local.info()
return {
'local': l,
'nodes': sorted(set(self._nodes.keys() + l))
}
def join(self):
self._active = False
self._q.put('')
self._local.join()
return Thread.join(self)
def run(self):
while self._active:
self._q.get()
if self._active:
self.cleanup()
def url(self, user_id):
node = self.get(user_id)
@ -51,10 +93,3 @@ class Nodes(object):
url = None
logger.debug('resolved %s -> %s', user_id, url)
return url
def fingerprint(self, user_id):
node = self.get(user_id)
if node:
return node['cert']
return None

View File

@ -3,9 +3,9 @@
import json
import tornado.web
from tornado.httpserver import HTTPServer
from tornado.ioloop import PeriodicCallback
import tornado.web
from proxy import ProxyHandler
from utils import get_public_ipv6, valid
@ -68,6 +68,12 @@ class NodeHandler(ProxyHandler):
self.finish()
return url
class StaticHandler(tornado.web.RequestHandler):
def get(self):
self.write('')
self.finish()
def publish_node():
update_online()
state._online = PeriodicCallback(update_online, 60000)
@ -89,6 +95,7 @@ def update_online():
def start():
application = tornado.web.Application([
(r"/", StaticHandler),
(r".*", NodeHandler),
], gzip=True)
http_server = HTTPServer(application, ssl_options={

View File

@ -6,7 +6,7 @@ import tornado.httpclient
import tornado.gen
from utils import run_async
import tls
import link
import logging
logger = logging.getLogger('proxy')
@ -65,7 +65,7 @@ class ProxyHandler(tornado.web.RequestHandler):
@run_async
def _fetch_response(self, url, fingerprint, callback):
response = tls.read(url, self.request.body, self.request.headers, fingerprint)
response = link.read(url, self.request.body, self.request.headers, fingerprint)
callback(response)
def remote_url(self):

View File

@ -22,6 +22,25 @@ import logging
logger = logging.getLogger('server')
def render_json(handler, response):
response = json.dumps(response, indent=2)
handler.set_header('Content-Type', 'application/json')
handler.set_header('Content-Length', str(len(response)))
handler.write(response)
handler.finish()
class StatusHandler(tornado.web.RequestHandler):
def get(self, action):
response = {}
if action == 'info':
response['id'] = settings.USER_ID
response['online'] = state.online
response.update(state.nodes.info())
else:
response['error'] = 'unknown action'
return render_json(self, response)
class ServiceHandler(tornado.web.RequestHandler):
def post(self, action):
@ -35,7 +54,7 @@ class ServiceHandler(tornado.web.RequestHandler):
response = json.dumps({'status': 200})
else:
self.set_status(500)
response = 'Unsupported action'
response = json.dumps({'error': 'unknown action'})
self.write(response)
self.finish()
@ -58,9 +77,12 @@ class RequestHandler(ProxyHandler):
url = node['url'] + '/' + uri
return url, node['cert']
else:
self.set_status(404)
self.write(json.dumps({'status': 'unknown peer'}))
self.finish()
if state.online:
self.set_status(404)
render_json(self, {'status': 'unknown peer'})
else:
self.set_status(500)
render_json(self, {'status': 'offline'})
return None
def run():
@ -75,6 +97,7 @@ def run():
'debug': False,
}
handlers = [
(r'/(info)', StatusHandler),
(r'/(add|remove)', ServiceHandler),
(r".*", RequestHandler),
]
@ -100,9 +123,10 @@ def run():
else:
host = settings.server['address']
url = 'http://%s:%s/' % (host, settings.server['port'])
print('open browser at %s' % url)
print('peerlink runnig at %s' % url)
def shutdown():
state.nodes.join()
state.node.stop()
http_server.stop()

View File

@ -46,9 +46,8 @@ ENCODING='base64'
USER_ID = vk.to_ascii(encoding=ENCODING)
if not os.path.exists(tls_cert_path):
import tls
server['cert'] = tls.generate_tls()
import utils
server['cert'] = utils.create_tls_certificate()
VERSION="0.0"
USER_AGENT = 'PeerLink/%s' % VERSION

View File

@ -2,50 +2,27 @@
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
import os
import sys
import socket
import time
from datetime import datetime
import subprocess
from threading import Thread
from functools import wraps
from threading import Thread
import hashlib
import os
import socket
import subprocess
import sys
from urlparse import urlparse
import ed25519
import OpenSSL
import settings
import logging
logger = logging.getLogger('oml.utils')
logger = logging.getLogger('utils')
from settings import ENCODING
def valid(key, value, sig):
'''
validate that value was signed by key
'''
vk = ed25519.VerifyingKey(str(key), encoding=ENCODING)
try:
vk.verify(str(sig), str(value), encoding=ENCODING)
#except ed25519.BadSignatureError:
except:
return False
return True
def get_public_ipv6():
try:
host = ('2a01:4f8:120:3201::3', 25519)
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.settimeout(1)
s.connect(host)
ip = s.getsockname()[0]
s.close()
except:
ip = None
return ip
def get_interface():
interface = ''
if sys.platform == 'darwin' or sys.platform.startswith('freebsd'):
#cmd = ['/usr/sbin/netstat', '-rn']
cmd = ['/sbin/route', '-n', 'get', 'default']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True)
stdout, stderr = p.communicate()
@ -86,31 +63,18 @@ def get_local_ipv4():
ip = [p for p in local_ip[0].split(' ')[1:] if '.' in p][0]
return ip
def remove_empty_folders(prefix):
empty = []
for root, folders, files in os.walk(prefix):
if not folders and not files:
empty.append(root)
for folder in empty:
remove_empty_tree(folder)
def remove_empty_tree(leaf):
while leaf:
if not os.path.exists(leaf):
leaf = os.path.dirname(leaf)
elif os.path.isdir(leaf) and not os.listdir(leaf):
logger.debug('rmdir %s', leaf)
os.rmdir(leaf)
else:
break
utc_0 = int(time.mktime(datetime(1970, 01, 01).timetuple()))
def datetime2ts(dt):
return int(time.mktime(dt.utctimetuple())) - utc_0
def ts2datetime(ts):
return datetime.utcfromtimestamp(float(ts))
def get_public_ipv6():
n = urlparse(settings.server['directory_service'])
host = (n.hostname, n.port)
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.settimeout(1)
s.connect(host)
ip = s.getsockname()[0]
s.close()
except:
ip = None
return ip
def run_async(func):
@wraps(func)
@ -121,3 +85,51 @@ def run_async(func):
return async_func
# ed25519 utils
def valid(key, value, sig):
'''
validate that value was signed by key
'''
vk = ed25519.VerifyingKey(str(key), encoding=settings.ENCODING)
try:
vk.verify(str(sig), str(value), encoding=settings.ENCODING)
#except ed25519.BadSignatureError:
except:
return False
return True
# tls utils
def get_fingerprint():
with open(settings.tls_cert_path) as fd:
data = fd.read()
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, data)
return hashlib.sha1(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)).hexdigest()
def create_tls_certificate():
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
with open(settings.tls_key_path, 'wb') as fd:
os.chmod(settings.tls_key_path, 0600)
fd.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
os.chmod(settings.tls_key_path, 0400)
ca = OpenSSL.crypto.X509()
ca.set_version(2)
ca.set_serial_number(1)
ca.get_subject().CN = settings.USER_ID
ca.gmtime_adj_notBefore(0)
ca.gmtime_adj_notAfter(24 * 60 * 60)
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
ca.add_extensions([
OpenSSL.crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:0"),
OpenSSL.crypto.X509Extension("nsCertType", True, "sslCA"),
OpenSSL.crypto.X509Extension("extendedKeyUsage", True,
"serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"),
OpenSSL.crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"),
OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=ca),
])
ca.sign(key, "sha1")
with open(settings.tls_cert_path, 'wb') as fd:
fd.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
return get_fingerprint()