nodes
This commit is contained in:
parent
1436b14003
commit
75a14fed1e
18 changed files with 251 additions and 121 deletions
|
|
@ -151,7 +151,8 @@ class Changelog(db.Model):
|
|||
if not i:
|
||||
i = Item.get_or_create(itemid, info)
|
||||
i.modified = datetime.fromtimestamp(float(timestamp))
|
||||
i.users.append(user)
|
||||
if user not in i.users:
|
||||
i.users.append(user)
|
||||
i.update()
|
||||
return True
|
||||
|
||||
|
|
|
|||
|
|
@ -70,7 +70,6 @@ def api_acceptPeering(app, user_id, username, message):
|
|||
user.info['username'] = username
|
||||
user.info['message'] = message
|
||||
user.update_peering(True, username)
|
||||
trigger_event('peering.accept', user.json())
|
||||
state.nodes.queue('add', user.id)
|
||||
return True
|
||||
return False
|
||||
|
|
|
|||
71
oml/nodes.py
71
oml/nodes.py
|
|
@ -11,6 +11,7 @@ import gzip
|
|||
import urllib2
|
||||
from datetime import datetime
|
||||
import os
|
||||
import time
|
||||
|
||||
import ox
|
||||
import ed25519
|
||||
|
|
@ -30,7 +31,8 @@ logger = logging.getLogger('oml.nodes')
|
|||
|
||||
ENCODING='base64'
|
||||
|
||||
class Node(object):
|
||||
class Node(Thread):
|
||||
_running = True
|
||||
_cert = None
|
||||
online = False
|
||||
download_speed = 0
|
||||
|
|
@ -42,10 +44,36 @@ class Node(object):
|
|||
self.user_id = user.id
|
||||
key = str(user.id)
|
||||
self.vk = ed25519.VerifyingKey(key, encoding=ENCODING)
|
||||
self.go_online()
|
||||
logger.debug('new Node %s online=%s', self.user_id, self.online)
|
||||
self._q = Queue()
|
||||
Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.start()
|
||||
self._ping = PeriodicCallback(self.ping, 120000)
|
||||
self._ping.start()
|
||||
self.ping()
|
||||
|
||||
def run(self):
|
||||
with self._app.app_context():
|
||||
while self._running:
|
||||
action = self._q.get()
|
||||
if not self._running:
|
||||
break
|
||||
if action == 'go_online' or not self.online:
|
||||
self._go_online()
|
||||
else:
|
||||
self.online = self.can_connect()
|
||||
|
||||
def join(self):
|
||||
self._running = False
|
||||
self.ping()
|
||||
return Thread.join(self)
|
||||
|
||||
def ping(self):
|
||||
self._q.put('')
|
||||
|
||||
def go_online(self):
|
||||
self._q.put('go_online')
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
|
|
@ -148,30 +176,26 @@ class Node(object):
|
|||
|
||||
def can_connect(self):
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
||||
s.settimeout(1)
|
||||
s.connect((self.host, self.port))
|
||||
s.close()
|
||||
logger.debug('try to connect to %s', self.url)
|
||||
r = self._opener.open(self.url, timeout=1)
|
||||
c = r.read()
|
||||
logger.debug('ok')
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
logger.debug('failed')
|
||||
return False
|
||||
|
||||
def ping(self):
|
||||
with self._app.app_context():
|
||||
if self.online:
|
||||
self.online = self.can_connect()
|
||||
else:
|
||||
self.go_online()
|
||||
|
||||
def go_online(self):
|
||||
def _go_online(self):
|
||||
self.resolve()
|
||||
u = self.user
|
||||
logger.debug('go_online peer=%s queued=%s (%s)', u.peered, u.queued, u.id)
|
||||
if u.peered or u.queued:
|
||||
try:
|
||||
self.online = False
|
||||
logger.debug('type to connect to %s at [%s]:%s', self.user_id, self.host, self.port)
|
||||
logger.debug('try to connect to %s at [%s]:%s', self.user_id, self.host, self.port)
|
||||
if self.can_connect():
|
||||
logger.debug('connected to [%s]:%s', self.host, self.port)
|
||||
self.online = True
|
||||
if u.queued:
|
||||
logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered)
|
||||
|
|
@ -184,12 +208,14 @@ class Node(object):
|
|||
self.peering('removePeering')
|
||||
if self.online:
|
||||
self.pullChanges()
|
||||
logger.debug('connected to %s', self.user_id)
|
||||
except:
|
||||
logger.debug('failed to connect to %s', self.user_id, exc_info=1)
|
||||
self.online = False
|
||||
else:
|
||||
self.online = False
|
||||
self.trigger_status()
|
||||
|
||||
def trigger_status(self):
|
||||
trigger_event('status', {
|
||||
'id': self.user_id,
|
||||
'online': self.online
|
||||
|
|
@ -210,10 +236,7 @@ class Node(object):
|
|||
r = self.request('pushChanges', changes)
|
||||
except:
|
||||
self.online = False
|
||||
trigger_event('status', {
|
||||
'id': self.user_id,
|
||||
'online': self.online
|
||||
})
|
||||
self.trigger_status()
|
||||
r = False
|
||||
logger.debug('pushedChanges %s %s', r, self.user_id)
|
||||
|
||||
|
|
@ -230,9 +253,11 @@ class Node(object):
|
|||
u.save()
|
||||
else:
|
||||
logger.debug('peering failed? %s %s', action, r)
|
||||
|
||||
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
|
||||
self.online = False
|
||||
else:
|
||||
self.go_online()
|
||||
trigger_event('peering.%s'%action.replace('Peering', ''), u.json())
|
||||
return True
|
||||
|
||||
def download(self, item):
|
||||
|
|
@ -336,7 +361,7 @@ class Nodes(Thread):
|
|||
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
|
||||
else:
|
||||
if not self._nodes[user_id].online:
|
||||
self._nodes[user_id].go_online()
|
||||
self._nodes[user_id].ping()
|
||||
|
||||
def run(self):
|
||||
with self._app.app_context():
|
||||
|
|
@ -351,4 +376,6 @@ class Nodes(Thread):
|
|||
def join(self):
|
||||
self._running = False
|
||||
self._q.put(None)
|
||||
for node in self._nodes.values():
|
||||
node.join()
|
||||
return Thread.join(self)
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ class CertValidatingHTTPSConnection(httplib.HTTPConnection):
|
|||
if not self._ValidateCertificateFingerprint(cert):
|
||||
raise InvalidCertificateException(self.fingerprint, cert,
|
||||
'fingerprint mismatch')
|
||||
logger.debug('CIPHER %s VERSION %s', self.sock.cipher(), self.sock.ssl_version)
|
||||
#logger.debug('CIPHER %s VERSION %s', self.sock.cipher(), self.sock.ssl_version)
|
||||
|
||||
class VerifiedHTTPSHandler(urllib2.HTTPSHandler):
|
||||
def __init__(self, **kwargs):
|
||||
|
|
|
|||
|
|
@ -76,7 +76,6 @@ class User(db.Model):
|
|||
|
||||
def update_peering(self, peered, username=None):
|
||||
was_peering = self.peered
|
||||
self.queued = True
|
||||
if peered:
|
||||
self.pending = ''
|
||||
if username:
|
||||
|
|
@ -90,6 +89,7 @@ class User(db.Model):
|
|||
if not was_peering:
|
||||
Changelog.record(state.user(), 'addpeer', self.id, self.nickname)
|
||||
self.peered = True
|
||||
self.queued = True
|
||||
self.save()
|
||||
else:
|
||||
self.pending = ''
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue