diff --git a/oml/item/models.py b/oml/item/models.py index 149a678..3922b4f 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -538,7 +538,7 @@ class Item(db.Model): add_record('additem', self.id, f.info) add_record('edititem', self.id, self.meta) for l in self.lists.filter_by(user_id=settings.USER_ID): - if l.name != '': + if l.name != '' and l.name != 'Inbox': add_record('addlistitems', l.name, [self.id]) self.update() f.move() diff --git a/oml/library.py b/oml/library.py index 11f9103..362826d 100644 --- a/oml/library.py +++ b/oml/library.py @@ -130,16 +130,22 @@ class Peer(object): self.info['lists'][new['name']] = self.info['lists'].pop(name) else: self.info['lists'][new['name']] = [] + if name in self.info['listorder']: + self.info['listorder'] = [new['name'] if n == name else n for n in self.info['listorder']] elif action == 'orderlists': self.info['listorder'] = args[0] elif action == 'removelist': name = args[0] if name in self.info['lists']: del self.info['lists'][name] + if name in self.info['listorder']: + self.info['listorder'] = [n for n in self.info['listorder'] if n != name] elif action == 'addlistitems': name, ids = args if name not in self.info['lists']: self.info['lists'][name] = [] + if name not in self.info['listorder']: + self.info['listorder'].append(name) self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids)) elif action == 'removelistitems': name, ids = args @@ -208,8 +214,9 @@ class Peer(object): def remove(self): self.join() - os.unlink(self._dbpath) - os.unlink(self._infopath) + for path in (self._dbpath, self._logpath, self._infopath): + if os.path.exists(path): + os.unlink(path) def sync_db(self): import item.models @@ -327,6 +334,8 @@ class Peer(object): if state.shutdown: break state.db.session.commit() + if update_items: + logger.debug('updated %s items', len(update_items)) ids = set(self.library.keys()) changed = False for name, l in self.info.get('lists', {}).items(): diff --git a/oml/localnodes.py b/oml/localnodes.py index f3fa7ec..20562e7 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -153,5 +153,7 @@ class LocalNodes(dict): state.tasks.queue('removelocalinfo', id) def get(self, user_id): - if user_id in self and can_connect(self[user_id]): - return self[user_id] if user_id in self else None + data = super().get(user_id) + if data and can_connect(data): + return data + return None diff --git a/oml/nodes.py b/oml/nodes.py index e3621d9..c5cfbfe 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -1,16 +1,17 @@ # -*- coding: utf-8 -*- - +from io import BytesIO from queue import Queue from threading import Thread -import json -from io import BytesIO import gzip -import urllib.request, urllib.error, urllib.parse +import json import os -import time import socket import socks +import time +import urllib.error +import urllib.parse +import urllib.request import ox from tornado.ioloop import PeriodicCallback @@ -37,12 +38,13 @@ class Node(Thread): _online = None TIMEOUT = 5 - def __init__(self, nodes, user): + def __init__(self, nodes, user_id): self._nodes = nodes - self.user = user - self.user_id = user.id + self.user_id = user_id self._opener = get_opener(self.user_id) self._q = Queue() + self._pingcb = PeriodicCallback(self.ping, 10 * settings.server['pull_interval']) + state.main.add_callback(self._pingcb.start) Thread.__init__(self) self.daemon = True self.start() @@ -53,10 +55,22 @@ class Node(Thread): action = self._q.get() if state.shutdown: break - if action == 'send_response': - self._send_response() - elif action == 'ping': + if action == 'ping': self.online = self.can_connect() + elif action == 'send_response': + if self.online: + self._send_response() + else: + if not self._q.qsize(): + time.sleep(5) + self.send_response() + elif isinstance(action, list) and len(action) == 2: + if self.online: + self._call(action[0], *action[1]) + else: + if not self._q.qsize(): + time.sleep(5) + self.queue(action[0], *action[1]) else: logger.debug('unknown action %s', action) @@ -68,6 +82,14 @@ class Node(Thread): if state.online or self.get_local(): self._q.put('ping') + def queue(self, action, *args): + logger.debug('queue node action %s->%s%s', self.user_id, action, args) + self._q.put([action, args]) + + def _call(self, action, *args): + r = getattr(self, action)(*args) + logger.debug('call node api %s->%s%s = %s', self.user_id, action, args, r) + @property def url(self): if self.local: @@ -202,34 +224,26 @@ class Node(Thread): def _send_response(self): with db.session(): u = user.models.User.get(self.user_id) - send_response = u and u.peered or u.queued if u: user_pending = u.pending user_peered = u.peered user_queued = u.queued + else: + user_queued = False if DEBUG_NODES: logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) - if send_response: - try: - self.online = self.can_connect() - except: - if DEBUG_NODES: - logger.debug('failed to connect to %s', self.user_id) - self.online = False - if self.online: - if DEBUG_NODES: - logger.debug('connected to %s', self.url) - if user_queued: - if DEBUG_NODES: - logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered) - if user_pending == 'sent': - self.peering('requestPeering') - elif user_pending == '' and user_peered: - self.peering('acceptPeering') - else: - #fixme, what about cancel/reject peering here? - self.peering('removePeering') + if user_queued: + if DEBUG_NODES: + logger.debug('connected to %s', self.url) + logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered) + if user_pending == 'sent': + self.peering('requestPeering') + elif user_pending == '' and user_peered: + self.peering('acceptPeering') + else: + #fixme, what about cancel/reject peering here? + self.peering('removePeering') def trigger_status(self): if self.online is not None: @@ -321,6 +335,7 @@ class Node(Thread): return False def peering(self, action): + pull_changes = False with db.session(): u = user.models.User.get_or_create(self.user_id) user_info = u.info @@ -335,6 +350,8 @@ class Node(Thread): if 'message' in u.info: del u.info['message'] u.save() + if action == 'acceptPeering': + pull_changes = True else: logger.debug('peering failed? %s %s', action, r) if action in ('cancelPeering', 'rejectPeering', 'removePeering'): @@ -342,6 +359,8 @@ class Node(Thread): with db.session(): u = user.models.User.get(self.user_id) trigger_event('peering.%s' % action.replace('Peering', ''), u.json()) + if pull_changes: + self.pullChanges() return True headers = { @@ -493,7 +512,7 @@ class Nodes(Thread): self.queue('add', u.id, True) self.local = LocalNodes() self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval']) - self._pullcb.start() + state.main.add_callback(self._pullcb.start) Thread.__init__(self) self.daemon = True self.start() @@ -514,9 +533,16 @@ class Nodes(Thread): def queue(self, *args): if args: - logger.debug('queue "%s", %s entries in queue', args[0], self._q.qsize()) + logger.debug('queue "%s", %s entries in queue', args, self._q.qsize()) self._q.put(list(args)) + def peer_queue(self, peer, action, *args): + if peer not in self._nodes: + self._add(peer) + elif not self._nodes[peer].is_online(): + self._nodes[peer].ping() + self._nodes[peer].queue(action, *args) + def is_online(self, id): return id in self._nodes and self._nodes[id].is_online() @@ -532,22 +558,27 @@ class Nodes(Thread): if target == 'all': nodes = list(self._nodes.values()) elif target == 'peered': - nodes = [n for n in list(self._nodes.values()) if n.user.peered] + ids = [] + with db.session(): + from user.models import User + for u in User.query.filter(User.id != settings.USER_ID).filter_by(peered=True).all(): + ids.append(u.id) + nodes = [n for n in list(self._nodes.values()) if n.user_id in ids] elif target == 'online': nodes = [n for n in list(self._nodes.values()) if n.online] else: - if not target in self._nodes: + if target not in self._nodes: self._add(target) nodes = [self._nodes[target]] for node in nodes: - r = getattr(node, action)(*args) - logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r) + node._call(action, *args) def _add(self, user_id, send_response=False): if user_id not in self._nodes: from user.models import User with db.session(): - self._nodes[user_id] = Node(self, User.get_or_create(user_id)) + User.get_or_create(user_id) + self._nodes[user_id] = Node(self, user_id) else: self._nodes[user_id].ping() if send_response: @@ -575,7 +606,7 @@ class Nodes(Thread): if state.shutdown: break node = self._nodes.get(u['id']) - if node: + if node and node.is_online(): node.pullChanges() self._pulling = False @@ -589,8 +620,6 @@ class Nodes(Thread): def publish_node(): update_online() - state.check_nodes = PeriodicCallback(check_nodes, 120000) - state.check_nodes.start() state._online = PeriodicCallback(update_online, 60000) state._online.start() @@ -605,11 +634,3 @@ def update_online(): if state.online: for node in list(state.nodes._nodes.values()): node.trigger_status() - -def check_nodes(): - if state.online: - with db.session(): - for u in user.models.User.query.filter_by(queued=True): - if not state.nodes.is_online(u.id): - logger.debug('queued peering message for %s trying to connect...', u.id) - state.nodes.queue('add', u.id, True) diff --git a/oml/queryparser.py b/oml/queryparser.py index 033e068..939611b 100644 --- a/oml/queryparser.py +++ b/oml/queryparser.py @@ -128,7 +128,11 @@ class Parser(object): elif k == 'list': nickname, name = v.split(':', 1) if nickname: - u = self._user.query.filter_by(nickname=nickname).one() + try: + u = self._user.query.filter_by(nickname=nickname, peered=True).one() + except sqlalchemy.orm.exc.NoResultFound: + ids = [] + return self.in_ids(ids, exclude) else: u = self._user.query.filter_by(id=settings.USER_ID).one() if not name: diff --git a/oml/settings.py b/oml/settings.py index 4ef0843..1d5d9ca 100644 --- a/oml/settings.py +++ b/oml/settings.py @@ -95,4 +95,4 @@ FULLTEXT_SUPPORT = fulltext.platform_supported() if not FULLTEXT_SUPPORT: config['itemKeys'] = [k for k in config['itemKeys'] if k['id'] != 'fulltext'] -DB_VERSION = 15 +DB_VERSION = 17 diff --git a/oml/setup.py b/oml/setup.py index b243a00..69024d0 100644 --- a/oml/setup.py +++ b/oml/setup.py @@ -43,11 +43,11 @@ CREATE TABLE user ( peered BOOLEAN, online BOOLEAN, PRIMARY KEY (id), - UNIQUE (nickname), CHECK (queued IN (0, 1)), CHECK (peered IN (0, 1)), CHECK (online IN (0, 1)) ); +CREATE INDEX ix_user_nichname ON user (nichname); CREATE TABLE metadata ( created DATETIME, modified DATETIME, diff --git a/oml/update.py b/oml/update.py index 07cddb4..e51ed08 100644 --- a/oml/update.py +++ b/oml/update.py @@ -373,6 +373,10 @@ class Update(Thread): db_version = migrate_13() if db_version < 15: db_version = migrate_15() + if db_version < 16: + db_version = migrate_16() + if db_version < 17: + db_version = migrate_17() settings.server['db_version'] = db_version def run(self): @@ -437,7 +441,7 @@ def migrate_5(): 'DROP INDEX IF EXISTS user_metadata_index', 'CREATE UNIQUE INDEX user_metadata_index ON user_metadata(item_id, user_id)', 'UPDATE sort SET sharemetadata = 0', - ]), + ]) with db.session() as session: import user.models for m in user.models.Metadata.query: @@ -631,3 +635,42 @@ def migrate_15(): del u.info['local'] u.save() return 15 + +def migrate_16(): + db.run_sql([ + '''CREATE TABLE user2 ( + created DATETIME, + modified DATETIME, + id VARCHAR(43) NOT NULL, + info BLOB, + nickname VARCHAR(256), + pending VARCHAR(64), + queued BOOLEAN, + peered BOOLEAN, + online BOOLEAN, + PRIMARY KEY (id), + CHECK (queued IN (0, 1)), + CHECK (peered IN (0, 1)), + CHECK (online IN (0, 1)) + )''', + '''INSERT INTO user2 (created, modified, id, info, nickname, pending, queued, peered, online) + SELECT created, modified, id, info, nickname, pending, queued, peered, online FROM user''', + 'DROP TABLE user', + 'ALTER TABLE user2 RENAME TO user', + 'CREATE INDEX IF NOT EXISTS ix_user_nickname ON user (nickname)' + ]) + return 16 + +def migrate_17(): + from user.models import List, User + from changelog import add_record + with db.session(): + l = List.get(':Public') + if not l: + add_record('removelist', 'Public') + lists = [] + for l in List.query.filter_by(user_id=settings.USER_ID).order_by('index_'): + if l.type == 'static' and l.name not in ('', 'Inbox'): + lists.append(l.name) + add_record('orderlists', lists) + return 17 diff --git a/oml/user/api.py b/oml/user/api.py index be190ad..b3aff56 100644 --- a/oml/user/api.py +++ b/oml/user/api.py @@ -422,7 +422,7 @@ def requestPeering(data): u.info['nickname'] = data.get('nickname', '') u.update_name() u.save() - state.nodes.queue(u.id, 'peering', 'requestPeering') + state.nodes.peer_queue(u.id, 'peering', 'requestPeering') return {} actions.register(requestPeering, cache=False) @@ -441,7 +441,7 @@ def acceptPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(True) - state.nodes.queue(u.id, 'peering', 'acceptPeering') + state.nodes.peer_queue(u.id, 'peering', 'acceptPeering') return {} actions.register(acceptPeering, cache=False) @@ -459,7 +459,7 @@ def rejectPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.queue(u.id, 'peering', 'rejectPeering') + state.nodes.peer_queue(u.id, 'peering', 'rejectPeering') return {} actions.register(rejectPeering, cache=False) @@ -474,10 +474,11 @@ def removePeering(data): if len(data.get('id', '')) not in (16, 43): logger.debug('invalid user id') return {} - u = models.User.get_or_create(data['id']) - u.info['message'] = data.get('message', '') - u.update_peering(False) - state.nodes.queue(u.id, 'peering', 'removePeering') + u = models.User.get(data['id'], for_udpate=True) + if u: + u.info['message'] = data.get('message', '') + u.update_peering(False) + state.nodes.peer_queue(u.id, 'peering', 'removePeering') return {} actions.register(removePeering, cache=False) @@ -493,7 +494,7 @@ def cancelPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.queue(u.id, 'peering', 'cancelPeering') + state.nodes.peer_queue(u.id, 'peering', 'cancelPeering') return {} actions.register(cancelPeering, cache=False) diff --git a/oml/user/models.py b/oml/user/models.py index 3d7284a..77ff81e 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -30,7 +30,7 @@ class User(db.Model): id = sa.Column(sa.String(43), primary_key=True) info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler))) - nickname = sa.Column(sa.String(256), unique=True) + nickname = sa.Column(sa.String(256), index=True) pending = sa.Column(sa.String(64)) # sent|received queued = sa.Column(sa.Boolean()) @@ -339,7 +339,7 @@ class List(db.Model): state.db.session.add(self) if commit: state.db.session.commit() - if self.user_id == settings.USER_ID and self.name != '' and available_items: + if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox') and available_items: add_record('addlistitems', self.name, available_items) def get_items(self): @@ -368,7 +368,7 @@ class List(db.Model): q = list_items.delete().where(list_items.columns['list_id'].is_(self.id)) state.db.session.execute(q) if not self._query: - if self.user_id == settings.USER_ID and self.name != '': + if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox'): add_record('removelist', self.name) state.db.session.delete(self) if commit: diff --git a/oml/websocket.py b/oml/websocket.py index 27a3d4e..9178bb8 100644 --- a/oml/websocket.py +++ b/oml/websocket.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- - -from tornado.websocket import WebSocketHandler +from tornado.websocket import WebSocketHandler, WebSocketClosedError from tornado.ioloop import IOLoop +from tornado.iostream import StreamClosedError import json from oxtornado import json_dumps @@ -49,7 +49,11 @@ class Handler(WebSocketHandler): #websocket calls def on_message(self, message): - action, data = json.loads(message) + try: + action, data = json.loads(message) + except json.decoder.JSONDecodeError: + logger.debug('invalid websocket message: %s', message) + return if state.tasks: state.tasks.queue(action, data) @@ -62,7 +66,17 @@ class Handler(WebSocketHandler): if self.ws_connection is None: self.on_close() else: - state.main.add_callback(lambda: self.write_message(message)) + state.main.add_callback(lambda: self._write_message(message)) + + async def _write_message(self, message): + try: + task = self.write_message(message) + await task + except StreamClosedError as e: + self.on_close() + except WebSocketClosedError as e: + self.on_close() + def trigger_event(event, data): #if len(state.websockets):