2015-01-08 16:01:03 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
from datetime import datetime
|
2023-07-27 13:35:53 +00:00
|
|
|
from time import time, monotonic
|
2016-04-30 12:15:06 +00:00
|
|
|
|
2023-07-27 13:35:53 +00:00
|
|
|
from app.celery import app
|
2015-01-08 16:01:03 +00:00
|
|
|
|
|
|
|
from .models import File
|
|
|
|
|
|
|
|
|
|
|
|
def parse_job(job):
|
|
|
|
f = File.objects.get(id=job['args'][0])
|
|
|
|
r = {
|
|
|
|
'id': job['id'],
|
|
|
|
'item': f.item.public_id,
|
|
|
|
'file': f.oshash
|
|
|
|
}
|
|
|
|
if job['time_start']:
|
2023-07-27 13:35:53 +00:00
|
|
|
start_time = datetime.fromtimestamp(time() - (monotonic() - job['time_start']))
|
2015-01-08 16:01:03 +00:00
|
|
|
r.update({
|
2016-05-05 08:49:34 +00:00
|
|
|
'started': start_time,
|
|
|
|
'running': (datetime.now() - start_time).total_seconds()
|
2015-01-08 16:01:03 +00:00
|
|
|
})
|
|
|
|
if f.encoding:
|
|
|
|
r['status'] = f.encoding_status()
|
|
|
|
return r
|
|
|
|
|
|
|
|
def status():
|
2016-05-05 08:49:34 +00:00
|
|
|
status = []
|
2015-01-08 16:01:03 +00:00
|
|
|
encoding_jobs = ('archive.tasks.extract_stream', 'archive.tasks.process_stream')
|
2023-07-27 13:35:53 +00:00
|
|
|
c = app.control.inspect()
|
2016-04-30 12:15:06 +00:00
|
|
|
for job in c.active(safe=True).get('celery@pandora-encoding', []):
|
2015-01-08 16:01:03 +00:00
|
|
|
if job['name'] in encoding_jobs:
|
2016-05-05 08:49:34 +00:00
|
|
|
status.append(parse_job(job))
|
2016-04-30 12:15:06 +00:00
|
|
|
for job in c.reserved(safe=True).get('celery@pandora-encoding', []):
|
2015-01-08 16:01:03 +00:00
|
|
|
if job['name'] in encoding_jobs:
|
2016-05-05 08:49:34 +00:00
|
|
|
status.append(parse_job(job))
|
2015-01-08 16:01:03 +00:00
|
|
|
return status
|
|
|
|
|
|
|
|
|
|
|
|
def fill_queue():
|
|
|
|
s = status()
|
|
|
|
in_queue = [f['file'] for f in s['active']] + [f['file'] for f in s['queued']]
|
|
|
|
check = []
|
|
|
|
for f in File.objects.filter(queued=True).exclude(oshash__in=in_queue):
|
|
|
|
if f.streams.all().count():
|
|
|
|
f.process_stream()
|
|
|
|
elif f.data:
|
|
|
|
f.extract_stream()
|
|
|
|
else:
|
2016-06-16 12:47:45 +00:00
|
|
|
print('not sure what to do with', f)
|
2015-01-08 16:01:03 +00:00
|
|
|
check.append(f)
|
|
|
|
in_queue.append(f.oshash)
|
|
|
|
for f in File.objects.filter(encoding=True).exclude(oshash__in=in_queue):
|
|
|
|
if f.streams.all().count():
|
|
|
|
f.process_stream()
|
|
|
|
elif f.data:
|
|
|
|
f.extract_stream()
|
|
|
|
else:
|
2016-06-16 12:47:45 +00:00
|
|
|
print('not sure what to do with', f)
|
2015-01-08 16:01:03 +00:00
|
|
|
check.append(f)
|
|
|
|
return check
|
|
|
|
|
|
|
|
|
|
|
|
def get_celery_worker_status():
|
|
|
|
ERROR_KEY = "ERROR"
|
|
|
|
try:
|
2024-04-05 20:20:45 +00:00
|
|
|
insp = app.control.inspect()
|
2015-01-08 16:01:03 +00:00
|
|
|
d = insp.stats()
|
|
|
|
if not d:
|
2016-06-16 12:47:45 +00:00
|
|
|
d = {ERROR_KEY: 'No running Celery workers were found.'}
|
2015-01-08 16:01:03 +00:00
|
|
|
except IOError as e:
|
|
|
|
from errno import errorcode
|
|
|
|
msg = "Error connecting to the backend: " + str(e)
|
|
|
|
if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
|
|
|
|
msg += ' Check that the RabbitMQ server is running.'
|
2016-06-16 12:47:45 +00:00
|
|
|
d = {ERROR_KEY: msg}
|
2015-01-08 16:01:03 +00:00
|
|
|
except ImportError as e:
|
2016-06-16 12:47:45 +00:00
|
|
|
d = {ERROR_KEY: str(e)}
|
2015-01-08 16:01:03 +00:00
|
|
|
return d
|