Browse Source

use mdns for local peer discovery

j 2 years ago
parent
commit
417195cfd1
7 changed files with 130 additions and 257 deletions
  1. 91
    237
      oml/localnodes.py
  2. 7
    15
      oml/nodes.py
  3. 8
    1
      oml/tasks.py
  4. 2
    2
      oml/user/api.py
  5. 20
    2
      oml/user/models.py
  6. 1
    0
      requirements-shared.txt
  7. 1
    0
      requirements.txt

+ 91
- 237
oml/localnodes.py View File

@@ -1,26 +1,23 @@
1 1
 # -*- coding: utf-8 -*-
2 2
 # vi:si:et:sw=4:sts=4:ts=4
3 3
 
4
-
5
-import json
6 4
 import socket
7
-import struct
8
-import _thread
9
-from threading import Thread
10
-import time
11
-import select
12 5
 
13
-from utils import get_public_ipv6, get_local_ipv4, get_interface
14
-from settings import preferences, server, USER_ID
6
+from zeroconf import (
7
+    get_all_addresses,
8
+    ServiceBrowser, ServiceInfo, ServiceStateChange, Zeroconf
9
+)
10
+from tornado.ioloop import PeriodicCallback
11
+
12
+import settings
15 13
 import state
16
-import db
17
-import user.models
18 14
 from tor_request import get_opener
19
-import settings
15
+from utils import get_local_ipv4
20 16
 
21 17
 import logging
22 18
 logger = logging.getLogger(__name__)
23 19
 
20
+
24 21
 def can_connect(data):
25 22
     try:
26 23
         opener = get_opener(data['id'])
@@ -47,235 +44,92 @@ def can_connect(data):
47 44
         #logger.debug('failed to connect to local node %s', data, exc_info=True)
48 45
     return False
49 46
 
47
+class LocalNodes(dict):
48
+    service_type = '_oml._tcp.local.'
49
+    local_info = None
50
+    local_ips = None
50 51
 
51
-class LocalNodesBase(Thread):
52
-
53
-    _PORT = 9851 
54
-    _TTL = 1
55
-
56
-    def __init__(self, nodes):
57
-        self._socket = None
58
-        self._nodes = nodes
59
-        Thread.__init__(self)
60
-        if not server['localnode_discovery']:
52
+    def __init__(self):
53
+        if not settings.server.get('localnode_discovery'):
61 54
             return
62
-        self.daemon = True
63
-        self.start()
64
-
65
-    def get_packet(self):
66
-        self.host = self.get_ip()
67
-        if self.host:
68
-            message = json.dumps({
69
-                'id': USER_ID,
70
-                'username': preferences.get('username', 'anonymous'),
71
-                'host': self.host,
72
-                'port': server['node_port']
73
-            })
74
-            packet = message.encode()
75
-        else:
76
-            packet = None
77
-        return packet
78
-
79
-    def get_socket(self):
80
-        pass
81
-
82
-    def send(self):
83
-        pass
84
-
85
-    def receive(self):
86
-        last = time.time()
87
-        s = None
88
-        while not s and not state.shutdown:
89
-            try:
90
-                s = self.get_socket()
91
-                s.bind(('', self._PORT))
92
-            except OSError: # no local interface exists
93
-                self.wait(60)
94
-        while not state.shutdown:
95
-            try:
96
-                r, _, _ = select.select([s], [], [], 3)
97
-                if r:
98
-                    data, addr = s.recvfrom(1024)
99
-                    if not state.shutdown:
100
-                        while data[-1] == 0:
101
-                            data = data[:-1] # Strip trailing \0's
102
-                        data = self.verify(data)
103
-                        if data:
104
-                            self.update_node(data)
105
-            except OSError: # no local interface exists
106
-                self.wait(60)
107
-            except:
108
-                if not state.shutdown:
109
-                    logger.debug('receive failed. restart later', exc_info=True)
110
-                self.wait(60)
111
-            finally:
112
-                if not state.shutdown:
113
-                    now = time.time()
114
-                    if now - last > 60:
115
-                        last = now
116
-                        _thread.start_new_thread(self.send, ())
117
-
118
-    def verify(self, data):
119
-        try:
120
-            message = json.loads(data.decode())
121
-        except:
122
-            return None
123
-        for key in ['id', 'username', 'host', 'port']:
124
-            if key not in message:
125
-                return None
126
-        return message
127
-
128
-    def update_node(self, data):
129
-        #fixme use local link address
130
-        #print addr
131
-        if data['id'] != USER_ID:
132
-            if data['id'] not in self._nodes:
133
-                _thread.start_new_thread(self.new_node, (data, ))
134
-            elif can_connect(data):
135
-                self._nodes[data['id']] = data
136
-
137
-    def get(self, user_id):
138
-        if user_id in self._nodes:
139
-            if can_connect(self._nodes[user_id]):
140
-                return self._nodes[user_id]
141
-
142
-    def new_node(self, data):
143
-        logger.debug('NEW NODE %s', data)
144
-        if can_connect(data):
145
-            self._nodes[data['id']] = data
146
-            with db.session():
147
-                u = user.models.User.get(data['id'])
148
-                if u:
149
-                    if u.info['username'] != data['username']:
150
-                        u.info['username'] = data['username']
151
-                        u.update_name()
152
-                    u.info['local'] = data
153
-                    u.save()
154
-                    state.nodes.queue('add', u.id)
155
-            self.send()
156
-
157
-
158
-    def get_ip(self):
159
-        pass
160
-
161
-    def run(self):
162
-        self.send()
163
-        self.receive()
164
-
165
-    def join(self):
166
-        if self._socket:
167
-            try:
168
-                self._socket.shutdown(socket.SHUT_RDWR)
169
-            except OSError:
170
-                pass
171
-            self._socket.close()
172
-        return Thread.join(self)
173
-
174
-    def wait(self, timeout):
175
-        step = min(timeout, 1)
176
-        while not state.shutdown and timeout > 0:
177
-            time.sleep(step)
178
-            timeout -= step
179
-
180
-class LocalNodes4(LocalNodesBase):
181
-
182
-    _BROADCAST = "239.255.255.250"
183
-    _TTL = 1
184
-
185
-    def send(self):
186
-        packet = self.get_packet()
187
-        if packet:
188
-            #logger.debug('send4 %s', packet)
189
-            sockaddr = (self._BROADCAST, self._PORT)
190
-            s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
191
-            s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._TTL)
192
-            try:
193
-                s.sendto(packet + b'\0', sockaddr)
194
-            except OSError:
195
-                pass
196
-            except:
197
-                logger.debug('LocalNodes4.send failed', exc_info=True)
198
-            s.close()
199
-
200
-    def get_socket(self):
201
-        s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
202
-        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
203
-        if hasattr(socket, 'SO_REUSEPORT'):
204
-            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
205
-        mreq = struct.pack("=4sl", socket.inet_aton(self._BROADCAST), socket.INADDR_ANY)
206
-        s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
207
-        self._socket = s
208
-        return s
209
-
210
-    def get_ip(self):
211
-        return get_local_ipv4()
212
-
213
-class LocalNodes6(LocalNodesBase):
214
-
215
-    _BROADCAST = "ff02::1"
216
-
217
-    def send(self):
218
-        packet = self.get_packet()
219
-        if packet:
220
-            #logger.debug('send6 %s', packet)
221
-            ttl = struct.pack('@i', self._TTL)
222
-            address = self._BROADCAST + get_interface()
223
-            addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM)
224
-            addr = addrs[0]
225
-            (family, socktype, proto, canonname, sockaddr) = addr
226
-            s = socket.socket(family, socktype, proto)
227
-            s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl)
55
+        self.setup()
56
+        self._ip_changed = PeriodicCallback(self._update_if_ip_changed, 60000)
57
+
58
+    def setup(self):
59
+        self.local_ips = sorted(get_all_addresses(socket.AF_INET))
60
+        self.zeroconf = Zeroconf()
61
+        self.register_service()
62
+        self.browse()
63
+
64
+    def _update_if_ip_changed(self):
65
+        local_ips = sorted(get_all_addresses(socket.AF_INET))
66
+        if local_ips != self.local_ips:
67
+            self.close()
68
+            self.setup()
69
+
70
+    def browse(self):
71
+        self.browser = ServiceBrowser(self.zeroconf, self.service_type, handlers=[self.on_service_state_change])
72
+
73
+    def register_service(self):
74
+        if self.local_info:
75
+            self.zeroconf.unregister_service(self.local_info)
76
+            self.local_info = None
77
+        local_ip = get_local_ipv4()
78
+        if local_ip:
79
+            local_name = socket.gethostname().partition('.')[0] + '.local.'
80
+
81
+            port = settings.server['node_port']
82
+            desc = {
83
+                'username': settings.preferences.get('username', 'anonymous'),
84
+            }
85
+            self.local_info = ServiceInfo(self.service_type,
86
+                                          '%s.%s' % (settings.USER_ID, self.service_type),
87
+                                          socket.inet_aton(local_ip), port, 0, 0,
88
+                                          desc, local_name)
89
+            self.zeroconf.register_service(self.local_info)
90
+
91
+    def __del__(self):
92
+        self.close()
93
+
94
+    def close(self):
95
+        if self.local_info:
96
+            self.zeroconf.unregister_service(self.local_info)
97
+            self.local_info = None
98
+        if self.zeroconf:
228 99
             try:
229
-                s.sendto(packet + b'\0', sockaddr)
100
+                self.zeroconf.close()
230 101
             except:
231
-                logger.debug('LocalNodes6.send failed', exc_info=True)
232
-            s.close()
233
-
234
-    def get_socket(self):
235
-        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
236
-        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
237
-        if hasattr(socket, 'SO_REUSEPORT'):
238
-            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
239
-        group_bin = socket.inet_pton(socket.AF_INET6, self._BROADCAST) + b'\0'*4
240
-        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, group_bin)
241
-        self._socket = s
242
-        return s
243
-
244
-    def get_ip(self):
245
-        return get_public_ipv6()
246
-
247
-class LocalNodes(object):
248
-
249
-    _nodes4 = None
250
-    _nodes6 = None
251
-
252
-    def __init__(self):
253
-        self._nodes = {}
254
-        if not server['localnode_discovery']:
102
+                logger.debug('exception closing zeroconf', exc_info=True)
103
+            self.zeroconf = None
104
+        for id in list(self):
105
+            self.pop(id, None)
106
+
107
+    def on_service_state_change(self, zeroconf, service_type, name, state_change):
108
+        id = name.split('.')[0]
109
+        if id == settings.USER_ID:
255 110
             return
256
-        self._nodes4 = LocalNodes4(self._nodes)
257
-        #self._nodes6 = LocalNodes6(self._nodes)
258
-
259
-    def cleanup(self):
260
-        if not state.shutdown:
261
-            for id in list(self._nodes.keys()):
262
-                if not can_connect(self._nodes[id]):
263
-                    with db.session():
264
-                        u = user.models.User.get(id)
265
-                        if u and 'local' in u.info:
266
-                            del u.info['local']
267
-                            u.save()
268
-                    del self._nodes[id]
269
-                if state.shutdown:
270
-                    break
111
+        if state_change is ServiceStateChange.Added:
112
+            info = zeroconf.get_service_info(service_type, name)
113
+            if info:
114
+                self[id] = {
115
+                    'id': id,
116
+                    'host': socket.inet_ntoa(info.address),
117
+                    'port': info.port
118
+                }
119
+                if info.properties:
120
+                    for key, value in info.properties.items():
121
+                        key = key.decode()
122
+                        self[id][key] = value.decode()
123
+                logger.debug('add localnode: %s', self[id])
124
+                if state.tasks:
125
+                    state.tasks.queue('addlocalinfo', self[id])
126
+        elif state_change is ServiceStateChange.Removed:
127
+            logger.debug('remove localnode: %s', id)
128
+            self.pop(id, None)
129
+            if state.tasks:
130
+                state.tasks.queue('removelocalinfo', id)
271 131
 
272 132
     def get(self, user_id):
273
-        if user_id in self._nodes:
274
-            if can_connect(self._nodes[user_id]):
275
-                return self._nodes[user_id]
276
-
277
-    def join(self):
278
-        if self._nodes4:
279
-            self._nodes4.join()
280
-        if self._nodes6:
281
-            self._nodes6.join()
133
+        if user_id in self:
134
+            if can_connect(self[user_id]):
135
+                return self[user_id]

+ 7
- 15
oml/nodes.py View File

@@ -105,8 +105,8 @@ class Node(Thread):
105 105
             self.port = 9851
106 106
 
107 107
     def get_local(self):
108
-        if self._nodes and self._nodes._local:
109
-            return self._nodes._local.get(self.user_id)
108
+        if self._nodes and self._nodes.local:
109
+            return self._nodes.local.get(self.user_id)
110 110
         return None
111 111
 
112 112
     def request(self, action, *args):
@@ -405,7 +405,7 @@ class Node(Thread):
405 405
 
406 406
 class Nodes(Thread):
407 407
     _nodes = {}
408
-    _local = None
408
+    local = None
409 409
     _pulling = False
410 410
 
411 411
     def __init__(self):
@@ -420,9 +420,7 @@ class Nodes(Thread):
420 420
             for u in user.models.User.query.filter_by(queued=True):
421 421
                 logger.debug('adding queued node... %s', u.id)
422 422
                 self.queue('add', u.id, True)
423
-        self._local = LocalNodes()
424
-        self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000)
425
-        self._cleanup.start()
423
+        self.local = LocalNodes()
426 424
         self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval'])
427 425
         self._pullcb.start()
428 426
         Thread.__init__(self)
@@ -435,19 +433,13 @@ class Nodes(Thread):
435 433
         while not state.shutdown:
436 434
             args = self._q.get()
437 435
             if args:
438
-                if args[0] == 'cleanup':
439
-                    self.cleanup()
440
-                elif args[0] == 'add':
436
+                if args[0] == 'add':
441 437
                     self._add(*args[1:])
442 438
                 elif args[0] == 'pull':
443 439
                     self._pull()
444 440
                 else:
445 441
                     self._call(*args)
446 442
 
447
-    def cleanup(self):
448
-        if not state.shutdown and self._local:
449
-            self._local.cleanup()
450
-
451 443
     def queue(self, *args):
452 444
         self._q.put(list(args))
453 445
 
@@ -515,8 +507,8 @@ class Nodes(Thread):
515 507
         self._q.put(None)
516 508
         for node in list(self._nodes.values()):
517 509
             node.join()
518
-        if self._local:
519
-            self._local.join()
510
+        if self.local:
511
+            self.local.close()
520 512
         return Thread.join(self)
521 513
 
522 514
 def publish_node():

+ 8
- 1
oml/tasks.py View File

@@ -40,7 +40,10 @@ class Tasks(Thread):
40 40
         self.queue('scan')
41 41
         import item.scan
42 42
         from item.models import sync_metadata, get_preview, get_cover
43
-        from user.models import export_list, update_user_peering
43
+        from user.models import (
44
+                export_list, update_user_peering,
45
+                add_local_info, remove_local_info,
46
+        )
44 47
         shutdown = False
45 48
         while not shutdown:
46 49
             p, m = self.q.get()
@@ -68,6 +71,10 @@ class Tasks(Thread):
68 71
                             update_user_peering(*data)
69 72
                         elif action == 'ping':
70 73
                             trigger_event('pong', data)
74
+                        elif action == 'addlocalinfo':
75
+                            add_local_info(data)
76
+                        elif action == 'removelocalinfo':
77
+                            remove_local_info(data)
71 78
                         elif action == 'scan':
72 79
                             item.scan.run_scan()
73 80
                         elif action == 'scanimport':

+ 2
- 2
oml/user/api.py View File

@@ -123,9 +123,9 @@ def getUsers(data):
123 123
         users.append(u.json())
124 124
         ids.add(u.id)
125 125
     if state.nodes:
126
-        for id in state.nodes._local._nodes:
126
+        for id in state.nodes.local:
127 127
             if id not in ids:
128
-                n = state.nodes._local._nodes[id].copy()
128
+                n = state.nodes.local[id].copy()
129 129
                 n['online'] = True
130 130
                 n['name'] = n['username']
131 131
                 users.append(n)

+ 20
- 2
oml/user/models.py View File

@@ -54,8 +54,8 @@ class User(db.Model):
54 54
         if not user:
55 55
             user = cls(id=id, peered=False, online=False)
56 56
             user.info = {}
57
-            if state.nodes and state.nodes._local and id in state.nodes._local._nodes:
58
-                user.info['local'] = state.nodes._local._nodes[id]
57
+            if state.nodes and state.nodes.local and id in state.nodes.local:
58
+                user.info['local'] = state.nodes.local[id]
59 59
                 user.info['username'] = user.info['local']['username']
60 60
             user.update_name()
61 61
             user.save()
@@ -598,3 +598,21 @@ def update_user_peering(user_id, peered, username=None):
598 598
         if u:
599 599
             u.update_peering(peered, username)
600 600
 
601
+def remove_local_info(id):
602
+    with db.session():
603
+        u = User.get(id)
604
+        if u and 'local' in u.info:
605
+            del u.info['local']
606
+            u.save()
607
+            u.trigger_status()
608
+
609
+def add_local_info(data):
610
+    with db.session():
611
+        u = User.get(data['id'])
612
+        if u:
613
+            if u.info['username'] != data['username']:
614
+                u.info['username'] = data['username']
615
+                u.update_name()
616
+            u.info['local'] = data
617
+            u.save()
618
+            state.nodes.queue('add', u.id)

+ 1
- 0
requirements-shared.txt View File

@@ -9,3 +9,4 @@ PyPDF2==1.25.1
9 9
 pysocks
10 10
 stem
11 11
 sqlitedict==1.4.0
12
+zeroconf

+ 1
- 0
requirements.txt View File

@@ -5,3 +5,4 @@ SQLAlchemy==1.0.12
5 5
 pyopenssl>=0.15
6 6
 pyCrypto>=2.6.1
7 7
 pillow
8
+netifaces