From 9f3374a7dc64922b5286862c3402468641969070 Mon Sep 17 00:00:00 2001 From: j Date: Mon, 18 Jan 2016 21:15:59 +0530 Subject: [PATCH] implement bandwidth throttling, fixes #189 --- oml/bandwidth.py | 68 +++++++++++++++++++++++++++++++++++++++++----- oml/node/server.py | 13 +++++---- oml/nodes.py | 22 ++++++--------- 3 files changed, 76 insertions(+), 27 deletions(-) diff --git a/oml/bandwidth.py b/oml/bandwidth.py index ad7535e..df650fc 100644 --- a/oml/bandwidth.py +++ b/oml/bandwidth.py @@ -1,3 +1,6 @@ +from time import time + +import settings import state from websocket import trigger_event @@ -5,25 +8,76 @@ import logging logger = logging.getLogger(__name__) class Bandwidth(object): - up = 0 - down = 0 _last = {} def __init__(self): + self._upload = Bucket('uploadRate') + self._download = Bucket('downloadRate') self.update() def update(self): - bandwidth = {'up': self.up, 'down': self.down} + bandwidth = { + 'up': self._upload.rate, + 'down': self._download.rate + } if bandwidth != self._last: trigger_event('bandwidth', bandwidth) self._last = bandwidth - self.up = 0 - self.down = 0 state.main.call_later(1, self.update) def download(self, amount): - self.down += amount + return self._download.consume(amount) def upload(self, amount): - self.up += amount + return self._upload.consume(amount) + +class Bucket(object): + _rate = 0 + fill_rate = None + + def __init__(self, pref): + self._pref = pref + self.update() + self.timestamp = time() + self.rate_timestamp = time() + + def update(self): + rate = settings.preferences.get(self._pref, '') + if rate and rate.isdigit(): + rate = max(float(rate) * 1024, 0) + if rate != self.fill_rate: + self.capacity = max(rate, 2*16*1024) + self._tokens = rate + self.fill_rate = rate + else: + self.fill_rate = None + + def consume(self, tokens): + self.update() + if self.fill_rate is None: + self._rate += tokens + return True + if tokens <= self.tokens: + self._tokens -= tokens + self._rate += tokens + else: + return False + return True + + def get_tokens(self): + now = time() + if self._tokens < self.capacity: + delta = self.fill_rate * (now - self.timestamp) + self._tokens = min(self.capacity, self._tokens + delta) + self.timestamp = now + return self._tokens + tokens = property(get_tokens) + + def get_rate(self): + now = time() + rate = self._rate * (now - self.rate_timestamp) + self._rate = 0 + self.rate_timestamp = now + return rate + rate = property(get_rate) diff --git a/oml/node/server.py b/oml/node/server.py index 169a803..6d90a6b 100644 --- a/oml/node/server.py +++ b/oml/node/server.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 -from datetime import datetime from socketserver import ThreadingMixIn from threading import Thread import base64 @@ -12,6 +11,7 @@ import json import os import socket import socketserver +import time from Crypto.PublicKey import RSA from Crypto.Util.asn1 import DerSequence @@ -70,6 +70,7 @@ class TLSTCPServer(socketserver.TCPServer): pass class NodeServer(ThreadingMixIn, TLSTCPServer): + _running = True allow_reuse_address = True @@ -151,19 +152,18 @@ class Handler(http.server.SimpleHTTPRequestHandler): self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL) self.send_header('Content-Length', str(os.path.getsize(path))) self.end_headers() - ct = datetime.utcnow() + chunk_size = 16*1024 with open(path, 'rb') as f: size = 0 while 1: - data = f.read(16384) + data = f.read(chunk_size) if not data: break size += len(data) self.wfile.write(data) if state.bandwidth: - since_ct = (datetime.utcnow() - ct).total_seconds() - state.bandwidth.upload(size/since_ct) - size = 0 + while not state.bandwidth.upload(chunk_size) and self.server._running: + time.sleep(0.1) else: self.send_response(200, 'OK') self.send_header('Content-type', 'text/plain') @@ -278,6 +278,7 @@ class Server(Thread): def stop(self): if self.http_server: + self.http_server._running = False self.http_server.shutdown() self.http_server.socket.close() return Thread.join(self) diff --git a/oml/nodes.py b/oml/nodes.py index 89e8ef5..8782951 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -312,7 +312,7 @@ class Node(Thread): logger.debug('download %s', url) self._opener.addheaders = list(zip(self.headers.keys(), self.headers.values())) try: - r = self._opener.open(url, timeout=self.TIMEOUT*2) + r = self._opener.open(url, timeout=self.TIMEOUT*5) except: logger.debug('openurl failed %s', url, exc_info=1) return False @@ -324,7 +324,8 @@ class Node(Thread): content = b'' ct = datetime.utcnow() size = 0 - for chunk in iter(lambda: fileobj.read(16*1024), b''): + chunk_size = 16*1024 + for chunk in iter(lambda: fileobj.read(chunk_size), b''): content += chunk size += len(chunk) since_ct = (datetime.utcnow() - ct).total_seconds() @@ -335,25 +336,18 @@ class Node(Thread): # transfer was canceled return False else: - t.progress = len(content) / item.info['size'] + t.progress = size / item.info['size'] t.save() trigger_event('transfer', { 'id': item.id, 'progress': t.progress }) - if state.bandwidth: - state.bandwidth.download(size/since_ct) - size = 0 - ''' - content = fileobj.read() - ''' - if state.bandwidth: - state.bandwidth.download(size/since_ct) - size = 0 - + if state.bandwidth: + while not state.bandwidth.download(chunk_size) and self._running: + time.sleep(0.1) t2 = datetime.utcnow() duration = (t2-t1).total_seconds() if duration: - self.download_speed = len(content) / duration + self.download_speed = size / duration return item.save_file(content) except: logger.debug('download failed %s', url, exc_info=1)