From 68383b8834bfc04dc7c0cb669960f6ec38120c65 Mon Sep 17 00:00:00 2001 From: j Date: Tue, 1 Dec 2015 00:26:35 +0100 Subject: [PATCH] report bandwidth, only pull changes --- oml/bandwidth.py | 26 +++++++++++++++++++ oml/changelog.py | 4 +-- oml/node/nodeapi.py | 2 ++ oml/nodes.py | 47 +++++++++++++++++++++-------------- oml/server.py | 2 ++ oml/state.py | 1 + oml/update.py | 2 +- static/js/connectionButton.js | 20 +++++++++++---- 8 files changed, 78 insertions(+), 26 deletions(-) create mode 100644 oml/bandwidth.py diff --git a/oml/bandwidth.py b/oml/bandwidth.py new file mode 100644 index 0000000..d63ea08 --- /dev/null +++ b/oml/bandwidth.py @@ -0,0 +1,26 @@ +import state +from websocket import trigger_event + +class Bandwidth(object): + up = 0 + down = 0 + _last = {} + + def __init__(self): + self.update() + + def update(self): + bandwidth = {'up': self.up, 'down': self.down} + if bandwidth != self._last: + trigger_event('bandwidth', bandwidth) + self._last = bandwidth + self.up = 0 + self.down = 0 + state.main.call_later(1, self.update) + + def download(self, amount): + self.down += amount * 8 + + def upload(self, amount): + self.up += amount * 8 + diff --git a/oml/changelog.py b/oml/changelog.py index 36f78af..9abe826 100644 --- a/oml/changelog.py +++ b/oml/changelog.py @@ -58,8 +58,8 @@ class Changelog(db.Model): _data = _data.encode() state.db.session.add(c) state.db.session.commit() - if state.nodes: - state.nodes.queue('peered', 'pushChanges', [c.json()]) + #if state.nodes: + # state.nodes.queue('peered', 'pushChanges', [c.json()]) @classmethod def apply_changes(cls, user, changes): diff --git a/oml/node/nodeapi.py b/oml/node/nodeapi.py index af9357e..10648fa 100644 --- a/oml/node/nodeapi.py +++ b/oml/node/nodeapi.py @@ -34,6 +34,8 @@ def api_pullChanges(remote_id, user_id=None, from_=None, to=None): return [c.json() for c in qs] def api_pushChanges(user_id, changes): + logger.debug('pushChanges no longer used, ignored') + return True user = User.get(user_id) if not Changelog.apply_changes(user, changes): logger.debug('FAILED TO APPLY CHANGE') diff --git a/oml/nodes.py b/oml/nodes.py index 5738aca..290de75 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -244,7 +244,7 @@ class Node(Thread): else: #fixme, what about cancel/reject peering here? self.peering('removePeering') - if self.online: + if self.peered and self.online: self.pullChanges() except: logger.debug('failed to connect to %s', self.user_id) @@ -316,24 +316,30 @@ class Node(Thread): return False if r.getcode() == 200: try: + fileobj = r if r.headers.get('content-encoding', None) == 'gzip': - content = gzip.GzipFile(fileobj=r).read() - else: - content = b'' - ct = datetime.utcnow() - for chunk in iter(lambda: r.read(16*1024), b''): - content += chunk - if (datetime.utcnow() - ct).total_seconds() > 1: - ct = datetime.utcnow() - t = Transfer.get(item.id) - t.progress = len(content) / item.info['size'] - t.save() - trigger_event('transfer', { - 'id': item.id, 'progress': t.progress - }) - ''' - content = r.read() - ''' + fileobj = gzip.GzipFile(fileobj=r) + content = b'' + ct = datetime.utcnow() + size = 0 + for chunk in iter(lambda: fileobj.read(16*1024), b''): + content += chunk + size += len(chunk) + since_ct = (datetime.utcnow() - ct).total_seconds() + if since_ct > 1: + ct = datetime.utcnow() + t = Transfer.get(item.id) + t.progress = len(content) / item.info['size'] + t.save() + trigger_event('transfer', { + 'id': item.id, 'progress': t.progress + }) + if state.bandwidth: + state.bandwidth.download(size/since_ct) + size = 0 + ''' + content = fileobj.read() + ''' t2 = datetime.utcnow() duration = (t2-t1).total_seconds() @@ -468,3 +474,8 @@ def check_nodes(): if not state.nodes.is_online(u.id): logger.debug('queued peering message for %s trying to connect...', u.id) state.nodes.queue('add', u.id) + for u in user.models.User.query.filter_by(peered=True): + if state.nodes.is_online(u.id): + u.pullChanges() + else: + u.go_online() diff --git a/oml/server.py b/oml/server.py index 3fd66af..8526cb3 100644 --- a/oml/server.py +++ b/oml/server.py @@ -110,6 +110,8 @@ def run(): import downloads import nodes import tor + import bandwidth + state.bandwidth = bandwidth.Bandwidth() state.tor = tor.Tor() state.node = node.server.start() state.downloads = downloads.Downloads() diff --git a/oml/state.py b/oml/state.py index 3181842..344dec9 100644 --- a/oml/state.py +++ b/oml/state.py @@ -1,3 +1,4 @@ +bandwidth = None host = None main = None nodes = False diff --git a/oml/update.py b/oml/update.py index 1d04bfa..b11c743 100644 --- a/oml/update.py +++ b/oml/update.py @@ -184,7 +184,7 @@ def getVersion(data): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) stdout, stderr = p.communicate() new = stdout.strip()[:40] - response['update'] = current != new + response['update'] = len(new) == 40 and current != new else: if not os.path.exists(os.path.join(settings.updates_path, 'release.json')): return response diff --git a/static/js/connectionButton.js b/static/js/connectionButton.js index 1ddcd7a..9c930e1 100644 --- a/static/js/connectionButton.js +++ b/static/js/connectionButton.js @@ -10,24 +10,34 @@ oml.ui.connectionButton = function() { }) .bindEvent({ // ... - }); + }), + bandwidth; /* - oml.ui.statusIcon(oml.user.online ? 'connected' : 'disconnected') + oml.ui.statusIcon(oml.user) .css({float: 'left'}) .appendTo(that); */ - Ox.Element() + function formatBandwidth(up, down) { + return '↓'+Ox.formatValue(down, 'b')+' / ↑'+Ox.formatValue(up, 'b')+''; + } + + bandwidth = Ox.Element() .addClass('OxLight') .css({ float: 'left', marginTop: '2px', fontSize: '9px' }) - .html('↓0K/↑0K') + .html(formatBandwidth(0, 0)) .appendTo(that); + oml.bindEvent({ + bandwidth: function(data) { + bandwidth.html(formatBandwidth(data.up, data.down)); + } + }); return that; -}; \ No newline at end of file +};