openmedialibrary/oml/nodes.py

605 lines
21 KiB
Python
Raw Normal View History

2014-05-04 17:26:43 +00:00
# -*- coding: utf-8 -*-
2014-09-02 22:32:44 +00:00
from queue import Queue
2014-05-04 17:26:43 +00:00
from threading import Thread
import json
2014-10-31 14:47:54 +00:00
from io import BytesIO
2014-05-18 23:24:04 +00:00
import gzip
2014-09-02 22:32:44 +00:00
import urllib.request, urllib.error, urllib.parse
2014-05-04 17:26:43 +00:00
import os
2014-05-19 15:00:33 +00:00
import time
2016-02-12 09:00:59 +00:00
import socket
2016-02-18 11:38:41 +00:00
import socks
2014-05-04 17:26:43 +00:00
import ox
2014-05-18 23:24:04 +00:00
from tornado.ioloop import PeriodicCallback
2014-05-04 17:26:43 +00:00
import settings
import user.models
from websocket import trigger_event
2014-05-12 12:57:47 +00:00
from localnodes import LocalNodes
from tor_request import get_opener
2016-02-10 14:02:32 +00:00
from utils import user_sort_key, get_peer
import state
2014-08-09 16:14:14 +00:00
import db
2016-02-10 14:02:32 +00:00
import library
2014-05-04 17:26:43 +00:00
2014-05-17 14:26:59 +00:00
import logging
2015-11-29 14:56:38 +00:00
logger = logging.getLogger(__name__)
2014-05-17 14:26:59 +00:00
2017-06-03 20:50:14 +00:00
DEBUG_NODES = False
2014-05-04 17:26:43 +00:00
2014-05-19 15:00:33 +00:00
class Node(Thread):
2014-09-02 23:09:42 +00:00
host = None
2015-12-01 08:59:52 +00:00
local = None
2015-12-02 21:05:23 +00:00
_online = None
2014-05-18 23:50:05 +00:00
TIMEOUT = 5
2014-05-04 17:26:43 +00:00
2014-05-12 12:57:47 +00:00
def __init__(self, nodes, user):
self._nodes = nodes
self.user = user
2014-05-04 17:26:43 +00:00
self.user_id = user.id
self._opener = get_opener(self.user_id)
2014-05-19 15:00:33 +00:00
self._q = Queue()
Thread.__init__(self)
self.daemon = True
self.start()
2014-05-19 15:00:33 +00:00
def run(self):
2016-02-14 05:53:55 +00:00
self.ping()
while not state.shutdown:
2014-08-09 16:14:14 +00:00
action = self._q.get()
if state.shutdown:
2014-08-09 16:14:14 +00:00
break
2016-01-17 13:12:56 +00:00
if action == 'send_response':
self._send_response()
2015-12-01 08:59:52 +00:00
elif action == 'ping':
self.online = self.can_connect()
else:
logger.debug('unknown action %s', action)
2014-05-19 15:00:33 +00:00
def join(self):
2015-12-01 08:59:52 +00:00
self._q.put('')
2014-08-09 18:32:41 +00:00
#return Thread.join(self)
2014-05-19 15:00:33 +00:00
def ping(self):
2016-09-29 08:35:47 +00:00
if state.online or self.get_local():
2015-12-02 21:05:23 +00:00
self._q.put('ping')
2014-05-19 15:00:33 +00:00
2014-05-04 17:26:43 +00:00
@property
def url(self):
if self.local:
if ':' in self.local:
url = 'https://[%s]:%s' % (self.local, self.port)
2014-05-12 12:57:47 +00:00
else:
url = 'https://%s:%s' % (self.local, self.port)
2016-04-04 23:28:55 +00:00
else:
url = 'https://%s.onion:9851' % self.user_id
2014-05-04 17:26:43 +00:00
return url
2015-12-02 21:05:23 +00:00
@property
def online(self):
return self._online
@online.setter
def online(self, online):
if self._online != online:
self._online = online
self.trigger_status()
else:
self._online = online
2014-05-13 10:36:02 +00:00
def resolve(self):
2015-12-01 10:51:58 +00:00
#logger.debug('resolve node %s', self.user_id)
r = self.get_local()
2014-05-04 17:26:43 +00:00
if r:
self.local = r['host']
2014-05-04 17:26:43 +00:00
if 'port' in r:
self.port = r['port']
else:
self.local = None
2014-05-04 17:26:43 +00:00
self.port = 9851
2014-05-12 12:57:47 +00:00
def get_local(self):
2016-03-14 13:31:56 +00:00
if self._nodes and self._nodes.local:
return self._nodes.local.get(self.user_id)
2014-05-12 12:57:47 +00:00
return None
2014-05-04 17:26:43 +00:00
def request(self, action, *args):
self.resolve()
2014-05-13 10:36:02 +00:00
url = self.url
2016-04-04 23:28:55 +00:00
if self.local:
logger.debug('request:%s(%s:%s): %s%s', self.user_id, self.local, self.port, action, list(args))
else:
logger.debug('request:%s: %s%s', self.user_id, action, list(args))
2014-09-09 10:08:04 +00:00
content = json.dumps([action, args]).encode()
2014-05-04 17:26:43 +00:00
headers = {
'User-Agent': settings.USER_AGENT,
'X-Node-Protocol': settings.NODE_PROTOCOL,
2014-05-04 17:26:43 +00:00
'Accept': 'text/plain',
'Accept-Encoding': 'gzip',
'Content-Type': 'application/json',
}
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
2015-12-02 15:30:37 +00:00
#logger.debug('headers: %s', self._opener.addheaders)
2014-05-14 09:57:11 +00:00
try:
2016-04-04 23:29:49 +00:00
r = self._opener.open(url, data=content, timeout=self.TIMEOUT*12)
2014-09-02 22:32:44 +00:00
except urllib.error.HTTPError as e:
2014-05-14 09:57:11 +00:00
if e.code == 403:
logger.debug('403: %s (%s)', url, self.user_id)
if state.tasks:
state.tasks.queue('peering', (self.user_id, False))
del self._nodes._nodes[self.user_id]
self.online = False
return None
2014-05-17 14:26:59 +00:00
logger.debug('urllib2.HTTPError %s %s', e, e.code)
2014-05-14 09:57:11 +00:00
self.online = False
return None
2014-09-02 22:32:44 +00:00
except urllib.error.URLError as e:
2014-05-17 14:26:59 +00:00
logger.debug('urllib2.URLError %s', e)
2014-05-14 09:57:11 +00:00
self.online = False
return None
2016-07-04 10:06:50 +00:00
except socket.timeout:
logger.debug('timeout %s', url)
self.online = False
return None
2014-05-14 09:57:11 +00:00
except:
2016-01-24 09:13:03 +00:00
logger.debug('unknown url error', exc_info=True)
2014-05-14 09:57:11 +00:00
self.online = False
return None
data = r.read()
2014-05-18 23:24:04 +00:00
if r.headers.get('content-encoding', None) == 'gzip':
2014-10-31 14:47:54 +00:00
data = gzip.GzipFile(fileobj=BytesIO(data)).read()
version = r.headers.get('X-Node-Protocol', None)
if version != settings.NODE_PROTOCOL:
logger.debug('version does not match local: %s remote %s (%s)', settings.NODE_PROTOCOL, version, self.user_id)
self.online = False
if version > settings.NODE_PROTOCOL:
state.update_required = True
return None
response = json.loads(data.decode('utf-8'))
2014-05-04 17:26:43 +00:00
return response
2014-05-18 03:01:24 +00:00
def can_connect(self):
2015-12-01 08:59:52 +00:00
self.resolve()
url = self.url
2016-02-24 07:19:00 +00:00
if not state.online and not self.local:
return False
2014-05-18 03:01:24 +00:00
try:
if url:
headers = {
'User-Agent': settings.USER_AGENT,
'X-Node-Protocol': settings.NODE_PROTOCOL,
'Accept-Encoding': 'gzip',
}
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
2015-12-01 10:51:58 +00:00
self._opener.timeout = 2
2014-09-02 23:09:42 +00:00
r = self._opener.open(url)
version = r.headers.get('X-Node-Protocol', None)
if version != settings.NODE_PROTOCOL:
logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version)
return False
c = r.read()
2016-02-23 09:07:06 +00:00
if DEBUG_NODES:
logger.debug('can connect to: %s', url)
return True
2014-05-18 03:01:24 +00:00
except:
2016-02-23 09:07:06 +00:00
if DEBUG_NODES:
logger.debug('can not connect to: %s', url)
2014-05-18 03:01:24 +00:00
pass
return False
2015-12-01 08:59:52 +00:00
def is_online(self):
2016-09-29 08:35:47 +00:00
return self.online or self.get_local() is not None
2015-12-01 08:59:52 +00:00
2016-01-17 13:12:56 +00:00
def send_response(self):
self._q.put('send_response')
def _send_response(self):
with db.session():
2016-03-01 11:38:58 +00:00
u = user.models.User.get(self.user_id)
if u and u.peered or u.queued:
2016-02-23 09:07:06 +00:00
if DEBUG_NODES:
logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
try:
self.online = self.can_connect()
except:
2016-02-23 09:07:06 +00:00
if DEBUG_NODES:
logger.debug('failed to connect to %s', self.user_id)
self.online = False
2016-03-01 11:38:58 +00:00
if self.online:
if DEBUG_NODES:
logger.debug('connected to %s', self.url)
if u.queued:
if DEBUG_NODES:
logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered)
if u.pending == 'sent':
self.peering('requestPeering')
elif u.pending == '' and u.peered:
self.peering('acceptPeering')
else:
#fixme, what about cancel/reject peering here?
self.peering('removePeering')
2014-05-19 15:00:33 +00:00
def trigger_status(self):
2015-12-02 21:05:23 +00:00
if self.online is not None:
trigger_event('status', {
'id': self.user_id,
'online': self.online
})
2014-05-04 17:26:43 +00:00
def pullChanges(self):
2016-02-24 07:19:00 +00:00
if state.shutdown:
return
self.online = self.can_connect()
if not self.online or state.shutdown:
return
2017-06-03 20:50:14 +00:00
self.resolve()
2016-02-10 14:02:32 +00:00
peer = get_peer(self.user_id)
2017-06-03 20:50:14 +00:00
path = peer._logpath
if os.path.exists(path):
size = os.path.getsize(path)
else:
size = 0
url = '%s/log' % self.url
if DEBUG_NODES:
logger.debug('pullChanges: %s [%s]', self.user_id, url)
headers = self.headers.copy()
if size:
headers['Range'] = '%s-' % size
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
2016-02-10 14:02:32 +00:00
try:
2017-06-03 20:50:14 +00:00
r = self._opener.open(url, timeout=self.TIMEOUT*60)
except urllib.error.HTTPError as e:
if e.code == 403:
logger.debug('pullChanges 403: %s (%s)', url, self.user_id)
if state.tasks:
state.tasks.queue('peering', (self.user_id, False))
del self._nodes._nodes[self.user_id]
2017-06-03 20:50:14 +00:00
self.online = False
else:
logger.debug('unknown http errpr %s %s (%s)', e.code, url, self.user_id)
return False
except socket.timeout:
logger.debug('timeout %s', url)
return False
except socks.GeneralProxyError:
logger.debug('openurl failed %s', url)
return False
except urllib.error.URLError as e:
logger.debug('openurl failed urllib2.URLError %s', e.reason)
return False
2016-02-10 14:02:32 +00:00
except:
2017-06-03 20:50:14 +00:00
logger.debug('openurl failed %s', url, exc_info=True)
2016-02-10 14:02:32 +00:00
return False
2017-06-03 20:50:14 +00:00
if r.getcode() in (200, 206):
changed = False
chunk_size = 16 * 1024
mode = 'ab' if r.getcode() == 206 else 'wb'
content = b''
try:
if r.headers.get('content-encoding', None) == 'gzip':
fileobj = gzip.GzipFile(fileobj=r)
else:
fileobj = r
for chunk in iter(lambda: fileobj.read(chunk_size), b''):
content += chunk
eol = content.rfind(b'\n') + 1
if eol > 0:
with open(path, mode) as fd:
fd.write(content[:eol])
content = content[eol:]
mode = 'ab'
changed = True
if state.shutdown:
return False
if state.bandwidth:
while not state.bandwidth.download(chunk_size) and not state.shutdown:
time.sleep(0.1)
if content:
with open(path, mode) as fd:
fd.write(content)
changed = True
if changed:
peer.apply_log()
except:
logger.debug('download failed %s', url, exc_info=True)
return False
else:
logger.debug('FAILED %s', url)
2016-02-10 14:02:32 +00:00
return False
2014-05-04 17:26:43 +00:00
2014-05-18 03:01:24 +00:00
def peering(self, action):
with db.session():
u = user.models.User.get_or_create(self.user_id)
if action in ('requestPeering', 'acceptPeering'):
r = self.request(action, settings.preferences['username'], u.info.get('message'))
else:
r = self.request(action, u.info.get('message'))
if r != None:
u.queued = False
if 'message' in u.info:
del u.info['message']
u.save()
else:
logger.debug('peering failed? %s %s', action, r)
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
self.online = False
trigger_event('peering.%s'%action.replace('Peering', ''), u.json())
return True
2014-05-04 17:26:43 +00:00
2016-01-15 07:28:01 +00:00
headers = {
'X-Node-Protocol': settings.NODE_PROTOCOL,
'User-Agent': settings.USER_AGENT,
'Accept-Encoding': 'gzip',
}
2014-05-04 17:26:43 +00:00
def download(self, item):
2015-12-01 08:59:52 +00:00
self.resolve()
2014-05-04 17:26:43 +00:00
url = '%s/get/%s' % (self.url, item.id)
2016-02-23 09:07:06 +00:00
if DEBUG_NODES:
logger.debug('download %s', url)
2016-01-15 07:28:01 +00:00
self._opener.addheaders = list(zip(self.headers.keys(), self.headers.values()))
try:
r = self._opener.open(url, timeout=self.TIMEOUT*5)
2016-02-12 09:00:59 +00:00
except socket.timeout:
logger.debug('timeout %s', url)
return False
2016-02-18 11:38:41 +00:00
except socks.GeneralProxyError:
logger.debug('openurl failed %s', url)
return False
except urllib.error.URLError as e:
logger.debug('openurl failed urllib2.URLError %s', e.reason)
return False
2016-02-20 18:25:06 +00:00
except:
logger.debug('openurl failed %s', url, exc_info=True)
return False
2014-05-14 09:57:11 +00:00
if r.getcode() == 200:
try:
if r.headers.get('content-encoding', None) == 'gzip':
2015-11-30 23:26:35 +00:00
fileobj = gzip.GzipFile(fileobj=r)
else:
fileobj = r
content = []
ct = time.time()
size = item.info['size']
received = 0
chunk_size = 16*1024
for chunk in iter(lambda: fileobj.read(chunk_size), b''):
content.append(chunk)
received += len(chunk)
if time.time() - ct > 1:
ct = time.time()
2016-02-11 15:55:41 +00:00
if state.shutdown:
return False
t = state.downloads.transfers.get(item.id)
if not t: # transfer was canceled
trigger_event('transfer', {
'id': item.id, 'progress': -1
})
return False
else:
t['progress'] = received / size
trigger_event('transfer', {
2016-02-11 15:55:41 +00:00
'id': item.id, 'progress': t['progress']
})
2016-02-11 15:55:41 +00:00
state.downloads.transfers[item.id] = t
if state.bandwidth:
while not state.bandwidth.download(chunk_size) and not state.shutdown:
time.sleep(0.1)
return item.save_file(b''.join(content))
except:
2016-01-24 09:13:03 +00:00
logger.debug('download failed %s', url, exc_info=True)
return False
2014-05-04 17:26:43 +00:00
else:
2014-05-17 14:26:59 +00:00
logger.debug('FAILED %s', url)
2014-05-04 17:26:43 +00:00
return False
def download_preview(self, item_id):
2016-01-15 07:28:01 +00:00
from item.icons import icons
self.resolve()
2016-02-23 09:07:06 +00:00
if DEBUG_NODES:
logger.debug('download preview for %s from %s', item_id, self.url)
url = '%s/preview/%s' % (self.url, item_id)
2016-01-15 07:28:01 +00:00
self._opener.addheaders = list(zip(self.headers.keys(), self.headers.values()))
try:
r = self._opener.open(url, timeout=self.TIMEOUT*2)
2016-02-18 11:38:41 +00:00
except socket.timeout:
logger.debug('timeout %s', url)
return False
except socks.GeneralProxyError:
logger.debug('download failed %s', url)
return False
2016-01-15 07:28:01 +00:00
except:
2016-01-24 10:05:43 +00:00
logger.debug('download failed %s', url, exc_info=True)
2016-02-04 13:07:19 +00:00
self.online = False
2016-01-15 07:28:01 +00:00
return False
code = r.getcode()
if code == 200:
try:
if r.headers.get('content-encoding', None) == 'gzip':
fileobj = gzip.GzipFile(fileobj=r)
else:
fileobj = r
2016-01-15 07:28:01 +00:00
content = fileobj.read()
key = 'preview:' + item_id
2016-01-15 07:28:01 +00:00
icons[key] = content
icons.clear(key+':')
2016-01-15 07:28:01 +00:00
return True
except:
2016-01-24 09:13:03 +00:00
logger.debug('preview download failed %s', url, exc_info=True)
2016-01-15 07:28:01 +00:00
elif code == 404:
pass
else:
logger.debug('FAILED %s', url)
2016-01-24 07:44:43 +00:00
return False
2016-01-15 07:28:01 +00:00
2014-05-14 09:57:11 +00:00
def download_upgrade(self, release):
for module in release['modules']:
path = os.path.join(settings.update_path, release['modules'][module]['name'])
2014-05-04 17:26:43 +00:00
if not os.path.exists(path):
2014-05-14 09:57:11 +00:00
url = '%s/oml/%s' % (self.url, release['modules'][module]['name'])
sha1 = release['modules'][module]['sha1']
2014-05-04 17:26:43 +00:00
headers = {
'User-Agent': settings.USER_AGENT,
}
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
2014-05-14 09:57:11 +00:00
r = self._opener.open(url)
if r.getcode() == 200:
2014-05-04 17:26:43 +00:00
with open(path, 'w') as fd:
2014-05-14 09:57:11 +00:00
fd.write(r.read())
2014-05-04 17:26:43 +00:00
if (ox.sha1sum(path) != sha1):
2014-05-17 14:26:59 +00:00
logger.error('invalid update!')
2014-05-04 17:26:43 +00:00
os.unlink(path)
return False
else:
return False
def upload(self, items):
2019-01-28 10:09:00 +00:00
logger.debug('add items to %s\'s inbox: %s', self.user_id, items)
r = self.request('upload', items)
return bool(r)
2014-05-04 17:26:43 +00:00
class Nodes(Thread):
_nodes = {}
2016-03-14 13:31:56 +00:00
local = None
_pulling = False
2014-05-04 17:26:43 +00:00
2014-08-09 16:33:59 +00:00
def __init__(self):
2014-05-04 17:26:43 +00:00
self._q = Queue()
2015-12-02 21:05:23 +00:00
with db.session():
for u in user.models.User.query.filter_by(peered=True):
if 'local' in u.info:
del u.info['local']
u.save()
self.queue('add', u.id)
2017-06-03 20:50:14 +00:00
get_peer(u.id)
2015-12-02 21:05:23 +00:00
for u in user.models.User.query.filter_by(queued=True):
logger.debug('adding queued node... %s', u.id)
2016-01-17 13:12:56 +00:00
self.queue('add', u.id, True)
2016-03-14 13:31:56 +00:00
self.local = LocalNodes()
2016-02-10 14:02:32 +00:00
self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval'])
self._pullcb.start()
2014-05-04 17:26:43 +00:00
Thread.__init__(self)
self.daemon = True
self.start()
def run(self):
library.sync_db()
self.queue('pull')
while not state.shutdown:
args = self._q.get()
if args:
2016-03-14 13:31:56 +00:00
if args[0] == 'add':
self._add(*args[1:])
elif args[0] == 'pull':
self._pull()
else:
self._call(*args)
2014-05-04 17:26:43 +00:00
def queue(self, *args):
self._q.put(list(args))
def is_online(self, id):
2015-12-01 08:59:52 +00:00
return id in self._nodes and self._nodes[id].is_online()
2014-05-04 17:26:43 +00:00
def download(self, id, item):
return id in self._nodes and self._nodes[id].download(item)
def download_preview(self, id, item):
return id in self._nodes and \
self._nodes[id].is_online() and \
self._nodes[id].download_preview(item)
2014-05-04 17:26:43 +00:00
def _call(self, target, action, *args):
if target == 'all':
2014-09-02 22:32:44 +00:00
nodes = list(self._nodes.values())
2014-05-14 09:57:11 +00:00
elif target == 'peered':
2014-09-02 22:32:44 +00:00
nodes = [n for n in list(self._nodes.values()) if n.user.peered]
2014-05-04 17:26:43 +00:00
elif target == 'online':
2014-09-02 22:32:44 +00:00
nodes = [n for n in list(self._nodes.values()) if n.online]
2014-05-04 17:26:43 +00:00
else:
2016-01-17 13:12:56 +00:00
if not target in self._nodes:
self._add(target)
2014-05-04 17:26:43 +00:00
nodes = [self._nodes[target]]
for node in nodes:
r = getattr(node, action)(*args)
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
2014-05-04 17:26:43 +00:00
2016-01-17 13:12:56 +00:00
def _add(self, user_id, send_response=False):
2014-05-04 17:26:43 +00:00
if user_id not in self._nodes:
from user.models import User
2014-08-09 16:14:14 +00:00
with db.session():
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
2014-05-04 17:26:43 +00:00
else:
2016-02-14 05:53:55 +00:00
self._nodes[user_id].ping()
2016-01-17 13:12:56 +00:00
if send_response:
self._nodes[user_id].send_response()
2014-05-04 17:26:43 +00:00
2016-02-25 07:39:59 +00:00
def pull(self):
if not self._pulling:
self.queue('pull')
def _pull(self):
if not state.sync_enabled or settings.preferences.get('downloadRate') == 0:
return
if state.activity and state.activity.get('activity') == 'import':
return
self._pulling = True
2016-02-24 07:19:00 +00:00
if state.shutdown:
return
users = []
with db.session():
from user.models import User
for u in User.query.filter(User.id!=settings.USER_ID).filter_by(peered=True).all():
2016-02-25 07:39:59 +00:00
users.append(u.json(['id', 'index', 'name']))
users.sort(key=user_sort_key)
for u in users:
if state.shutdown:
break
node = self._nodes.get(u['id'])
if node:
2016-02-24 07:19:00 +00:00
node.pullChanges()
self._pulling = False
2014-05-04 17:26:43 +00:00
def join(self):
self._q.put(None)
2014-09-02 22:32:44 +00:00
for node in list(self._nodes.values()):
2014-05-19 15:00:33 +00:00
node.join()
2016-03-14 13:31:56 +00:00
if self.local:
self.local.close()
2019-01-12 17:41:14 +00:00
return super().join(1)
def publish_node():
update_online()
state.check_nodes = PeriodicCallback(check_nodes, 120000)
state.check_nodes.start()
state._online = PeriodicCallback(update_online, 60000)
state._online.start()
def update_online():
online = state.tor and state.tor.is_online()
if online != state.online:
state.online = online
trigger_event('status', {
'id': settings.USER_ID,
'online': state.online
})
2016-02-13 09:27:55 +00:00
if state.online:
for node in list(state.nodes._nodes.values()):
node.trigger_status()
def check_nodes():
if state.online:
with db.session():
for u in user.models.User.query.filter_by(queued=True):
if not state.nodes.is_online(u.id):
logger.debug('queued peering message for %s trying to connect...', u.id)
2016-01-17 13:12:56 +00:00
state.nodes.queue('add', u.id, True)