switch to pull based updates
This commit is contained in:
parent
077ea2bd84
commit
75164a8399
4 changed files with 58 additions and 26 deletions
67
oml/nodes.py
67
oml/nodes.py
|
|
@ -36,6 +36,7 @@ ENCODING='base64'
|
|||
class Node(Thread):
|
||||
_running = True
|
||||
host = None
|
||||
local = None
|
||||
online = False
|
||||
download_speed = 0
|
||||
TIMEOUT = 5
|
||||
|
|
@ -44,13 +45,12 @@ class Node(Thread):
|
|||
self._nodes = nodes
|
||||
self.user_id = user.id
|
||||
self._opener = get_opener(self.user_id)
|
||||
logger.debug('new Node %s online=%s', self.user_id, self.online)
|
||||
self._q = Queue()
|
||||
Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.start()
|
||||
self._ping = PeriodicCallback(self.ping, 120000)
|
||||
self._ping.start()
|
||||
self._pull = PeriodicCallback(self.pull, 60000)
|
||||
self._pull.start()
|
||||
self.ping()
|
||||
|
||||
def run(self):
|
||||
|
|
@ -58,18 +58,27 @@ class Node(Thread):
|
|||
action = self._q.get()
|
||||
if not self._running:
|
||||
break
|
||||
if action == 'go_online' or not self.online:
|
||||
self._go_online()
|
||||
else:
|
||||
if action == 'go_online':
|
||||
if not self.online:
|
||||
self._go_online()
|
||||
elif action == 'ping':
|
||||
self.online = self.can_connect()
|
||||
elif action == 'pull':
|
||||
self.online = self.can_connect()
|
||||
self.pullChanges()
|
||||
else:
|
||||
logger.debug('unknown action %s', action)
|
||||
|
||||
def join(self):
|
||||
self._running = False
|
||||
self.ping()
|
||||
self._q.put('')
|
||||
#return Thread.join(self)
|
||||
|
||||
def pull(self):
|
||||
self._q.put('pull')
|
||||
|
||||
def ping(self):
|
||||
self._q.put('')
|
||||
self._q.put('ping')
|
||||
|
||||
def go_online(self):
|
||||
self._q.put('go_online')
|
||||
|
|
@ -123,7 +132,7 @@ class Node(Thread):
|
|||
logger.debug('request %s%s', action, args)
|
||||
self.resolve()
|
||||
url = self.url
|
||||
if not self.url:
|
||||
if not url:
|
||||
logger.debug('unable to find host %s', self.user_id)
|
||||
self.online = False
|
||||
return None
|
||||
|
|
@ -202,8 +211,9 @@ class Node(Thread):
|
|||
return user.models.User.get_or_create(self.user_id)
|
||||
|
||||
def can_connect(self):
|
||||
self.resolve()
|
||||
url = self.url
|
||||
try:
|
||||
url = self.url
|
||||
if url:
|
||||
headers = {
|
||||
'User-Agent': settings.USER_AGENT,
|
||||
|
|
@ -219,24 +229,23 @@ class Node(Thread):
|
|||
return False
|
||||
c = r.read()
|
||||
logger.debug('can connect to: %s (%s)', url, self.user.nickname)
|
||||
if self.user.peered:
|
||||
self.pullChanges()
|
||||
return True
|
||||
except:
|
||||
logger.debug('can not connect to: %s (%s)', url, self.user.nickname)
|
||||
pass
|
||||
return False
|
||||
|
||||
def is_online(self):
|
||||
return self.online or self.get_local() != None
|
||||
|
||||
def _go_online(self):
|
||||
self.resolve()
|
||||
u = self.user
|
||||
if u.peered or u.queued:
|
||||
logger.debug('go_online peered=%s queued=%s %s [%s]:%s (%s)', u.peered, u.queued, u.id, self.local, self.port, u.nickname)
|
||||
logger.debug('go_online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
|
||||
try:
|
||||
self.online = False
|
||||
if self.can_connect():
|
||||
self.online = self.can_connect()
|
||||
if self.online:
|
||||
logger.debug('connected to %s', self.url)
|
||||
self.online = True
|
||||
if u.queued:
|
||||
logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered)
|
||||
if u.pending == 'sent':
|
||||
|
|
@ -246,8 +255,6 @@ class Node(Thread):
|
|||
else:
|
||||
#fixme, what about cancel/reject peering here?
|
||||
self.peering('removePeering')
|
||||
if u.peered and self.online:
|
||||
self.pullChanges()
|
||||
except:
|
||||
logger.debug('failed to connect to %s', self.user_id)
|
||||
self.online = False
|
||||
|
|
@ -262,13 +269,23 @@ class Node(Thread):
|
|||
})
|
||||
|
||||
def pullChanges(self):
|
||||
if not self.online:
|
||||
return True
|
||||
last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first()
|
||||
from_revision = last.revision + 1 if last else 0
|
||||
logger.debug('pullChanges %s from %s', self.user.name, from_revision)
|
||||
changes = self.request('pullChanges', from_revision)
|
||||
try:
|
||||
changes = self.request('pullChanges', from_revision)
|
||||
except:
|
||||
self.online = False
|
||||
self.trigger_status()
|
||||
logger.debug('%s went offline', self.user.name)
|
||||
return False
|
||||
logger.debug('changes: %s', changes)
|
||||
if not changes:
|
||||
return False
|
||||
return Changelog.apply_changes(self.user, changes)
|
||||
with db.session():
|
||||
r = Changelog.apply_changes(self.user, changes)
|
||||
return r
|
||||
|
||||
def pushChanges(self, changes):
|
||||
logger.debug('pushing changes to %s %s', self.user_id, changes)
|
||||
|
|
@ -303,6 +320,7 @@ class Node(Thread):
|
|||
|
||||
def download(self, item):
|
||||
from item.models import Transfer
|
||||
self.resolve()
|
||||
url = '%s/get/%s' % (self.url, item.id)
|
||||
headers = {
|
||||
'X-Node-Protocol': settings.NODE_PROTOCOL,
|
||||
|
|
@ -342,6 +360,9 @@ class Node(Thread):
|
|||
'''
|
||||
content = fileobj.read()
|
||||
'''
|
||||
if state.bandwidth:
|
||||
state.bandwidth.download(size/since_ct)
|
||||
size = 0
|
||||
|
||||
t2 = datetime.utcnow()
|
||||
duration = (t2-t1).total_seconds()
|
||||
|
|
@ -399,7 +420,7 @@ class Nodes(Thread):
|
|||
self._q.put(list(args))
|
||||
|
||||
def is_online(self, id):
|
||||
return id in self._nodes and self._nodes[id].online
|
||||
return id in self._nodes and self._nodes[id].is_online()
|
||||
|
||||
def download(self, id, item):
|
||||
return id in self._nodes and self._nodes[id].download(item)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue