queue peering requests per node
This commit is contained in:
parent
148b41087f
commit
c95e22869c
4 changed files with 53 additions and 19 deletions
|
@ -153,5 +153,7 @@ class LocalNodes(dict):
|
||||||
state.tasks.queue('removelocalinfo', id)
|
state.tasks.queue('removelocalinfo', id)
|
||||||
|
|
||||||
def get(self, user_id):
|
def get(self, user_id):
|
||||||
if user_id in self and can_connect(self[user_id]):
|
data = super().get(user_id)
|
||||||
return self[user_id] if user_id in self else None
|
if data and can_connect(data):
|
||||||
|
return data
|
||||||
|
return None
|
||||||
|
|
50
oml/nodes.py
50
oml/nodes.py
|
@ -37,10 +37,9 @@ class Node(Thread):
|
||||||
_online = None
|
_online = None
|
||||||
TIMEOUT = 5
|
TIMEOUT = 5
|
||||||
|
|
||||||
def __init__(self, nodes, user):
|
def __init__(self, nodes, user_id):
|
||||||
self._nodes = nodes
|
self._nodes = nodes
|
||||||
self.user = user
|
self.user_id = user_id
|
||||||
self.user_id = user.id
|
|
||||||
self._opener = get_opener(self.user_id)
|
self._opener = get_opener(self.user_id)
|
||||||
self._q = Queue()
|
self._q = Queue()
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
|
@ -57,6 +56,13 @@ class Node(Thread):
|
||||||
self._send_response()
|
self._send_response()
|
||||||
elif action == 'ping':
|
elif action == 'ping':
|
||||||
self.online = self.can_connect()
|
self.online = self.can_connect()
|
||||||
|
elif isinstance(action, list) and len(action) == 2:
|
||||||
|
if self.online:
|
||||||
|
self._call(action[0], *action[1])
|
||||||
|
else:
|
||||||
|
if not self._q.qsize():
|
||||||
|
time.sleep(5)
|
||||||
|
self.queue(action[0], *action[1])
|
||||||
else:
|
else:
|
||||||
logger.debug('unknown action %s', action)
|
logger.debug('unknown action %s', action)
|
||||||
|
|
||||||
|
@ -68,6 +74,14 @@ class Node(Thread):
|
||||||
if state.online or self.get_local():
|
if state.online or self.get_local():
|
||||||
self._q.put('ping')
|
self._q.put('ping')
|
||||||
|
|
||||||
|
def queue(self, action, *args):
|
||||||
|
logger.debug('queue node action %s->%s%s', self.user_id, action, args)
|
||||||
|
self._q.put([action, args])
|
||||||
|
|
||||||
|
def _call(self, action, *args):
|
||||||
|
r = getattr(self, action)(*args)
|
||||||
|
logger.debug('call node api %s->%s%s = %s', self.user_id, action, args, r)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def url(self):
|
def url(self):
|
||||||
if self.local:
|
if self.local:
|
||||||
|
@ -321,6 +335,7 @@ class Node(Thread):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def peering(self, action):
|
def peering(self, action):
|
||||||
|
pull_changes = False
|
||||||
with db.session():
|
with db.session():
|
||||||
u = user.models.User.get_or_create(self.user_id)
|
u = user.models.User.get_or_create(self.user_id)
|
||||||
user_info = u.info
|
user_info = u.info
|
||||||
|
@ -335,6 +350,8 @@ class Node(Thread):
|
||||||
if 'message' in u.info:
|
if 'message' in u.info:
|
||||||
del u.info['message']
|
del u.info['message']
|
||||||
u.save()
|
u.save()
|
||||||
|
if action == 'acceptPeering':
|
||||||
|
pull_changes = True
|
||||||
else:
|
else:
|
||||||
logger.debug('peering failed? %s %s', action, r)
|
logger.debug('peering failed? %s %s', action, r)
|
||||||
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
|
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
|
||||||
|
@ -342,6 +359,8 @@ class Node(Thread):
|
||||||
with db.session():
|
with db.session():
|
||||||
u = user.models.User.get(self.user_id)
|
u = user.models.User.get(self.user_id)
|
||||||
trigger_event('peering.%s' % action.replace('Peering', ''), u.json())
|
trigger_event('peering.%s' % action.replace('Peering', ''), u.json())
|
||||||
|
if pull_changes:
|
||||||
|
self.pullChanges()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
|
@ -514,9 +533,15 @@ class Nodes(Thread):
|
||||||
|
|
||||||
def queue(self, *args):
|
def queue(self, *args):
|
||||||
if args:
|
if args:
|
||||||
logger.debug('queue "%s", %s entries in queue', args[0], self._q.qsize())
|
logger.debug('queue "%s", %s entries in queue', args, self._q.qsize())
|
||||||
self._q.put(list(args))
|
self._q.put(list(args))
|
||||||
|
|
||||||
|
def peer_queue(self, peer, action, *args):
|
||||||
|
if peer not in self._nodes:
|
||||||
|
self._add(peer)
|
||||||
|
self._nodes[peer].queue(action, *args)
|
||||||
|
|
||||||
|
|
||||||
def is_online(self, id):
|
def is_online(self, id):
|
||||||
return id in self._nodes and self._nodes[id].is_online()
|
return id in self._nodes and self._nodes[id].is_online()
|
||||||
|
|
||||||
|
@ -536,18 +561,18 @@ class Nodes(Thread):
|
||||||
elif target == 'online':
|
elif target == 'online':
|
||||||
nodes = [n for n in list(self._nodes.values()) if n.online]
|
nodes = [n for n in list(self._nodes.values()) if n.online]
|
||||||
else:
|
else:
|
||||||
if not target in self._nodes:
|
if target not in self._nodes:
|
||||||
self._add(target)
|
self._add(target)
|
||||||
nodes = [self._nodes[target]]
|
nodes = [self._nodes[target]]
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
r = getattr(node, action)(*args)
|
node._call(action, *args)
|
||||||
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
|
|
||||||
|
|
||||||
def _add(self, user_id, send_response=False):
|
def _add(self, user_id, send_response=False):
|
||||||
if user_id not in self._nodes:
|
if user_id not in self._nodes:
|
||||||
from user.models import User
|
from user.models import User
|
||||||
with db.session():
|
with db.session():
|
||||||
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
|
User.get_or_create(user_id)
|
||||||
|
self._nodes[user_id] = Node(self, user_id)
|
||||||
else:
|
else:
|
||||||
self._nodes[user_id].ping()
|
self._nodes[user_id].ping()
|
||||||
if send_response:
|
if send_response:
|
||||||
|
@ -608,8 +633,11 @@ def update_online():
|
||||||
|
|
||||||
def check_nodes():
|
def check_nodes():
|
||||||
if state.online:
|
if state.online:
|
||||||
|
ids = []
|
||||||
with db.session():
|
with db.session():
|
||||||
for u in user.models.User.query.filter_by(queued=True):
|
for u in user.models.User.query.filter_by(queued=True):
|
||||||
if not state.nodes.is_online(u.id):
|
ids.append(u.id)
|
||||||
logger.debug('queued peering message for %s trying to connect...', u.id)
|
for id in ids:
|
||||||
state.nodes.queue('add', u.id, True)
|
if not state.nodes.is_online(id):
|
||||||
|
logger.debug('queued peering message for %s trying to connect...', id)
|
||||||
|
state.nodes.queue('add', id, True)
|
||||||
|
|
|
@ -128,7 +128,11 @@ class Parser(object):
|
||||||
elif k == 'list':
|
elif k == 'list':
|
||||||
nickname, name = v.split(':', 1)
|
nickname, name = v.split(':', 1)
|
||||||
if nickname:
|
if nickname:
|
||||||
|
try:
|
||||||
u = self._user.query.filter_by(nickname=nickname).one()
|
u = self._user.query.filter_by(nickname=nickname).one()
|
||||||
|
except sqlalchemy.orm.exc.NoResultFound:
|
||||||
|
ids = []
|
||||||
|
return self.in_ids(ids, exclude)
|
||||||
else:
|
else:
|
||||||
u = self._user.query.filter_by(id=settings.USER_ID).one()
|
u = self._user.query.filter_by(id=settings.USER_ID).one()
|
||||||
if not name:
|
if not name:
|
||||||
|
|
|
@ -422,7 +422,7 @@ def requestPeering(data):
|
||||||
u.info['nickname'] = data.get('nickname', '')
|
u.info['nickname'] = data.get('nickname', '')
|
||||||
u.update_name()
|
u.update_name()
|
||||||
u.save()
|
u.save()
|
||||||
state.nodes.queue(u.id, 'peering', 'requestPeering')
|
state.nodes.peer_queue(u.id, 'peering', 'requestPeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(requestPeering, cache=False)
|
actions.register(requestPeering, cache=False)
|
||||||
|
|
||||||
|
@ -441,7 +441,7 @@ def acceptPeering(data):
|
||||||
u = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
u.info['message'] = data.get('message', '')
|
u.info['message'] = data.get('message', '')
|
||||||
u.update_peering(True)
|
u.update_peering(True)
|
||||||
state.nodes.queue(u.id, 'peering', 'acceptPeering')
|
state.nodes.peer_queue(u.id, 'peering', 'acceptPeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(acceptPeering, cache=False)
|
actions.register(acceptPeering, cache=False)
|
||||||
|
|
||||||
|
@ -459,7 +459,7 @@ def rejectPeering(data):
|
||||||
u = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
u.info['message'] = data.get('message', '')
|
u.info['message'] = data.get('message', '')
|
||||||
u.update_peering(False)
|
u.update_peering(False)
|
||||||
state.nodes.queue(u.id, 'peering', 'rejectPeering')
|
state.nodes.peer_queue(u.id, 'peering', 'rejectPeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(rejectPeering, cache=False)
|
actions.register(rejectPeering, cache=False)
|
||||||
|
|
||||||
|
@ -477,7 +477,7 @@ def removePeering(data):
|
||||||
u = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
u.info['message'] = data.get('message', '')
|
u.info['message'] = data.get('message', '')
|
||||||
u.update_peering(False)
|
u.update_peering(False)
|
||||||
state.nodes.queue(u.id, 'peering', 'removePeering')
|
state.nodes.peer_queue(u.id, 'peering', 'removePeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(removePeering, cache=False)
|
actions.register(removePeering, cache=False)
|
||||||
|
|
||||||
|
@ -493,7 +493,7 @@ def cancelPeering(data):
|
||||||
u = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
u.info['message'] = data.get('message', '')
|
u.info['message'] = data.get('message', '')
|
||||||
u.update_peering(False)
|
u.update_peering(False)
|
||||||
state.nodes.queue(u.id, 'peering', 'cancelPeering')
|
state.nodes.peer_queue(u.id, 'peering', 'cancelPeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(cancelPeering, cache=False)
|
actions.register(cancelPeering, cache=False)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue