diff --git a/pandora/taskqueue/models.py b/pandora/taskqueue/models.py index 672982283..60b4cd410 100644 --- a/pandora/taskqueue/models.py +++ b/pandora/taskqueue/models.py @@ -38,12 +38,12 @@ def get_tasks(username): return tasks class Task(models.Model): - DONE = ['finished', 'failed', 'cancelled'] + DONE = ['finished', 'failed', 'canceled'] created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) - # 'queued|uploading|processing|finished|failed|cancelled', + # 'queued|uploading|processing|finished|failed|canceled' status = models.CharField(default='unknown', max_length=32) started = models.DateTimeField(null=True) ended = models.DateTimeField(null=True) @@ -103,20 +103,41 @@ class Task(models.Model): return True return False - def update_from_queue(self, save=True): + def get_job(self): c = celery.task.control.inspect() active = c.active(safe=True) - status = 'unknown' if active: for queue in active: for job in active[queue]: - if job.get('name') in ('item.tasks.update_timeline', 'archive.tasks.download_media'): - args = job.get('args', []) - if args and args[0] == self.item.public_id: - if job.get('time_start'): - status = 'processing' - else: - status = 'queued' + name = job.get('name') + args = job.get('args', []) + if args: + if name in ( + 'item.tasks.update_timeline', + 'archive.tasks.download_media' + ): + if args[0] == self.item.public_id: + return job + elif name in ( + 'archive.tasks.process_stream', + 'archive.tasks.extract_stream', + 'archive.tasks.extract_derivatives', + ): + id = args[0] + if self.item.files.filter(id=id).count(): + return job + + def update_from_queue(self, save=True): + status = 'unknown' + job = self.get_job() + if job: + if job.get('name') in ('item.tasks.update_timeline', 'archive.tasks.download_media'): + args = job.get('args', []) + if args and args[0] == self.item.public_id: + if job.get('time_start'): + status = 'processing' + else: + status = 'queued' if status != self.status: self.status = status if save: @@ -125,12 +146,32 @@ class Task(models.Model): return False def cancel(self): - self.status = 'cancelled' - self.save() # FIXME: actually cancel task + if self.status == 'pending': + for f in self.item.files.filter(wanted=True, available=False): + f.delete() + elif self.status == 'uploading': + for f in self.item.files.filter(wanted=True, available=False): + f.delete() + for f in self.item.files.filter(uploading=True): + f.delete() + elif self.status in ('processing', 'queued'): + job = self.get_job() + if job: + print(job) + r = celery.task.control.revoke(job['id']) + print(r) + for f in self.item.files.filter(encoding=True): + f.delete() + if not self.item.files.count() and settings.CONFIG.get('itemRequiresVideo'): + print('delete item') + + self.status = 'canceled' + self.ended = datetime.now() + self.save() def json(self): - if self.status != 'cancelled': + if self.status != 'canceled': self.update() return { 'started': self.started,