implement bandwidth throttling, fixes #189
This commit is contained in:
parent
defbbf290f
commit
9f3374a7dc
3 changed files with 76 additions and 27 deletions
|
@ -1,3 +1,6 @@
|
||||||
|
from time import time
|
||||||
|
|
||||||
|
import settings
|
||||||
import state
|
import state
|
||||||
from websocket import trigger_event
|
from websocket import trigger_event
|
||||||
|
|
||||||
|
@ -5,25 +8,76 @@ import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class Bandwidth(object):
|
class Bandwidth(object):
|
||||||
up = 0
|
|
||||||
down = 0
|
|
||||||
_last = {}
|
_last = {}
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
self._upload = Bucket('uploadRate')
|
||||||
|
self._download = Bucket('downloadRate')
|
||||||
self.update()
|
self.update()
|
||||||
|
|
||||||
def update(self):
|
def update(self):
|
||||||
bandwidth = {'up': self.up, 'down': self.down}
|
bandwidth = {
|
||||||
|
'up': self._upload.rate,
|
||||||
|
'down': self._download.rate
|
||||||
|
}
|
||||||
if bandwidth != self._last:
|
if bandwidth != self._last:
|
||||||
trigger_event('bandwidth', bandwidth)
|
trigger_event('bandwidth', bandwidth)
|
||||||
self._last = bandwidth
|
self._last = bandwidth
|
||||||
self.up = 0
|
|
||||||
self.down = 0
|
|
||||||
state.main.call_later(1, self.update)
|
state.main.call_later(1, self.update)
|
||||||
|
|
||||||
def download(self, amount):
|
def download(self, amount):
|
||||||
self.down += amount
|
return self._download.consume(amount)
|
||||||
|
|
||||||
def upload(self, amount):
|
def upload(self, amount):
|
||||||
self.up += amount
|
return self._upload.consume(amount)
|
||||||
|
|
||||||
|
|
||||||
|
class Bucket(object):
|
||||||
|
_rate = 0
|
||||||
|
fill_rate = None
|
||||||
|
|
||||||
|
def __init__(self, pref):
|
||||||
|
self._pref = pref
|
||||||
|
self.update()
|
||||||
|
self.timestamp = time()
|
||||||
|
self.rate_timestamp = time()
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
rate = settings.preferences.get(self._pref, '')
|
||||||
|
if rate and rate.isdigit():
|
||||||
|
rate = max(float(rate) * 1024, 0)
|
||||||
|
if rate != self.fill_rate:
|
||||||
|
self.capacity = max(rate, 2*16*1024)
|
||||||
|
self._tokens = rate
|
||||||
|
self.fill_rate = rate
|
||||||
|
else:
|
||||||
|
self.fill_rate = None
|
||||||
|
|
||||||
|
def consume(self, tokens):
|
||||||
|
self.update()
|
||||||
|
if self.fill_rate is None:
|
||||||
|
self._rate += tokens
|
||||||
|
return True
|
||||||
|
if tokens <= self.tokens:
|
||||||
|
self._tokens -= tokens
|
||||||
|
self._rate += tokens
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_tokens(self):
|
||||||
|
now = time()
|
||||||
|
if self._tokens < self.capacity:
|
||||||
|
delta = self.fill_rate * (now - self.timestamp)
|
||||||
|
self._tokens = min(self.capacity, self._tokens + delta)
|
||||||
|
self.timestamp = now
|
||||||
|
return self._tokens
|
||||||
|
tokens = property(get_tokens)
|
||||||
|
|
||||||
|
def get_rate(self):
|
||||||
|
now = time()
|
||||||
|
rate = self._rate * (now - self.rate_timestamp)
|
||||||
|
self._rate = 0
|
||||||
|
self.rate_timestamp = now
|
||||||
|
return rate
|
||||||
|
rate = property(get_rate)
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# vi:si:et:sw=4:sts=4:ts=4
|
# vi:si:et:sw=4:sts=4:ts=4
|
||||||
from datetime import datetime
|
|
||||||
from socketserver import ThreadingMixIn
|
from socketserver import ThreadingMixIn
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
import base64
|
import base64
|
||||||
|
@ -12,6 +11,7 @@ import json
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
import socketserver
|
import socketserver
|
||||||
|
import time
|
||||||
|
|
||||||
from Crypto.PublicKey import RSA
|
from Crypto.PublicKey import RSA
|
||||||
from Crypto.Util.asn1 import DerSequence
|
from Crypto.Util.asn1 import DerSequence
|
||||||
|
@ -70,6 +70,7 @@ class TLSTCPServer(socketserver.TCPServer):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class NodeServer(ThreadingMixIn, TLSTCPServer):
|
class NodeServer(ThreadingMixIn, TLSTCPServer):
|
||||||
|
_running = True
|
||||||
allow_reuse_address = True
|
allow_reuse_address = True
|
||||||
|
|
||||||
|
|
||||||
|
@ -151,19 +152,18 @@ class Handler(http.server.SimpleHTTPRequestHandler):
|
||||||
self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL)
|
self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL)
|
||||||
self.send_header('Content-Length', str(os.path.getsize(path)))
|
self.send_header('Content-Length', str(os.path.getsize(path)))
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
ct = datetime.utcnow()
|
chunk_size = 16*1024
|
||||||
with open(path, 'rb') as f:
|
with open(path, 'rb') as f:
|
||||||
size = 0
|
size = 0
|
||||||
while 1:
|
while 1:
|
||||||
data = f.read(16384)
|
data = f.read(chunk_size)
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
size += len(data)
|
size += len(data)
|
||||||
self.wfile.write(data)
|
self.wfile.write(data)
|
||||||
if state.bandwidth:
|
if state.bandwidth:
|
||||||
since_ct = (datetime.utcnow() - ct).total_seconds()
|
while not state.bandwidth.upload(chunk_size) and self.server._running:
|
||||||
state.bandwidth.upload(size/since_ct)
|
time.sleep(0.1)
|
||||||
size = 0
|
|
||||||
else:
|
else:
|
||||||
self.send_response(200, 'OK')
|
self.send_response(200, 'OK')
|
||||||
self.send_header('Content-type', 'text/plain')
|
self.send_header('Content-type', 'text/plain')
|
||||||
|
@ -278,6 +278,7 @@ class Server(Thread):
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self.http_server:
|
if self.http_server:
|
||||||
|
self.http_server._running = False
|
||||||
self.http_server.shutdown()
|
self.http_server.shutdown()
|
||||||
self.http_server.socket.close()
|
self.http_server.socket.close()
|
||||||
return Thread.join(self)
|
return Thread.join(self)
|
||||||
|
|
22
oml/nodes.py
22
oml/nodes.py
|
@ -312,7 +312,7 @@ class Node(Thread):
|
||||||
logger.debug('download %s', url)
|
logger.debug('download %s', url)
|
||||||
self._opener.addheaders = list(zip(self.headers.keys(), self.headers.values()))
|
self._opener.addheaders = list(zip(self.headers.keys(), self.headers.values()))
|
||||||
try:
|
try:
|
||||||
r = self._opener.open(url, timeout=self.TIMEOUT*2)
|
r = self._opener.open(url, timeout=self.TIMEOUT*5)
|
||||||
except:
|
except:
|
||||||
logger.debug('openurl failed %s', url, exc_info=1)
|
logger.debug('openurl failed %s', url, exc_info=1)
|
||||||
return False
|
return False
|
||||||
|
@ -324,7 +324,8 @@ class Node(Thread):
|
||||||
content = b''
|
content = b''
|
||||||
ct = datetime.utcnow()
|
ct = datetime.utcnow()
|
||||||
size = 0
|
size = 0
|
||||||
for chunk in iter(lambda: fileobj.read(16*1024), b''):
|
chunk_size = 16*1024
|
||||||
|
for chunk in iter(lambda: fileobj.read(chunk_size), b''):
|
||||||
content += chunk
|
content += chunk
|
||||||
size += len(chunk)
|
size += len(chunk)
|
||||||
since_ct = (datetime.utcnow() - ct).total_seconds()
|
since_ct = (datetime.utcnow() - ct).total_seconds()
|
||||||
|
@ -335,25 +336,18 @@ class Node(Thread):
|
||||||
# transfer was canceled
|
# transfer was canceled
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
t.progress = len(content) / item.info['size']
|
t.progress = size / item.info['size']
|
||||||
t.save()
|
t.save()
|
||||||
trigger_event('transfer', {
|
trigger_event('transfer', {
|
||||||
'id': item.id, 'progress': t.progress
|
'id': item.id, 'progress': t.progress
|
||||||
})
|
})
|
||||||
if state.bandwidth:
|
if state.bandwidth:
|
||||||
state.bandwidth.download(size/since_ct)
|
while not state.bandwidth.download(chunk_size) and self._running:
|
||||||
size = 0
|
time.sleep(0.1)
|
||||||
'''
|
|
||||||
content = fileobj.read()
|
|
||||||
'''
|
|
||||||
if state.bandwidth:
|
|
||||||
state.bandwidth.download(size/since_ct)
|
|
||||||
size = 0
|
|
||||||
|
|
||||||
t2 = datetime.utcnow()
|
t2 = datetime.utcnow()
|
||||||
duration = (t2-t1).total_seconds()
|
duration = (t2-t1).total_seconds()
|
||||||
if duration:
|
if duration:
|
||||||
self.download_speed = len(content) / duration
|
self.download_speed = size / duration
|
||||||
return item.save_file(content)
|
return item.save_file(content)
|
||||||
except:
|
except:
|
||||||
logger.debug('download failed %s', url, exc_info=1)
|
logger.debug('download failed %s', url, exc_info=1)
|
||||||
|
|
Loading…
Reference in a new issue