cleanup download, download from local nodes if offline, cache per download users syndb only at startup
This commit is contained in:
parent
ba4f311fe1
commit
988d030f92
4 changed files with 45 additions and 42 deletions
|
@ -45,22 +45,24 @@ class Downloads(Thread):
|
||||||
if itemid not in self.transfers:
|
if itemid not in self.transfers:
|
||||||
continue
|
continue
|
||||||
if t.get('added') and t.get('progress', -1) < 1:
|
if t.get('added') and t.get('progress', -1) < 1:
|
||||||
i = item.models.Item.get(itemid)
|
if not 'users' in t:
|
||||||
for u in i.users:
|
i = item.models.Item.get(itemid)
|
||||||
|
t['users'] = [u.id for u in i.users]
|
||||||
|
for uid in t['users']:
|
||||||
if state.shutdown:
|
if state.shutdown:
|
||||||
return False
|
return False
|
||||||
if state.nodes.is_online(u.id):
|
if state.nodes.is_online(uid):
|
||||||
logger.debug('DOWNLOAD %s %s', i, u)
|
logger.debug('DOWNLOAD %s %s', i, uid)
|
||||||
r = state.nodes.download(u.id, i)
|
if state.nodes.download(uid, i):
|
||||||
|
break
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.wait(10)
|
self.wait(10)
|
||||||
while not state.shutdown:
|
while not state.shutdown:
|
||||||
if self.wait_online():
|
with db.session():
|
||||||
with db.session():
|
self.download_next()
|
||||||
self.download_next()
|
self.wait(10)
|
||||||
self.wait(10)
|
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
self.transfers.commit()
|
self.transfers.commit()
|
||||||
|
|
|
@ -565,6 +565,7 @@ class Item(db.Model):
|
||||||
if state.downloads and self.id in state.downloads.transfers:
|
if state.downloads and self.id in state.downloads.transfers:
|
||||||
del state.downloads.transfers[self.id]
|
del state.downloads.transfers[self.id]
|
||||||
self.update()
|
self.update()
|
||||||
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def remove_file(self):
|
def remove_file(self):
|
||||||
|
|
64
oml/nodes.py
64
oml/nodes.py
|
@ -8,7 +8,6 @@ import json
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import gzip
|
import gzip
|
||||||
import urllib.request, urllib.error, urllib.parse
|
import urllib.request, urllib.error, urllib.parse
|
||||||
from datetime import datetime
|
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import socket
|
import socket
|
||||||
|
@ -306,31 +305,30 @@ class Node(Thread):
|
||||||
return False
|
return False
|
||||||
if r.getcode() == 200:
|
if r.getcode() == 200:
|
||||||
try:
|
try:
|
||||||
fileobj = r
|
|
||||||
if r.headers.get('content-encoding', None) == 'gzip':
|
if r.headers.get('content-encoding', None) == 'gzip':
|
||||||
fileobj = gzip.GzipFile(fileobj=r)
|
fileobj = gzip.GzipFile(fileobj=r)
|
||||||
content = b''
|
else:
|
||||||
ct = datetime.utcnow()
|
fileobj = r
|
||||||
size = 0
|
content = []
|
||||||
|
ct = time.time()
|
||||||
|
size = item.info['size']
|
||||||
|
received = 0
|
||||||
chunk_size = 16*1024
|
chunk_size = 16*1024
|
||||||
for chunk in iter(lambda: fileobj.read(chunk_size), b''):
|
for chunk in iter(lambda: fileobj.read(chunk_size), b''):
|
||||||
content += chunk
|
content.append(chunk)
|
||||||
size += len(chunk)
|
received += len(chunk)
|
||||||
since_ct = (datetime.utcnow() - ct).total_seconds()
|
if time.time() - ct > 1:
|
||||||
if since_ct > 1:
|
ct = time.time()
|
||||||
ct = datetime.utcnow()
|
|
||||||
if state.shutdown:
|
if state.shutdown:
|
||||||
return False
|
return False
|
||||||
t = state.downloads.transfers.get(item.id)
|
t = state.downloads.transfers.get(item.id)
|
||||||
if not t:
|
if not t: # transfer was canceled
|
||||||
# transfer was canceled
|
|
||||||
trigger_event('transfer', {
|
trigger_event('transfer', {
|
||||||
'id': item.id, 'progress': -1
|
'id': item.id, 'progress': -1
|
||||||
})
|
})
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
t['progress'] = size / item.info['size']
|
t['progress'] = received / size
|
||||||
state.downloads.transfers[item.id] = t
|
|
||||||
trigger_event('transfer', {
|
trigger_event('transfer', {
|
||||||
'id': item.id, 'progress': t['progress']
|
'id': item.id, 'progress': t['progress']
|
||||||
})
|
})
|
||||||
|
@ -338,7 +336,7 @@ class Node(Thread):
|
||||||
if state.bandwidth:
|
if state.bandwidth:
|
||||||
while not state.bandwidth.download(chunk_size) and not state.shutdown:
|
while not state.bandwidth.download(chunk_size) and not state.shutdown:
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
return item.save_file(content)
|
return item.save_file(b''.join(content))
|
||||||
except:
|
except:
|
||||||
logger.debug('download failed %s', url, exc_info=True)
|
logger.debug('download failed %s', url, exc_info=True)
|
||||||
return False
|
return False
|
||||||
|
@ -368,9 +366,10 @@ class Node(Thread):
|
||||||
code = r.getcode()
|
code = r.getcode()
|
||||||
if code == 200:
|
if code == 200:
|
||||||
try:
|
try:
|
||||||
fileobj = r
|
|
||||||
if r.headers.get('content-encoding', None) == 'gzip':
|
if r.headers.get('content-encoding', None) == 'gzip':
|
||||||
fileobj = gzip.GzipFile(fileobj=r)
|
fileobj = gzip.GzipFile(fileobj=r)
|
||||||
|
else:
|
||||||
|
fileobj = r
|
||||||
content = fileobj.read()
|
content = fileobj.read()
|
||||||
key = 'preview:' + item_id
|
key = 'preview:' + item_id
|
||||||
icons[key] = content
|
icons[key] = content
|
||||||
|
@ -431,12 +430,28 @@ class Nodes(Thread):
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
library.sync_db()
|
||||||
|
self.queue('pull')
|
||||||
|
while not state.shutdown:
|
||||||
|
args = self._q.get()
|
||||||
|
if args:
|
||||||
|
if args[0] == 'cleanup':
|
||||||
|
self.cleanup()
|
||||||
|
elif args[0] == 'add':
|
||||||
|
self._add(*args[1:])
|
||||||
|
elif args[0] == 'pull':
|
||||||
|
self._pull()
|
||||||
|
else:
|
||||||
|
self._call(*args)
|
||||||
|
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
if not state.shutdown and self._local:
|
if not state.shutdown and self._local:
|
||||||
self._local.cleanup()
|
self._local.cleanup()
|
||||||
|
|
||||||
def pull(self):
|
def pull(self):
|
||||||
if state.online and not self._pulling:
|
if not self._pulling:
|
||||||
self.queue('pull')
|
self.queue('pull')
|
||||||
|
|
||||||
def queue(self, *args):
|
def queue(self, *args):
|
||||||
|
@ -482,7 +497,6 @@ class Nodes(Thread):
|
||||||
if state.activity and state.activity.get('activity') == 'import':
|
if state.activity and state.activity.get('activity') == 'import':
|
||||||
return
|
return
|
||||||
self._pulling = True
|
self._pulling = True
|
||||||
library.sync_db()
|
|
||||||
if state.shutdown:
|
if state.shutdown:
|
||||||
return
|
return
|
||||||
users = []
|
users = []
|
||||||
|
@ -499,20 +513,6 @@ class Nodes(Thread):
|
||||||
node.pullChanges()
|
node.pullChanges()
|
||||||
self._pulling = False
|
self._pulling = False
|
||||||
|
|
||||||
def run(self):
|
|
||||||
self.queue('pull')
|
|
||||||
while not state.shutdown:
|
|
||||||
args = self._q.get()
|
|
||||||
if args:
|
|
||||||
if args[0] == 'cleanup':
|
|
||||||
self.cleanup()
|
|
||||||
elif args[0] == 'add':
|
|
||||||
self._add(*args[1:])
|
|
||||||
elif args[0] == 'pull':
|
|
||||||
self._pull()
|
|
||||||
else:
|
|
||||||
self._call(*args)
|
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
self._q.put(None)
|
self._q.put(None)
|
||||||
for node in list(self._nodes.values()):
|
for node in list(self._nodes.values()):
|
||||||
|
|
|
@ -318,7 +318,7 @@ class List(db.Model):
|
||||||
for item_id in items:
|
for item_id in items:
|
||||||
i = Item.get(item_id)
|
i = Item.get(item_id)
|
||||||
if i:
|
if i:
|
||||||
if i.info['mediastate'] != 'available' and self.user_id == settings.USER_ID:
|
if self.user_id == settings.USER_ID and i.info.get('mediastate') != 'available':
|
||||||
i.queue_download()
|
i.queue_download()
|
||||||
if i not in self.items:
|
if i not in self.items:
|
||||||
self.items.append(i)
|
self.items.append(i)
|
||||||
|
|
Loading…
Reference in a new issue