diff --git a/oml/downloads.py b/oml/downloads.py index cb02811..8e62375 100644 --- a/oml/downloads.py +++ b/oml/downloads.py @@ -1,17 +1,17 @@ # -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 - +import os from threading import Thread import time +from sqlitedict import SqliteDict + import db import state import settings import update -from websocket import trigger_event - import logging logger = logging.getLogger(__name__) @@ -21,6 +21,8 @@ class Downloads(Thread): Thread.__init__(self) self.daemon = True self.start() + self._dbpath = os.path.join(settings.data_path, 'transfers.db') + self.transfers = SqliteDict(self._dbpath, tablename='transfers', autocommit=False) def download_updates(self): now = int(time.mktime(time.gmtime())) @@ -32,18 +34,19 @@ class Downloads(Thread): def download_next(self): import item.models self.download_updates() - for t in item.models.Transfer.query.filter( - item.models.Transfer.added!=None, - item.models.Transfer.progress<1).order_by(item.models.Transfer.added): + downloads = list(self.transfers.items()) + downloads.sort(key=lambda t: t[1].get('added')) + for itemid, t in downloads: if state.shutdown: return False - for u in t.item.users: - if state.shutdown: - return False - if state.nodes.is_online(u.id): - logger.debug('DOWNLOAD %s %s', t.item, u) - r = state.nodes.download(u.id, t.item) - return True + if t.get('added') and t.get('progress', -1) < 1: + i = item.models.Item.get(itemid) + for u in i.users: + if state.shutdown: + return False + if state.nodes.is_online(u.id): + logger.debug('DOWNLOAD %s %s', i, u) + r = state.nodes.download(u.id, i) return False def run(self): @@ -55,6 +58,8 @@ class Downloads(Thread): self.wait(10) def join(self): + self.transfers.commit() + self.transfers.close() return Thread.join(self) def wait_online(self): diff --git a/oml/item/api.py b/oml/item/api.py index 9b94478..8f711a7 100644 --- a/oml/item/api.py +++ b/oml/item/api.py @@ -271,11 +271,9 @@ def cancelDownloads(data): ids = data['ids'] if ids: for item in models.Item.query.filter(models.Item.id.in_(ids)): - t = models.Transfer.get(item.id) + t = state.downloads.transfers.get(item.id) if t: - t.progress = None - t.added = None - t.save() + del state.downloads.transfers[item.id] p = state.user() if p in item.users: item.users.remove(p) diff --git a/oml/item/models.py b/oml/item/models.py index 5a5d054..7a1ac1e 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -120,11 +120,11 @@ class Item(db.Model): j['timesaccessed'] = self.timesaccessed j['accessed'] = self.accessed j['added'] = self.added - t = Transfer.get(self.id) - if t: - j['transferadded'] = t.added - j['transferprogress'] = t.progress - + if state.downloads: + t = state.downloads.transfers.get(self.id) + if t: + j['transferadded'] = t['added'] + j['transferprogress'] = t['progress'] # unused and slow #j['users'] = list(map(str, list(self.users))) @@ -261,9 +261,12 @@ class Item(db.Model): def update_mediastate(self): # available, unavailable, transferring - t = Transfer.get(self.id) - if t and t.added and t.progress < 1: - self.info['mediastate'] = 'transferring' + if state.downloads: + t = state.downloads.transfers.get(self.id) + if t and t.get('added') and t.get('progress', 0) < 1: + self.info['mediastate'] = 'transferring' + else: + self.info['mediastate'] = 'available' if self.files.count() else 'unavailable' else: self.info['mediastate'] = 'available' if self.files.count() else 'unavailable' @@ -286,7 +289,8 @@ class Item(db.Model): def delete(self, commit=True): Sort.query.filter_by(item_id=self.id).delete() - Transfer.query.filter_by(item_id=self.id).delete() + if state.downloads and self.id in state.downloads.transfers: + del state.downloads.transfers[self.id] state.db.session.delete(self) icons.clear('cover:%s' % self.id) icons.clear('preview:%s' % self.id) @@ -491,11 +495,10 @@ class Item(db.Model): def queue_download(self): u = state.user() if not u in self.users: - t = Transfer.get_or_create(self.id) - if not t.added: - t.added = datetime.utcnow() - t.progress = 0 - t.save() + state.downloads.transfers[self.id] = { + 'added': datetime.utcnow(), + 'progress': 0 + } logger.debug('queue %s for download', self.id) self.add_user(u) @@ -525,9 +528,8 @@ class Item(db.Model): self.meta[key] = f.info[key] if u not in self.users: self.add_user(u) - t = Transfer.get_or_create(self.id) - t.progress = 1 - t.save() + if state.downloads and self.id in state.downloads.transfers: + del state.downloads.transfers[self.id] self.added = datetime.utcnow() Changelog.record(u, 'additem', self.id, f.info) Changelog.record(u, 'edititem', self.id, self.meta) @@ -541,9 +543,8 @@ class Item(db.Model): return True else: logger.debug('TRIED TO SAVE EXISTING FILE!!!') - t = Transfer.get_or_create(self.id) - t.progress = 1 - t.save() + if state.downloads and self.id in state.downloads.transfers: + del state.downloads.transfers[self.id] self.update() return False @@ -566,7 +567,8 @@ class Item(db.Model): else: self.added = None self.update() - Transfer.query.filter_by(item_id=self.id).delete() + if self.id in state.downloads.transfers: + del state.downloads.transfers[self.id] Changelog.record(user, 'removeitem', self.id) class Sort(db.Model): diff --git a/oml/library.py b/oml/library.py index 3b6832f..775f19f 100644 --- a/oml/library.py +++ b/oml/library.py @@ -173,6 +173,7 @@ class Peer(object): def join(self): #self.library.join() + self.library.commit() self.library.close() self.sync_info() diff --git a/oml/nodes.py b/oml/nodes.py index f7a575f..cdb9d9c 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -290,7 +290,6 @@ class Node(Thread): } def download(self, item): - from item.models import Transfer self.resolve() url = '%s/get/%s' % (self.url, item.id) t1 = datetime.utcnow() @@ -316,19 +315,22 @@ class Node(Thread): since_ct = (datetime.utcnow() - ct).total_seconds() if since_ct > 1: ct = datetime.utcnow() - t = Transfer.get(item.id) - if not t.added: + if state.shutdown: + return False + t = state.downloads.transfers.get(item.id) + if not t: # transfer was canceled trigger_event('transfer', { 'id': item.id, 'progress': -1 }) return False else: - t.progress = size / item.info['size'] - t.save() + t['progress'] = size / item.info['size'] + state.downloads.transfers[item.id] = t trigger_event('transfer', { - 'id': item.id, 'progress': t.progress + 'id': item.id, 'progress': t['progress'] }) + state.downloads.transfers[item.id] = t if state.bandwidth: while not state.bandwidth.download(chunk_size) and not state.shutdown: time.sleep(0.1) diff --git a/oml/update.py b/oml/update.py index 25318ea..d0c94aa 100644 --- a/oml/update.py +++ b/oml/update.py @@ -518,7 +518,8 @@ def migrate_10(): def migrate_11(): with db.session() as session: - from user.models import User, Metadata + from user.models import User, Metadata, List + from item.models import Transfer from changelog import Changelog import utils for u in User.query.filter_by(peered=True): @@ -526,6 +527,9 @@ def migrate_11(): last = Changelog.query.filter_by(user_id=u.id).order_by('-revision').first() if last: peer.info['revision'] = last.revision + for l in List.query.filter_by(user_id=u.id): + if l.name: + peer.info['lists'][l.lame] = [i.id for i in l.get_items()] for m in Metadata.query.filter_by(user_id=u.id): peer.library[m.item_id] = { 'meta': dict(m.data), @@ -538,4 +542,16 @@ def migrate_11(): Changelog.query.filter_by(user_id=u.id).delete() Metadata.query.filter_by(user_id=u.id).delete() session.commit() + import state + import downloads + state.downloads = downloads.Downloads() + for t in Transfer.query: + if t.added: + state.downloads.transfers[t.item_id] = { + 'added': t.added, + 'progress': t.progress + } + state.db.session.add(t) + state.db.session.commit() + downloads.join() return 11