openmedialibrary/oml/nodes.py

332 lines
10 KiB
Python
Raw Normal View History

2014-05-04 17:26:43 +00:00
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
from Queue import Queue
from threading import Thread
import json
from datetime import datetime
import os
import ox
import ed25519
2014-05-14 09:57:11 +00:00
import urllib2
2014-05-04 17:26:43 +00:00
import settings
import user.models
from changelog import Changelog
import directory
from websocket import trigger_event
2014-05-12 12:57:47 +00:00
from localnodes import LocalNodes
2014-05-14 09:57:11 +00:00
from ssl_request import get_opener
2014-05-04 17:26:43 +00:00
2014-05-17 14:26:59 +00:00
import logging
logger = logging.getLogger('oml.nodes')
2014-05-04 17:26:43 +00:00
ENCODING='base64'
class Node(object):
2014-05-14 09:57:11 +00:00
_cert = None
2014-05-04 17:26:43 +00:00
online = False
download_speed = 0
2014-05-12 12:57:47 +00:00
def __init__(self, nodes, user):
self._nodes = nodes
self._app = nodes._app
2014-05-04 17:26:43 +00:00
self.user_id = user.id
key = str(user.id)
self.vk = ed25519.VerifyingKey(key, encoding=ENCODING)
self.go_online()
@property
def url(self):
2014-05-12 12:57:47 +00:00
local = self.get_local()
if local:
2014-05-14 09:57:11 +00:00
url = 'https://[%s]:%s' % (local['host'], local['port'])
2014-05-13 10:36:02 +00:00
elif not self.host:
return None
2014-05-04 17:26:43 +00:00
else:
2014-05-12 12:57:47 +00:00
if ':' in self.host:
2014-05-14 09:57:11 +00:00
url = 'https://[%s]:%s' % (self.host, self.port)
2014-05-12 12:57:47 +00:00
else:
2014-05-14 09:57:11 +00:00
url = 'https://%s:%s' % (self.host, self.port)
2014-05-04 17:26:43 +00:00
return url
2014-05-13 10:36:02 +00:00
def resolve(self):
2014-05-04 17:26:43 +00:00
r = directory.get(self.vk)
if r:
self.host = r['host']
if 'port' in r:
self.port = r['port']
2014-05-14 09:57:11 +00:00
if r['cert'] != self._cert:
self._cert = r['cert']
self._opener = get_opener(self._cert)
2014-05-04 17:26:43 +00:00
else:
self.host = None
self.port = 9851
2014-05-12 12:57:47 +00:00
def get_local(self):
if self._nodes and self._nodes._local:
2014-05-14 09:57:11 +00:00
local = self._nodes._local.get(self.user_id)
if local and local['cert'] != self._cert:
self._cert = local['cert']
self._opener = get_opener(self._cert)
return local
2014-05-12 12:57:47 +00:00
return None
2014-05-04 17:26:43 +00:00
def request(self, action, *args):
2014-05-13 10:36:02 +00:00
url = self.url
if not url:
self.resolve()
url = self.url
if not self.url:
2014-05-17 14:26:59 +00:00
logger.debug('unable to find host %s', self.user_id)
2014-05-13 10:36:02 +00:00
self.online = False
2014-05-04 17:26:43 +00:00
return None
content = json.dumps([action, args])
sig = settings.sk.sign(content, encoding=ENCODING)
headers = {
'User-Agent': settings.USER_AGENT,
'Accept': 'text/plain',
'Accept-Encoding': 'gzip',
'Content-Type': 'application/json',
'X-Ed25519-Key': settings.USER_ID,
'X-Ed25519-Signature': sig,
}
2014-05-14 09:57:11 +00:00
self._opener.addheaders = zip(headers.keys(), headers.values())
try:
r = self._opener.open(url, data=content)
except urllib2.HTTPError as e:
if e.code == 403:
2014-05-17 14:26:59 +00:00
logger.debug('REMOTE ENDED PEERING')
2014-05-14 09:57:11 +00:00
if self.user.peered:
self.user.update_peering(False)
self.online = False
return
2014-05-17 14:26:59 +00:00
logger.debug('urllib2.HTTPError %s %s', e, e.code)
2014-05-14 09:57:11 +00:00
self.online = False
return None
except urllib2.URLError as e:
2014-05-17 14:26:59 +00:00
logger.debug('urllib2.URLError %s', e)
2014-05-14 09:57:11 +00:00
self.online = False
return None
except:
2014-05-17 14:26:59 +00:00
logger.debug('unknown url error')
2014-05-14 09:57:11 +00:00
import traceback
print traceback.print_exc()
self.online = False
return None
data = r.read()
2014-05-04 17:26:43 +00:00
sig = r.headers.get('X-Ed25519-Signature')
if sig and self._valid(data, sig):
response = json.loads(data)
else:
2014-05-17 14:26:59 +00:00
logger.debug('invalid signature %s', data)
2014-05-04 17:26:43 +00:00
response = None
return response
def _valid(self, data, sig):
try:
self.vk.verify(sig, data, encoding=ENCODING)
#except ed25519.BadSignatureError:
except:
return False
return True
@property
def user(self):
return user.models.User.get_or_create(self.user_id)
def go_online(self):
2014-05-13 10:36:02 +00:00
self.resolve()
2014-05-04 17:26:43 +00:00
if self.user.peered:
try:
self.online = False
2014-05-17 14:26:59 +00:00
logger.debug('type to connect to %s', self.user_id)
2014-05-04 17:26:43 +00:00
self.pullChanges()
2014-05-17 14:26:59 +00:00
logger.debug('connected to %s', self.user_id)
2014-05-04 17:26:43 +00:00
self.online = True
except:
import traceback
traceback.print_exc()
2014-05-17 14:26:59 +00:00
logger.debug('failed to connect to %s', self.user_id)
2014-05-04 17:26:43 +00:00
self.online = False
else:
self.online = False
trigger_event('status', {
'id': self.user_id,
'status': 'online' if self.online else 'offline'
})
def pullChanges(self):
with self._app.app_context():
last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first()
from_revision = last.revision + 1 if last else 0
changes = self.request('pullChanges', from_revision)
if not changes:
return False
2014-05-16 14:30:16 +00:00
return Changelog.apply_changes(self.user, changes)
2014-05-04 17:26:43 +00:00
def pushChanges(self, changes):
2014-05-17 14:26:59 +00:00
logger.debug('pushing changes to %s %s', self.user_id, changes)
2014-05-04 17:26:43 +00:00
try:
r = self.request('pushChanges', changes)
except:
self.online = False
trigger_event('status', {
'id': self.user_id,
'status': 'offline'
})
r = False
2014-05-17 14:26:59 +00:00
logger.debug('pushedChanges %s %s', r, self.user_id)
2014-05-04 17:26:43 +00:00
def requestPeering(self, message):
p = self.user
p.pending = 'sent'
p.save()
r = self.request('requestPeering', settings.preferences['username'], message)
return True
def acceptPeering(self, message):
2014-05-17 14:26:59 +00:00
logger.debug('run acceptPeering %s', message)
2014-05-04 17:26:43 +00:00
r = self.request('acceptPeering', settings.preferences['username'], message)
2014-05-17 14:26:59 +00:00
logger.debug('result %s', r)
2014-05-04 17:26:43 +00:00
p = self.user
p.update_peering(True)
self.go_online()
return True
def rejectPeering(self, message):
2014-05-17 14:26:59 +00:00
logger.debug('rejectPeering %s', self.user)
2014-05-04 17:26:43 +00:00
p = self.user
p.update_peering(False)
2014-05-13 10:36:02 +00:00
r = self.request('rejectPeering', message)
self.online = False
2014-05-04 17:26:43 +00:00
return True
def removePeering(self, message):
2014-05-17 14:26:59 +00:00
logger.debug('removePeering %s', self.user)
2014-05-04 17:26:43 +00:00
p = self.user
2014-05-13 10:36:02 +00:00
if p.peered:
p.update_peering(False)
r = self.request('removePeering', message)
self.online = False
2014-05-12 23:43:27 +00:00
return True
def cancelPeering(self, message):
p = self.user
p.update_peering(False)
2014-05-13 10:36:02 +00:00
self.online = False
r = self.request('cancelPeering', message)
2014-05-04 17:26:43 +00:00
return True
def download(self, item):
url = '%s/get/%s' % (self.url, item.id)
headers = {
'User-Agent': settings.USER_AGENT,
}
t1 = datetime.now()
2014-05-17 14:26:59 +00:00
logger.debug('download %s', url)
2014-05-14 09:57:11 +00:00
'''
2014-05-04 17:26:43 +00:00
r = requests.get(url, headers=headers)
if r.status_code == 200:
2014-05-14 09:57:11 +00:00
content = r.content
'''
self._opener.addheaders = zip(headers.keys(), headers.values())
r = self._opener.open(url)
if r.getcode() == 200:
content = r.read()
2014-05-04 17:26:43 +00:00
t2 = datetime.now()
duration = (t2-t1).total_seconds()
if duration:
2014-05-14 09:57:11 +00:00
self.download_speed = len(content) / duration
2014-05-17 14:26:59 +00:00
logger.debug('SPEED %s', ox.format_bits(self.download_speed))
2014-05-14 09:57:11 +00:00
return item.save_file(content)
2014-05-04 17:26:43 +00:00
else:
2014-05-17 14:26:59 +00:00
logger.debug('FAILED %s', url)
2014-05-04 17:26:43 +00:00
return False
2014-05-14 09:57:11 +00:00
def download_upgrade(self, release):
for module in release['modules']:
path = os.path.join(settings.update_path, release['modules'][module]['name'])
2014-05-04 17:26:43 +00:00
if not os.path.exists(path):
2014-05-14 09:57:11 +00:00
url = '%s/oml/%s' % (self.url, release['modules'][module]['name'])
sha1 = release['modules'][module]['sha1']
2014-05-04 17:26:43 +00:00
headers = {
'User-Agent': settings.USER_AGENT,
}
2014-05-14 09:57:11 +00:00
self._opener.addheaders = zip(headers.keys(), headers.values())
r = self._opener.open(url)
if r.getcode() == 200:
2014-05-04 17:26:43 +00:00
with open(path, 'w') as fd:
2014-05-14 09:57:11 +00:00
fd.write(r.read())
2014-05-04 17:26:43 +00:00
if (ox.sha1sum(path) != sha1):
2014-05-17 14:26:59 +00:00
logger.error('invalid update!')
2014-05-04 17:26:43 +00:00
os.unlink(path)
return False
else:
return False
class Nodes(Thread):
_nodes = {}
def __init__(self, app):
self._app = app
self._q = Queue()
self._running = True
2014-05-12 12:57:47 +00:00
self._local = LocalNodes(app)
2014-05-04 17:26:43 +00:00
Thread.__init__(self)
self.daemon = True
self.start()
def queue(self, *args):
self._q.put(list(args))
def check_online(self, id):
return id in self._nodes and self._nodes[id].online
def download(self, id, item):
return id in self._nodes and self._nodes[id].download(item)
def _call(self, target, action, *args):
if target == 'all':
nodes = self._nodes.values()
2014-05-14 09:57:11 +00:00
elif target == 'peered':
nodes = [n for n in self._nodes.values() if n.user.peered]
2014-05-04 17:26:43 +00:00
elif target == 'online':
nodes = [n for n in self._nodes.values() if n.online]
else:
nodes = [self._nodes[target]]
for node in nodes:
getattr(node, action)(*args)
def _add_node(self, user_id):
if user_id not in self._nodes:
from user.models import User
2014-05-12 12:57:47 +00:00
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
2014-05-13 10:36:02 +00:00
'''
2014-05-04 17:26:43 +00:00
else:
self._nodes[user_id].online = True
trigger_event('status', {
'id': user_id,
'status': 'online'
})
2014-05-13 10:36:02 +00:00
'''
2014-05-04 17:26:43 +00:00
def run(self):
with self._app.app_context():
while self._running:
args = self._q.get()
if args:
if args[0] == 'add':
self._add_node(args[1])
else:
self._call(*args)
def join(self):
self._running = False
self._q.put(None)
return Thread.join(self)