diff --git a/oml/item/models.py b/oml/item/models.py index 3922b4f..149a678 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -538,7 +538,7 @@ class Item(db.Model): add_record('additem', self.id, f.info) add_record('edititem', self.id, self.meta) for l in self.lists.filter_by(user_id=settings.USER_ID): - if l.name != '' and l.name != 'Inbox': + if l.name != '': add_record('addlistitems', l.name, [self.id]) self.update() f.move() diff --git a/oml/library.py b/oml/library.py index 362826d..11f9103 100644 --- a/oml/library.py +++ b/oml/library.py @@ -130,22 +130,16 @@ class Peer(object): self.info['lists'][new['name']] = self.info['lists'].pop(name) else: self.info['lists'][new['name']] = [] - if name in self.info['listorder']: - self.info['listorder'] = [new['name'] if n == name else n for n in self.info['listorder']] elif action == 'orderlists': self.info['listorder'] = args[0] elif action == 'removelist': name = args[0] if name in self.info['lists']: del self.info['lists'][name] - if name in self.info['listorder']: - self.info['listorder'] = [n for n in self.info['listorder'] if n != name] elif action == 'addlistitems': name, ids = args if name not in self.info['lists']: self.info['lists'][name] = [] - if name not in self.info['listorder']: - self.info['listorder'].append(name) self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids)) elif action == 'removelistitems': name, ids = args @@ -214,9 +208,8 @@ class Peer(object): def remove(self): self.join() - for path in (self._dbpath, self._logpath, self._infopath): - if os.path.exists(path): - os.unlink(path) + os.unlink(self._dbpath) + os.unlink(self._infopath) def sync_db(self): import item.models @@ -334,8 +327,6 @@ class Peer(object): if state.shutdown: break state.db.session.commit() - if update_items: - logger.debug('updated %s items', len(update_items)) ids = set(self.library.keys()) changed = False for name, l in self.info.get('lists', {}).items(): diff --git a/oml/localnodes.py b/oml/localnodes.py index 20562e7..f3fa7ec 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -153,7 +153,5 @@ class LocalNodes(dict): state.tasks.queue('removelocalinfo', id) def get(self, user_id): - data = super().get(user_id) - if data and can_connect(data): - return data - return None + if user_id in self and can_connect(self[user_id]): + return self[user_id] if user_id in self else None diff --git a/oml/nodes.py b/oml/nodes.py index c5cfbfe..e3621d9 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -1,17 +1,16 @@ # -*- coding: utf-8 -*- -from io import BytesIO + from queue import Queue from threading import Thread -import gzip import json +from io import BytesIO +import gzip +import urllib.request, urllib.error, urllib.parse import os +import time import socket import socks -import time -import urllib.error -import urllib.parse -import urllib.request import ox from tornado.ioloop import PeriodicCallback @@ -38,13 +37,12 @@ class Node(Thread): _online = None TIMEOUT = 5 - def __init__(self, nodes, user_id): + def __init__(self, nodes, user): self._nodes = nodes - self.user_id = user_id + self.user = user + self.user_id = user.id self._opener = get_opener(self.user_id) self._q = Queue() - self._pingcb = PeriodicCallback(self.ping, 10 * settings.server['pull_interval']) - state.main.add_callback(self._pingcb.start) Thread.__init__(self) self.daemon = True self.start() @@ -55,22 +53,10 @@ class Node(Thread): action = self._q.get() if state.shutdown: break - if action == 'ping': + if action == 'send_response': + self._send_response() + elif action == 'ping': self.online = self.can_connect() - elif action == 'send_response': - if self.online: - self._send_response() - else: - if not self._q.qsize(): - time.sleep(5) - self.send_response() - elif isinstance(action, list) and len(action) == 2: - if self.online: - self._call(action[0], *action[1]) - else: - if not self._q.qsize(): - time.sleep(5) - self.queue(action[0], *action[1]) else: logger.debug('unknown action %s', action) @@ -82,14 +68,6 @@ class Node(Thread): if state.online or self.get_local(): self._q.put('ping') - def queue(self, action, *args): - logger.debug('queue node action %s->%s%s', self.user_id, action, args) - self._q.put([action, args]) - - def _call(self, action, *args): - r = getattr(self, action)(*args) - logger.debug('call node api %s->%s%s = %s', self.user_id, action, args, r) - @property def url(self): if self.local: @@ -224,26 +202,34 @@ class Node(Thread): def _send_response(self): with db.session(): u = user.models.User.get(self.user_id) + send_response = u and u.peered or u.queued if u: user_pending = u.pending user_peered = u.peered user_queued = u.queued - else: - user_queued = False if DEBUG_NODES: logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) - if user_queued: - if DEBUG_NODES: - logger.debug('connected to %s', self.url) - logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered) - if user_pending == 'sent': - self.peering('requestPeering') - elif user_pending == '' and user_peered: - self.peering('acceptPeering') - else: - #fixme, what about cancel/reject peering here? - self.peering('removePeering') + if send_response: + try: + self.online = self.can_connect() + except: + if DEBUG_NODES: + logger.debug('failed to connect to %s', self.user_id) + self.online = False + if self.online: + if DEBUG_NODES: + logger.debug('connected to %s', self.url) + if user_queued: + if DEBUG_NODES: + logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered) + if user_pending == 'sent': + self.peering('requestPeering') + elif user_pending == '' and user_peered: + self.peering('acceptPeering') + else: + #fixme, what about cancel/reject peering here? + self.peering('removePeering') def trigger_status(self): if self.online is not None: @@ -335,7 +321,6 @@ class Node(Thread): return False def peering(self, action): - pull_changes = False with db.session(): u = user.models.User.get_or_create(self.user_id) user_info = u.info @@ -350,8 +335,6 @@ class Node(Thread): if 'message' in u.info: del u.info['message'] u.save() - if action == 'acceptPeering': - pull_changes = True else: logger.debug('peering failed? %s %s', action, r) if action in ('cancelPeering', 'rejectPeering', 'removePeering'): @@ -359,8 +342,6 @@ class Node(Thread): with db.session(): u = user.models.User.get(self.user_id) trigger_event('peering.%s' % action.replace('Peering', ''), u.json()) - if pull_changes: - self.pullChanges() return True headers = { @@ -512,7 +493,7 @@ class Nodes(Thread): self.queue('add', u.id, True) self.local = LocalNodes() self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval']) - state.main.add_callback(self._pullcb.start) + self._pullcb.start() Thread.__init__(self) self.daemon = True self.start() @@ -533,16 +514,9 @@ class Nodes(Thread): def queue(self, *args): if args: - logger.debug('queue "%s", %s entries in queue', args, self._q.qsize()) + logger.debug('queue "%s", %s entries in queue', args[0], self._q.qsize()) self._q.put(list(args)) - def peer_queue(self, peer, action, *args): - if peer not in self._nodes: - self._add(peer) - elif not self._nodes[peer].is_online(): - self._nodes[peer].ping() - self._nodes[peer].queue(action, *args) - def is_online(self, id): return id in self._nodes and self._nodes[id].is_online() @@ -558,27 +532,22 @@ class Nodes(Thread): if target == 'all': nodes = list(self._nodes.values()) elif target == 'peered': - ids = [] - with db.session(): - from user.models import User - for u in User.query.filter(User.id != settings.USER_ID).filter_by(peered=True).all(): - ids.append(u.id) - nodes = [n for n in list(self._nodes.values()) if n.user_id in ids] + nodes = [n for n in list(self._nodes.values()) if n.user.peered] elif target == 'online': nodes = [n for n in list(self._nodes.values()) if n.online] else: - if target not in self._nodes: + if not target in self._nodes: self._add(target) nodes = [self._nodes[target]] for node in nodes: - node._call(action, *args) + r = getattr(node, action)(*args) + logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r) def _add(self, user_id, send_response=False): if user_id not in self._nodes: from user.models import User with db.session(): - User.get_or_create(user_id) - self._nodes[user_id] = Node(self, user_id) + self._nodes[user_id] = Node(self, User.get_or_create(user_id)) else: self._nodes[user_id].ping() if send_response: @@ -606,7 +575,7 @@ class Nodes(Thread): if state.shutdown: break node = self._nodes.get(u['id']) - if node and node.is_online(): + if node: node.pullChanges() self._pulling = False @@ -620,6 +589,8 @@ class Nodes(Thread): def publish_node(): update_online() + state.check_nodes = PeriodicCallback(check_nodes, 120000) + state.check_nodes.start() state._online = PeriodicCallback(update_online, 60000) state._online.start() @@ -634,3 +605,11 @@ def update_online(): if state.online: for node in list(state.nodes._nodes.values()): node.trigger_status() + +def check_nodes(): + if state.online: + with db.session(): + for u in user.models.User.query.filter_by(queued=True): + if not state.nodes.is_online(u.id): + logger.debug('queued peering message for %s trying to connect...', u.id) + state.nodes.queue('add', u.id, True) diff --git a/oml/queryparser.py b/oml/queryparser.py index 939611b..033e068 100644 --- a/oml/queryparser.py +++ b/oml/queryparser.py @@ -128,11 +128,7 @@ class Parser(object): elif k == 'list': nickname, name = v.split(':', 1) if nickname: - try: - u = self._user.query.filter_by(nickname=nickname, peered=True).one() - except sqlalchemy.orm.exc.NoResultFound: - ids = [] - return self.in_ids(ids, exclude) + u = self._user.query.filter_by(nickname=nickname).one() else: u = self._user.query.filter_by(id=settings.USER_ID).one() if not name: diff --git a/oml/settings.py b/oml/settings.py index 1d5d9ca..4ef0843 100644 --- a/oml/settings.py +++ b/oml/settings.py @@ -95,4 +95,4 @@ FULLTEXT_SUPPORT = fulltext.platform_supported() if not FULLTEXT_SUPPORT: config['itemKeys'] = [k for k in config['itemKeys'] if k['id'] != 'fulltext'] -DB_VERSION = 17 +DB_VERSION = 15 diff --git a/oml/setup.py b/oml/setup.py index 69024d0..b243a00 100644 --- a/oml/setup.py +++ b/oml/setup.py @@ -43,11 +43,11 @@ CREATE TABLE user ( peered BOOLEAN, online BOOLEAN, PRIMARY KEY (id), + UNIQUE (nickname), CHECK (queued IN (0, 1)), CHECK (peered IN (0, 1)), CHECK (online IN (0, 1)) ); -CREATE INDEX ix_user_nichname ON user (nichname); CREATE TABLE metadata ( created DATETIME, modified DATETIME, diff --git a/oml/update.py b/oml/update.py index e51ed08..07cddb4 100644 --- a/oml/update.py +++ b/oml/update.py @@ -373,10 +373,6 @@ class Update(Thread): db_version = migrate_13() if db_version < 15: db_version = migrate_15() - if db_version < 16: - db_version = migrate_16() - if db_version < 17: - db_version = migrate_17() settings.server['db_version'] = db_version def run(self): @@ -441,7 +437,7 @@ def migrate_5(): 'DROP INDEX IF EXISTS user_metadata_index', 'CREATE UNIQUE INDEX user_metadata_index ON user_metadata(item_id, user_id)', 'UPDATE sort SET sharemetadata = 0', - ]) + ]), with db.session() as session: import user.models for m in user.models.Metadata.query: @@ -635,42 +631,3 @@ def migrate_15(): del u.info['local'] u.save() return 15 - -def migrate_16(): - db.run_sql([ - '''CREATE TABLE user2 ( - created DATETIME, - modified DATETIME, - id VARCHAR(43) NOT NULL, - info BLOB, - nickname VARCHAR(256), - pending VARCHAR(64), - queued BOOLEAN, - peered BOOLEAN, - online BOOLEAN, - PRIMARY KEY (id), - CHECK (queued IN (0, 1)), - CHECK (peered IN (0, 1)), - CHECK (online IN (0, 1)) - )''', - '''INSERT INTO user2 (created, modified, id, info, nickname, pending, queued, peered, online) - SELECT created, modified, id, info, nickname, pending, queued, peered, online FROM user''', - 'DROP TABLE user', - 'ALTER TABLE user2 RENAME TO user', - 'CREATE INDEX IF NOT EXISTS ix_user_nickname ON user (nickname)' - ]) - return 16 - -def migrate_17(): - from user.models import List, User - from changelog import add_record - with db.session(): - l = List.get(':Public') - if not l: - add_record('removelist', 'Public') - lists = [] - for l in List.query.filter_by(user_id=settings.USER_ID).order_by('index_'): - if l.type == 'static' and l.name not in ('', 'Inbox'): - lists.append(l.name) - add_record('orderlists', lists) - return 17 diff --git a/oml/user/api.py b/oml/user/api.py index b3aff56..be190ad 100644 --- a/oml/user/api.py +++ b/oml/user/api.py @@ -422,7 +422,7 @@ def requestPeering(data): u.info['nickname'] = data.get('nickname', '') u.update_name() u.save() - state.nodes.peer_queue(u.id, 'peering', 'requestPeering') + state.nodes.queue(u.id, 'peering', 'requestPeering') return {} actions.register(requestPeering, cache=False) @@ -441,7 +441,7 @@ def acceptPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(True) - state.nodes.peer_queue(u.id, 'peering', 'acceptPeering') + state.nodes.queue(u.id, 'peering', 'acceptPeering') return {} actions.register(acceptPeering, cache=False) @@ -459,7 +459,7 @@ def rejectPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.peer_queue(u.id, 'peering', 'rejectPeering') + state.nodes.queue(u.id, 'peering', 'rejectPeering') return {} actions.register(rejectPeering, cache=False) @@ -474,11 +474,10 @@ def removePeering(data): if len(data.get('id', '')) not in (16, 43): logger.debug('invalid user id') return {} - u = models.User.get(data['id'], for_udpate=True) - if u: - u.info['message'] = data.get('message', '') - u.update_peering(False) - state.nodes.peer_queue(u.id, 'peering', 'removePeering') + u = models.User.get_or_create(data['id']) + u.info['message'] = data.get('message', '') + u.update_peering(False) + state.nodes.queue(u.id, 'peering', 'removePeering') return {} actions.register(removePeering, cache=False) @@ -494,7 +493,7 @@ def cancelPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.peer_queue(u.id, 'peering', 'cancelPeering') + state.nodes.queue(u.id, 'peering', 'cancelPeering') return {} actions.register(cancelPeering, cache=False) diff --git a/oml/user/models.py b/oml/user/models.py index 77ff81e..3d7284a 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -30,7 +30,7 @@ class User(db.Model): id = sa.Column(sa.String(43), primary_key=True) info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler))) - nickname = sa.Column(sa.String(256), index=True) + nickname = sa.Column(sa.String(256), unique=True) pending = sa.Column(sa.String(64)) # sent|received queued = sa.Column(sa.Boolean()) @@ -339,7 +339,7 @@ class List(db.Model): state.db.session.add(self) if commit: state.db.session.commit() - if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox') and available_items: + if self.user_id == settings.USER_ID and self.name != '' and available_items: add_record('addlistitems', self.name, available_items) def get_items(self): @@ -368,7 +368,7 @@ class List(db.Model): q = list_items.delete().where(list_items.columns['list_id'].is_(self.id)) state.db.session.execute(q) if not self._query: - if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox'): + if self.user_id == settings.USER_ID and self.name != '': add_record('removelist', self.name) state.db.session.delete(self) if commit: diff --git a/oml/websocket.py b/oml/websocket.py index 9178bb8..27a3d4e 100644 --- a/oml/websocket.py +++ b/oml/websocket.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- -from tornado.websocket import WebSocketHandler, WebSocketClosedError + +from tornado.websocket import WebSocketHandler from tornado.ioloop import IOLoop -from tornado.iostream import StreamClosedError import json from oxtornado import json_dumps @@ -49,11 +49,7 @@ class Handler(WebSocketHandler): #websocket calls def on_message(self, message): - try: - action, data = json.loads(message) - except json.decoder.JSONDecodeError: - logger.debug('invalid websocket message: %s', message) - return + action, data = json.loads(message) if state.tasks: state.tasks.queue(action, data) @@ -66,17 +62,7 @@ class Handler(WebSocketHandler): if self.ws_connection is None: self.on_close() else: - state.main.add_callback(lambda: self._write_message(message)) - - async def _write_message(self, message): - try: - task = self.write_message(message) - await task - except StreamClosedError as e: - self.on_close() - except WebSocketClosedError as e: - self.on_close() - + state.main.add_callback(lambda: self.write_message(message)) def trigger_event(event, data): #if len(state.websockets):