diff --git a/oml/downloads.py b/oml/downloads.py index 388cdbd..f15cf19 100644 --- a/oml/downloads.py +++ b/oml/downloads.py @@ -45,22 +45,24 @@ class Downloads(Thread): if itemid not in self.transfers: continue if t.get('added') and t.get('progress', -1) < 1: - i = item.models.Item.get(itemid) - for u in i.users: + if not 'users' in t: + i = item.models.Item.get(itemid) + t['users'] = [u.id for u in i.users] + for uid in t['users']: if state.shutdown: return False - if state.nodes.is_online(u.id): - logger.debug('DOWNLOAD %s %s', i, u) - r = state.nodes.download(u.id, i) + if state.nodes.is_online(uid): + logger.debug('DOWNLOAD %s %s', i, uid) + if state.nodes.download(uid, i): + break return False def run(self): self.wait(10) while not state.shutdown: - if self.wait_online(): - with db.session(): - self.download_next() - self.wait(10) + with db.session(): + self.download_next() + self.wait(10) def join(self): self.transfers.commit() diff --git a/oml/item/models.py b/oml/item/models.py index 82aa153..4d52ac4 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -565,6 +565,7 @@ class Item(db.Model): if state.downloads and self.id in state.downloads.transfers: del state.downloads.transfers[self.id] self.update() + return True return False def remove_file(self): diff --git a/oml/nodes.py b/oml/nodes.py index 3508ac6..88d1aaf 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -8,7 +8,6 @@ import json from io import BytesIO import gzip import urllib.request, urllib.error, urllib.parse -from datetime import datetime import os import time import socket @@ -306,31 +305,30 @@ class Node(Thread): return False if r.getcode() == 200: try: - fileobj = r if r.headers.get('content-encoding', None) == 'gzip': fileobj = gzip.GzipFile(fileobj=r) - content = b'' - ct = datetime.utcnow() - size = 0 + else: + fileobj = r + content = [] + ct = time.time() + size = item.info['size'] + received = 0 chunk_size = 16*1024 for chunk in iter(lambda: fileobj.read(chunk_size), b''): - content += chunk - size += len(chunk) - since_ct = (datetime.utcnow() - ct).total_seconds() - if since_ct > 1: - ct = datetime.utcnow() + content.append(chunk) + received += len(chunk) + if time.time() - ct > 1: + ct = time.time() if state.shutdown: return False t = state.downloads.transfers.get(item.id) - if not t: - # transfer was canceled + if not t: # transfer was canceled trigger_event('transfer', { 'id': item.id, 'progress': -1 }) return False else: - t['progress'] = size / item.info['size'] - state.downloads.transfers[item.id] = t + t['progress'] = received / size trigger_event('transfer', { 'id': item.id, 'progress': t['progress'] }) @@ -338,7 +336,7 @@ class Node(Thread): if state.bandwidth: while not state.bandwidth.download(chunk_size) and not state.shutdown: time.sleep(0.1) - return item.save_file(content) + return item.save_file(b''.join(content)) except: logger.debug('download failed %s', url, exc_info=True) return False @@ -368,9 +366,10 @@ class Node(Thread): code = r.getcode() if code == 200: try: - fileobj = r if r.headers.get('content-encoding', None) == 'gzip': fileobj = gzip.GzipFile(fileobj=r) + else: + fileobj = r content = fileobj.read() key = 'preview:' + item_id icons[key] = content @@ -431,12 +430,28 @@ class Nodes(Thread): self.daemon = True self.start() + def run(self): + library.sync_db() + self.queue('pull') + while not state.shutdown: + args = self._q.get() + if args: + if args[0] == 'cleanup': + self.cleanup() + elif args[0] == 'add': + self._add(*args[1:]) + elif args[0] == 'pull': + self._pull() + else: + self._call(*args) + + def cleanup(self): if not state.shutdown and self._local: self._local.cleanup() def pull(self): - if state.online and not self._pulling: + if not self._pulling: self.queue('pull') def queue(self, *args): @@ -482,7 +497,6 @@ class Nodes(Thread): if state.activity and state.activity.get('activity') == 'import': return self._pulling = True - library.sync_db() if state.shutdown: return users = [] @@ -499,20 +513,6 @@ class Nodes(Thread): node.pullChanges() self._pulling = False - def run(self): - self.queue('pull') - while not state.shutdown: - args = self._q.get() - if args: - if args[0] == 'cleanup': - self.cleanup() - elif args[0] == 'add': - self._add(*args[1:]) - elif args[0] == 'pull': - self._pull() - else: - self._call(*args) - def join(self): self._q.put(None) for node in list(self._nodes.values()): diff --git a/oml/user/models.py b/oml/user/models.py index 703d2cc..d0b4b24 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -318,7 +318,7 @@ class List(db.Model): for item_id in items: i = Item.get(item_id) if i: - if i.info['mediastate'] != 'available' and self.user_id == settings.USER_ID: + if self.user_id == settings.USER_ID and i.info.get('mediastate') != 'available': i.queue_download() if i not in self.items: self.items.append(i)