From c95e22869cf52121e2815870d12dcb6e588bf25b Mon Sep 17 00:00:00 2001 From: j Date: Sat, 2 Feb 2019 12:43:37 +0530 Subject: [PATCH] queue peering requests per node --- oml/localnodes.py | 6 ++++-- oml/nodes.py | 50 ++++++++++++++++++++++++++++++++++++---------- oml/queryparser.py | 6 +++++- oml/user/api.py | 10 +++++----- 4 files changed, 53 insertions(+), 19 deletions(-) diff --git a/oml/localnodes.py b/oml/localnodes.py index f3fa7ec..20562e7 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -153,5 +153,7 @@ class LocalNodes(dict): state.tasks.queue('removelocalinfo', id) def get(self, user_id): - if user_id in self and can_connect(self[user_id]): - return self[user_id] if user_id in self else None + data = super().get(user_id) + if data and can_connect(data): + return data + return None diff --git a/oml/nodes.py b/oml/nodes.py index e3621d9..a9b9096 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -37,10 +37,9 @@ class Node(Thread): _online = None TIMEOUT = 5 - def __init__(self, nodes, user): + def __init__(self, nodes, user_id): self._nodes = nodes - self.user = user - self.user_id = user.id + self.user_id = user_id self._opener = get_opener(self.user_id) self._q = Queue() Thread.__init__(self) @@ -57,6 +56,13 @@ class Node(Thread): self._send_response() elif action == 'ping': self.online = self.can_connect() + elif isinstance(action, list) and len(action) == 2: + if self.online: + self._call(action[0], *action[1]) + else: + if not self._q.qsize(): + time.sleep(5) + self.queue(action[0], *action[1]) else: logger.debug('unknown action %s', action) @@ -68,6 +74,14 @@ class Node(Thread): if state.online or self.get_local(): self._q.put('ping') + def queue(self, action, *args): + logger.debug('queue node action %s->%s%s', self.user_id, action, args) + self._q.put([action, args]) + + def _call(self, action, *args): + r = getattr(self, action)(*args) + logger.debug('call node api %s->%s%s = %s', self.user_id, action, args, r) + @property def url(self): if self.local: @@ -321,6 +335,7 @@ class Node(Thread): return False def peering(self, action): + pull_changes = False with db.session(): u = user.models.User.get_or_create(self.user_id) user_info = u.info @@ -335,6 +350,8 @@ class Node(Thread): if 'message' in u.info: del u.info['message'] u.save() + if action == 'acceptPeering': + pull_changes = True else: logger.debug('peering failed? %s %s', action, r) if action in ('cancelPeering', 'rejectPeering', 'removePeering'): @@ -342,6 +359,8 @@ class Node(Thread): with db.session(): u = user.models.User.get(self.user_id) trigger_event('peering.%s' % action.replace('Peering', ''), u.json()) + if pull_changes: + self.pullChanges() return True headers = { @@ -514,9 +533,15 @@ class Nodes(Thread): def queue(self, *args): if args: - logger.debug('queue "%s", %s entries in queue', args[0], self._q.qsize()) + logger.debug('queue "%s", %s entries in queue', args, self._q.qsize()) self._q.put(list(args)) + def peer_queue(self, peer, action, *args): + if peer not in self._nodes: + self._add(peer) + self._nodes[peer].queue(action, *args) + + def is_online(self, id): return id in self._nodes and self._nodes[id].is_online() @@ -536,18 +561,18 @@ class Nodes(Thread): elif target == 'online': nodes = [n for n in list(self._nodes.values()) if n.online] else: - if not target in self._nodes: + if target not in self._nodes: self._add(target) nodes = [self._nodes[target]] for node in nodes: - r = getattr(node, action)(*args) - logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r) + node._call(action, *args) def _add(self, user_id, send_response=False): if user_id not in self._nodes: from user.models import User with db.session(): - self._nodes[user_id] = Node(self, User.get_or_create(user_id)) + User.get_or_create(user_id) + self._nodes[user_id] = Node(self, user_id) else: self._nodes[user_id].ping() if send_response: @@ -608,8 +633,11 @@ def update_online(): def check_nodes(): if state.online: + ids = [] with db.session(): for u in user.models.User.query.filter_by(queued=True): - if not state.nodes.is_online(u.id): - logger.debug('queued peering message for %s trying to connect...', u.id) - state.nodes.queue('add', u.id, True) + ids.append(u.id) + for id in ids: + if not state.nodes.is_online(id): + logger.debug('queued peering message for %s trying to connect...', id) + state.nodes.queue('add', id, True) diff --git a/oml/queryparser.py b/oml/queryparser.py index 033e068..2810729 100644 --- a/oml/queryparser.py +++ b/oml/queryparser.py @@ -128,7 +128,11 @@ class Parser(object): elif k == 'list': nickname, name = v.split(':', 1) if nickname: - u = self._user.query.filter_by(nickname=nickname).one() + try: + u = self._user.query.filter_by(nickname=nickname).one() + except sqlalchemy.orm.exc.NoResultFound: + ids = [] + return self.in_ids(ids, exclude) else: u = self._user.query.filter_by(id=settings.USER_ID).one() if not name: diff --git a/oml/user/api.py b/oml/user/api.py index be190ad..f63b8a0 100644 --- a/oml/user/api.py +++ b/oml/user/api.py @@ -422,7 +422,7 @@ def requestPeering(data): u.info['nickname'] = data.get('nickname', '') u.update_name() u.save() - state.nodes.queue(u.id, 'peering', 'requestPeering') + state.nodes.peer_queue(u.id, 'peering', 'requestPeering') return {} actions.register(requestPeering, cache=False) @@ -441,7 +441,7 @@ def acceptPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(True) - state.nodes.queue(u.id, 'peering', 'acceptPeering') + state.nodes.peer_queue(u.id, 'peering', 'acceptPeering') return {} actions.register(acceptPeering, cache=False) @@ -459,7 +459,7 @@ def rejectPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.queue(u.id, 'peering', 'rejectPeering') + state.nodes.peer_queue(u.id, 'peering', 'rejectPeering') return {} actions.register(rejectPeering, cache=False) @@ -477,7 +477,7 @@ def removePeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.queue(u.id, 'peering', 'removePeering') + state.nodes.peer_queue(u.id, 'peering', 'removePeering') return {} actions.register(removePeering, cache=False) @@ -493,7 +493,7 @@ def cancelPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.queue(u.id, 'peering', 'cancelPeering') + state.nodes.peer_queue(u.id, 'peering', 'cancelPeering') return {} actions.register(cancelPeering, cache=False)