From dc2121293e8b3e0126876399d295613dc852f594 Mon Sep 17 00:00:00 2001 From: j Date: Thu, 22 May 2014 16:20:40 +0200 Subject: [PATCH] fall back to ipv4 for local nodes if no ipv6 connection is available --- oml/changelog.py | 5 +-- oml/downloads.py | 7 +--- oml/item/models.py | 10 ++++- oml/localnodes.py | 99 +++++++++++++++++++++++++++++++------------- oml/nodes.py | 68 ++++++++++++++++-------------- oml/user/models.py | 2 +- oml/utils.py | 29 +++++++++++++ static/js/folders.js | 2 +- static/js/utils.js | 1 + 9 files changed, 151 insertions(+), 72 deletions(-) diff --git a/oml/changelog.py b/oml/changelog.py index 52aa028..6786454 100644 --- a/oml/changelog.py +++ b/oml/changelog.py @@ -57,7 +57,7 @@ class Changelog(db.Model): c.sig = settings.sk.sign(_data, encoding='base64') db.session.add(c) db.session.commit() - if state.online: + if state.nodes: state.nodes.queue('peered', 'pushChanges', [c.json()]) @classmethod @@ -192,8 +192,7 @@ class Changelog(db.Model): if i.users: i.update() else: - db.session.delete(i) - db.session.commit() + i.delete() return True def action_addlist(self, user, timestamp, name, query=None): diff --git a/oml/downloads.py b/oml/downloads.py index 6ee0d0f..1f417cf 100644 --- a/oml/downloads.py +++ b/oml/downloads.py @@ -38,11 +38,8 @@ class Downloads(Thread): import item.scan item.scan.run_scan() while self._running: - if state.online: - self.download_next() - time.sleep(0.5) - else: - time.sleep(20) + self.download_next() + time.sleep(0.5) def join(self): self._running = False diff --git a/oml/item/models.py b/oml/item/models.py index 6fb6312..c4483be 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -228,6 +228,13 @@ class Item(db.Model): db.session.add(self) db.session.commit() + def delete(self, commit=True): + db.session.delete(self) + Sort.query.filter_by(item_id=self.id).delete() + Transfer.query.filter_by(item_id=self.id).delete() + if commit: + db.session.commit() + meta_keys = ('title', 'author', 'date', 'publisher', 'edition', 'language') def update_meta(self, data): @@ -397,8 +404,7 @@ class Item(db.Model): l.items.remove(self) db.session.commit() if not self.users: - db.session.delete(self) - Sort.query.filter_by(item_id=self.id).delete() + self.delete() else: self.update() Changelog.record(user, 'removeitem', self.id) diff --git a/oml/localnodes.py b/oml/localnodes.py index c2e7e3a..8e0763c 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -11,7 +11,7 @@ import sys import thread from threading import Thread -from utils import valid, get_public_ipv6 +from utils import valid, get_public_ipv6, get_local_ipv4, get_interface from settings import preferences, server, USER_ID, sk import state @@ -19,34 +19,26 @@ logger = logging.getLogger('oml.localnodes') def can_connect(data): try: - s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + if ':' in data['host']: + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + else: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(1) s.connect((data['host'], data['port'])) s.close() return True except: pass + logger.debug('can_connect failed') return False -def get_interface(): - interface = '' - if sys.platform == 'darwin': - #cmd = ['/usr/sbin/netstat', '-rn'] - cmd = ['/sbin/route', '-n', 'get', 'default'] - p = subprocess.Popen(cmd, stdout=subprocess.PIPE) - stdout, stderr = p.communicate() - interface = [[p.strip() for p in s.split(':', 1)] for s in stdout.strip().split('\n') if 'interface' in s] - if interface: - interface = '%%%s' % interface[0][1] - else: - interface = '' - return interface - class LocalNodes(Thread): _active = True _nodes = {} + _MODE = 6 _BROADCAST = "ff02::1" + _BROADCAST4 = "239.255.255.250" _PORT = 9851 TTL = 1 @@ -58,10 +50,7 @@ class LocalNodes(Thread): self.daemon = True self.start() - def send(self): - if not server['localnode_discovery']: - return - + def get_packet(self): message = json.dumps({ 'username': preferences.get('username', 'anonymous'), 'host': self.host, @@ -70,10 +59,17 @@ class LocalNodes(Thread): }) sig = sk.sign(message, encoding='base64') packet = json.dumps([sig, USER_ID, message]) + return packet + def send(self): + if not server['localnode_discovery']: + return + if self._MODE == 4: + return self.send4() + packet = self.get_packet() ttl = struct.pack('@i', self.TTL) address = self._BROADCAST + get_interface() - addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6,socket.SOCK_DGRAM) + addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM) addr = addrs[0] (family, socktype, proto, canonname, sockaddr) = addr s = socket.socket(family, socktype, proto) @@ -81,7 +77,27 @@ class LocalNodes(Thread): s.sendto(packet + '\0', sockaddr) s.close() + def send4(self): + logger.debug('send4') + packet = self.get_packet() + sockaddr = (self._BROADCAST4, self._PORT) + s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + s.sendto(packet + '\0', sockaddr) + s.close() + logger.debug('sent4') + ''' + try: + s.sendto(packet + '\0', sockaddr) + s.close() + except: + logger.debug('send failed %s', ) + return + ''' + def receive(self): + if self._MODE == 4: + return self.receive4() s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('', self._PORT)) @@ -94,14 +110,25 @@ class LocalNodes(Thread): data = data[:-1] # Strip trailing \0's data = self.verify(data) if data: - #fixme use local link address - #print addr - if data['id'] != USER_ID: - if data['id'] not in self._nodes: - thread.start_new_thread(self.new_node, (data, )) - #else: - # print 'UPDATE NODE', data - self._nodes[data['id']] = data + self.update_node(data) + + def receive4(self): + logger.debug('receive4') + s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + mreq = struct.pack("=4sl", socket.inet_aton(self._BROADCAST4), socket.INADDR_ANY) + s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + + s.bind(('', self._PORT)) + while self._active: + data, addr = s.recvfrom(1024) + logger.debug('receive4') + while data[-1] == '\0': + data = data[:-1] # Strip trailing \0's + logger.debug('receive4 %s', data) + data = self.verify(data) + if data: + self.update_node(data) def verify(self, data): try: @@ -118,6 +145,16 @@ class LocalNodes(Thread): return None return message + def update_node(self, data): + #fixme use local link address + #print addr + if data['id'] != USER_ID: + if data['id'] not in self._nodes: + thread.start_new_thread(self.new_node, (data, )) + #else: + # print 'UPDATE NODE', data + self._nodes[data['id']] = data + def get(self, user_id): if user_id in self._nodes: if can_connect(self._nodes[user_id]): @@ -138,6 +175,10 @@ class LocalNodes(Thread): def run(self): self.host = get_public_ipv6() + if not self.host: + logger.debug('no ipv6 detected, fall back to local ipv4 sharing') + self.host = get_local_ipv4() + self._MODE = 4 self.send() self.receive() diff --git a/oml/nodes.py b/oml/nodes.py index da6cc96..2c03628 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -78,20 +78,24 @@ class Node(Thread): @property def url(self): - local = self.get_local() - if local: - url = 'https://[%s]:%s' % (local['host'], local['port']) - elif not self.host: - return None - else: + if self.host: if ':' in self.host: url = 'https://[%s]:%s' % (self.host, self.port) else: url = 'https://%s:%s' % (self.host, self.port) + else: + url = None return url def resolve(self): - r = directory.get(self.vk) + logger.debug('resolve node') + r = self.get_local() + if not r: + try: + r = directory.get(self.vk) + except: + logger.debug('directory failed', exc_info=1) + r = None if r: self.host = r['host'] if 'port' in r: @@ -187,31 +191,32 @@ class Node(Thread): def can_connect(self): try: - logger.debug('try to connect to %s', self.url) - headers = { - 'User-Agent': settings.USER_AGENT, - 'X-Node-Protocol': settings.NODE_PROTOCOL, - 'Accept-Encoding': 'gzip', - } - self._opener.addheaders = zip(headers.keys(), headers.values()) - r = self._opener.open(self.url, timeout=1) - version = r.headers.get('X-Node-Protocol', None) - if version != settings.NODE_PROTOCOL: - logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version) - return False - c = r.read() - logger.debug('ok') - return True + url = self.url + if url: + logger.debug('try to connect to %s', url) + headers = { + 'User-Agent': settings.USER_AGENT, + 'X-Node-Protocol': settings.NODE_PROTOCOL, + 'Accept-Encoding': 'gzip', + } + self._opener.addheaders = zip(headers.keys(), headers.values()) + r = self._opener.open(url, timeout=1) + version = r.headers.get('X-Node-Protocol', None) + if version != settings.NODE_PROTOCOL: + logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version) + return False + c = r.read() + logger.debug('ok') + return True except: pass - logger.debug('failed') return False def _go_online(self): self.resolve() u = self.user logger.debug('go_online peer=%s queued=%s (%s)', u.peered, u.queued, u.id) - if u.peered or u.queued: + if u.peered or u.queued and self.host: try: self.online = False logger.debug('try to connect to %s at [%s]:%s', self.user_id, self.host, self.port) @@ -253,13 +258,14 @@ class Node(Thread): def pushChanges(self, changes): logger.debug('pushing changes to %s %s', self.user_id, changes) - try: - r = self.request('pushChanges', changes) - except: - self.online = False - self.trigger_status() - r = False - logger.debug('pushedChanges %s %s', r, self.user_id) + if self.online: + try: + r = self.request('pushChanges', changes) + except: + self.online = False + self.trigger_status() + r = False + logger.debug('pushedChanges %s %s', r, self.user_id) def peering(self, action): u = self.user diff --git a/oml/user/models.py b/oml/user/models.py index 21cf03b..0586135 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -100,7 +100,7 @@ class User(db.Model): for i in self.items: i.users.remove(self) if not i.users: - db.session.delete(i) + i.delete() else: i.update_lists() Changelog.query.filter_by(user_id=self.id).delete() diff --git a/oml/utils.py b/oml/utils.py index 8809f61..19d85ae 100644 --- a/oml/utils.py +++ b/oml/utils.py @@ -3,6 +3,7 @@ from __future__ import division import os +import sys import Image from StringIO import StringIO import re @@ -12,6 +13,7 @@ import cStringIO import gzip import time from datetime import datetime +import subprocess import ox import ed25519 @@ -121,6 +123,33 @@ def get_public_ipv6(): ip = None return ip +def get_interface(): + interface = '' + if sys.platform == 'darwin': + #cmd = ['/usr/sbin/netstat', '-rn'] + cmd = ['/sbin/route', '-n', 'get', 'default'] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE) + stdout, stderr = p.communicate() + interface = [[p.strip() for p in s.split(':', 1)] for s in stdout.strip().split('\n') if 'interface' in s] + if interface: + interface = '%%%s' % interface[0][1] + else: + interface = '' + return interface + +def get_local_ipv4(): + ip = socket.gethostbyaddr(socket.getfqdn())[-1][0] + if ip == '127.0.0.1': + cmd = ['ip', 'route', 'show'] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE) + stdout, stderr = p.communicate() + local = [l for l in stdout.split('\n') if 'default' in l] + if local: + dev = local[0].split(' ')[4] + local_ip = [l for l in stdout.split('\n') if dev in l and not 'default' in l] + return [p for p in local_ip[0].split(' ')[1:] if '.' in p][0] + return ip + def update_dict(root, data): for key in data: keys = map(lambda part: part.replace('\0', '\\.'), key.replace('\\.', '\0').split('.')) diff --git a/static/js/folders.js b/static/js/folders.js index 6566640..3629f26 100644 --- a/static/js/folders.js +++ b/static/js/folders.js @@ -309,7 +309,7 @@ oml.ui.folders = function() { }) .css({height: items.length * 16 + 'px'}) .size(); - oml.resizeFolders(); + oml.resizeListFolders(); callback && callback(); }); }; diff --git a/static/js/utils.js b/static/js/utils.js index 1d5f604..034ab9d 100644 --- a/static/js/utils.js +++ b/static/js/utils.js @@ -5,6 +5,7 @@ oml.addList = function() { isDuplicate = args.length == 1, isSmart, isFrom, name, callback, list, listData, data, + ui = oml.user.ui, username = oml.user.preferences.username; Ox.Request.clearCache('getLists'); oml.api.getLists(function(result) {