add task queue api

This commit is contained in:
j 2016-08-17 14:37:59 +02:00
parent 0eb873d2cb
commit e1cacdb67a
15 changed files with 257 additions and 7 deletions

View file

@ -20,6 +20,7 @@ import ox.iso
from item import utils from item import utils
import item.models import item.models
from person.models import get_name_sort from person.models import get_name_sort
from taskqueue.models import Task
from chunk import save_chunk from chunk import save_chunk
import extract import extract
@ -231,6 +232,7 @@ class File(models.Model):
self.item = qs[0] self.item = qs[0]
if not self.item: if not self.item:
self.item = item.models.get_item(info, user) self.item = item.models.get_item(info, user)
Task.start(self.item, user)
for key in self.AV_INFO + self.PATH_INFO: for key in self.AV_INFO + self.PATH_INFO:
if key in info: if key in info:
self.info[key] = info[key] self.info[key] = info[key]

View file

@ -9,6 +9,8 @@ from django.db.models import Q
from item.models import Item from item.models import Item
from item.tasks import update_poster from item.tasks import update_poster
from taskqueue.models import Task
import models import models
import extract import extract
import external import external
@ -96,6 +98,7 @@ def update_files(user, volume, files):
i.update_selected() i.update_selected()
for i in update_timeline: for i in update_timeline:
i = Item.objects.get(public_id=i) i = Item.objects.get(public_id=i)
Tasks.start(i, user)
i.update_timeline() i.update_timeline()
@task(ignore_results=True, queue='default') @task(ignore_results=True, queue='default')
@ -130,6 +133,7 @@ def process_stream(fileId):
file.item.update_timeline() file.item.update_timeline()
if file.item.rendered: if file.item.rendered:
file.item.save() file.item.save()
Task.finish(file.item)
models.File.objects.filter(id=fileId).update(encoding=False) models.File.objects.filter(id=fileId).update(encoding=False)
return True return True
@ -158,6 +162,7 @@ def extract_stream(fileId):
file.item.update_timeline() file.item.update_timeline()
update_poster(file.item.public_id) update_poster(file.item.public_id)
file.extract_tracks() file.extract_tracks()
Task.finish(file.item)
models.File.objects.filter(id=fileId).update(encoding=False) models.File.objects.filter(id=fileId).update(encoding=False)
@task(queue="encoding") @task(queue="encoding")

View file

@ -20,6 +20,7 @@ from item.views import parse_query
import item.tasks import item.tasks
from oxdjango.api import actions from oxdjango.api import actions
from changelog.models import add_changelog from changelog.models import add_changelog
from taskqueue.models import Task
from . import models from . import models
from . import queue from . import queue
@ -280,6 +281,7 @@ def firefogg_upload(request):
f.save() f.save()
if f.item.rendered and f.selected: if f.item.rendered and f.selected:
Item.objects.filter(id=f.item.id).update(rendered=False) Item.objects.filter(id=f.item.id).update(rendered=False)
Task.start(f.item, request.user)
response = { response = {
'uploadUrl': '/api/upload/?id=%s&profile=%s' % (f.oshash, profile), 'uploadUrl': '/api/upload/?id=%s&profile=%s' % (f.oshash, profile),
'url': request.build_absolute_uri('/%s' % f.item.public_id), 'url': request.build_absolute_uri('/%s' % f.item.public_id),
@ -331,6 +333,7 @@ def direct_upload(request):
Item.objects.filter(id=file.item.id).update(rendered=False) Item.objects.filter(id=file.item.id).update(rendered=False)
file.uploading = True file.uploading = True
file.save() file.save()
Task.start(file.item, request.user)
upload_url = request.build_absolute_uri('/api/upload/direct/?id=%s' % file.oshash) upload_url = request.build_absolute_uri('/api/upload/direct/?id=%s' % file.oshash)
return render_to_json_response({ return render_to_json_response({
'uploadUrl': upload_url, 'uploadUrl': upload_url,
@ -423,6 +426,7 @@ def moveMedia(request, data):
else: else:
c.rendered = False c.rendered = False
c.save() c.save()
Task.start(c, request.user)
item.tasks.update_timeline.delay(public_id) item.tasks.update_timeline.delay(public_id)
response = json_response(text='updated') response = json_response(text='updated')
response['data']['item'] = i.public_id response['data']['item'] = i.public_id
@ -470,6 +474,10 @@ def editMedia(request, data):
if key == 'language' and (f.is_video or f.is_audio): if key == 'language' and (f.is_video or f.is_audio):
save_items.add(f.item.id) save_items.add(f.item.id)
if key == 'part' and (f.is_video or f.is_audio): if key == 'part' and (f.is_video or f.is_audio):
if f.item.rendered:
f.item.rendered = False
f.item.save()
Task.start(f.item, request.user)
update_timeline.add(f.item.id) update_timeline.add(f.item.id)
update = True update = True
if update: if update:

View file

@ -62,6 +62,7 @@
"canReadText": {"guest": 0, "member": 0, "friend": 1, "staff": 1, "admin": 1}, "canReadText": {"guest": 0, "member": 0, "friend": 1, "staff": 1, "admin": 1},
"canRemoveItems": {"admin": true}, "canRemoveItems": {"admin": true},
"canSeeAccessed": {"staff": true, "admin": true}, "canSeeAccessed": {"staff": true, "admin": true},
"canSeeAllTasks": {"staff": true, "admin": true},
"canSeeDebugMenu": {"staff": true, "admin": true}, "canSeeDebugMenu": {"staff": true, "admin": true},
"canSeeExtraItemViews": {"staff": true, "admin": true}, "canSeeExtraItemViews": {"staff": true, "admin": true},
"canSeeMedia": {"staff": true, "admin": true}, "canSeeMedia": {"staff": true, "admin": true},

View file

@ -64,6 +64,7 @@
"canReadText": {"guest": 0, "member": 0, "researcher": 1, "staff": 1, "admin": 1}, "canReadText": {"guest": 0, "member": 0, "researcher": 1, "staff": 1, "admin": 1},
"canRemoveItems": {"staff": true, "admin": true}, "canRemoveItems": {"staff": true, "admin": true},
"canSeeAccessed": {"researcher": true, "staff": true, "admin": true}, "canSeeAccessed": {"researcher": true, "staff": true, "admin": true},
"canSeeAllTasks": {"staff": true, "admin": true},
"canSeeDebugMenu": {"researcher": true, "staff": true, "admin": true}, "canSeeDebugMenu": {"researcher": true, "staff": true, "admin": true},
"canSeeExtraItemViews": {"researcher": true, "staff": true, "admin": true}, "canSeeExtraItemViews": {"researcher": true, "staff": true, "admin": true},
"canSeeMedia": {"researcher": true, "staff": true, "admin": true}, "canSeeMedia": {"researcher": true, "staff": true, "admin": true},

View file

@ -62,6 +62,7 @@
"canReadText": {"guest": 0, "member": 0, "staff": 1, "admin": 1}, "canReadText": {"guest": 0, "member": 0, "staff": 1, "admin": 1},
"canRemoveItems": {"admin": true}, "canRemoveItems": {"admin": true},
"canSeeAccessed": {"staff": true, "admin": true}, "canSeeAccessed": {"staff": true, "admin": true},
"canSeeAllTasks": {"staff": true, "admin": true},
"canSeeDebugMenu": {"staff": true, "admin": true}, "canSeeDebugMenu": {"staff": true, "admin": true},
"canSeeExtraItemViews": {"staff": true, "admin": true}, "canSeeExtraItemViews": {"staff": true, "admin": true},
"canSeeMedia": {"staff": true, "admin": true}, "canSeeMedia": {"staff": true, "admin": true},

View file

@ -66,6 +66,7 @@ examples (config.SITENAME.jsonc) that are part of this pan.do/ra distribution.
"canReadText": {"guest": 0, "member": 0, "staff": 1, "admin": 1}, "canReadText": {"guest": 0, "member": 0, "staff": 1, "admin": 1},
"canRemoveItems": {"admin": true}, "canRemoveItems": {"admin": true},
"canSeeAccessed": {"staff": true, "admin": true}, "canSeeAccessed": {"staff": true, "admin": true},
"canSeeAllTasks": {"staff": true, "admin": true},
"canSeeDebugMenu": {"staff": true, "admin": true}, "canSeeDebugMenu": {"staff": true, "admin": true},
"canSeeExtraItemViews": {"staff": true, "admin": true}, "canSeeExtraItemViews": {"staff": true, "admin": true},
"canSeeMedia": {"staff": true, "admin": true}, "canSeeMedia": {"staff": true, "admin": true},

View file

@ -13,6 +13,7 @@ from celery.task import task, periodic_task
import models import models
from text.models import Text from text.models import Text
from taskqueue.models import Task
@periodic_task(run_every=timedelta(days=1), queue='encoding') @periodic_task(run_every=timedelta(days=1), queue='encoding')
@ -79,6 +80,7 @@ def update_external(public_id):
def update_timeline(public_id): def update_timeline(public_id):
item = models.Item.objects.get(public_id=public_id) item = models.Item.objects.get(public_id=public_id)
item.update_timeline(async=False) item.update_timeline(async=False)
Task.finish(item)
@task(queue="encoding") @task(queue="encoding")
def rebuild_timeline(public_id): def rebuild_timeline(public_id):
@ -117,7 +119,7 @@ def update_sitemap(base_url):
# This date should be in W3C Datetime format, can be %Y-%m-%d # This date should be in W3C Datetime format, can be %Y-%m-%d
lastmod = ET.SubElement(url, "lastmod") lastmod = ET.SubElement(url, "lastmod")
lastmod.text = datetime.now().strftime("%Y-%m-%d") lastmod.text = datetime.now().strftime("%Y-%m-%d")
# priority of page on site values 0.1 - 1.0 # priority of page on site values 0.1 - 1.0
priority = ET.SubElement(url, "priority") priority = ET.SubElement(url, "priority")
priority.text = '1.0' priority.text = '1.0'
@ -150,8 +152,8 @@ def update_sitemap(base_url):
priority.text = '1.0' priority.text = '1.0'
if i.rendered and i.level <= can_play: if i.rendered and i.level <= can_play:
video = ET.SubElement(url, "video:video") video = ET.SubElement(url, "video:video")
#el = ET.SubElement(video, "video:content_loc") # el = ET.SubElement(video, "video:content_loc")
#el.text = absolute_url("%s/video" % i.public_id) # el.text = absolute_url("%s/video" % i.public_id)
el = ET.SubElement(video, "video:player_loc") el = ET.SubElement(video, "video:player_loc")
el.attrib['allow_embed'] = 'no' el.attrib['allow_embed'] = 'no'
el.text = absolute_url("%s/player" % i.public_id) el.text = absolute_url("%s/player" % i.public_id)
@ -188,7 +190,7 @@ def update_sitemap(base_url):
priority = ET.SubElement(url, "priority") priority = ET.SubElement(url, "priority")
priority.text = '1.0' priority.text = '1.0'
for t in Text.objects.filter(Q(status='featured')|Q(status='public')): for t in Text.objects.filter(Q(status='featured') | Q(status='public')):
url = ET.SubElement(urlset, "url") url = ET.SubElement(urlset, "url")
# URL of the page. This URL must begin with the protocol (such as http) # URL of the page. This URL must begin with the protocol (such as http)
loc = ET.SubElement(url, "loc") loc = ET.SubElement(url, "loc")

View file

@ -140,7 +140,8 @@ INSTALLED_APPS = (
'tv', 'tv',
'document', 'document',
'entity', 'entity',
'websocket' 'websocket',
'taskqueue',
) )
# Log errors into db # Log errors into db

View file

View file

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.4 on 2016-08-17 10:23
from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('item', '0002_auto_20160219_1734'),
]
operations = [
migrations.CreateModel(
name='Task',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(auto_now_add=True)),
('modified', models.DateTimeField(auto_now=True)),
('status', models.CharField(default=b'unknown', max_length=32)),
('started', models.DateTimeField(null=True)),
('ended', models.DateTimeField(null=True)),
('item', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='tasks', to='item.Item')),
('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='tasks', to=settings.AUTH_USER_MODEL)),
],
),
]

View file

141
pandora/taskqueue/models.py Normal file
View file

@ -0,0 +1,141 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division, print_function
from datetime import datetime, timedelta
from time import time
from celery.backends import default_backend
from celery.utils import get_full_cls_name
from django.contrib.auth.models import User
from django.db import models
from django.db.models import Q
import celery.task.control
import kombu.five
import ox
def get_tasks(username):
from item.models import Item
tasks = []
# remove finished tasks
yesterday = datetime.now() - timedelta(days=1)
Task.objects.filter(status__in=Task.DONE, ended__lt=yesterday).delete()
# add task for that might be missing
for i in Item.objects.filter(rendered=False).exclude(files__id=None):
task, created = Task.objects.get_or_create(item=i)
if created:
task.started = i.modified
task.update()
qs = Task.objects.all()
if username:
qs = qs.filter(user__username=username)
for task in qs:
tasks.append(task.json())
return tasks
class Task(models.Model):
DONE = ['finished', 'failed', 'cancelled']
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
# 'queued|uploading|processing|finished|failed|cancelled',
status = models.CharField(default='unknown', max_length=32)
started = models.DateTimeField(null=True)
ended = models.DateTimeField(null=True)
item = models.ForeignKey("item.Item", related_name='tasks')
user = models.ForeignKey(User, related_name='tasks', null=True)
def __unicode__(self):
return "%s [%s]" % (self.item.public_id, self.status)
@property
def public_id(self):
return ox.toAZ(self.id)
@classmethod
def get(cls, id):
return cls.objects.get(pk=ox.fromAZ(id))
@classmethod
def start(cls, item, user):
task, created = cls.objects.get_or_create(item=item)
if task.update(save=False) or created:
task.user = user
task.started = datetime.now()
task.ended = None
task.save()
@classmethod
def finish(cls, item):
task, created = cls.objects.get_or_create(item=item)
task.update()
if task.status in task.DONE and not task.ended:
task.ended = datetime.now()
task.save()
def update(self, save=True):
if self.item.files.filter(wanted=True, available=False).count():
status = 'pending'
elif self.item.files.filter(uploading=True).count():
status = 'uploading'
elif self.item.files.filter(queued=True).count():
status = 'queued'
elif self.item.files.filter(encoding=True).count():
status = 'processing'
elif self.item.files.filter(failed=True).count():
status = 'failed'
elif self.item.rendered:
status = 'finished'
else:
status = 'unknown'
if status != self.status:
self.status = status
if save:
self.save()
return True
return False
def update_from_queue(self, save=True):
c = celery.task.control.inspect()
active = c.active(safe=True)
status = 'unknown'
if active:
for queue in active:
for job in active[queue]:
if job.get('name') in ('item.tasks.update_timeline', ):
args = job.get('args', [])
if args and args[0] == self.item.public_id:
if job.get('time_start'):
status = 'processing'
else:
status = 'queued'
if status != self.status:
self.status = status
if save:
self.save()
return True
return False
def cancel(self):
self.state = 'cancelled'
self.save()
# FIXME: actually cancel task
def json(self):
if self.state != 'cancelled':
self.update()
return {
'started': self.started,
'ended': self.ended,
'status': self.status,
'title': self.item.get('title'),
'item': self.item.public_id,
'user': self.user and self.user.username or '',
'id': self.public_id,
}

View file

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
import ox
from oxdjango.decorators import login_required_json
from oxdjango.shortcuts import render_to_json_response, get_object_or_404_json, json_response
from oxdjango.api import actions
from . import models
@login_required_json
def getTasks(request, data):
'''
get list of tasks
takes {
user: ''
}
returns {
[{
started: 0,
finished: 0,
status: 'queued|uploading|processing|finished|failed|cancelled',
title: '',
item: 'itemID',
id: 'taskID'
}]
}
'''
user = data.get('user', '')
if user != request.user.username and not request.user.profile.capability('canSeeAllTasks'):
response = json_response(status=403, text='permission denied')
else:
response = json_response(status=200, text='ok')
response['data']['items'] = models.get_tasks(user)
return render_to_json_response(response)
actions.register(getTasks, cache=False)
@login_required_json
def cancelTask(request, data):
response = json_response(status=200, text='ok')
ids = data['id']
if not isinstance(ids, list):
ids = [ids]
for id in ids:
task = models.Task.get(id)
if task.user != request.user and not request.user.profile.capability('canSeeAllTasks'):
response = json_response(status=403, text='permission denied')
return render_to_json_response(response)
else:
task.cancel()
return render_to_json_response(response)
actions.register(cancelTask, cache=False)

View file

@ -158,7 +158,7 @@ pandora.ui.tasksDialog = function() {
pandora.api.getTasks($checkbox.value() ? {} : { pandora.api.getTasks($checkbox.value() ? {} : {
user: pandora.user.username user: pandora.user.username
}, function(result) { }, function(result) {
$list.options({items: result.data}) $list.options({items: result.data.items})
updateButton() updateButton()
}); });
timeout = setTimeout(getItems, 15000); timeout = setTimeout(getItems, 15000);