fall back to ipv4 for local nodes if no ipv6 connection is available

This commit is contained in:
j 2014-05-22 16:20:40 +02:00
parent c285022bca
commit dc2121293e
9 changed files with 151 additions and 72 deletions

View file

@ -57,7 +57,7 @@ class Changelog(db.Model):
c.sig = settings.sk.sign(_data, encoding='base64') c.sig = settings.sk.sign(_data, encoding='base64')
db.session.add(c) db.session.add(c)
db.session.commit() db.session.commit()
if state.online: if state.nodes:
state.nodes.queue('peered', 'pushChanges', [c.json()]) state.nodes.queue('peered', 'pushChanges', [c.json()])
@classmethod @classmethod
@ -192,8 +192,7 @@ class Changelog(db.Model):
if i.users: if i.users:
i.update() i.update()
else: else:
db.session.delete(i) i.delete()
db.session.commit()
return True return True
def action_addlist(self, user, timestamp, name, query=None): def action_addlist(self, user, timestamp, name, query=None):

View file

@ -38,11 +38,8 @@ class Downloads(Thread):
import item.scan import item.scan
item.scan.run_scan() item.scan.run_scan()
while self._running: while self._running:
if state.online: self.download_next()
self.download_next() time.sleep(0.5)
time.sleep(0.5)
else:
time.sleep(20)
def join(self): def join(self):
self._running = False self._running = False

View file

@ -228,6 +228,13 @@ class Item(db.Model):
db.session.add(self) db.session.add(self)
db.session.commit() db.session.commit()
def delete(self, commit=True):
db.session.delete(self)
Sort.query.filter_by(item_id=self.id).delete()
Transfer.query.filter_by(item_id=self.id).delete()
if commit:
db.session.commit()
meta_keys = ('title', 'author', 'date', 'publisher', 'edition', 'language') meta_keys = ('title', 'author', 'date', 'publisher', 'edition', 'language')
def update_meta(self, data): def update_meta(self, data):
@ -397,8 +404,7 @@ class Item(db.Model):
l.items.remove(self) l.items.remove(self)
db.session.commit() db.session.commit()
if not self.users: if not self.users:
db.session.delete(self) self.delete()
Sort.query.filter_by(item_id=self.id).delete()
else: else:
self.update() self.update()
Changelog.record(user, 'removeitem', self.id) Changelog.record(user, 'removeitem', self.id)

View file

@ -11,7 +11,7 @@ import sys
import thread import thread
from threading import Thread from threading import Thread
from utils import valid, get_public_ipv6 from utils import valid, get_public_ipv6, get_local_ipv4, get_interface
from settings import preferences, server, USER_ID, sk from settings import preferences, server, USER_ID, sk
import state import state
@ -19,34 +19,26 @@ logger = logging.getLogger('oml.localnodes')
def can_connect(data): def can_connect(data):
try: try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) if ':' in data['host']:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1) s.settimeout(1)
s.connect((data['host'], data['port'])) s.connect((data['host'], data['port']))
s.close() s.close()
return True return True
except: except:
pass pass
logger.debug('can_connect failed')
return False return False
def get_interface():
interface = ''
if sys.platform == 'darwin':
#cmd = ['/usr/sbin/netstat', '-rn']
cmd = ['/sbin/route', '-n', 'get', 'default']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
interface = [[p.strip() for p in s.split(':', 1)] for s in stdout.strip().split('\n') if 'interface' in s]
if interface:
interface = '%%%s' % interface[0][1]
else:
interface = ''
return interface
class LocalNodes(Thread): class LocalNodes(Thread):
_active = True _active = True
_nodes = {} _nodes = {}
_MODE = 6
_BROADCAST = "ff02::1" _BROADCAST = "ff02::1"
_BROADCAST4 = "239.255.255.250"
_PORT = 9851 _PORT = 9851
TTL = 1 TTL = 1
@ -58,10 +50,7 @@ class LocalNodes(Thread):
self.daemon = True self.daemon = True
self.start() self.start()
def send(self): def get_packet(self):
if not server['localnode_discovery']:
return
message = json.dumps({ message = json.dumps({
'username': preferences.get('username', 'anonymous'), 'username': preferences.get('username', 'anonymous'),
'host': self.host, 'host': self.host,
@ -70,10 +59,17 @@ class LocalNodes(Thread):
}) })
sig = sk.sign(message, encoding='base64') sig = sk.sign(message, encoding='base64')
packet = json.dumps([sig, USER_ID, message]) packet = json.dumps([sig, USER_ID, message])
return packet
def send(self):
if not server['localnode_discovery']:
return
if self._MODE == 4:
return self.send4()
packet = self.get_packet()
ttl = struct.pack('@i', self.TTL) ttl = struct.pack('@i', self.TTL)
address = self._BROADCAST + get_interface() address = self._BROADCAST + get_interface()
addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6,socket.SOCK_DGRAM) addrs = socket.getaddrinfo(address, self._PORT, socket.AF_INET6, socket.SOCK_DGRAM)
addr = addrs[0] addr = addrs[0]
(family, socktype, proto, canonname, sockaddr) = addr (family, socktype, proto, canonname, sockaddr) = addr
s = socket.socket(family, socktype, proto) s = socket.socket(family, socktype, proto)
@ -81,7 +77,27 @@ class LocalNodes(Thread):
s.sendto(packet + '\0', sockaddr) s.sendto(packet + '\0', sockaddr)
s.close() s.close()
def send4(self):
logger.debug('send4')
packet = self.get_packet()
sockaddr = (self._BROADCAST4, self._PORT)
s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
s.sendto(packet + '\0', sockaddr)
s.close()
logger.debug('sent4')
'''
try:
s.sendto(packet + '\0', sockaddr)
s.close()
except:
logger.debug('send failed %s', )
return
'''
def receive(self): def receive(self):
if self._MODE == 4:
return self.receive4()
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', self._PORT)) s.bind(('', self._PORT))
@ -94,14 +110,25 @@ class LocalNodes(Thread):
data = data[:-1] # Strip trailing \0's data = data[:-1] # Strip trailing \0's
data = self.verify(data) data = self.verify(data)
if data: if data:
#fixme use local link address self.update_node(data)
#print addr
if data['id'] != USER_ID: def receive4(self):
if data['id'] not in self._nodes: logger.debug('receive4')
thread.start_new_thread(self.new_node, (data, )) s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
#else: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# print 'UPDATE NODE', data mreq = struct.pack("=4sl", socket.inet_aton(self._BROADCAST4), socket.INADDR_ANY)
self._nodes[data['id']] = data s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
s.bind(('', self._PORT))
while self._active:
data, addr = s.recvfrom(1024)
logger.debug('receive4')
while data[-1] == '\0':
data = data[:-1] # Strip trailing \0's
logger.debug('receive4 %s', data)
data = self.verify(data)
if data:
self.update_node(data)
def verify(self, data): def verify(self, data):
try: try:
@ -118,6 +145,16 @@ class LocalNodes(Thread):
return None return None
return message return message
def update_node(self, data):
#fixme use local link address
#print addr
if data['id'] != USER_ID:
if data['id'] not in self._nodes:
thread.start_new_thread(self.new_node, (data, ))
#else:
# print 'UPDATE NODE', data
self._nodes[data['id']] = data
def get(self, user_id): def get(self, user_id):
if user_id in self._nodes: if user_id in self._nodes:
if can_connect(self._nodes[user_id]): if can_connect(self._nodes[user_id]):
@ -138,6 +175,10 @@ class LocalNodes(Thread):
def run(self): def run(self):
self.host = get_public_ipv6() self.host = get_public_ipv6()
if not self.host:
logger.debug('no ipv6 detected, fall back to local ipv4 sharing')
self.host = get_local_ipv4()
self._MODE = 4
self.send() self.send()
self.receive() self.receive()

View file

@ -78,20 +78,24 @@ class Node(Thread):
@property @property
def url(self): def url(self):
local = self.get_local() if self.host:
if local:
url = 'https://[%s]:%s' % (local['host'], local['port'])
elif not self.host:
return None
else:
if ':' in self.host: if ':' in self.host:
url = 'https://[%s]:%s' % (self.host, self.port) url = 'https://[%s]:%s' % (self.host, self.port)
else: else:
url = 'https://%s:%s' % (self.host, self.port) url = 'https://%s:%s' % (self.host, self.port)
else:
url = None
return url return url
def resolve(self): def resolve(self):
r = directory.get(self.vk) logger.debug('resolve node')
r = self.get_local()
if not r:
try:
r = directory.get(self.vk)
except:
logger.debug('directory failed', exc_info=1)
r = None
if r: if r:
self.host = r['host'] self.host = r['host']
if 'port' in r: if 'port' in r:
@ -187,31 +191,32 @@ class Node(Thread):
def can_connect(self): def can_connect(self):
try: try:
logger.debug('try to connect to %s', self.url) url = self.url
headers = { if url:
'User-Agent': settings.USER_AGENT, logger.debug('try to connect to %s', url)
'X-Node-Protocol': settings.NODE_PROTOCOL, headers = {
'Accept-Encoding': 'gzip', 'User-Agent': settings.USER_AGENT,
} 'X-Node-Protocol': settings.NODE_PROTOCOL,
self._opener.addheaders = zip(headers.keys(), headers.values()) 'Accept-Encoding': 'gzip',
r = self._opener.open(self.url, timeout=1) }
version = r.headers.get('X-Node-Protocol', None) self._opener.addheaders = zip(headers.keys(), headers.values())
if version != settings.NODE_PROTOCOL: r = self._opener.open(url, timeout=1)
logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version) version = r.headers.get('X-Node-Protocol', None)
return False if version != settings.NODE_PROTOCOL:
c = r.read() logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version)
logger.debug('ok') return False
return True c = r.read()
logger.debug('ok')
return True
except: except:
pass pass
logger.debug('failed')
return False return False
def _go_online(self): def _go_online(self):
self.resolve() self.resolve()
u = self.user u = self.user
logger.debug('go_online peer=%s queued=%s (%s)', u.peered, u.queued, u.id) logger.debug('go_online peer=%s queued=%s (%s)', u.peered, u.queued, u.id)
if u.peered or u.queued: if u.peered or u.queued and self.host:
try: try:
self.online = False self.online = False
logger.debug('try to connect to %s at [%s]:%s', self.user_id, self.host, self.port) logger.debug('try to connect to %s at [%s]:%s', self.user_id, self.host, self.port)
@ -253,13 +258,14 @@ class Node(Thread):
def pushChanges(self, changes): def pushChanges(self, changes):
logger.debug('pushing changes to %s %s', self.user_id, changes) logger.debug('pushing changes to %s %s', self.user_id, changes)
try: if self.online:
r = self.request('pushChanges', changes) try:
except: r = self.request('pushChanges', changes)
self.online = False except:
self.trigger_status() self.online = False
r = False self.trigger_status()
logger.debug('pushedChanges %s %s', r, self.user_id) r = False
logger.debug('pushedChanges %s %s', r, self.user_id)
def peering(self, action): def peering(self, action):
u = self.user u = self.user

View file

@ -100,7 +100,7 @@ class User(db.Model):
for i in self.items: for i in self.items:
i.users.remove(self) i.users.remove(self)
if not i.users: if not i.users:
db.session.delete(i) i.delete()
else: else:
i.update_lists() i.update_lists()
Changelog.query.filter_by(user_id=self.id).delete() Changelog.query.filter_by(user_id=self.id).delete()

View file

@ -3,6 +3,7 @@
from __future__ import division from __future__ import division
import os import os
import sys
import Image import Image
from StringIO import StringIO from StringIO import StringIO
import re import re
@ -12,6 +13,7 @@ import cStringIO
import gzip import gzip
import time import time
from datetime import datetime from datetime import datetime
import subprocess
import ox import ox
import ed25519 import ed25519
@ -121,6 +123,33 @@ def get_public_ipv6():
ip = None ip = None
return ip return ip
def get_interface():
interface = ''
if sys.platform == 'darwin':
#cmd = ['/usr/sbin/netstat', '-rn']
cmd = ['/sbin/route', '-n', 'get', 'default']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
interface = [[p.strip() for p in s.split(':', 1)] for s in stdout.strip().split('\n') if 'interface' in s]
if interface:
interface = '%%%s' % interface[0][1]
else:
interface = ''
return interface
def get_local_ipv4():
ip = socket.gethostbyaddr(socket.getfqdn())[-1][0]
if ip == '127.0.0.1':
cmd = ['ip', 'route', 'show']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
local = [l for l in stdout.split('\n') if 'default' in l]
if local:
dev = local[0].split(' ')[4]
local_ip = [l for l in stdout.split('\n') if dev in l and not 'default' in l]
return [p for p in local_ip[0].split(' ')[1:] if '.' in p][0]
return ip
def update_dict(root, data): def update_dict(root, data):
for key in data: for key in data:
keys = map(lambda part: part.replace('\0', '\\.'), key.replace('\\.', '\0').split('.')) keys = map(lambda part: part.replace('\0', '\\.'), key.replace('\\.', '\0').split('.'))

View file

@ -309,7 +309,7 @@ oml.ui.folders = function() {
}) })
.css({height: items.length * 16 + 'px'}) .css({height: items.length * 16 + 'px'})
.size(); .size();
oml.resizeFolders(); oml.resizeListFolders();
callback && callback(); callback && callback();
}); });
}; };

View file

@ -5,6 +5,7 @@ oml.addList = function() {
isDuplicate = args.length == 1, isDuplicate = args.length == 1,
isSmart, isFrom, name, callback, isSmart, isFrom, name, callback,
list, listData, data, list, listData, data,
ui = oml.user.ui,
username = oml.user.preferences.username; username = oml.user.preferences.username;
Ox.Request.clearCache('getLists'); Ox.Request.clearCache('getLists');
oml.api.getLists(function(result) { oml.api.getLists(function(result) {