openmedialibrary/oml/nodes.py

355 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
from Queue import Queue
from threading import Thread
import json
import socket
from StringIO import StringIO
import gzip
import urllib2
from datetime import datetime
import os
import ox
import ed25519
from tornado.ioloop import PeriodicCallback
import settings
import user.models
from changelog import Changelog
import directory
from websocket import trigger_event
from localnodes import LocalNodes
from ssl_request import get_opener
import logging
logger = logging.getLogger('oml.nodes')
ENCODING='base64'
class Node(object):
_cert = None
online = False
download_speed = 0
TIMEOUT = 5
def __init__(self, nodes, user):
self._nodes = nodes
self._app = nodes._app
self.user_id = user.id
key = str(user.id)
self.vk = ed25519.VerifyingKey(key, encoding=ENCODING)
self.go_online()
logger.debug('new Node %s online=%s', self.user_id, self.online)
self._ping = PeriodicCallback(self.ping, 120000)
self._ping.start()
@property
def url(self):
local = self.get_local()
if local:
url = 'https://[%s]:%s' % (local['host'], local['port'])
elif not self.host:
return None
else:
if ':' in self.host:
url = 'https://[%s]:%s' % (self.host, self.port)
else:
url = 'https://%s:%s' % (self.host, self.port)
return url
def resolve(self):
r = directory.get(self.vk)
if r:
self.host = r['host']
if 'port' in r:
self.port = r['port']
if r['cert'] != self._cert:
self._cert = r['cert']
self._opener = get_opener(self._cert)
else:
self.host = None
self.port = 9851
def get_local(self):
if self._nodes and self._nodes._local:
local = self._nodes._local.get(self.user_id)
if local and local['cert'] != self._cert:
self._cert = local['cert']
self._opener = get_opener(self._cert)
return local
return None
def request(self, action, *args):
url = self.url
if not url:
self.resolve()
url = self.url
if not self.url:
logger.debug('unable to find host %s', self.user_id)
self.online = False
return None
content = json.dumps([action, args])
sig = settings.sk.sign(content, encoding=ENCODING)
headers = {
'User-Agent': settings.USER_AGENT,
'Accept': 'text/plain',
'Accept-Encoding': 'gzip',
'Content-Type': 'application/json',
'X-Ed25519-Key': settings.USER_ID,
'X-Ed25519-Signature': sig,
}
self._opener.addheaders = zip(headers.keys(), headers.values())
try:
r = self._opener.open(url, data=content, timeout=self.TIMEOUT)
except urllib2.HTTPError as e:
if e.code == 403:
logger.debug('REMOTE ENDED PEERING')
if self.user.peered:
self.user.update_peering(False)
self.online = False
return
logger.debug('urllib2.HTTPError %s %s', e, e.code)
self.online = False
return None
except urllib2.URLError as e:
logger.debug('urllib2.URLError %s', e)
self.online = False
return None
except:
logger.debug('unknown url error', exc_info=1)
self.online = False
return None
data = r.read()
if r.headers.get('content-encoding', None) == 'gzip':
data = gzip.GzipFile(fileobj=StringIO(data)).read()
sig = r.headers.get('X-Ed25519-Signature')
if sig and self._valid(data, sig):
response = json.loads(data)
else:
logger.debug('invalid signature %s', data)
response = None
return response
def _valid(self, data, sig):
try:
self.vk.verify(sig, data, encoding=ENCODING)
#except ed25519.BadSignatureError:
except:
return False
return True
@property
def user(self):
return user.models.User.get_or_create(self.user_id)
def can_connect(self):
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.settimeout(1)
s.connect((self.host, self.port))
s.close()
return True
except:
pass
return False
def ping(self):
with self._app.app_context():
if self.online:
self.online = self.can_connect()
else:
self.go_online()
def go_online(self):
self.resolve()
u = self.user
if u.peered or u.queued:
try:
self.online = False
logger.debug('type to connect to %s at [%s]:%s', self.user_id, self.host, self.port)
if self.can_connect():
self.online = True
if u.queued:
logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered)
if u.pending == 'sent':
self.peering('requestPeering')
elif u.pending == '' and u.peered:
self.peering('acceptPeering')
else:
#fixme, what about cancel/reject peering here?
self.peering('removePeering')
if self.online:
self.pullChanges()
logger.debug('connected to %s', self.user_id)
except:
logger.debug('failed to connect to %s', self.user_id, exc_info=1)
self.online = False
else:
self.online = False
trigger_event('status', {
'id': self.user_id,
'online': self.online
})
def pullChanges(self):
with self._app.app_context():
last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first()
from_revision = last.revision + 1 if last else 0
changes = self.request('pullChanges', from_revision)
if not changes:
return False
return Changelog.apply_changes(self.user, changes)
def pushChanges(self, changes):
logger.debug('pushing changes to %s %s', self.user_id, changes)
try:
r = self.request('pushChanges', changes)
except:
self.online = False
trigger_event('status', {
'id': self.user_id,
'online': self.online
})
r = False
logger.debug('pushedChanges %s %s', r, self.user_id)
def peering(self, action):
u = self.user
if action in ('requestPeering', 'acceptPeering'):
r = self.request(action, settings.preferences['username'], u.info.get('message'))
else:
r = self.request(action, u.info.get('message'))
if r != None:
u.queued = False
if 'message' in u.info:
del u.info['message']
u.save()
else:
logger.debug('peering failed? %s %s', action, r)
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
self.online = False
return True
def download(self, item):
url = '%s/get/%s' % (self.url, item.id)
headers = {
'User-Agent': settings.USER_AGENT,
}
t1 = datetime.now()
logger.debug('download %s', url)
'''
r = requests.get(url, headers=headers)
if r.status_code == 200:
content = r.content
'''
self._opener.addheaders = zip(headers.keys(), headers.values())
r = self._opener.open(url, timeout=self.TIMEOUT*2)
if r.getcode() == 200:
if r.headers.get('content-encoding', None) == 'gzip':
content = gzip.GzipFile(fileobj=r).read()
else:
'''
content = ''
for chunk in iter(lambda: r.read(1024*1024), ''):
content += chunk
item.transferprogress = len(content) / item.info['size']
item.save()
trigger_event('transfer', {
'id': item.id, 'progress': item.transferprogress
})
'''
content = r.read()
t2 = datetime.now()
duration = (t2-t1).total_seconds()
if duration:
self.download_speed = len(content) / duration
logger.debug('SPEED %s', ox.format_bits(self.download_speed))
return item.save_file(content)
else:
logger.debug('FAILED %s', url)
return False
def download_upgrade(self, release):
for module in release['modules']:
path = os.path.join(settings.update_path, release['modules'][module]['name'])
if not os.path.exists(path):
url = '%s/oml/%s' % (self.url, release['modules'][module]['name'])
sha1 = release['modules'][module]['sha1']
headers = {
'User-Agent': settings.USER_AGENT,
}
self._opener.addheaders = zip(headers.keys(), headers.values())
r = self._opener.open(url)
if r.getcode() == 200:
with open(path, 'w') as fd:
fd.write(r.read())
if (ox.sha1sum(path) != sha1):
logger.error('invalid update!')
os.unlink(path)
return False
else:
return False
class Nodes(Thread):
_nodes = {}
_local = None
def __init__(self, app):
self._app = app
self._q = Queue()
self._running = True
self._local = LocalNodes(app)
Thread.__init__(self)
self.daemon = True
self.start()
def queue(self, *args):
self._q.put(list(args))
def check_online(self, id):
return id in self._nodes and self._nodes[id].online
def download(self, id, item):
return id in self._nodes and self._nodes[id].download(item)
def _call(self, target, action, *args):
if target == 'all':
nodes = self._nodes.values()
elif target == 'peered':
nodes = [n for n in self._nodes.values() if n.user.peered]
elif target == 'online':
nodes = [n for n in self._nodes.values() if n.online]
else:
nodes = [self._nodes[target]]
for node in nodes:
getattr(node, action)(*args)
def _add(self, user_id):
if user_id not in self._nodes:
from user.models import User
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
else:
if not self._nodes[user_id].online:
self._nodes[user_id].go_online()
def run(self):
with self._app.app_context():
while self._running:
args = self._q.get()
if args:
if args[0] == 'add':
self._add(args[1])
else:
self._call(*args)
def join(self):
self._running = False
self._q.put(None)
return Thread.join(self)