From 611fc2b3737f2d92d8e24846eed7339424f6e138 Mon Sep 17 00:00:00 2001 From: j Date: Sat, 2 Feb 2019 14:21:25 +0530 Subject: [PATCH] ping nodes to update online status --- oml/nodes.py | 73 +++++++++++++++++++++------------------------------- 1 file changed, 30 insertions(+), 43 deletions(-) diff --git a/oml/nodes.py b/oml/nodes.py index a9b9096..5ba54ee 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -1,16 +1,17 @@ # -*- coding: utf-8 -*- - +from io import BytesIO from queue import Queue from threading import Thread -import json -from io import BytesIO import gzip -import urllib.request, urllib.error, urllib.parse +import json import os -import time import socket import socks +import time +import urllib.error +import urllib.parse +import urllib.request import ox from tornado.ioloop import PeriodicCallback @@ -42,6 +43,8 @@ class Node(Thread): self.user_id = user_id self._opener = get_opener(self.user_id) self._q = Queue() + self._pingcb = PeriodicCallback(self.ping, 10 * settings.server['pull_interval']) + self._pingcb.start() Thread.__init__(self) self.daemon = True self.start() @@ -52,10 +55,15 @@ class Node(Thread): action = self._q.get() if state.shutdown: break - if action == 'send_response': - self._send_response() - elif action == 'ping': + if action == 'ping': self.online = self.can_connect() + elif action == 'send_response': + if self.online: + self._send_response() + else: + if not self._q.qsize(): + time.sleep(5) + self.send_response() elif isinstance(action, list) and len(action) == 2: if self.online: self._call(action[0], *action[1]) @@ -216,34 +224,26 @@ class Node(Thread): def _send_response(self): with db.session(): u = user.models.User.get(self.user_id) - send_response = u and u.peered or u.queued if u: user_pending = u.pending user_peered = u.peered user_queued = u.queued + else: + user_queued = False if DEBUG_NODES: logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) - if send_response: - try: - self.online = self.can_connect() - except: - if DEBUG_NODES: - logger.debug('failed to connect to %s', self.user_id) - self.online = False - if self.online: - if DEBUG_NODES: - logger.debug('connected to %s', self.url) - if user_queued: - if DEBUG_NODES: - logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered) - if user_pending == 'sent': - self.peering('requestPeering') - elif user_pending == '' and user_peered: - self.peering('acceptPeering') - else: - #fixme, what about cancel/reject peering here? - self.peering('removePeering') + if user_queued: + if DEBUG_NODES: + logger.debug('connected to %s', self.url) + logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered) + if user_pending == 'sent': + self.peering('requestPeering') + elif user_pending == '' and user_peered: + self.peering('acceptPeering') + else: + #fixme, what about cancel/reject peering here? + self.peering('removePeering') def trigger_status(self): if self.online is not None: @@ -600,7 +600,7 @@ class Nodes(Thread): if state.shutdown: break node = self._nodes.get(u['id']) - if node: + if node and node.is_online(): node.pullChanges() self._pulling = False @@ -614,8 +614,6 @@ class Nodes(Thread): def publish_node(): update_online() - state.check_nodes = PeriodicCallback(check_nodes, 120000) - state.check_nodes.start() state._online = PeriodicCallback(update_online, 60000) state._online.start() @@ -630,14 +628,3 @@ def update_online(): if state.online: for node in list(state.nodes._nodes.values()): node.trigger_status() - -def check_nodes(): - if state.online: - ids = [] - with db.session(): - for u in user.models.User.query.filter_by(queued=True): - ids.append(u.id) - for id in ids: - if not state.nodes.is_online(id): - logger.debug('queued peering message for %s trying to connect...', id) - state.nodes.queue('add', id, True)