ping nodes to update online status
This commit is contained in:
parent
265a4791fe
commit
611fc2b373
1 changed files with 30 additions and 43 deletions
73
oml/nodes.py
73
oml/nodes.py
|
@ -1,16 +1,17 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from io import BytesIO
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
import json
|
|
||||||
from io import BytesIO
|
|
||||||
import gzip
|
import gzip
|
||||||
import urllib.request, urllib.error, urllib.parse
|
import json
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
import socket
|
import socket
|
||||||
import socks
|
import socks
|
||||||
|
import time
|
||||||
|
import urllib.error
|
||||||
|
import urllib.parse
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
import ox
|
import ox
|
||||||
from tornado.ioloop import PeriodicCallback
|
from tornado.ioloop import PeriodicCallback
|
||||||
|
@ -42,6 +43,8 @@ class Node(Thread):
|
||||||
self.user_id = user_id
|
self.user_id = user_id
|
||||||
self._opener = get_opener(self.user_id)
|
self._opener = get_opener(self.user_id)
|
||||||
self._q = Queue()
|
self._q = Queue()
|
||||||
|
self._pingcb = PeriodicCallback(self.ping, 10 * settings.server['pull_interval'])
|
||||||
|
self._pingcb.start()
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
self.start()
|
self.start()
|
||||||
|
@ -52,10 +55,15 @@ class Node(Thread):
|
||||||
action = self._q.get()
|
action = self._q.get()
|
||||||
if state.shutdown:
|
if state.shutdown:
|
||||||
break
|
break
|
||||||
if action == 'send_response':
|
if action == 'ping':
|
||||||
self._send_response()
|
|
||||||
elif action == 'ping':
|
|
||||||
self.online = self.can_connect()
|
self.online = self.can_connect()
|
||||||
|
elif action == 'send_response':
|
||||||
|
if self.online:
|
||||||
|
self._send_response()
|
||||||
|
else:
|
||||||
|
if not self._q.qsize():
|
||||||
|
time.sleep(5)
|
||||||
|
self.send_response()
|
||||||
elif isinstance(action, list) and len(action) == 2:
|
elif isinstance(action, list) and len(action) == 2:
|
||||||
if self.online:
|
if self.online:
|
||||||
self._call(action[0], *action[1])
|
self._call(action[0], *action[1])
|
||||||
|
@ -216,34 +224,26 @@ class Node(Thread):
|
||||||
def _send_response(self):
|
def _send_response(self):
|
||||||
with db.session():
|
with db.session():
|
||||||
u = user.models.User.get(self.user_id)
|
u = user.models.User.get(self.user_id)
|
||||||
send_response = u and u.peered or u.queued
|
|
||||||
if u:
|
if u:
|
||||||
user_pending = u.pending
|
user_pending = u.pending
|
||||||
user_peered = u.peered
|
user_peered = u.peered
|
||||||
user_queued = u.queued
|
user_queued = u.queued
|
||||||
|
else:
|
||||||
|
user_queued = False
|
||||||
if DEBUG_NODES:
|
if DEBUG_NODES:
|
||||||
logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
|
logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
|
||||||
|
|
||||||
if send_response:
|
if user_queued:
|
||||||
try:
|
if DEBUG_NODES:
|
||||||
self.online = self.can_connect()
|
logger.debug('connected to %s', self.url)
|
||||||
except:
|
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
|
||||||
if DEBUG_NODES:
|
if user_pending == 'sent':
|
||||||
logger.debug('failed to connect to %s', self.user_id)
|
self.peering('requestPeering')
|
||||||
self.online = False
|
elif user_pending == '' and user_peered:
|
||||||
if self.online:
|
self.peering('acceptPeering')
|
||||||
if DEBUG_NODES:
|
else:
|
||||||
logger.debug('connected to %s', self.url)
|
#fixme, what about cancel/reject peering here?
|
||||||
if user_queued:
|
self.peering('removePeering')
|
||||||
if DEBUG_NODES:
|
|
||||||
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
|
|
||||||
if user_pending == 'sent':
|
|
||||||
self.peering('requestPeering')
|
|
||||||
elif user_pending == '' and user_peered:
|
|
||||||
self.peering('acceptPeering')
|
|
||||||
else:
|
|
||||||
#fixme, what about cancel/reject peering here?
|
|
||||||
self.peering('removePeering')
|
|
||||||
|
|
||||||
def trigger_status(self):
|
def trigger_status(self):
|
||||||
if self.online is not None:
|
if self.online is not None:
|
||||||
|
@ -600,7 +600,7 @@ class Nodes(Thread):
|
||||||
if state.shutdown:
|
if state.shutdown:
|
||||||
break
|
break
|
||||||
node = self._nodes.get(u['id'])
|
node = self._nodes.get(u['id'])
|
||||||
if node:
|
if node and node.is_online():
|
||||||
node.pullChanges()
|
node.pullChanges()
|
||||||
self._pulling = False
|
self._pulling = False
|
||||||
|
|
||||||
|
@ -614,8 +614,6 @@ class Nodes(Thread):
|
||||||
|
|
||||||
def publish_node():
|
def publish_node():
|
||||||
update_online()
|
update_online()
|
||||||
state.check_nodes = PeriodicCallback(check_nodes, 120000)
|
|
||||||
state.check_nodes.start()
|
|
||||||
state._online = PeriodicCallback(update_online, 60000)
|
state._online = PeriodicCallback(update_online, 60000)
|
||||||
state._online.start()
|
state._online.start()
|
||||||
|
|
||||||
|
@ -630,14 +628,3 @@ def update_online():
|
||||||
if state.online:
|
if state.online:
|
||||||
for node in list(state.nodes._nodes.values()):
|
for node in list(state.nodes._nodes.values()):
|
||||||
node.trigger_status()
|
node.trigger_status()
|
||||||
|
|
||||||
def check_nodes():
|
|
||||||
if state.online:
|
|
||||||
ids = []
|
|
||||||
with db.session():
|
|
||||||
for u in user.models.User.query.filter_by(queued=True):
|
|
||||||
ids.append(u.id)
|
|
||||||
for id in ids:
|
|
||||||
if not state.nodes.is_online(id):
|
|
||||||
logger.debug('queued peering message for %s trying to connect...', id)
|
|
||||||
state.nodes.queue('add', id, True)
|
|
||||||
|
|
Loading…
Reference in a new issue