move transfers into sqlitedict
This commit is contained in:
parent
9a9185d3d5
commit
9d7a553b95
6 changed files with 69 additions and 45 deletions
|
@ -1,17 +1,17 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# vi:si:et:sw=4:sts=4:ts=4
|
||||
|
||||
|
||||
import os
|
||||
from threading import Thread
|
||||
import time
|
||||
|
||||
from sqlitedict import SqliteDict
|
||||
|
||||
import db
|
||||
import state
|
||||
import settings
|
||||
import update
|
||||
|
||||
from websocket import trigger_event
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -21,6 +21,8 @@ class Downloads(Thread):
|
|||
Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.start()
|
||||
self._dbpath = os.path.join(settings.data_path, 'transfers.db')
|
||||
self.transfers = SqliteDict(self._dbpath, tablename='transfers', autocommit=False)
|
||||
|
||||
def download_updates(self):
|
||||
now = int(time.mktime(time.gmtime()))
|
||||
|
@ -32,18 +34,19 @@ class Downloads(Thread):
|
|||
def download_next(self):
|
||||
import item.models
|
||||
self.download_updates()
|
||||
for t in item.models.Transfer.query.filter(
|
||||
item.models.Transfer.added!=None,
|
||||
item.models.Transfer.progress<1).order_by(item.models.Transfer.added):
|
||||
downloads = list(self.transfers.items())
|
||||
downloads.sort(key=lambda t: t[1].get('added'))
|
||||
for itemid, t in downloads:
|
||||
if state.shutdown:
|
||||
return False
|
||||
for u in t.item.users:
|
||||
if t.get('added') and t.get('progress', -1) < 1:
|
||||
i = item.models.Item.get(itemid)
|
||||
for u in i.users:
|
||||
if state.shutdown:
|
||||
return False
|
||||
if state.nodes.is_online(u.id):
|
||||
logger.debug('DOWNLOAD %s %s', t.item, u)
|
||||
r = state.nodes.download(u.id, t.item)
|
||||
return True
|
||||
logger.debug('DOWNLOAD %s %s', i, u)
|
||||
r = state.nodes.download(u.id, i)
|
||||
return False
|
||||
|
||||
def run(self):
|
||||
|
@ -55,6 +58,8 @@ class Downloads(Thread):
|
|||
self.wait(10)
|
||||
|
||||
def join(self):
|
||||
self.transfers.commit()
|
||||
self.transfers.close()
|
||||
return Thread.join(self)
|
||||
|
||||
def wait_online(self):
|
||||
|
|
|
@ -271,11 +271,9 @@ def cancelDownloads(data):
|
|||
ids = data['ids']
|
||||
if ids:
|
||||
for item in models.Item.query.filter(models.Item.id.in_(ids)):
|
||||
t = models.Transfer.get(item.id)
|
||||
t = state.downloads.transfers.get(item.id)
|
||||
if t:
|
||||
t.progress = None
|
||||
t.added = None
|
||||
t.save()
|
||||
del state.downloads.transfers[item.id]
|
||||
p = state.user()
|
||||
if p in item.users:
|
||||
item.users.remove(p)
|
||||
|
|
|
@ -120,11 +120,11 @@ class Item(db.Model):
|
|||
j['timesaccessed'] = self.timesaccessed
|
||||
j['accessed'] = self.accessed
|
||||
j['added'] = self.added
|
||||
t = Transfer.get(self.id)
|
||||
if state.downloads:
|
||||
t = state.downloads.transfers.get(self.id)
|
||||
if t:
|
||||
j['transferadded'] = t.added
|
||||
j['transferprogress'] = t.progress
|
||||
|
||||
j['transferadded'] = t['added']
|
||||
j['transferprogress'] = t['progress']
|
||||
# unused and slow
|
||||
#j['users'] = list(map(str, list(self.users)))
|
||||
|
||||
|
@ -261,11 +261,14 @@ class Item(db.Model):
|
|||
|
||||
def update_mediastate(self):
|
||||
# available, unavailable, transferring
|
||||
t = Transfer.get(self.id)
|
||||
if t and t.added and t.progress < 1:
|
||||
if state.downloads:
|
||||
t = state.downloads.transfers.get(self.id)
|
||||
if t and t.get('added') and t.get('progress', 0) < 1:
|
||||
self.info['mediastate'] = 'transferring'
|
||||
else:
|
||||
self.info['mediastate'] = 'available' if self.files.count() else 'unavailable'
|
||||
else:
|
||||
self.info['mediastate'] = 'available' if self.files.count() else 'unavailable'
|
||||
|
||||
def update(self, modified=None, commit=True):
|
||||
self.update_mediastate()
|
||||
|
@ -286,7 +289,8 @@ class Item(db.Model):
|
|||
|
||||
def delete(self, commit=True):
|
||||
Sort.query.filter_by(item_id=self.id).delete()
|
||||
Transfer.query.filter_by(item_id=self.id).delete()
|
||||
if state.downloads and self.id in state.downloads.transfers:
|
||||
del state.downloads.transfers[self.id]
|
||||
state.db.session.delete(self)
|
||||
icons.clear('cover:%s' % self.id)
|
||||
icons.clear('preview:%s' % self.id)
|
||||
|
@ -491,11 +495,10 @@ class Item(db.Model):
|
|||
def queue_download(self):
|
||||
u = state.user()
|
||||
if not u in self.users:
|
||||
t = Transfer.get_or_create(self.id)
|
||||
if not t.added:
|
||||
t.added = datetime.utcnow()
|
||||
t.progress = 0
|
||||
t.save()
|
||||
state.downloads.transfers[self.id] = {
|
||||
'added': datetime.utcnow(),
|
||||
'progress': 0
|
||||
}
|
||||
logger.debug('queue %s for download', self.id)
|
||||
self.add_user(u)
|
||||
|
||||
|
@ -525,9 +528,8 @@ class Item(db.Model):
|
|||
self.meta[key] = f.info[key]
|
||||
if u not in self.users:
|
||||
self.add_user(u)
|
||||
t = Transfer.get_or_create(self.id)
|
||||
t.progress = 1
|
||||
t.save()
|
||||
if state.downloads and self.id in state.downloads.transfers:
|
||||
del state.downloads.transfers[self.id]
|
||||
self.added = datetime.utcnow()
|
||||
Changelog.record(u, 'additem', self.id, f.info)
|
||||
Changelog.record(u, 'edititem', self.id, self.meta)
|
||||
|
@ -541,9 +543,8 @@ class Item(db.Model):
|
|||
return True
|
||||
else:
|
||||
logger.debug('TRIED TO SAVE EXISTING FILE!!!')
|
||||
t = Transfer.get_or_create(self.id)
|
||||
t.progress = 1
|
||||
t.save()
|
||||
if state.downloads and self.id in state.downloads.transfers:
|
||||
del state.downloads.transfers[self.id]
|
||||
self.update()
|
||||
return False
|
||||
|
||||
|
@ -566,7 +567,8 @@ class Item(db.Model):
|
|||
else:
|
||||
self.added = None
|
||||
self.update()
|
||||
Transfer.query.filter_by(item_id=self.id).delete()
|
||||
if self.id in state.downloads.transfers:
|
||||
del state.downloads.transfers[self.id]
|
||||
Changelog.record(user, 'removeitem', self.id)
|
||||
|
||||
class Sort(db.Model):
|
||||
|
|
|
@ -173,6 +173,7 @@ class Peer(object):
|
|||
|
||||
def join(self):
|
||||
#self.library.join()
|
||||
self.library.commit()
|
||||
self.library.close()
|
||||
self.sync_info()
|
||||
|
||||
|
|
14
oml/nodes.py
14
oml/nodes.py
|
@ -290,7 +290,6 @@ class Node(Thread):
|
|||
}
|
||||
|
||||
def download(self, item):
|
||||
from item.models import Transfer
|
||||
self.resolve()
|
||||
url = '%s/get/%s' % (self.url, item.id)
|
||||
t1 = datetime.utcnow()
|
||||
|
@ -316,19 +315,22 @@ class Node(Thread):
|
|||
since_ct = (datetime.utcnow() - ct).total_seconds()
|
||||
if since_ct > 1:
|
||||
ct = datetime.utcnow()
|
||||
t = Transfer.get(item.id)
|
||||
if not t.added:
|
||||
if state.shutdown:
|
||||
return False
|
||||
t = state.downloads.transfers.get(item.id)
|
||||
if not t:
|
||||
# transfer was canceled
|
||||
trigger_event('transfer', {
|
||||
'id': item.id, 'progress': -1
|
||||
})
|
||||
return False
|
||||
else:
|
||||
t.progress = size / item.info['size']
|
||||
t.save()
|
||||
t['progress'] = size / item.info['size']
|
||||
state.downloads.transfers[item.id] = t
|
||||
trigger_event('transfer', {
|
||||
'id': item.id, 'progress': t.progress
|
||||
'id': item.id, 'progress': t['progress']
|
||||
})
|
||||
state.downloads.transfers[item.id] = t
|
||||
if state.bandwidth:
|
||||
while not state.bandwidth.download(chunk_size) and not state.shutdown:
|
||||
time.sleep(0.1)
|
||||
|
|
|
@ -518,7 +518,8 @@ def migrate_10():
|
|||
|
||||
def migrate_11():
|
||||
with db.session() as session:
|
||||
from user.models import User, Metadata
|
||||
from user.models import User, Metadata, List
|
||||
from item.models import Transfer
|
||||
from changelog import Changelog
|
||||
import utils
|
||||
for u in User.query.filter_by(peered=True):
|
||||
|
@ -526,6 +527,9 @@ def migrate_11():
|
|||
last = Changelog.query.filter_by(user_id=u.id).order_by('-revision').first()
|
||||
if last:
|
||||
peer.info['revision'] = last.revision
|
||||
for l in List.query.filter_by(user_id=u.id):
|
||||
if l.name:
|
||||
peer.info['lists'][l.lame] = [i.id for i in l.get_items()]
|
||||
for m in Metadata.query.filter_by(user_id=u.id):
|
||||
peer.library[m.item_id] = {
|
||||
'meta': dict(m.data),
|
||||
|
@ -538,4 +542,16 @@ def migrate_11():
|
|||
Changelog.query.filter_by(user_id=u.id).delete()
|
||||
Metadata.query.filter_by(user_id=u.id).delete()
|
||||
session.commit()
|
||||
import state
|
||||
import downloads
|
||||
state.downloads = downloads.Downloads()
|
||||
for t in Transfer.query:
|
||||
if t.added:
|
||||
state.downloads.transfers[t.item_id] = {
|
||||
'added': t.added,
|
||||
'progress': t.progress
|
||||
}
|
||||
state.db.session.add(t)
|
||||
state.db.session.commit()
|
||||
downloads.join()
|
||||
return 11
|
||||
|
|
Loading…
Reference in a new issue