From e4ca454c418e9597765a14772344d3d7b5b923ac Mon Sep 17 00:00:00 2001 From: j Date: Sun, 18 May 2014 05:01:24 +0200 Subject: [PATCH] queue peering requests and send again --- README.md | 4 +- migrations/versions/3169519dc1e5_.py | 26 +++++++ oml/localnodes.py | 4 +- oml/media/pdf.py | 7 +- oml/node/{api.py => nodeapi.py} | 3 +- oml/node/server.py | 40 +++++++--- oml/nodes.py | 106 +++++++++++++-------------- oml/user/api.py | 38 ++++++---- oml/user/models.py | 16 ++-- oml/websocket.py | 7 +- static/js/identifyDialog.js | 3 +- 11 files changed, 157 insertions(+), 97 deletions(-) create mode 100644 migrations/versions/3169519dc1e5_.py rename oml/node/{api.py => nodeapi.py} (97%) diff --git a/README.md b/README.md index b98ee06..517e9ae 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,8 @@ Development On Linux you need to always install python-imaging python-lxml ghostscript: - apt-get install \ - python-imaging python-lxml ghostscript + apt-get install python-imaging python-lxml ghostscript poppler-utils + Now checkout the source and prepare for use: diff --git a/migrations/versions/3169519dc1e5_.py b/migrations/versions/3169519dc1e5_.py new file mode 100644 index 0000000..8424a9c --- /dev/null +++ b/migrations/versions/3169519dc1e5_.py @@ -0,0 +1,26 @@ +"""empty message + +Revision ID: 3169519dc1e5 +Revises: 1a7c813a17c2 +Create Date: 2014-05-18 03:28:03.950996 + +""" + +# revision identifiers, used by Alembic. +revision = '3169519dc1e5' +down_revision = '1a7c813a17c2' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.add_column('user', sa.Column('queued', sa.Boolean(), nullable=True)) + ### end Alembic commands ### + + +def downgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.drop_column('user', 'queued') + ### end Alembic commands ### diff --git a/oml/localnodes.py b/oml/localnodes.py index d8c5c28..c2e7e3a 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -11,8 +11,9 @@ import sys import thread from threading import Thread -from settings import preferences, server, USER_ID, sk from utils import valid, get_public_ipv6 +from settings import preferences, server, USER_ID, sk +import state logger = logging.getLogger('oml.localnodes') @@ -132,6 +133,7 @@ class LocalNodes(Thread): u.info['username'] = data['username'] u.info['local'] = data u.save() + state.nodes.queue('add', u.id) self.send() def run(self): diff --git a/oml/media/pdf.py b/oml/media/pdf.py index 4ffcc4e..b95c2bb 100644 --- a/oml/media/pdf.py +++ b/oml/media/pdf.py @@ -15,6 +15,9 @@ import stdnum.isbn import settings from utils import normalize_isbn, find_isbns +import logging +logger = logging.getLogger('oml.meta.pdf') + def cover(pdf): if sys.platform == 'darwin': return ql_cover(pdf) @@ -86,9 +89,7 @@ def info(pdf): if value and _key not in data: data[_key] = value except: - print 'FAILED TO PARSE', pdf - import traceback - print traceback.print_exc() + logger.debug('FAILED TO PARSE %s', pdf, exc_info=1) if 'identifier' in data: value = normalize_isbn(data['identifier']) diff --git a/oml/node/api.py b/oml/node/nodeapi.py similarity index 97% rename from oml/node/api.py rename to oml/node/nodeapi.py index 937f40a..d2ed686 100644 --- a/oml/node/api.py +++ b/oml/node/nodeapi.py @@ -10,7 +10,7 @@ import state from websocket import trigger_event import logging -logger = logging.getLogger('oml.node.api') +logger = logging.getLogger('oml.node.nodeapi') def api_pullChanges(app, remote_id, user_id=None, from_=None, to=None): if user_id and not from_ and not to: @@ -69,6 +69,7 @@ def api_acceptPeering(app, user_id, username, message): user.info['message'] = message user.update_peering(True, username) trigger_event('peering', user.json()) + state.nodes.queue('add', user.id) return True return False diff --git a/oml/node/server.py b/oml/node/server.py index d26727d..c42482a 100644 --- a/oml/node/server.py +++ b/oml/node/server.py @@ -5,6 +5,7 @@ import os import tornado from tornado.web import Application from tornado.httpserver import HTTPServer +from tornado.ioloop import PeriodicCallback import settings @@ -14,7 +15,7 @@ import user import json from utils import valid, get_public_ipv6 -import api +import nodeapi import cert import logging @@ -46,7 +47,7 @@ class NodeHandler(tornado.web.RequestHandler): content = {} if valid(key, data, sig): action, args = json.loads(data) - logger.debug('%s action %s %s', key, action, args) + logger.debug('NODE action %s %s (%s)', action, args, key) if action == 'ping': content = { 'ip': request.remote_addr @@ -57,13 +58,17 @@ class NodeHandler(tornado.web.RequestHandler): if action in ( 'requestPeering', 'acceptPeering', 'rejectPeering', 'removePeering' ) or (u and u.peered): - content = getattr(api, 'api_' + action)(self.app, key, *args) + content = getattr(nodeapi, 'api_' + action)(self.app, key, *args) else: - logger.debug('PEER %s IS UNKNOWN SEND 403', key) - self.set_status(403) - content = { - 'status': 'not peered' - } + if u and u.pending: + logger.debug('ignore request from pending peer[%s] %s (%s)', key, action, args) + content = {} + else: + logger.debug('PEER %s IS UNKNOWN SEND 403', key) + self.set_status(403) + content = { + 'status': 'not peered' + } content = json.dumps(content) sig = settings.sk.sign(content, encoding='base64') self.set_header('X-Ed25519-Signature', sig) @@ -103,13 +108,28 @@ class ShareHandler(tornado.web.RequestHandler): self.finish() -def publish_node(): +def publish_node(app): host = get_public_ipv6() state.online = directory.put(settings.sk, { 'host': host, 'port': settings.server['node_port'], 'cert': settings.server['cert'] }) + if state.online: + with app.app_context(): + for u in user.models.User.query.filter_by(queued=True): + logger.debug('adding queued node... %s', u.id) + state.nodes.queue('add', u.id) + state.check_nodes = PeriodicCallback(lambda: check_nodes(app), 60000) + state.check_nodes.start() + +def check_nodes(app): + if state.online: + with app.app_context(): + for u in user.models.User.query.filter_by(queued=True): + if not state.nodes.check_online(u.id): + logger.debug('queued peering message for %s trying to connect...', u.id) + state.nodes.queue('add', u.id) def start(app): application = Application([ @@ -124,5 +144,5 @@ def start(app): "keyfile": settings.ssl_key_path }) http_server.listen(settings.server['node_port'], settings.server['node_address']) - state.main.add_callback(publish_node) + state.main.add_callback(publish_node, app) return http_server diff --git a/oml/nodes.py b/oml/nodes.py index fe23c12..c47109e 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -5,6 +5,7 @@ from __future__ import division from Queue import Queue from threading import Thread import json +import socket from datetime import datetime import os @@ -40,6 +41,7 @@ class Node(object): key = str(user.id) self.vk = ed25519.VerifyingKey(key, encoding=ENCODING) self.go_online() + logger.debug('new Node %s online=%s', self.user_id, self.online) @property def url(self): @@ -114,9 +116,7 @@ class Node(object): self.online = False return None except: - logger.debug('unknown url error') - import traceback - print traceback.print_exc() + logger.debug('unknown url error', exc_info=1) self.online = False return None data = r.read() @@ -140,19 +140,40 @@ class Node(object): def user(self): return user.models.User.get_or_create(self.user_id) + def can_connect(self): + try: + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s.settimeout(1) + s.connect((self.host, self.port)) + s.close() + return True + except: + pass + return False + def go_online(self): self.resolve() - if self.user.peered: + u = self.user + if u.peered or u.queued: try: self.online = False logger.debug('type to connect to %s', self.user_id) - self.pullChanges() + if self.can_connect(): + self.online = True + if u.queued: + logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered) + if u.pending == 'sent': + self.peering('requestPeering') + elif u.pending == '' and u.peered: + self.peering('acceptPeering') + else: + #fixme, what about cancel/reject peering here? + self.peering('removePeering') + if self.online: + self.pullChanges() logger.debug('connected to %s', self.user_id) - self.online = True except: - import traceback - traceback.print_exc() - logger.debug('failed to connect to %s', self.user_id) + logger.debug('failed to connect to %s', self.user_id, exc_info=1) self.online = False else: self.online = False @@ -183,44 +204,22 @@ class Node(object): r = False logger.debug('pushedChanges %s %s', r, self.user_id) - def requestPeering(self, message): - p = self.user - p.pending = 'sent' - p.save() - r = self.request('requestPeering', settings.preferences['username'], message) - return True + def peering(self, action): + u = self.user + if action in ('requestPeering', 'acceptPeering'): + r = self.request(action, settings.preferences['username'], u.info.get('message')) + else: + r = self.request(action, u.info.get('message')) + if r: + u.queued = False + if 'message' in u.info: + del u.info['message'] + u.save() + else: + logger.debug('peering failed? %s %s', action, r) - def acceptPeering(self, message): - logger.debug('run acceptPeering %s', message) - r = self.request('acceptPeering', settings.preferences['username'], message) - logger.debug('result %s', r) - p = self.user - p.update_peering(True) - self.go_online() - return True - - def rejectPeering(self, message): - logger.debug('rejectPeering %s', self.user) - p = self.user - p.update_peering(False) - r = self.request('rejectPeering', message) - self.online = False - return True - - def removePeering(self, message): - logger.debug('removePeering %s', self.user) - p = self.user - if p.peered: - p.update_peering(False) - r = self.request('removePeering', message) - self.online = False - return True - - def cancelPeering(self, message): - p = self.user - p.update_peering(False) - self.online = False - r = self.request('cancelPeering', message) + if action in ('cancelPeering', 'rejectPeering', 'removePeering'): + self.online = False return True def download(self, item): @@ -272,6 +271,7 @@ class Node(object): class Nodes(Thread): _nodes = {} + _local = None def __init__(self, app): self._app = app @@ -303,18 +303,14 @@ class Nodes(Thread): for node in nodes: getattr(node, action)(*args) - def _add_node(self, user_id): + def _add(self, user_id): if user_id not in self._nodes: from user.models import User self._nodes[user_id] = Node(self, User.get_or_create(user_id)) - ''' else: - self._nodes[user_id].online = True - trigger_event('status', { - 'id': user_id, - 'status': 'online' - }) - ''' + logger.debug('bring existing node online %s', user_id) + if not self._nodes[user_id].online: + self._nodes[user_id].go_online() def run(self): with self._app.app_context(): @@ -322,7 +318,7 @@ class Nodes(Thread): args = self._q.get() if args: if args[0] == 'add': - self._add_node(args[1]) + self._add(args[1]) else: self._call(*args) diff --git a/oml/user/api.py b/oml/user/api.py index ba0b593..2793206 100644 --- a/oml/user/api.py +++ b/oml/user/api.py @@ -191,9 +191,13 @@ def requestPeering(request): if len(data.get('id', '')) != 43: logger.debug('invalid user id') return {} - p = models.User.get_or_create(data['id']) - state.nodes.queue('add', p.id) - state.nodes.queue(p.id, 'requestPeering', data.get('message', '')) + u = models.User.get_or_create(data['id']) + u.pending = 'sent' + u.queued = True + u.info['message'] = data.get('message', '') + u.save() + state.nodes.queue('add', u.id) + state.nodes.queue(u.id, 'peering', 'requestPeering') return {} actions.register(requestPeering, cache=False) @@ -204,9 +208,11 @@ def acceptPeering(request): logger.debug('invalid user id') return {} logger.debug('acceptPeering... %s', data) - p = models.User.get_or_create(data['id']) - state.nodes.queue('add', p.id) - state.nodes.queue(p.id, 'acceptPeering', data.get('message', '')) + u = models.User.get_or_create(data['id']) + u.info['message'] = data.get('message', '') + u.update_peering(True) + state.nodes.queue('add', u.id) + state.nodes.queue(u.id, 'peering', 'acceptPeering') return {} actions.register(acceptPeering, cache=False) @@ -216,9 +222,11 @@ def rejectPeering(request): if len(data.get('id', '')) != 43: logger.debug('invalid user id') return {} - p = models.User.get_or_create(data['id']) - state.nodes.queue('add', p.id) - state.nodes.queue(p.id, 'rejectPeering', data.get('message', '')) + u = models.User.get_or_create(data['id']) + u.info['message'] = data.get('message', '') + u.update_peering(False) + state.nodes.queue('add', u.id) + state.nodes.queue(u.id, 'peering', 'rejectPeering') return {} actions.register(rejectPeering, cache=False) @@ -229,8 +237,10 @@ def removePeering(request): logger.debug('invalid user id') return {} u = models.User.get_or_create(data['id']) + u.info['message'] = data.get('message', '') + u.update_peering(False) state.nodes.queue('add', u.id) - state.nodes.queue(u.id, 'removePeering', data.get('message', '')) + state.nodes.queue(u.id, 'peering', 'removePeering') return {} actions.register(removePeering, cache=False) @@ -240,9 +250,11 @@ def cancelPeering(request): if len(data.get('id', '')) != 43: logger.debug('invalid user id') return {} - p = models.User.get_or_create(data['id']) - state.nodes.queue('add', p.id) - state.nodes.queue(p.id, 'cancelPeering', data.get('message', '')) + u = models.User.get_or_create(data['id']) + u.info['message'] = data.get('message', '') + u.update_peering(False) + state.nodes.queue('add', u.id) + state.nodes.queue(u.id, 'peering', 'cancelPeering') return {} actions.register(cancelPeering, cache=False) diff --git a/oml/user/models.py b/oml/user/models.py index c5f8d39..52d50d2 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -26,6 +26,7 @@ class User(db.Model): nickname = db.Column(db.String(256)) pending = db.Column(db.String(64)) # sent|received + queued = db.Column(db.Boolean()) peered = db.Column(db.Boolean()) online = db.Column(db.Boolean()) @@ -69,6 +70,7 @@ class User(db.Model): def update_peering(self, peered, username=None): was_peering = self.peered + self.queued = True if peered: self.pending = '' if username: @@ -175,16 +177,18 @@ class List(db.Model): from item.models import Item for item_id in items: i = Item.get(item_id) - self.items.append(i) - if self.user_id == settings.USER_ID: - i.queue_download() - i.update() + if i: + self.items.append(i) + if self.user_id == settings.USER_ID: + i.queue_download() + i.update() db.session.add(self) db.session.commit() for item_id in items: i = Item.get(item_id) - i.update_lists() - db.session.add(i) + if i: + i.update_lists() + db.session.add(i) db.session.commit() if self.user_id == settings.USER_ID: Changelog.record(self.user, 'addlistitems', self.name, items) diff --git a/oml/websocket.py b/oml/websocket.py index 78aa8e6..90c7900 100644 --- a/oml/websocket.py +++ b/oml/websocket.py @@ -2,8 +2,6 @@ # vi:si:et:sw=4:sts=4:ts=4 from __future__ import division -import logging - from tornado.websocket import WebSocketHandler from tornado.ioloop import IOLoop from Queue import Queue @@ -14,6 +12,7 @@ from oxflask.shortcuts import json_dumps import state +import logging logger = logging.getLogger('oml.websocket') class Background: @@ -82,6 +81,4 @@ def trigger_event(event, data): try: ws.post([event, data]) except: - import traceback - traceback.print_exc() - logger.debug('failed to send to ws %s %s %s', ws, event, data) + logger.debug('failed to send to ws %s %s %s', ws, event, data, exc_info=1) diff --git a/static/js/identifyDialog.js b/static/js/identifyDialog.js index 0f3ce6a..c12adfc 100644 --- a/static/js/identifyDialog.js +++ b/static/js/identifyDialog.js @@ -269,6 +269,7 @@ oml.ui.identifyDialog = function(data) { }); getMetadata(id.id, data.value, function() { // ... + updateIdButtons(); }); } } @@ -503,4 +504,4 @@ oml.ui.identifyDialog = function(data) { return that; -}; \ No newline at end of file +};