openmedialibrary/oml/localnodes.py

271 lines
8.2 KiB
Python
Raw Normal View History

2014-05-12 12:57:47 +00:00
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
2014-09-02 22:32:44 +00:00
2014-05-12 12:57:47 +00:00
import json
2014-05-17 14:26:59 +00:00
import socket
2014-05-12 12:57:47 +00:00
import struct
2014-09-02 22:32:44 +00:00
import _thread
2014-05-17 14:26:59 +00:00
from threading import Thread
import time
2014-05-12 12:57:47 +00:00
from utils import valid, get_public_ipv6, get_local_ipv4, get_interface
2014-05-18 03:01:24 +00:00
from settings import preferences, server, USER_ID, sk
import state
2014-08-09 16:33:59 +00:00
import db
import user.models
from tor_request import get_opener
import settings
2014-05-12 12:57:47 +00:00
import logging
2015-11-29 14:56:38 +00:00
logger = logging.getLogger(__name__)
2014-05-17 14:26:59 +00:00
2014-05-12 12:57:47 +00:00
def can_connect(data):
try:
opener = get_opener(data['id'])
headers = {
'User-Agent': settings.USER_AGENT,
'X-Node-Protocol': settings.NODE_PROTOCOL,
'Accept-Encoding': 'gzip',
}
if ':' in data['host']:
url = 'https://[{host}]:{port}'.format(**data)
else:
url = 'https://{host}:{port}'.format(**data)
opener.addheaders = list(zip(headers.keys(), headers.values()))
opener.timeout = 1
r = opener.open(url)
version = r.headers.get('X-Node-Protocol', None)
if version != settings.NODE_PROTOCOL:
logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version)
return False
c = r.read()
2014-05-12 12:57:47 +00:00
return True
except:
2015-12-01 10:51:58 +00:00
pass
#logger.debug('failed to connect to local node %s', data, exc_info=1)
2014-05-12 12:57:47 +00:00
return False
2014-05-24 09:39:45 +00:00
class LocalNodesBase(Thread):
2014-05-12 12:57:47 +00:00
_PORT = 9851
2014-05-24 09:39:45 +00:00
_TTL = 1
_TIMEOUT = 30
2014-05-12 12:57:47 +00:00
2014-08-09 16:33:59 +00:00
def __init__(self, nodes):
2014-08-09 18:32:41 +00:00
self._socket = None
2014-05-24 09:39:45 +00:00
self._active = True
self._nodes = nodes
2014-05-12 12:57:47 +00:00
Thread.__init__(self)
2014-05-14 23:28:49 +00:00
if not server['localnode_discovery']:
return
2014-05-12 12:57:47 +00:00
self.daemon = True
self.start()
def get_packet(self):
self.host = self.get_ip()
if self.host:
message = json.dumps({
'id': USER_ID,
'username': preferences.get('username', 'anonymous'),
'host': self.host,
'port': server['node_port']
})
packet = message.encode()
else:
packet = None
return packet
2014-05-16 17:08:10 +00:00
2014-05-24 09:39:45 +00:00
def get_socket(self):
pass
2014-05-12 12:57:47 +00:00
2014-05-24 09:39:45 +00:00
def send(self):
pass
2014-05-12 12:57:47 +00:00
def receive(self):
last = time.mktime(time.localtime())
2014-05-12 12:57:47 +00:00
while self._active:
try:
s = self.get_socket()
s.settimeout(self._TIMEOUT)
s.bind(('', self._PORT))
while self._active:
data, addr = s.recvfrom(1024)
2014-08-09 18:32:41 +00:00
if self._active:
2014-09-09 14:28:59 +00:00
while data[-1] == 0:
2014-08-09 18:32:41 +00:00
data = data[:-1] # Strip trailing \0's
data = self.verify(data)
if data:
self.update_node(data)
except socket.timeout:
pass
except:
if self._active:
logger.debug('receive failed. restart later', exc_info=1)
time.sleep(10)
finally:
2014-09-02 22:33:42 +00:00
if self._active:
now = time.mktime(time.localtime())
if now - last > 60:
last = now
2014-09-02 22:32:44 +00:00
_thread.start_new_thread(self.send, ())
2014-05-13 10:58:49 +00:00
def verify(self, data):
2014-05-12 12:57:47 +00:00
try:
message = json.loads(data.decode())
2014-05-12 12:57:47 +00:00
except:
return None
for key in ['id', 'username', 'host', 'port']:
if key not in message:
return None
return message
2014-05-12 12:57:47 +00:00
def update_node(self, data):
#fixme use local link address
#print addr
if data['id'] != USER_ID:
if data['id'] not in self._nodes:
2014-09-02 22:32:44 +00:00
_thread.start_new_thread(self.new_node, (data, ))
2014-05-24 09:39:45 +00:00
elif can_connect(data):
self._nodes[data['id']] = data
2014-05-12 12:57:47 +00:00
def get(self, user_id):
if user_id in self._nodes:
if can_connect(self._nodes[user_id]):
return self._nodes[user_id]
def new_node(self, data):
2014-05-17 14:26:59 +00:00
logger.debug('NEW NODE %s', data)
2014-05-12 12:57:47 +00:00
if can_connect(data):
self._nodes[data['id']] = data
2014-08-09 16:33:59 +00:00
with db.session():
u = user.models.User.get(data['id'])
if u:
u.info['username'] = data['username']
u.info['local'] = data
u.update_name()
u.save()
state.nodes.queue('add', u.id)
2014-05-12 12:57:47 +00:00
self.send()
2015-12-02 21:05:23 +00:00
2014-05-24 09:39:45 +00:00
def get_ip(self):
pass
2014-05-12 12:57:47 +00:00
def run(self):
2014-05-16 17:08:10 +00:00
self.send()
2014-05-12 12:57:47 +00:00
self.receive()
def join(self):
self._active = False
2014-08-09 18:32:41 +00:00
if self._socket:
try:
self._socket.shutdown(socket.SHUT_RDWR)
2014-09-09 14:28:59 +00:00
except OSError:
2014-08-09 18:32:41 +00:00
pass
self._socket.close()
2014-05-12 12:57:47 +00:00
return Thread.join(self)
2014-05-24 09:39:45 +00:00
class LocalNodes4(LocalNodesBase):
_BROADCAST = "239.255.255.250"
_TTL = 1
def send(self):
packet = self.get_packet()
if packet:
#logger.debug('send4 %s', packet)
sockaddr = (self._BROADCAST, self._PORT)
s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL)
try:
2014-09-02 22:32:44 +00:00
s.sendto(packet + b'\0', sockaddr)
except:
logger.debug('LocalNodes4.send failed', exc_info=1)
s.close()
2014-05-24 09:39:45 +00:00
def get_socket(self):
s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
2014-05-24 09:39:45 +00:00
mreq = struct.pack("=4sl", socket.inet_aton(self._BROADCAST), socket.INADDR_ANY)
s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
2014-08-09 18:32:41 +00:00
self._socket = s
2014-05-24 09:39:45 +00:00
return s
def get_ip(self):
return get_local_ipv4()
class LocalNodes6(LocalNodesBase):
_BROADCAST = "ff02::1"
def send(self):
packet = self.get_packet()
if packet:
#logger.debug('send6 %s', packet)
ttl = struct.pack('@i', self._TTL)
address = self._BROADCAST + get_interface()
addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM)
addr = addrs[0]
(family, socktype, proto, canonname, sockaddr) = addr
s = socket.socket(family, socktype, proto)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl)
try:
2014-09-02 22:32:44 +00:00
s.sendto(packet + b'\0', sockaddr)
except:
logger.debug('LocalNodes6.send failed', exc_info=1)
s.close()
2014-05-24 09:39:45 +00:00
def get_socket(self):
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
2014-09-02 22:32:44 +00:00
group_bin = socket.inet_pton(socket.AF_INET6, self._BROADCAST) + b'\0'*4
2014-05-24 09:39:45 +00:00
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, group_bin)
2014-08-09 18:32:41 +00:00
self._socket = s
2014-05-24 09:39:45 +00:00
return s
def get_ip(self):
return get_public_ipv6()
class LocalNodes(object):
_active = True
2014-05-24 09:39:45 +00:00
_nodes4 = None
_nodes6 = None
2014-08-09 16:33:59 +00:00
def __init__(self):
2014-05-24 09:39:45 +00:00
self._nodes = {}
if not server['localnode_discovery']:
return
2014-08-09 16:33:59 +00:00
self._nodes4 = LocalNodes4(self._nodes)
#self._nodes6 = LocalNodes6(self._nodes)
2014-05-24 09:39:45 +00:00
def cleanup(self):
if self._active:
2014-09-02 22:32:44 +00:00
for id in list(self._nodes.keys()):
if not can_connect(self._nodes[id]):
2015-11-29 14:56:38 +00:00
with db.session():
u = user.models.User.get(id)
if u and 'local' in u.info:
del u.info['local']
u.save()
del self._nodes[id]
if not self._active:
break
2014-05-24 09:39:45 +00:00
def get(self, user_id):
if user_id in self._nodes:
if can_connect(self._nodes[user_id]):
return self._nodes[user_id]
def join(self):
self._active = False
2014-05-24 09:39:45 +00:00
if self._nodes4:
self._nodes4.join()
if self._nodes6:
self._nodes6.join()