From acd64d3186a9b5b0d493dff78dae2a81294e09dd Mon Sep 17 00:00:00 2001 From: j Date: Thu, 11 Feb 2016 11:44:40 +0530 Subject: [PATCH] save queued task on shutdown, add getcover task --- oml/item/models.py | 28 +++++++++++--- oml/tasks.py | 96 ++++++++++++++++++++++++++++++---------------- 2 files changed, 87 insertions(+), 37 deletions(-) diff --git a/oml/item/models.py b/oml/item/models.py index 00014be..1d5c34d 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -450,6 +450,8 @@ class Item(db.Model): def update_icons(self): if state.online: self.update_cover() + else: + state.tasks.queue('getcover', self.id) self.update_preview() def load_metadata(self): @@ -823,12 +825,28 @@ def update_sort_table(): s.commit() +def get_cover(id): + delay = 60 + if state.online: + #logger.debug('get_cover(%s)', id) + with db.session(): + i = Item.get(id) + if i: + i.update_cover() + else: + state.main.call_later(delay, lambda: state.tasks.queue('getcover', id)) + + def get_preview(id): - #logger.debug('get_preview(%s)', id) - with db.session(): - i = Item.get(id) - if i: - i.get_preview() + delay = 60 + if state.online: + #logger.debug('get_preview(%s)', id) + with db.session(): + i = Item.get(id) + if i: + i.get_preview() + else: + state.main.call_later(delay, lambda: state.tasks.queue('getpreview', id)) def sync_metadata(ids=None): #logger.debug('sync_metadata(%s)', len(ids) if len(ids) > 10 else ids) diff --git a/oml/tasks.py b/oml/tasks.py index 94df188..15577c9 100644 --- a/oml/tasks.py +++ b/oml/tasks.py @@ -1,65 +1,97 @@ # -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 - +import os +import json from queue import Queue from threading import Thread from websocket import trigger_event import state +import settings import logging logger = logging.getLogger(__name__) class Tasks(Thread): + _tasks = [] def __init__(self): + self._taskspath = os.path.join(settings.data_path, 'tasks.json') self.q = Queue() Thread.__init__(self) self.daemon = True self.start() self.queue('scan') + self.load_tasks() def run(self): import item.scan - from item.models import sync_metadata, get_preview + from item.models import sync_metadata, get_preview, get_cover from user.models import export_list, update_user_peering - while not state.shutdown: + shutdown = False + while not shutdown: m = self.q.get() - if m and not state.shutdown: - try: - action, data = m - logger.debug('%s start', action) - if action == 'changelibrarypath': - item.scan.change_path(data[0], data[1]) - elif action == 'export': - export_list(data) - elif action == 'getpreview': - get_preview(data) - elif action == 'import': - item.scan.run_import(data) - elif action == 'peering': - update_user_peering(*data) - elif action == 'ping': - trigger_event('pong', data) - elif action == 'scan': - item.scan.run_scan() - elif action == 'scanimport': - item.scan.import_folder() - elif action == 'syncmetadata': - sync_metadata(data) - else: - trigger_event('error', {'error': 'unknown action'}) - logger.debug('%s done', action) - except: - logger.debug('task failed', exc_info=True) + if m: + if state.shutdown: + self._tasks.append(m) + else: + try: + action, data = m + logger.debug('%s start', action) + if action == 'changelibrarypath': + item.scan.change_path(data[0], data[1]) + elif action == 'export': + export_list(data) + elif action == 'getcover': + get_cover(data) + elif action == 'getpreview': + get_preview(data) + elif action == 'import': + item.scan.run_import(data) + elif action == 'peering': + update_user_peering(*data) + elif action == 'ping': + trigger_event('pong', data) + elif action == 'scan': + item.scan.run_scan() + elif action == 'scanimport': + item.scan.import_folder() + elif action == 'syncmetadata': + sync_metadata(data) + else: + trigger_event('error', {'error': 'unknown action'}) + logger.debug('%s done', action) + except: + logger.debug('task failed', exc_info=True) + else: + shutdown = True self.q.task_done() + def load_tasks(self): + if os.path.exists(self._taskspath): + try: + with open(self._taskspath) as f: + tasks = json.load(f) + for task in tasks: + self.q.put(task) + logger.debug('loaded %s tasks', len(tasks)) + except: + logger.debug('failed to load saved tasks', exc_info=True) + os.unlink(self._taskspath) + + def save_tasks(self): + if self._tasks: + logger.debug('saving %s tasks for later', len(self._tasks)) + with open(self._taskspath, 'w') as f: + json.dump(self._tasks, f) + def join(self): self.q.put(None) - return Thread.join(self) + r = Thread.join(self) + self.save_tasks() + return r def queue(self, action, data=None): if not state.shutdown: logger.debug('%s queued', action) self.q.put((action, data)) -