accept peering requests based on prefs
This commit is contained in:
parent
c89fd8e326
commit
954491759e
4 changed files with 27 additions and 30 deletions
|
@ -6,6 +6,7 @@ from changelog import Changelog
|
||||||
from user.models import User
|
from user.models import User
|
||||||
from websocket import trigger_event
|
from websocket import trigger_event
|
||||||
import state
|
import state
|
||||||
|
import settings
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -26,10 +27,11 @@ def api_pullChanges(remote_id, user_id=None, from_=None, to=None):
|
||||||
|
|
||||||
def api_requestPeering(user_id, username, message):
|
def api_requestPeering(user_id, username, message):
|
||||||
user = User.get_or_create(user_id)
|
user = User.get_or_create(user_id)
|
||||||
if not user.info:
|
|
||||||
user.info = {}
|
|
||||||
if not user.peered:
|
if not user.peered:
|
||||||
if user.pending == 'sent':
|
pref = settings.preferences.get('receivedRequests')
|
||||||
|
if pref == 'reject':
|
||||||
|
return True
|
||||||
|
if user.pending == 'sent' or pref == 'accept':
|
||||||
user.info['message'] = message
|
user.info['message'] = message
|
||||||
user.update_peering(True, username)
|
user.update_peering(True, username)
|
||||||
user.update_name()
|
user.update_name()
|
||||||
|
@ -40,6 +42,8 @@ def api_requestPeering(user_id, username, message):
|
||||||
user.update_name()
|
user.update_name()
|
||||||
user.save()
|
user.save()
|
||||||
trigger_event('peering.request', user.json())
|
trigger_event('peering.request', user.json())
|
||||||
|
if user.peered:
|
||||||
|
state.nodes.queue('add', user.id, True)
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -47,13 +51,11 @@ def api_acceptPeering(user_id, username, message):
|
||||||
user = User.get(user_id)
|
user = User.get(user_id)
|
||||||
logger.debug('incoming acceptPeering event: pending: %s', user.pending)
|
logger.debug('incoming acceptPeering event: pending: %s', user.pending)
|
||||||
if user and user.pending == 'sent':
|
if user and user.pending == 'sent':
|
||||||
if not user.info:
|
|
||||||
user.info = {}
|
|
||||||
user.info['username'] = username
|
user.info['username'] = username
|
||||||
user.info['message'] = message
|
user.info['message'] = message
|
||||||
user.update_name()
|
user.update_name()
|
||||||
user.update_peering(True, username)
|
user.update_peering(True, username)
|
||||||
state.nodes.queue('add', user.id)
|
state.nodes.queue('add', user.id, True)
|
||||||
trigger_event('peering.accept', user.json())
|
trigger_event('peering.accept', user.json())
|
||||||
return True
|
return True
|
||||||
elif user and user.peered:
|
elif user and user.peered:
|
||||||
|
@ -63,8 +65,6 @@ def api_acceptPeering(user_id, username, message):
|
||||||
def api_rejectPeering(user_id, message):
|
def api_rejectPeering(user_id, message):
|
||||||
user = User.get(user_id)
|
user = User.get(user_id)
|
||||||
if user:
|
if user:
|
||||||
if not user.info:
|
|
||||||
user.info = {}
|
|
||||||
user.info['message'] = message
|
user.info['message'] = message
|
||||||
user.update_peering(False)
|
user.update_peering(False)
|
||||||
trigger_event('peering.reject', user.json())
|
trigger_event('peering.reject', user.json())
|
||||||
|
|
31
oml/nodes.py
31
oml/nodes.py
|
@ -61,9 +61,8 @@ class Node(Thread):
|
||||||
action = self._q.get()
|
action = self._q.get()
|
||||||
if not self._running:
|
if not self._running:
|
||||||
break
|
break
|
||||||
if action == 'go_online':
|
if action == 'send_response':
|
||||||
if not self.online:
|
self._send_response()
|
||||||
self._go_online()
|
|
||||||
elif action == 'ping':
|
elif action == 'ping':
|
||||||
self.online = self.can_connect()
|
self.online = self.can_connect()
|
||||||
elif action == 'pull':
|
elif action == 'pull':
|
||||||
|
@ -88,9 +87,6 @@ class Node(Thread):
|
||||||
if state.online:
|
if state.online:
|
||||||
self._q.put('ping')
|
self._q.put('ping')
|
||||||
|
|
||||||
def go_online(self):
|
|
||||||
self._q.put('go_online')
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def url(self):
|
def url(self):
|
||||||
url = None
|
url = None
|
||||||
|
@ -234,11 +230,14 @@ class Node(Thread):
|
||||||
def is_online(self):
|
def is_online(self):
|
||||||
return self.online or self.get_local() != None
|
return self.online or self.get_local() != None
|
||||||
|
|
||||||
def _go_online(self):
|
def send_response(self):
|
||||||
|
self._q.put('send_response')
|
||||||
|
|
||||||
|
def _send_response(self):
|
||||||
with db.session():
|
with db.session():
|
||||||
u = user.models.User.get_or_create(self.user_id)
|
u = user.models.User.get_or_create(self.user_id)
|
||||||
if u.peered or u.queued:
|
if u.peered or u.queued:
|
||||||
logger.debug('go_online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
|
logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
|
||||||
try:
|
try:
|
||||||
self.online = self.can_connect()
|
self.online = self.can_connect()
|
||||||
if self.online:
|
if self.online:
|
||||||
|
@ -255,8 +254,6 @@ class Node(Thread):
|
||||||
except:
|
except:
|
||||||
logger.debug('failed to connect to %s', self.user_id)
|
logger.debug('failed to connect to %s', self.user_id)
|
||||||
self.online = False
|
self.online = False
|
||||||
else:
|
|
||||||
self.online = False
|
|
||||||
|
|
||||||
def trigger_status(self):
|
def trigger_status(self):
|
||||||
if self.online is not None:
|
if self.online is not None:
|
||||||
|
@ -299,8 +296,6 @@ class Node(Thread):
|
||||||
logger.debug('peering failed? %s %s', action, r)
|
logger.debug('peering failed? %s %s', action, r)
|
||||||
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
|
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
|
||||||
self.online = False
|
self.online = False
|
||||||
else:
|
|
||||||
self.go_online()
|
|
||||||
trigger_event('peering.%s'%action.replace('Peering', ''), u.json())
|
trigger_event('peering.%s'%action.replace('Peering', ''), u.json())
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -430,7 +425,7 @@ class Nodes(Thread):
|
||||||
self.queue('add', u.id)
|
self.queue('add', u.id)
|
||||||
for u in user.models.User.query.filter_by(queued=True):
|
for u in user.models.User.query.filter_by(queued=True):
|
||||||
logger.debug('adding queued node... %s', u.id)
|
logger.debug('adding queued node... %s', u.id)
|
||||||
self.queue('add', u.id)
|
self.queue('add', u.id, True)
|
||||||
self._local = LocalNodes()
|
self._local = LocalNodes()
|
||||||
self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000)
|
self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000)
|
||||||
self._cleanup.start()
|
self._cleanup.start()
|
||||||
|
@ -459,12 +454,14 @@ class Nodes(Thread):
|
||||||
elif target == 'online':
|
elif target == 'online':
|
||||||
nodes = [n for n in list(self._nodes.values()) if n.online]
|
nodes = [n for n in list(self._nodes.values()) if n.online]
|
||||||
else:
|
else:
|
||||||
|
if not target in self._nodes:
|
||||||
|
self._add(target)
|
||||||
nodes = [self._nodes[target]]
|
nodes = [self._nodes[target]]
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
r = getattr(node, action)(*args)
|
r = getattr(node, action)(*args)
|
||||||
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
|
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
|
||||||
|
|
||||||
def _add(self, user_id):
|
def _add(self, user_id, send_response=False):
|
||||||
if user_id not in self._nodes:
|
if user_id not in self._nodes:
|
||||||
from user.models import User
|
from user.models import User
|
||||||
with db.session():
|
with db.session():
|
||||||
|
@ -472,6 +469,8 @@ class Nodes(Thread):
|
||||||
else:
|
else:
|
||||||
if not self._nodes[user_id].online:
|
if not self._nodes[user_id].online:
|
||||||
self._nodes[user_id].ping()
|
self._nodes[user_id].ping()
|
||||||
|
if send_response:
|
||||||
|
self._nodes[user_id].send_response()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while self._running:
|
while self._running:
|
||||||
|
@ -480,7 +479,7 @@ class Nodes(Thread):
|
||||||
if args[0] == 'cleanup':
|
if args[0] == 'cleanup':
|
||||||
self.cleanup()
|
self.cleanup()
|
||||||
elif args[0] == 'add':
|
elif args[0] == 'add':
|
||||||
self._add(args[1])
|
self._add(*args[1:])
|
||||||
else:
|
else:
|
||||||
self._call(*args)
|
self._call(*args)
|
||||||
|
|
||||||
|
@ -523,4 +522,4 @@ def check_nodes():
|
||||||
for u in user.models.User.query.filter_by(queued=True):
|
for u in user.models.User.query.filter_by(queued=True):
|
||||||
if not state.nodes.is_online(u.id):
|
if not state.nodes.is_online(u.id):
|
||||||
logger.debug('queued peering message for %s trying to connect...', u.id)
|
logger.debug('queued peering message for %s trying to connect...', u.id)
|
||||||
state.nodes.queue('add', u.id)
|
state.nodes.queue('add', u.id, True)
|
||||||
|
|
|
@ -336,7 +336,6 @@ def requestPeering(data):
|
||||||
u.info['nickname'] = data.get('nickname', '')
|
u.info['nickname'] = data.get('nickname', '')
|
||||||
u.update_name()
|
u.update_name()
|
||||||
u.save()
|
u.save()
|
||||||
state.nodes.queue('add', u.id)
|
|
||||||
state.nodes.queue(u.id, 'peering', 'requestPeering')
|
state.nodes.queue(u.id, 'peering', 'requestPeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(requestPeering, cache=False)
|
actions.register(requestPeering, cache=False)
|
||||||
|
@ -356,7 +355,6 @@ def acceptPeering(data):
|
||||||
u = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
u.info['message'] = data.get('message', '')
|
u.info['message'] = data.get('message', '')
|
||||||
u.update_peering(True)
|
u.update_peering(True)
|
||||||
state.nodes.queue('add', u.id)
|
|
||||||
state.nodes.queue(u.id, 'peering', 'acceptPeering')
|
state.nodes.queue(u.id, 'peering', 'acceptPeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(acceptPeering, cache=False)
|
actions.register(acceptPeering, cache=False)
|
||||||
|
@ -375,7 +373,6 @@ def rejectPeering(data):
|
||||||
u = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
u.info['message'] = data.get('message', '')
|
u.info['message'] = data.get('message', '')
|
||||||
u.update_peering(False)
|
u.update_peering(False)
|
||||||
state.nodes.queue('add', u.id)
|
|
||||||
state.nodes.queue(u.id, 'peering', 'rejectPeering')
|
state.nodes.queue(u.id, 'peering', 'rejectPeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(rejectPeering, cache=False)
|
actions.register(rejectPeering, cache=False)
|
||||||
|
@ -394,7 +391,6 @@ def removePeering(data):
|
||||||
u = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
u.info['message'] = data.get('message', '')
|
u.info['message'] = data.get('message', '')
|
||||||
u.update_peering(False)
|
u.update_peering(False)
|
||||||
state.nodes.queue('add', u.id)
|
|
||||||
state.nodes.queue(u.id, 'peering', 'removePeering')
|
state.nodes.queue(u.id, 'peering', 'removePeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(removePeering, cache=False)
|
actions.register(removePeering, cache=False)
|
||||||
|
@ -411,7 +407,6 @@ def cancelPeering(data):
|
||||||
u = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
u.info['message'] = data.get('message', '')
|
u.info['message'] = data.get('message', '')
|
||||||
u.update_peering(False)
|
u.update_peering(False)
|
||||||
state.nodes.queue('add', u.id)
|
|
||||||
state.nodes.queue(u.id, 'peering', 'cancelPeering')
|
state.nodes.queue(u.id, 'peering', 'cancelPeering')
|
||||||
return {}
|
return {}
|
||||||
actions.register(cancelPeering, cache=False)
|
actions.register(cancelPeering, cache=False)
|
||||||
|
|
|
@ -43,7 +43,10 @@ class User(db.Model):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get(cls, id):
|
def get(cls, id):
|
||||||
return cls.query.filter_by(id=id).first()
|
user = cls.query.filter_by(id=id).first()
|
||||||
|
if user and not user.info:
|
||||||
|
user.info = {}
|
||||||
|
return user
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_or_create(cls, id):
|
def get_or_create(cls, id):
|
||||||
|
|
Loading…
Reference in a new issue