openmedialibrary/oml/node/server.py

188 lines
6.1 KiB
Python
Raw Normal View History

2014-05-04 17:26:43 +00:00
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
import tornado
2014-05-17 00:14:15 +00:00
from tornado.web import Application
2014-05-04 17:26:43 +00:00
from tornado.httpserver import HTTPServer
2014-05-18 03:01:24 +00:00
from tornado.ioloop import PeriodicCallback
2014-05-04 17:26:43 +00:00
import settings
import directory
import state
import user
import json
2014-05-17 22:18:32 +00:00
from utils import valid, get_public_ipv6
2014-05-18 03:01:24 +00:00
import nodeapi
2014-05-14 09:57:11 +00:00
import cert
2014-05-19 18:12:02 +00:00
from websocket import trigger_event
from oxtornado import run_async
2014-05-04 17:26:43 +00:00
2014-05-17 14:26:59 +00:00
import logging
logger = logging.getLogger('oml.node.server')
2014-05-04 17:26:43 +00:00
class NodeHandler(tornado.web.RequestHandler):
def initialize(self, app):
self.app = app
@tornado.web.asynchronous
@tornado.gen.coroutine
2014-05-04 17:26:43 +00:00
def post(self):
'''
API
pullChanges [userid] from [to]
pushChanges [index, change]
requestPeering username message
acceptPeering username message
rejectPeering message
removePeering message
ping responds public ip
'''
key = str(self.request.headers['X-Ed25519-Key'])
sig = str(self.request.headers['X-Ed25519-Signature'])
data = self.request.body
content = {}
self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL)
if self.request.headers.get('X-Node-Protocol', None) > settings.NODE_PROTOCOL:
state.update_required = True
if self.request.headers.get('X-Node-Protocol', None) != settings.NODE_PROTOCOL:
content = settings.release
else:
2014-05-04 17:26:43 +00:00
if valid(key, data, sig):
action, args = json.loads(data)
2014-05-18 03:01:24 +00:00
logger.debug('NODE action %s %s (%s)', action, args, key)
2014-05-04 17:26:43 +00:00
if action == 'ping':
content = {
'ip': self.request.remote_addr
2014-05-04 17:26:43 +00:00
}
else:
content = yield tornado.gen.Task(api_call, self.app, action, key, args)
if content is None:
content = {'status': 'not peered'}
logger.debug('PEER %s IS UNKNOWN SEND 403', key)
self.set_status(403)
content = json.dumps(content)
sig = settings.sk.sign(content, encoding='base64')
self.set_header('X-Ed25519-Signature', sig)
self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL)
self.write(content)
self.finish()
2014-05-04 17:26:43 +00:00
def get(self):
self.set_header('X-Node-Protocol', settings.NODE_PROTOCOL)
if self.request.headers.get('X-Node-Protocol', None) > settings.NODE_PROTOCOL:
state.update_required = True
2014-05-04 17:26:43 +00:00
self.write('Open Media Library')
self.finish()
@run_async
def api_call(app, action, key, args, callback):
with app.app_context():
u = user.models.User.get(key)
if action in (
'requestPeering', 'acceptPeering', 'rejectPeering', 'removePeering'
) or (u and u.peered):
content = getattr(nodeapi, 'api_' + action)(app, key, *args)
else:
if u and u.pending:
logger.debug('ignore request from pending peer[%s] %s (%s)', key, action, args)
content = {}
else:
content = None
callback(content)
2014-05-04 17:26:43 +00:00
class ShareHandler(tornado.web.RequestHandler):
def initialize(self, app):
self.app = app
def get(self, id):
with self.app.app_context():
import item.models
i = item.models.Item.get(id)
if not i:
self.set_status(404)
self.finish()
path = i.get_path()
mimetype = {
'epub': 'application/epub+zip',
'pdf': 'application/pdf',
'txt': 'text/plain',
}.get(path.split('.')[-1], None)
self.set_header('Content-Type', mimetype)
2014-05-17 14:26:59 +00:00
logger.debug('GET file %s', id)
2014-05-04 17:26:43 +00:00
with open(path, 'rb') as f:
while 1:
data = f.read(16384)
if not data:
break
self.write(data)
self.finish()
2014-05-18 03:01:24 +00:00
def publish_node(app):
2014-05-19 18:12:02 +00:00
update_online()
2014-05-18 03:01:24 +00:00
if state.online:
with app.app_context():
for u in user.models.User.query.filter_by(queued=True):
logger.debug('adding queued node... %s', u.id)
state.nodes.queue('add', u.id)
2014-05-18 23:24:04 +00:00
state.check_nodes = PeriodicCallback(lambda: check_nodes(app), 120000)
2014-05-18 03:01:24 +00:00
state.check_nodes.start()
2014-05-19 18:12:02 +00:00
state._online = PeriodicCallback(update_online, 60000)
state._online.start()
def update_online():
host = get_public_ipv6()
if not host:
if state.online:
state.online = False
trigger_event('status', {
'id': settings.USER_ID,
'online': state.online
})
else:
if host != state.host:
state.host = host
online = directory.put(settings.sk, {
'host': host,
'port': settings.server['node_port'],
'cert': settings.server['cert']
})
if online != state.online:
state.online = online
trigger_event('status', {
'id': settings.USER_ID,
'online': state.online
})
2014-05-18 03:01:24 +00:00
def check_nodes(app):
if state.online:
with app.app_context():
for u in user.models.User.query.filter_by(queued=True):
if not state.nodes.is_online(u.id):
2014-05-18 03:01:24 +00:00
logger.debug('queued peering message for %s trying to connect...', u.id)
state.nodes.queue('add', u.id)
2014-05-17 00:14:15 +00:00
2014-05-04 17:26:43 +00:00
def start(app):
2014-05-17 00:14:15 +00:00
application = Application([
2014-05-04 17:26:43 +00:00
(r"/get/(.*)", ShareHandler, dict(app=app)),
(r".*", NodeHandler, dict(app=app)),
2014-05-18 23:24:04 +00:00
], gzip=True)
2014-05-14 09:57:11 +00:00
if not os.path.exists(settings.ssl_cert_path):
settings.server['cert'] = cert.generate_ssl()
2014-05-17 00:14:15 +00:00
http_server = HTTPServer(application, ssl_options={
2014-05-14 09:57:11 +00:00
"certfile": settings.ssl_cert_path,
"keyfile": settings.ssl_key_path
})
2014-05-04 17:26:43 +00:00
http_server.listen(settings.server['node_port'], settings.server['node_address'])
2014-05-18 03:01:24 +00:00
state.main.add_callback(publish_node, app)
2014-05-04 17:26:43 +00:00
return http_server