diff --git a/oml/bandwidth.py b/oml/bandwidth.py index d63ea08..59dab09 100644 --- a/oml/bandwidth.py +++ b/oml/bandwidth.py @@ -1,6 +1,9 @@ import state from websocket import trigger_event +import logging +logger = logging.getLogger(__name__) + class Bandwidth(object): up = 0 down = 0 diff --git a/oml/node/nodeapi.py b/oml/node/nodeapi.py index 10648fa..522d3cf 100644 --- a/oml/node/nodeapi.py +++ b/oml/node/nodeapi.py @@ -65,8 +65,6 @@ def api_requestPeering(user_id, username, message): def api_acceptPeering(user_id, username, message): user = User.get(user_id) logger.debug('incoming acceptPeering event: pending: %s', user.pending) - if user and user.peered: - return True if user and user.pending == 'sent': if not user.info: user.info = {} @@ -75,6 +73,9 @@ def api_acceptPeering(user_id, username, message): user.update_name() user.update_peering(True, username) state.nodes.queue('add', user.id) + trigger_event('peering.accept', user.json()) + return True + elif user and user.peered: return True return False diff --git a/oml/nodes.py b/oml/nodes.py index 15f5b9d..e6168f8 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -36,6 +36,7 @@ ENCODING='base64' class Node(Thread): _running = True host = None + local = None online = False download_speed = 0 TIMEOUT = 5 @@ -44,13 +45,12 @@ class Node(Thread): self._nodes = nodes self.user_id = user.id self._opener = get_opener(self.user_id) - logger.debug('new Node %s online=%s', self.user_id, self.online) self._q = Queue() Thread.__init__(self) self.daemon = True self.start() - self._ping = PeriodicCallback(self.ping, 120000) - self._ping.start() + self._pull = PeriodicCallback(self.pull, 60000) + self._pull.start() self.ping() def run(self): @@ -58,18 +58,27 @@ class Node(Thread): action = self._q.get() if not self._running: break - if action == 'go_online' or not self.online: - self._go_online() - else: + if action == 'go_online': + if not self.online: + self._go_online() + elif action == 'ping': self.online = self.can_connect() + elif action == 'pull': + self.online = self.can_connect() + self.pullChanges() + else: + logger.debug('unknown action %s', action) def join(self): self._running = False - self.ping() + self._q.put('') #return Thread.join(self) + def pull(self): + self._q.put('pull') + def ping(self): - self._q.put('') + self._q.put('ping') def go_online(self): self._q.put('go_online') @@ -123,7 +132,7 @@ class Node(Thread): logger.debug('request %s%s', action, args) self.resolve() url = self.url - if not self.url: + if not url: logger.debug('unable to find host %s', self.user_id) self.online = False return None @@ -202,8 +211,9 @@ class Node(Thread): return user.models.User.get_or_create(self.user_id) def can_connect(self): + self.resolve() + url = self.url try: - url = self.url if url: headers = { 'User-Agent': settings.USER_AGENT, @@ -219,24 +229,23 @@ class Node(Thread): return False c = r.read() logger.debug('can connect to: %s (%s)', url, self.user.nickname) - if self.user.peered: - self.pullChanges() return True except: logger.debug('can not connect to: %s (%s)', url, self.user.nickname) pass return False + def is_online(self): + return self.online or self.get_local() != None + def _go_online(self): - self.resolve() u = self.user if u.peered or u.queued: - logger.debug('go_online peered=%s queued=%s %s [%s]:%s (%s)', u.peered, u.queued, u.id, self.local, self.port, u.nickname) + logger.debug('go_online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) try: - self.online = False - if self.can_connect(): + self.online = self.can_connect() + if self.online: logger.debug('connected to %s', self.url) - self.online = True if u.queued: logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered) if u.pending == 'sent': @@ -246,8 +255,6 @@ class Node(Thread): else: #fixme, what about cancel/reject peering here? self.peering('removePeering') - if u.peered and self.online: - self.pullChanges() except: logger.debug('failed to connect to %s', self.user_id) self.online = False @@ -262,13 +269,23 @@ class Node(Thread): }) def pullChanges(self): + if not self.online: + return True last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first() from_revision = last.revision + 1 if last else 0 - logger.debug('pullChanges %s from %s', self.user.name, from_revision) - changes = self.request('pullChanges', from_revision) + try: + changes = self.request('pullChanges', from_revision) + except: + self.online = False + self.trigger_status() + logger.debug('%s went offline', self.user.name) + return False + logger.debug('changes: %s', changes) if not changes: return False - return Changelog.apply_changes(self.user, changes) + with db.session(): + r = Changelog.apply_changes(self.user, changes) + return r def pushChanges(self, changes): logger.debug('pushing changes to %s %s', self.user_id, changes) @@ -303,6 +320,7 @@ class Node(Thread): def download(self, item): from item.models import Transfer + self.resolve() url = '%s/get/%s' % (self.url, item.id) headers = { 'X-Node-Protocol': settings.NODE_PROTOCOL, @@ -342,6 +360,9 @@ class Node(Thread): ''' content = fileobj.read() ''' + if state.bandwidth: + state.bandwidth.download(size/since_ct) + size = 0 t2 = datetime.utcnow() duration = (t2-t1).total_seconds() @@ -399,7 +420,7 @@ class Nodes(Thread): self._q.put(list(args)) def is_online(self, id): - return id in self._nodes and self._nodes[id].online + return id in self._nodes and self._nodes[id].is_online() def download(self, id, item): return id in self._nodes and self._nodes[id].download(item) diff --git a/oml/user/models.py b/oml/user/models.py index 222297b..6999939 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -45,8 +45,13 @@ class User(db.Model): if not user: user = cls(id=id, peered=False, online=False) user.info = {} + if state.nodes and state.nodes._local and id in state.nodes._local._nodes: + user.info['local'] = state.nodes._local._nodes[id] + user.info['username'] = user.info['local']['username'] user.update_name() user.save() + if state.nodes: + state.nodes.queue('add', user.id) return user def save(self): @@ -87,6 +92,8 @@ class User(db.Model): def update_peering(self, peered, username=None): was_peering = self.peered if peered: + logging.debug('update_peering, pending: %s queued: %s', self.pending, self.queued) + self.queued = self.pending != 'sent' self.pending = '' if username: self.info['username'] = username @@ -98,11 +105,11 @@ class User(db.Model): if not was_peering: Changelog.record(state.user(), 'addpeer', self.id, self.nickname) self.peered = True - self.queued = True self.save() else: self.pending = '' self.peered = False + self.queued = False self.update_name() self.save() List.query.filter_by(user_id=self.id).delete()