openmedialibrary/oml/localnodes.py

177 lines
6.2 KiB
Python
Raw Normal View History

2014-05-12 12:57:47 +00:00
# -*- coding: utf-8 -*-
2014-09-02 22:32:44 +00:00
2024-06-08 12:31:46 +00:00
import asyncio
2014-05-17 14:26:59 +00:00
import socket
2014-05-12 12:57:47 +00:00
import netifaces
2016-03-14 13:31:56 +00:00
from zeroconf import (
2024-06-08 12:31:46 +00:00
ServiceBrowser, ServiceInfo, ServiceStateChange
2016-03-14 13:31:56 +00:00
)
2024-06-08 12:31:46 +00:00
from zeroconf.asyncio import AsyncZeroconf
2016-03-14 13:31:56 +00:00
from tornado.ioloop import PeriodicCallback
import settings
2014-05-18 03:01:24 +00:00
import state
from tor_request import get_opener
2014-05-12 12:57:47 +00:00
import logging
2015-11-29 14:56:38 +00:00
logger = logging.getLogger(__name__)
2014-05-17 14:26:59 +00:00
2016-03-14 13:31:56 +00:00
2014-05-12 12:57:47 +00:00
def can_connect(data):
try:
opener = get_opener(data['id'])
headers = {
'User-Agent': settings.USER_AGENT,
'X-Node-Protocol': settings.NODE_PROTOCOL,
'Accept-Encoding': 'gzip',
}
if ':' in data['host']:
url = 'https://[{host}]:{port}'.format(**data)
else:
url = 'https://{host}:{port}'.format(**data)
opener.addheaders = list(zip(headers.keys(), headers.values()))
opener.timeout = 1
r = opener.open(url)
version = r.headers.get('X-Node-Protocol', None)
if version != settings.NODE_PROTOCOL:
logger.debug('version does not match local: %s remote %s (%s)', settings.NODE_PROTOCOL, version, data['id'])
return False
c = r.read()
2014-05-12 12:57:47 +00:00
return True
except:
2015-12-01 10:51:58 +00:00
pass
2016-01-24 09:13:03 +00:00
#logger.debug('failed to connect to local node %s', data, exc_info=True)
2014-05-12 12:57:47 +00:00
return False
def get_broadcast_interfaces():
return list(set(
addr['addr']
for iface in netifaces.interfaces()
for addr in netifaces.ifaddresses(iface).get(socket.AF_INET, [])
if addr.get('netmask') != '255.255.255.255' and addr.get('broadcast')
))
2016-03-14 13:31:56 +00:00
class LocalNodes(dict):
service_type = '_oml._tcp.local.'
local_info = None
local_ips = None
2016-03-14 13:31:56 +00:00
def __init__(self):
if not settings.server.get('localnode_discovery'):
2014-05-14 23:28:49 +00:00
return
2016-03-14 13:31:56 +00:00
self.setup()
self._ip_changed = PeriodicCallback(self._update_if_ip_changed, 60000)
2019-06-18 07:19:06 +00:00
state.main.add_callback(self._ip_changed.start)
2016-03-14 13:31:56 +00:00
def setup(self):
self.local_ips = get_broadcast_interfaces()
2024-06-08 12:31:46 +00:00
self.zeroconf = {ip: AsyncZeroconf(interfaces=[ip]) for ip in self.local_ips}
asyncio.create_task(self.register_service())
2016-03-14 13:31:56 +00:00
self.browse()
def _update_if_ip_changed(self):
local_ips = get_broadcast_interfaces()
username = settings.preferences.get('username', 'anonymous')
if local_ips != self.local_ips or self.username != username:
2024-06-08 12:31:46 +00:00
asyncio.run(self.close())
2016-03-14 13:31:56 +00:00
self.setup()
def browse(self):
self.browser = {
2024-06-08 12:31:46 +00:00
ip: ServiceBrowser(self.zeroconf[ip].zeroconf, self.service_type, handlers=[self.on_service_state_change])
for ip in self.zeroconf
}
2016-03-14 13:31:56 +00:00
2024-06-08 12:31:46 +00:00
async def register_service(self):
2016-03-14 13:31:56 +00:00
if self.local_info:
for local_ip, local_info in self.local_info:
2024-06-08 12:31:46 +00:00
self.zeroconf[local_ip].async_unregister_service(local_info)
2016-03-14 13:31:56 +00:00
self.local_info = None
local_name = socket.gethostname().partition('.')[0] + '.local.'
port = settings.server['node_port']
2024-06-08 12:31:46 +00:00
self.local_info = []
self.username = settings.preferences.get('username', 'anonymous')
desc = {
2024-06-08 12:31:46 +00:00
'username': self.username,
'id': settings.USER_ID,
}
2024-06-08 12:31:46 +00:00
tasks = []
for i, local_ip in enumerate(get_broadcast_interfaces()):
if i:
2024-06-08 12:31:46 +00:00
name = '%s [%s].%s' % (desc['username'], i, self.service_type)
else:
2024-06-08 12:31:46 +00:00
name = '%s.%s' % (desc['username'], self.service_type)
addresses = [socket.inet_aton(local_ip)]
local_info = ServiceInfo(self.service_type, name, port, 0, 0, desc, local_name, addresses=addresses)
task = self.zeroconf[local_ip].async_register_service(local_info)
tasks.append(task)
self.local_info.append((local_ip, local_info))
2024-06-08 12:31:46 +00:00
await asyncio.gather(*tasks)
2016-03-14 13:31:56 +00:00
def __del__(self):
self.close()
2024-06-08 12:31:46 +00:00
async def close(self):
2016-03-14 13:31:56 +00:00
if self.local_info:
2024-06-08 12:31:46 +00:00
tasks = []
for local_ip, local_info in self.local_info:
try:
2024-06-08 12:31:46 +00:00
task = self.zeroconf[local_ip].async_unregister_service(local_info)
tasks.append(task)
except:
logger.debug('exception closing zeroconf', exc_info=True)
2016-03-14 13:31:56 +00:00
self.local_info = None
if self.zeroconf:
for local_ip in self.zeroconf:
try:
2024-06-08 12:31:46 +00:00
task = self.zeroconf[local_ip].async_close()
tasks.append(task)
except:
logger.debug('exception closing zeroconf', exc_info=True)
2016-03-14 13:31:56 +00:00
self.zeroconf = None
for id in list(self):
self.pop(id, None)
2024-06-08 12:31:46 +00:00
await asyncio.gather(*tasks)
2016-03-14 13:31:56 +00:00
def on_service_state_change(self, zeroconf, service_type, name, state_change):
2024-06-08 12:31:46 +00:00
info = zeroconf.get_service_info(service_type, name)
if info and b'id' in info.properties:
id = info.properties[b'id'].decode()
if id == settings.USER_ID:
return
if state_change is ServiceStateChange.Added:
new = id not in self
2016-03-14 13:31:56 +00:00
self[id] = {
'id': id,
2024-06-08 12:31:46 +00:00
'host': socket.inet_ntoa(info.addresses[0]),
2016-03-14 13:31:56 +00:00
'port': info.port
}
if info.properties:
for key, value in info.properties.items():
key = key.decode()
self[id][key] = value.decode()
2024-06-08 12:31:46 +00:00
logger.debug(
'%s: %s [%s] (%s:%s)',
'add' if new else 'update',
self[id].get('username', 'anon'),
id,
self[id]['host'],
self[id]['port']
)
2017-06-03 20:50:14 +00:00
if state.tasks and id in self:
2016-03-14 13:31:56 +00:00
state.tasks.queue('addlocalinfo', self[id])
2024-06-08 12:31:46 +00:00
elif state_change is ServiceStateChange.Removed:
logger.debug('remove: %s', id)
self.pop(id, None)
if state.tasks:
state.tasks.queue('removelocalinfo', id)
def get_data(self, user_id):
data = self.get(user_id)
2019-02-02 07:13:37 +00:00
if data and can_connect(data):
return data
return None