From bea8efe6c498a029b34975f914a58b4c3ebc3441 Mon Sep 17 00:00:00 2001 From: j Date: Sat, 24 May 2014 11:39:45 +0200 Subject: [PATCH] local nodes --- oml/localnodes.py | 163 +++++++++++++++++++++++++--------------------- 1 file changed, 90 insertions(+), 73 deletions(-) diff --git a/oml/localnodes.py b/oml/localnodes.py index 2f89009..49eebff 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -30,17 +30,14 @@ def can_connect(data): logger.debug('can_connect failed') return False -class LocalNodes(Thread): - _active = True - _nodes = {} +class LocalNodesBase(Thread): - _MODE = 6 - _BROADCAST = "ff02::1" - _BROADCAST4 = "239.255.255.250" _PORT = 9851 - TTL = 1 + _TTL = 1 - def __init__(self, app): + def __init__(self, app, nodes): + self._active = True + self._nodes = nodes self._app = app Thread.__init__(self) if not server['localnode_discovery']: @@ -59,49 +56,15 @@ class LocalNodes(Thread): packet = json.dumps([sig, USER_ID, message]) return packet - def send(self): - if not server['localnode_discovery']: - return - if self._MODE == 4: - return self.send4() - packet = self.get_packet() - ttl = struct.pack('@i', self.TTL) - address = self._BROADCAST + get_interface() - addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) - addr = addrs[0] - (family, socktype, proto, canonname, sockaddr) = addr - s = socket.socket(family, socktype, proto) - s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl) - s.sendto(packet + '\0', sockaddr) - s.close() + def get_socket(self): + pass - def send4(self): - logger.debug('send4') - packet = self.get_packet() - sockaddr = (self._BROADCAST4, self._PORT) - s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) - s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) - s.sendto(packet + '\0', sockaddr) - s.close() - logger.debug('sent4') - ''' - try: - s.sendto(packet + '\0', sockaddr) - s.close() - except: - logger.debug('send failed %s', ) - return - ''' + def send(self): + pass def receive(self): - if self._MODE == 4: - return self.receive4() - s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s = self.get_socket() s.bind(('', self._PORT)) - group_bin = socket.inet_pton(socket.AF_INET6, self._BROADCAST) + '\0'*4 - s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, group_bin) - while self._active: data, addr = s.recvfrom(1024) while data[-1] == '\0': @@ -110,24 +73,6 @@ class LocalNodes(Thread): if data: self.update_node(data) - def receive4(self): - logger.debug('receive4') - s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - mreq = struct.pack("=4sl", socket.inet_aton(self._BROADCAST4), socket.INADDR_ANY) - s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) - - s.bind(('', self._PORT)) - while self._active: - data, addr = s.recvfrom(1024) - logger.debug('receive4') - while data[-1] == '\0': - data = data[:-1] # Strip trailing \0's - logger.debug('receive4 %s', data) - data = self.verify(data) - if data: - self.update_node(data) - def verify(self, data): try: packet = json.loads(data) @@ -149,9 +94,8 @@ class LocalNodes(Thread): if data['id'] != USER_ID: if data['id'] not in self._nodes: thread.start_new_thread(self.new_node, (data, )) - #else: - # print 'UPDATE NODE', data - self._nodes[data['id']] = data + elif can_connect(data): + self._nodes[data['id']] = data def get(self, user_id): if user_id in self._nodes: @@ -171,15 +115,88 @@ class LocalNodes(Thread): state.nodes.queue('add', u.id) self.send() + def get_ip(self): + pass + def run(self): - self.host = get_public_ipv6() - if not self.host: - logger.debug('no ipv6 detected, fall back to local ipv4 sharing') - self.host = get_local_ipv4() - self._MODE = 4 + self.host = self.get_ip() self.send() self.receive() def join(self): self._active = False return Thread.join(self) + +class LocalNodes4(LocalNodesBase): + + _BROADCAST = "239.255.255.250" + _TTL = 1 + + def send(self): + packet = self.get_packet() + sockaddr = (self._BROADCAST, self._PORT) + s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL) + s.sendto(packet + '\0', sockaddr) + s.close() + + def get_socket(self): + s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + mreq = struct.pack("=4sl", socket.inet_aton(self._BROADCAST), socket.INADDR_ANY) + s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + return s + + def get_ip(self): + return get_local_ipv4() + +class LocalNodes6(LocalNodesBase): + + _BROADCAST = "ff02::1" + + def send(self): + logger.debug('send6') + packet = self.get_packet() + ttl = struct.pack('@i', self._TTL) + address = self._BROADCAST + get_interface() + addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) + addr = addrs[0] + (family, socktype, proto, canonname, sockaddr) = addr + s = socket.socket(family, socktype, proto) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl) + s.sendto(packet + '\0', sockaddr) + s.close() + + def get_socket(self): + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + group_bin = socket.inet_pton(socket.AF_INET6, self._BROADCAST) + '\0'*4 + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, group_bin) + return s + + def get_ip(self): + return get_public_ipv6() + +class LocalNodes(object): + + _nodes4 = None + _nodes6 = None + + def __init__(self, app): + self._nodes = {} + self._app = app + if not server['localnode_discovery']: + return + self._nodes4 = LocalNodes4(app, self._nodes) + self._nodes6 = LocalNodes6(app, self._nodes) + + def get(self, user_id): + if user_id in self._nodes: + if can_connect(self._nodes[user_id]): + return self._nodes[user_id] + + def join(self): + if self._nodes4: + self._nodes4.join() + if self._nodes6: + self._nodes6.join()