diff --git a/oml/node/nodeapi.py b/oml/node/nodeapi.py index d24fbae..db11b90 100644 --- a/oml/node/nodeapi.py +++ b/oml/node/nodeapi.py @@ -6,6 +6,7 @@ from changelog import Changelog from user.models import User from websocket import trigger_event import state +import settings import logging logger = logging.getLogger(__name__) @@ -26,10 +27,11 @@ def api_pullChanges(remote_id, user_id=None, from_=None, to=None): def api_requestPeering(user_id, username, message): user = User.get_or_create(user_id) - if not user.info: - user.info = {} if not user.peered: - if user.pending == 'sent': + pref = settings.preferences.get('receivedRequests') + if pref == 'reject': + return True + if user.pending == 'sent' or pref == 'accept': user.info['message'] = message user.update_peering(True, username) user.update_name() @@ -40,6 +42,8 @@ def api_requestPeering(user_id, username, message): user.update_name() user.save() trigger_event('peering.request', user.json()) + if user.peered: + state.nodes.queue('add', user.id, True) return True return False @@ -47,13 +51,11 @@ def api_acceptPeering(user_id, username, message): user = User.get(user_id) logger.debug('incoming acceptPeering event: pending: %s', user.pending) if user and user.pending == 'sent': - if not user.info: - user.info = {} user.info['username'] = username user.info['message'] = message user.update_name() user.update_peering(True, username) - state.nodes.queue('add', user.id) + state.nodes.queue('add', user.id, True) trigger_event('peering.accept', user.json()) return True elif user and user.peered: @@ -63,8 +65,6 @@ def api_acceptPeering(user_id, username, message): def api_rejectPeering(user_id, message): user = User.get(user_id) if user: - if not user.info: - user.info = {} user.info['message'] = message user.update_peering(False) trigger_event('peering.reject', user.json()) diff --git a/oml/nodes.py b/oml/nodes.py index cc02fd8..4a8feab 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -61,9 +61,8 @@ class Node(Thread): action = self._q.get() if not self._running: break - if action == 'go_online': - if not self.online: - self._go_online() + if action == 'send_response': + self._send_response() elif action == 'ping': self.online = self.can_connect() elif action == 'pull': @@ -88,9 +87,6 @@ class Node(Thread): if state.online: self._q.put('ping') - def go_online(self): - self._q.put('go_online') - @property def url(self): url = None @@ -234,11 +230,14 @@ class Node(Thread): def is_online(self): return self.online or self.get_local() != None - def _go_online(self): + def send_response(self): + self._q.put('send_response') + + def _send_response(self): with db.session(): u = user.models.User.get_or_create(self.user_id) if u.peered or u.queued: - logger.debug('go_online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) + logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) try: self.online = self.can_connect() if self.online: @@ -255,8 +254,6 @@ class Node(Thread): except: logger.debug('failed to connect to %s', self.user_id) self.online = False - else: - self.online = False def trigger_status(self): if self.online is not None: @@ -299,8 +296,6 @@ class Node(Thread): logger.debug('peering failed? %s %s', action, r) if action in ('cancelPeering', 'rejectPeering', 'removePeering'): self.online = False - else: - self.go_online() trigger_event('peering.%s'%action.replace('Peering', ''), u.json()) return True @@ -430,7 +425,7 @@ class Nodes(Thread): self.queue('add', u.id) for u in user.models.User.query.filter_by(queued=True): logger.debug('adding queued node... %s', u.id) - self.queue('add', u.id) + self.queue('add', u.id, True) self._local = LocalNodes() self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000) self._cleanup.start() @@ -459,12 +454,14 @@ class Nodes(Thread): elif target == 'online': nodes = [n for n in list(self._nodes.values()) if n.online] else: + if not target in self._nodes: + self._add(target) nodes = [self._nodes[target]] for node in nodes: r = getattr(node, action)(*args) logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r) - def _add(self, user_id): + def _add(self, user_id, send_response=False): if user_id not in self._nodes: from user.models import User with db.session(): @@ -472,6 +469,8 @@ class Nodes(Thread): else: if not self._nodes[user_id].online: self._nodes[user_id].ping() + if send_response: + self._nodes[user_id].send_response() def run(self): while self._running: @@ -480,7 +479,7 @@ class Nodes(Thread): if args[0] == 'cleanup': self.cleanup() elif args[0] == 'add': - self._add(args[1]) + self._add(*args[1:]) else: self._call(*args) @@ -523,4 +522,4 @@ def check_nodes(): for u in user.models.User.query.filter_by(queued=True): if not state.nodes.is_online(u.id): logger.debug('queued peering message for %s trying to connect...', u.id) - state.nodes.queue('add', u.id) + state.nodes.queue('add', u.id, True) diff --git a/oml/user/api.py b/oml/user/api.py index 74c44b7..53eb88e 100644 --- a/oml/user/api.py +++ b/oml/user/api.py @@ -336,7 +336,6 @@ def requestPeering(data): u.info['nickname'] = data.get('nickname', '') u.update_name() u.save() - state.nodes.queue('add', u.id) state.nodes.queue(u.id, 'peering', 'requestPeering') return {} actions.register(requestPeering, cache=False) @@ -356,7 +355,6 @@ def acceptPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(True) - state.nodes.queue('add', u.id) state.nodes.queue(u.id, 'peering', 'acceptPeering') return {} actions.register(acceptPeering, cache=False) @@ -375,7 +373,6 @@ def rejectPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.queue('add', u.id) state.nodes.queue(u.id, 'peering', 'rejectPeering') return {} actions.register(rejectPeering, cache=False) @@ -394,7 +391,6 @@ def removePeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.queue('add', u.id) state.nodes.queue(u.id, 'peering', 'removePeering') return {} actions.register(removePeering, cache=False) @@ -411,7 +407,6 @@ def cancelPeering(data): u = models.User.get_or_create(data['id']) u.info['message'] = data.get('message', '') u.update_peering(False) - state.nodes.queue('add', u.id) state.nodes.queue(u.id, 'peering', 'cancelPeering') return {} actions.register(cancelPeering, cache=False) diff --git a/oml/user/models.py b/oml/user/models.py index cd9d766..7dd284c 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -43,7 +43,10 @@ class User(db.Model): @classmethod def get(cls, id): - return cls.query.filter_by(id=id).first() + user = cls.query.filter_by(id=id).first() + if user and not user.info: + user.info = {} + return user @classmethod def get_or_create(cls, id):