From 8a571a10a5543c059575189737fed444dfa6005f Mon Sep 17 00:00:00 2001 From: j Date: Thu, 11 Feb 2016 19:56:12 +0530 Subject: [PATCH] implement priority queue --- oml/tasks.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/oml/tasks.py b/oml/tasks.py index 15577c9..8e662b5 100644 --- a/oml/tasks.py +++ b/oml/tasks.py @@ -2,7 +2,7 @@ # vi:si:et:sw=4:sts=4:ts=4 import os import json -from queue import Queue +from queue import PriorityQueue from threading import Thread from websocket import trigger_event @@ -17,23 +17,23 @@ class Tasks(Thread): def __init__(self): self._taskspath = os.path.join(settings.data_path, 'tasks.json') - self.q = Queue() + self.q = PriorityQueue() Thread.__init__(self) self.daemon = True self.start() - self.queue('scan') - self.load_tasks() def run(self): + self.load_tasks() + self.queue('scan') import item.scan from item.models import sync_metadata, get_preview, get_cover from user.models import export_list, update_user_peering shutdown = False while not shutdown: - m = self.q.get() + p, m = self.q.get() if m: if state.shutdown: - self._tasks.append(m) + self._tasks.append((p, m)) else: try: action, data = m @@ -86,12 +86,20 @@ class Tasks(Thread): json.dump(self._tasks, f) def join(self): - self.q.put(None) + self.q.put((1000, None)) r = Thread.join(self) self.save_tasks() return r - def queue(self, action, data=None): + def queue(self, action, data=None, priority=None): + if priority is None: + priority = 100 + if action in ('getcover', 'getpreview'): + priority += 1 + if action in ('import', 'export'): + priority -= 1 + if action in ('peering', 'changelibrarypath'): + priority -= 2 if not state.shutdown: - logger.debug('%s queued', action) - self.q.put((action, data)) + #logger.debug('queue: %s (%s)', action, data) + self.q.put((priority, (action, data)))