diff --git a/oml/localnodes.py b/oml/localnodes.py index c601a23..1bcff99 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -3,7 +3,6 @@ from __future__ import division import json -import logging import socket import struct import thread @@ -16,6 +15,7 @@ import state import db import user.models +import logging logger = logging.getLogger('oml.localnodes') def can_connect(data): @@ -49,14 +49,18 @@ class LocalNodesBase(Thread): self.start() def get_packet(self): - message = json.dumps({ - 'username': preferences.get('username', 'anonymous'), - 'host': self.host, - 'port': server['node_port'], - 'cert': server['cert'] - }) - sig = sk.sign(message, encoding='base64') - packet = json.dumps([sig, USER_ID, message]) + self.host = self.get_ip() + if self.host: + message = json.dumps({ + 'username': preferences.get('username', 'anonymous'), + 'host': self.host, + 'port': server['node_port'], + 'cert': server['cert'] + }) + sig = sk.sign(message, encoding='base64') + packet = json.dumps([sig, USER_ID, message]) + else: + packet = None return packet def get_socket(self): @@ -66,9 +70,11 @@ class LocalNodesBase(Thread): pass def receive(self): + last = time.mktime(time.localtime()) while self._active: try: s = self.get_socket() + s.settimeout(2) s.bind(('', self._PORT)) while self._active: data, addr = s.recvfrom(1024) @@ -78,11 +84,15 @@ class LocalNodesBase(Thread): data = self.verify(data) if data: self.update_node(data) + except socket.timeout: + now = time.mktime(time.localtime()) + if now - last > 60: + last = now + thread.start_new_thread(self.send, ()) except: logger.debug('receive failed. restart later', exc_info=1) time.sleep(10) - def verify(self, data): try: packet = json.loads(data) @@ -129,7 +139,6 @@ class LocalNodesBase(Thread): pass def run(self): - self.host = self.get_ip() self.send() self.receive() @@ -150,14 +159,15 @@ class LocalNodes4(LocalNodesBase): def send(self): packet = self.get_packet() - sockaddr = (self._BROADCAST, self._PORT) - s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) - s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL) - try: - s.sendto(packet + '\0', sockaddr) - except: - logger.debug('LocalNodes4.send failed', exc_info=1) - s.close() + if packet: + sockaddr = (self._BROADCAST, self._PORT) + s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL) + try: + s.sendto(packet + '\0', sockaddr) + except: + logger.debug('LocalNodes4.send failed', exc_info=1) + s.close() def get_socket(self): s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) @@ -175,20 +185,21 @@ class LocalNodes6(LocalNodesBase): _BROADCAST = "ff02::1" def send(self): - logger.debug('send6') packet = self.get_packet() - ttl = struct.pack('@i', self._TTL) - address = self._BROADCAST + get_interface() - addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) - addr = addrs[0] - (family, socktype, proto, canonname, sockaddr) = addr - s = socket.socket(family, socktype, proto) - s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl) - try: - s.sendto(packet + '\0', sockaddr) - except: - logger.debug('LocalNodes6.send failed', exc_info=1) - s.close() + if packet: + logger.debug('send6 %s', packet) + ttl = struct.pack('@i', self._TTL) + address = self._BROADCAST + get_interface() + addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) + addr = addrs[0] + (family, socktype, proto, canonname, sockaddr) = addr + s = socket.socket(family, socktype, proto) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl) + try: + s.sendto(packet + '\0', sockaddr) + except: + logger.debug('LocalNodes6.send failed', exc_info=1) + s.close() def get_socket(self): s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) @@ -203,6 +214,7 @@ class LocalNodes6(LocalNodesBase): class LocalNodes(object): + _active = True _nodes4 = None _nodes6 = None @@ -213,12 +225,21 @@ class LocalNodes(object): self._nodes4 = LocalNodes4(self._nodes) self._nodes6 = LocalNodes6(self._nodes) + def cleanup(self): + if self._active: + for id in self._nodes.keys(): + if not can_connect(self._nodes[id]): + del self._nodes[id] + if not self._active: + break + def get(self, user_id): if user_id in self._nodes: if can_connect(self._nodes[user_id]): return self._nodes[user_id] def join(self): + self._active = False if self._nodes4: self._nodes4.join() if self._nodes6: diff --git a/oml/nodes.py b/oml/nodes.py index c0364a1..7399caf 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -358,10 +358,16 @@ class Nodes(Thread): self._q = Queue() self._running = True self._local = LocalNodes() + self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000) + self._cleanup.start() Thread.__init__(self) self.daemon = True self.start() + def cleanup(self): + if self._running: + self._local.cleanup() + def queue(self, *args): self._q.put(list(args)) @@ -396,7 +402,9 @@ class Nodes(Thread): while self._running: args = self._q.get() if args: - if args[0] == 'add': + if args[0] == 'cleanup': + self.cleanup() + elif args[0] == 'add': self._add(args[1]) else: self._call(*args)