openmedialibrary/oml/nodes.py

298 lines
8.9 KiB
Python
Raw Normal View History

2014-05-04 17:26:43 +00:00
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
from Queue import Queue
from threading import Thread
import json
from datetime import datetime
import os
import ox
import ed25519
import requests
import settings
import user.models
from changelog import Changelog
import directory
from websocket import trigger_event
2014-05-12 12:57:47 +00:00
from localnodes import LocalNodes
2014-05-04 17:26:43 +00:00
ENCODING='base64'
class Node(object):
online = False
download_speed = 0
2014-05-12 12:57:47 +00:00
def __init__(self, nodes, user):
self._nodes = nodes
self._app = nodes._app
2014-05-04 17:26:43 +00:00
self.user_id = user.id
key = str(user.id)
self.vk = ed25519.VerifyingKey(key, encoding=ENCODING)
self.go_online()
@property
def url(self):
2014-05-12 12:57:47 +00:00
local = self.get_local()
if local:
url = 'http://[%s]:%s' % (local['host'], local['port'])
print 'using local peer discovery to access node', url
2014-05-13 10:36:02 +00:00
elif not self.host:
return None
2014-05-04 17:26:43 +00:00
else:
2014-05-12 12:57:47 +00:00
if ':' in self.host:
url = 'http://[%s]:%s' % (self.host, self.port)
else:
url = 'http://%s:%s' % (self.host, self.port)
2014-05-04 17:26:43 +00:00
return url
2014-05-13 10:36:02 +00:00
def resolve(self):
2014-05-04 17:26:43 +00:00
r = directory.get(self.vk)
if r:
self.host = r['host']
if 'port' in r:
self.port = r['port']
else:
self.host = None
self.port = 9851
2014-05-12 12:57:47 +00:00
def get_local(self):
if self._nodes and self._nodes._local:
return self._nodes._local.get(self.user_id)
return None
2014-05-04 17:26:43 +00:00
def request(self, action, *args):
2014-05-13 10:36:02 +00:00
url = self.url
if not url:
self.resolve()
url = self.url
if not self.url:
print 'unable to find host', self.user_id
self.online = False
2014-05-04 17:26:43 +00:00
return None
content = json.dumps([action, args])
sig = settings.sk.sign(content, encoding=ENCODING)
headers = {
'User-Agent': settings.USER_AGENT,
'Accept': 'text/plain',
'Accept-Encoding': 'gzip',
'Content-Type': 'application/json',
'X-Ed25519-Key': settings.USER_ID,
'X-Ed25519-Signature': sig,
}
2014-05-13 10:36:02 +00:00
r = requests.post(url, data=content, headers=headers)
2014-05-04 17:26:43 +00:00
if r.status_code == 403:
print 'REMOTE ENDED PEERING'
if self.user.peered:
self.user.update_peering(False)
2014-05-13 10:36:02 +00:00
self.online = False
2014-05-04 17:26:43 +00:00
data = r.content
sig = r.headers.get('X-Ed25519-Signature')
if sig and self._valid(data, sig):
response = json.loads(data)
else:
2014-05-13 10:36:02 +00:00
print 'invalid signature', data
2014-05-04 17:26:43 +00:00
response = None
return response
def _valid(self, data, sig):
try:
self.vk.verify(sig, data, encoding=ENCODING)
#except ed25519.BadSignatureError:
except:
return False
return True
@property
def user(self):
return user.models.User.get_or_create(self.user_id)
def go_online(self):
2014-05-13 10:36:02 +00:00
self.resolve()
2014-05-04 17:26:43 +00:00
if self.user.peered:
try:
self.online = False
print 'type to connect to', self.user_id
self.pullChanges()
print 'connected to', self.user_id
self.online = True
except:
import traceback
traceback.print_exc()
print 'failed to connect to', self.user_id
self.online = False
else:
self.online = False
trigger_event('status', {
'id': self.user_id,
'status': 'online' if self.online else 'offline'
})
def pullChanges(self):
with self._app.app_context():
last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first()
from_revision = last.revision + 1 if last else 0
changes = self.request('pullChanges', from_revision)
if not changes:
return False
for change in changes:
if not Changelog.apply_change(self.user, change):
print 'FAIL', change
break
return False
return True
def pushChanges(self, changes):
print 'pushing changes to', self.user_id, changes
try:
r = self.request('pushChanges', changes)
except:
self.online = False
trigger_event('status', {
'id': self.user_id,
'status': 'offline'
})
r = False
print r
def requestPeering(self, message):
p = self.user
p.pending = 'sent'
p.save()
r = self.request('requestPeering', settings.preferences['username'], message)
return True
def acceptPeering(self, message):
r = self.request('acceptPeering', settings.preferences['username'], message)
p = self.user
p.update_peering(True)
self.go_online()
return True
def rejectPeering(self, message):
2014-05-13 10:36:02 +00:00
print 'reject peering!!!', self.user
2014-05-04 17:26:43 +00:00
p = self.user
p.update_peering(False)
2014-05-13 10:36:02 +00:00
r = self.request('rejectPeering', message)
print 'reject peering!!!', r
self.online = False
2014-05-04 17:26:43 +00:00
return True
def removePeering(self, message):
2014-05-12 23:43:27 +00:00
print 'remove peering!!!', self.user
2014-05-04 17:26:43 +00:00
p = self.user
2014-05-13 10:36:02 +00:00
if p.peered:
p.update_peering(False)
r = self.request('removePeering', message)
self.online = False
2014-05-12 23:43:27 +00:00
return True
def cancelPeering(self, message):
p = self.user
p.update_peering(False)
2014-05-13 10:36:02 +00:00
self.online = False
r = self.request('cancelPeering', message)
2014-05-04 17:26:43 +00:00
return True
def download(self, item):
url = '%s/get/%s' % (self.url, item.id)
headers = {
'User-Agent': settings.USER_AGENT,
}
t1 = datetime.now()
print 'GET', url
r = requests.get(url, headers=headers)
if r.status_code == 200:
t2 = datetime.now()
duration = (t2-t1).total_seconds()
if duration:
self.download_speed = len(r.content) / duration
print 'SPEED', ox.format_bits(self.download_speed)
return item.save_file(r.content)
else:
print 'FAILED', url
return False
def download_upgrade(self):
for module in settings.release['modules']:
path = os.path.join(settings.update_path, settings.release['modules'][module]['name'])
if not os.path.exists(path):
url = '%s/oml/%s' % (self.url, settings.release['modules'][module]['name'])
sha1 = settings.release['modules'][module]['sha1']
headers = {
'User-Agent': settings.USER_AGENT,
}
r = requests.get(url, headers=headers)
if r.status_code == 200:
with open(path, 'w') as fd:
fd.write(r.content)
if (ox.sha1sum(path) != sha1):
print 'invalid update!'
os.unlink(path)
return False
else:
return False
class Nodes(Thread):
_nodes = {}
def __init__(self, app):
self._app = app
self._q = Queue()
self._running = True
2014-05-12 12:57:47 +00:00
self._local = LocalNodes(app)
2014-05-04 17:26:43 +00:00
Thread.__init__(self)
self.daemon = True
self.start()
def queue(self, *args):
self._q.put(list(args))
def check_online(self, id):
return id in self._nodes and self._nodes[id].online
def download(self, id, item):
return id in self._nodes and self._nodes[id].download(item)
def _call(self, target, action, *args):
if target == 'all':
nodes = self._nodes.values()
elif target == 'online':
nodes = [n for n in self._nodes.values() if n.online]
else:
nodes = [self._nodes[target]]
for node in nodes:
getattr(node, action)(*args)
def _add_node(self, user_id):
if user_id not in self._nodes:
from user.models import User
2014-05-12 12:57:47 +00:00
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
2014-05-13 10:36:02 +00:00
'''
2014-05-04 17:26:43 +00:00
else:
self._nodes[user_id].online = True
trigger_event('status', {
'id': user_id,
'status': 'online'
})
2014-05-13 10:36:02 +00:00
'''
2014-05-04 17:26:43 +00:00
def run(self):
with self._app.app_context():
while self._running:
args = self._q.get()
if args:
if args[0] == 'add':
self._add_node(args[1])
else:
self._call(*args)
def join(self):
self._running = False
self._q.put(None)
return Thread.join(self)