From 6cd2eaf93ab6f072b52ca6a41a58730c6bc7db4f Mon Sep 17 00:00:00 2001 From: j <0x006A@0x2620.org> Date: Thu, 8 Jan 2015 17:01:03 +0100 Subject: [PATCH] extract some information about currently running encoding jobs --- pandora/archive/models.py | 15 +++++++ pandora/archive/queue.py | 84 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 pandora/archive/queue.py diff --git a/pandora/archive/models.py b/pandora/archive/models.py index 567df14e..71c904c6 100644 --- a/pandora/archive/models.py +++ b/pandora/archive/models.py @@ -495,6 +495,21 @@ class File(models.Model): os.unlink(target) shutil.rmtree(tmp) + def encoding_status(self): + status = {} + if self.encoding: + for s in self.streams.all(): + status[s.name()] = u'done' if s.available else u'encoding' + config = settings.CONFIG['video'] + max_resolution = self.streams.get(source=None).resolution + for resolution in sorted(config['resolutions'], reverse=True): + if resolution <= max_resolution: + for f in config['formats']: + name = u'%sp.%s' % (resolution, f) + if name not in status: + status[name] = u'queued' + return status + def delete(self, *args, **kwargs): self.delete_files() super(File, self).delete(*args, **kwargs) diff --git a/pandora/archive/queue.py b/pandora/archive/queue.py new file mode 100644 index 00000000..f2873226 --- /dev/null +++ b/pandora/archive/queue.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +from __future__ import division + +from datetime import datetime +import time + +import celery.task.control + +from .models import File + + +def parse_job(job): + f = File.objects.get(id=job['args'][0]) + r = { + 'id': job['id'], + 'item': f.item.public_id, + 'file': f.oshash + } + if job['time_start']: + r.update({ + 'started': datetime.fromtimestamp(job['time_start']), + 'running': time.time() - job['time_start'], + }) + if f.encoding: + r['status'] = f.encoding_status() + return r + +def status(): + status = { + 'active': [], + 'queued': [], + } + encoding_jobs = ('archive.tasks.extract_stream', 'archive.tasks.process_stream') + c = celery.task.control.inspect() + for job in c.active(safe=True)['pandora-encoding']: + if job['name'] in encoding_jobs: + status['active'].append(parse_job(job)) + for job in c.reserved(safe=True)['pandora-encoding']: + if job['name'] in encoding_jobs: + status['queued'].append(parse_job(job)) + return status + + +def fill_queue(): + s = status() + in_queue = [f['file'] for f in s['active']] + [f['file'] for f in s['queued']] + check = [] + for f in File.objects.filter(queued=True).exclude(oshash__in=in_queue): + if f.streams.all().count(): + f.process_stream() + elif f.data: + f.extract_stream() + else: + print 'not sure what to do with' ,f + check.append(f) + in_queue.append(f.oshash) + for f in File.objects.filter(encoding=True).exclude(oshash__in=in_queue): + if f.streams.all().count(): + f.process_stream() + elif f.data: + f.extract_stream() + else: + print 'not sure what to do with' ,f + check.append(f) + return check + + +def get_celery_worker_status(): + ERROR_KEY = "ERROR" + try: + insp = celery.task.control.inspect() + d = insp.stats() + if not d: + d = { ERROR_KEY: 'No running Celery workers were found.' } + except IOError as e: + from errno import errorcode + msg = "Error connecting to the backend: " + str(e) + if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED': + msg += ' Check that the RabbitMQ server is running.' + d = { ERROR_KEY: msg } + except ImportError as e: + d = { ERROR_KEY: str(e)} + return d