queue peering requests and send again

This commit is contained in:
j 2014-05-18 05:01:24 +02:00
parent 255bb6ce5c
commit e4ca454c41
11 changed files with 157 additions and 97 deletions

View file

@ -19,8 +19,8 @@ Development
On Linux you need to always install python-imaging python-lxml ghostscript:
apt-get install \
python-imaging python-lxml ghostscript
apt-get install python-imaging python-lxml ghostscript poppler-utils
Now checkout the source and prepare for use:

View file

@ -0,0 +1,26 @@
"""empty message
Revision ID: 3169519dc1e5
Revises: 1a7c813a17c2
Create Date: 2014-05-18 03:28:03.950996
"""
# revision identifiers, used by Alembic.
revision = '3169519dc1e5'
down_revision = '1a7c813a17c2'
from alembic import op
import sqlalchemy as sa
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.add_column('user', sa.Column('queued', sa.Boolean(), nullable=True))
### end Alembic commands ###
def downgrade():
### commands auto generated by Alembic - please adjust! ###
op.drop_column('user', 'queued')
### end Alembic commands ###

View file

@ -11,8 +11,9 @@ import sys
import thread
from threading import Thread
from settings import preferences, server, USER_ID, sk
from utils import valid, get_public_ipv6
from settings import preferences, server, USER_ID, sk
import state
logger = logging.getLogger('oml.localnodes')
@ -132,6 +133,7 @@ class LocalNodes(Thread):
u.info['username'] = data['username']
u.info['local'] = data
u.save()
state.nodes.queue('add', u.id)
self.send()
def run(self):

View file

@ -15,6 +15,9 @@ import stdnum.isbn
import settings
from utils import normalize_isbn, find_isbns
import logging
logger = logging.getLogger('oml.meta.pdf')
def cover(pdf):
if sys.platform == 'darwin':
return ql_cover(pdf)
@ -86,9 +89,7 @@ def info(pdf):
if value and _key not in data:
data[_key] = value
except:
print 'FAILED TO PARSE', pdf
import traceback
print traceback.print_exc()
logger.debug('FAILED TO PARSE %s', pdf, exc_info=1)
if 'identifier' in data:
value = normalize_isbn(data['identifier'])

View file

@ -10,7 +10,7 @@ import state
from websocket import trigger_event
import logging
logger = logging.getLogger('oml.node.api')
logger = logging.getLogger('oml.node.nodeapi')
def api_pullChanges(app, remote_id, user_id=None, from_=None, to=None):
if user_id and not from_ and not to:
@ -69,6 +69,7 @@ def api_acceptPeering(app, user_id, username, message):
user.info['message'] = message
user.update_peering(True, username)
trigger_event('peering', user.json())
state.nodes.queue('add', user.id)
return True
return False

View file

@ -5,6 +5,7 @@ import os
import tornado
from tornado.web import Application
from tornado.httpserver import HTTPServer
from tornado.ioloop import PeriodicCallback
import settings
@ -14,7 +15,7 @@ import user
import json
from utils import valid, get_public_ipv6
import api
import nodeapi
import cert
import logging
@ -46,7 +47,7 @@ class NodeHandler(tornado.web.RequestHandler):
content = {}
if valid(key, data, sig):
action, args = json.loads(data)
logger.debug('%s action %s %s', key, action, args)
logger.debug('NODE action %s %s (%s)', action, args, key)
if action == 'ping':
content = {
'ip': request.remote_addr
@ -57,13 +58,17 @@ class NodeHandler(tornado.web.RequestHandler):
if action in (
'requestPeering', 'acceptPeering', 'rejectPeering', 'removePeering'
) or (u and u.peered):
content = getattr(api, 'api_' + action)(self.app, key, *args)
content = getattr(nodeapi, 'api_' + action)(self.app, key, *args)
else:
logger.debug('PEER %s IS UNKNOWN SEND 403', key)
self.set_status(403)
content = {
'status': 'not peered'
}
if u and u.pending:
logger.debug('ignore request from pending peer[%s] %s (%s)', key, action, args)
content = {}
else:
logger.debug('PEER %s IS UNKNOWN SEND 403', key)
self.set_status(403)
content = {
'status': 'not peered'
}
content = json.dumps(content)
sig = settings.sk.sign(content, encoding='base64')
self.set_header('X-Ed25519-Signature', sig)
@ -103,13 +108,28 @@ class ShareHandler(tornado.web.RequestHandler):
self.finish()
def publish_node():
def publish_node(app):
host = get_public_ipv6()
state.online = directory.put(settings.sk, {
'host': host,
'port': settings.server['node_port'],
'cert': settings.server['cert']
})
if state.online:
with app.app_context():
for u in user.models.User.query.filter_by(queued=True):
logger.debug('adding queued node... %s', u.id)
state.nodes.queue('add', u.id)
state.check_nodes = PeriodicCallback(lambda: check_nodes(app), 60000)
state.check_nodes.start()
def check_nodes(app):
if state.online:
with app.app_context():
for u in user.models.User.query.filter_by(queued=True):
if not state.nodes.check_online(u.id):
logger.debug('queued peering message for %s trying to connect...', u.id)
state.nodes.queue('add', u.id)
def start(app):
application = Application([
@ -124,5 +144,5 @@ def start(app):
"keyfile": settings.ssl_key_path
})
http_server.listen(settings.server['node_port'], settings.server['node_address'])
state.main.add_callback(publish_node)
state.main.add_callback(publish_node, app)
return http_server

View file

@ -5,6 +5,7 @@ from __future__ import division
from Queue import Queue
from threading import Thread
import json
import socket
from datetime import datetime
import os
@ -40,6 +41,7 @@ class Node(object):
key = str(user.id)
self.vk = ed25519.VerifyingKey(key, encoding=ENCODING)
self.go_online()
logger.debug('new Node %s online=%s', self.user_id, self.online)
@property
def url(self):
@ -114,9 +116,7 @@ class Node(object):
self.online = False
return None
except:
logger.debug('unknown url error')
import traceback
print traceback.print_exc()
logger.debug('unknown url error', exc_info=1)
self.online = False
return None
data = r.read()
@ -140,19 +140,40 @@ class Node(object):
def user(self):
return user.models.User.get_or_create(self.user_id)
def can_connect(self):
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.settimeout(1)
s.connect((self.host, self.port))
s.close()
return True
except:
pass
return False
def go_online(self):
self.resolve()
if self.user.peered:
u = self.user
if u.peered or u.queued:
try:
self.online = False
logger.debug('type to connect to %s', self.user_id)
self.pullChanges()
if self.can_connect():
self.online = True
if u.queued:
logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered)
if u.pending == 'sent':
self.peering('requestPeering')
elif u.pending == '' and u.peered:
self.peering('acceptPeering')
else:
#fixme, what about cancel/reject peering here?
self.peering('removePeering')
if self.online:
self.pullChanges()
logger.debug('connected to %s', self.user_id)
self.online = True
except:
import traceback
traceback.print_exc()
logger.debug('failed to connect to %s', self.user_id)
logger.debug('failed to connect to %s', self.user_id, exc_info=1)
self.online = False
else:
self.online = False
@ -183,44 +204,22 @@ class Node(object):
r = False
logger.debug('pushedChanges %s %s', r, self.user_id)
def requestPeering(self, message):
p = self.user
p.pending = 'sent'
p.save()
r = self.request('requestPeering', settings.preferences['username'], message)
return True
def peering(self, action):
u = self.user
if action in ('requestPeering', 'acceptPeering'):
r = self.request(action, settings.preferences['username'], u.info.get('message'))
else:
r = self.request(action, u.info.get('message'))
if r:
u.queued = False
if 'message' in u.info:
del u.info['message']
u.save()
else:
logger.debug('peering failed? %s %s', action, r)
def acceptPeering(self, message):
logger.debug('run acceptPeering %s', message)
r = self.request('acceptPeering', settings.preferences['username'], message)
logger.debug('result %s', r)
p = self.user
p.update_peering(True)
self.go_online()
return True
def rejectPeering(self, message):
logger.debug('rejectPeering %s', self.user)
p = self.user
p.update_peering(False)
r = self.request('rejectPeering', message)
self.online = False
return True
def removePeering(self, message):
logger.debug('removePeering %s', self.user)
p = self.user
if p.peered:
p.update_peering(False)
r = self.request('removePeering', message)
self.online = False
return True
def cancelPeering(self, message):
p = self.user
p.update_peering(False)
self.online = False
r = self.request('cancelPeering', message)
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
self.online = False
return True
def download(self, item):
@ -272,6 +271,7 @@ class Node(object):
class Nodes(Thread):
_nodes = {}
_local = None
def __init__(self, app):
self._app = app
@ -303,18 +303,14 @@ class Nodes(Thread):
for node in nodes:
getattr(node, action)(*args)
def _add_node(self, user_id):
def _add(self, user_id):
if user_id not in self._nodes:
from user.models import User
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
'''
else:
self._nodes[user_id].online = True
trigger_event('status', {
'id': user_id,
'status': 'online'
})
'''
logger.debug('bring existing node online %s', user_id)
if not self._nodes[user_id].online:
self._nodes[user_id].go_online()
def run(self):
with self._app.app_context():
@ -322,7 +318,7 @@ class Nodes(Thread):
args = self._q.get()
if args:
if args[0] == 'add':
self._add_node(args[1])
self._add(args[1])
else:
self._call(*args)

View file

@ -191,9 +191,13 @@ def requestPeering(request):
if len(data.get('id', '')) != 43:
logger.debug('invalid user id')
return {}
p = models.User.get_or_create(data['id'])
state.nodes.queue('add', p.id)
state.nodes.queue(p.id, 'requestPeering', data.get('message', ''))
u = models.User.get_or_create(data['id'])
u.pending = 'sent'
u.queued = True
u.info['message'] = data.get('message', '')
u.save()
state.nodes.queue('add', u.id)
state.nodes.queue(u.id, 'peering', 'requestPeering')
return {}
actions.register(requestPeering, cache=False)
@ -204,9 +208,11 @@ def acceptPeering(request):
logger.debug('invalid user id')
return {}
logger.debug('acceptPeering... %s', data)
p = models.User.get_or_create(data['id'])
state.nodes.queue('add', p.id)
state.nodes.queue(p.id, 'acceptPeering', data.get('message', ''))
u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '')
u.update_peering(True)
state.nodes.queue('add', u.id)
state.nodes.queue(u.id, 'peering', 'acceptPeering')
return {}
actions.register(acceptPeering, cache=False)
@ -216,9 +222,11 @@ def rejectPeering(request):
if len(data.get('id', '')) != 43:
logger.debug('invalid user id')
return {}
p = models.User.get_or_create(data['id'])
state.nodes.queue('add', p.id)
state.nodes.queue(p.id, 'rejectPeering', data.get('message', ''))
u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '')
u.update_peering(False)
state.nodes.queue('add', u.id)
state.nodes.queue(u.id, 'peering', 'rejectPeering')
return {}
actions.register(rejectPeering, cache=False)
@ -229,8 +237,10 @@ def removePeering(request):
logger.debug('invalid user id')
return {}
u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '')
u.update_peering(False)
state.nodes.queue('add', u.id)
state.nodes.queue(u.id, 'removePeering', data.get('message', ''))
state.nodes.queue(u.id, 'peering', 'removePeering')
return {}
actions.register(removePeering, cache=False)
@ -240,9 +250,11 @@ def cancelPeering(request):
if len(data.get('id', '')) != 43:
logger.debug('invalid user id')
return {}
p = models.User.get_or_create(data['id'])
state.nodes.queue('add', p.id)
state.nodes.queue(p.id, 'cancelPeering', data.get('message', ''))
u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '')
u.update_peering(False)
state.nodes.queue('add', u.id)
state.nodes.queue(u.id, 'peering', 'cancelPeering')
return {}
actions.register(cancelPeering, cache=False)

View file

@ -26,6 +26,7 @@ class User(db.Model):
nickname = db.Column(db.String(256))
pending = db.Column(db.String(64)) # sent|received
queued = db.Column(db.Boolean())
peered = db.Column(db.Boolean())
online = db.Column(db.Boolean())
@ -69,6 +70,7 @@ class User(db.Model):
def update_peering(self, peered, username=None):
was_peering = self.peered
self.queued = True
if peered:
self.pending = ''
if username:
@ -175,16 +177,18 @@ class List(db.Model):
from item.models import Item
for item_id in items:
i = Item.get(item_id)
self.items.append(i)
if self.user_id == settings.USER_ID:
i.queue_download()
i.update()
if i:
self.items.append(i)
if self.user_id == settings.USER_ID:
i.queue_download()
i.update()
db.session.add(self)
db.session.commit()
for item_id in items:
i = Item.get(item_id)
i.update_lists()
db.session.add(i)
if i:
i.update_lists()
db.session.add(i)
db.session.commit()
if self.user_id == settings.USER_ID:
Changelog.record(self.user, 'addlistitems', self.name, items)

View file

@ -2,8 +2,6 @@
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
import logging
from tornado.websocket import WebSocketHandler
from tornado.ioloop import IOLoop
from Queue import Queue
@ -14,6 +12,7 @@ from oxflask.shortcuts import json_dumps
import state
import logging
logger = logging.getLogger('oml.websocket')
class Background:
@ -82,6 +81,4 @@ def trigger_event(event, data):
try:
ws.post([event, data])
except:
import traceback
traceback.print_exc()
logger.debug('failed to send to ws %s %s %s', ws, event, data)
logger.debug('failed to send to ws %s %s %s', ws, event, data, exc_info=1)

View file

@ -269,6 +269,7 @@ oml.ui.identifyDialog = function(data) {
});
getMetadata(id.id, data.value, function() {
// ...
updateIdButtons();
});
}
}
@ -503,4 +504,4 @@ oml.ui.identifyDialog = function(data) {
return that;
};
};