backport some localnode improvements from peerlink

This commit is contained in:
j 2014-09-01 12:30:09 +02:00
parent 4749f87d98
commit e4cf322570
2 changed files with 62 additions and 33 deletions

View file

@ -3,7 +3,6 @@
from __future__ import division from __future__ import division
import json import json
import logging
import socket import socket
import struct import struct
import thread import thread
@ -16,6 +15,7 @@ import state
import db import db
import user.models import user.models
import logging
logger = logging.getLogger('oml.localnodes') logger = logging.getLogger('oml.localnodes')
def can_connect(data): def can_connect(data):
@ -49,6 +49,8 @@ class LocalNodesBase(Thread):
self.start() self.start()
def get_packet(self): def get_packet(self):
self.host = self.get_ip()
if self.host:
message = json.dumps({ message = json.dumps({
'username': preferences.get('username', 'anonymous'), 'username': preferences.get('username', 'anonymous'),
'host': self.host, 'host': self.host,
@ -57,6 +59,8 @@ class LocalNodesBase(Thread):
}) })
sig = sk.sign(message, encoding='base64') sig = sk.sign(message, encoding='base64')
packet = json.dumps([sig, USER_ID, message]) packet = json.dumps([sig, USER_ID, message])
else:
packet = None
return packet return packet
def get_socket(self): def get_socket(self):
@ -66,9 +70,11 @@ class LocalNodesBase(Thread):
pass pass
def receive(self): def receive(self):
last = time.mktime(time.localtime())
while self._active: while self._active:
try: try:
s = self.get_socket() s = self.get_socket()
s.settimeout(2)
s.bind(('', self._PORT)) s.bind(('', self._PORT))
while self._active: while self._active:
data, addr = s.recvfrom(1024) data, addr = s.recvfrom(1024)
@ -78,11 +84,15 @@ class LocalNodesBase(Thread):
data = self.verify(data) data = self.verify(data)
if data: if data:
self.update_node(data) self.update_node(data)
except socket.timeout:
now = time.mktime(time.localtime())
if now - last > 60:
last = now
thread.start_new_thread(self.send, ())
except: except:
logger.debug('receive failed. restart later', exc_info=1) logger.debug('receive failed. restart later', exc_info=1)
time.sleep(10) time.sleep(10)
def verify(self, data): def verify(self, data):
try: try:
packet = json.loads(data) packet = json.loads(data)
@ -129,7 +139,6 @@ class LocalNodesBase(Thread):
pass pass
def run(self): def run(self):
self.host = self.get_ip()
self.send() self.send()
self.receive() self.receive()
@ -150,6 +159,7 @@ class LocalNodes4(LocalNodesBase):
def send(self): def send(self):
packet = self.get_packet() packet = self.get_packet()
if packet:
sockaddr = (self._BROADCAST, self._PORT) sockaddr = (self._BROADCAST, self._PORT)
s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL) s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL)
@ -175,8 +185,9 @@ class LocalNodes6(LocalNodesBase):
_BROADCAST = "ff02::1" _BROADCAST = "ff02::1"
def send(self): def send(self):
logger.debug('send6')
packet = self.get_packet() packet = self.get_packet()
if packet:
logger.debug('send6 %s', packet)
ttl = struct.pack('@i', self._TTL) ttl = struct.pack('@i', self._TTL)
address = self._BROADCAST + get_interface() address = self._BROADCAST + get_interface()
addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM)
@ -203,6 +214,7 @@ class LocalNodes6(LocalNodesBase):
class LocalNodes(object): class LocalNodes(object):
_active = True
_nodes4 = None _nodes4 = None
_nodes6 = None _nodes6 = None
@ -213,12 +225,21 @@ class LocalNodes(object):
self._nodes4 = LocalNodes4(self._nodes) self._nodes4 = LocalNodes4(self._nodes)
self._nodes6 = LocalNodes6(self._nodes) self._nodes6 = LocalNodes6(self._nodes)
def cleanup(self):
if self._active:
for id in self._nodes.keys():
if not can_connect(self._nodes[id]):
del self._nodes[id]
if not self._active:
break
def get(self, user_id): def get(self, user_id):
if user_id in self._nodes: if user_id in self._nodes:
if can_connect(self._nodes[user_id]): if can_connect(self._nodes[user_id]):
return self._nodes[user_id] return self._nodes[user_id]
def join(self): def join(self):
self._active = False
if self._nodes4: if self._nodes4:
self._nodes4.join() self._nodes4.join()
if self._nodes6: if self._nodes6:

View file

@ -358,10 +358,16 @@ class Nodes(Thread):
self._q = Queue() self._q = Queue()
self._running = True self._running = True
self._local = LocalNodes() self._local = LocalNodes()
self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000)
self._cleanup.start()
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.start() self.start()
def cleanup(self):
if self._running:
self._local.cleanup()
def queue(self, *args): def queue(self, *args):
self._q.put(list(args)) self._q.put(list(args))
@ -396,7 +402,9 @@ class Nodes(Thread):
while self._running: while self._running:
args = self._q.get() args = self._q.get()
if args: if args:
if args[0] == 'add': if args[0] == 'cleanup':
self.cleanup()
elif args[0] == 'add':
self._add(args[1]) self._add(args[1])
else: else:
self._call(*args) self._call(*args)