update to new celery api

This commit is contained in:
j 2021-12-15 19:22:49 +00:00
parent f71434a1ff
commit 9340c0e578
17 changed files with 103 additions and 65 deletions

View file

@ -11,7 +11,7 @@ PIDFile=/run/pandora/cron.pid
WorkingDirectory=/srv/pandora/pandora WorkingDirectory=/srv/pandora/pandora
ExecStart=/srv/pandora/bin/celery \ ExecStart=/srv/pandora/bin/celery \
-A app beat \ -A app beat \
-s /run/pandora/celerybeat-schedule \ --scheduler django_celery_beat.schedulers:DatabaseScheduler \
--pidfile /run/pandora/cron.pid \ --pidfile /run/pandora/cron.pid \
-l INFO -l INFO
ExecReload=/bin/kill -HUP $MAINPID ExecReload=/bin/kill -HUP $MAINPID

View file

@ -5,12 +5,12 @@ from django.contrib.auth import get_user_model
from django.db import transaction from django.db import transaction
import ox import ox
from celery.task import task from app.celery import app
from .models import Annotation from .models import Annotation
@task(ignore_results=False, queue='default') @app.task(ignore_results=False, queue='default')
def add_annotations(data): def add_annotations(data):
from item.models import Item from item.models import Item
from entity.models import Entity from entity.models import Entity
@ -51,7 +51,7 @@ def add_annotations(data):
annotation.item.update_facets() annotation.item.update_facets()
return True return True
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_item(id, force=False): def update_item(id, force=False):
from item.models import Item from item.models import Item
from clip.models import Clip from clip.models import Clip
@ -72,7 +72,7 @@ def update_item(id, force=False):
a.item.save() a.item.save()
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_annotations(layers, value): def update_annotations(layers, value):
items = {} items = {}

View file

@ -6,16 +6,8 @@ root_dir = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
root_dir = os.path.dirname(root_dir) root_dir = os.path.dirname(root_dir)
os.chdir(root_dir) os.chdir(root_dir)
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
app = Celery('pandora') app = Celery('pandora')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY') app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks() app.autodiscover_tasks()

View file

@ -2,13 +2,16 @@
import datetime import datetime
from celery.task import periodic_task from app.celery import app
from celery.schedules import crontab from celery.schedules import crontab
@app.task(queue='encoding')
@periodic_task(run_every=crontab(hour=6, minute=0), queue='encoding')
def cron(**kwargs): def cron(**kwargs):
from django.db import transaction from django.db import transaction
from django.contrib.sessions.models import Session from django.contrib.sessions.models import Session
Session.objects.filter(expire_date__lt=datetime.datetime.now()).delete() Session.objects.filter(expire_date__lt=datetime.datetime.now()).delete()
transaction.commit() transaction.commit()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(crontab(hour=6, minute=0), cron.s())

View file

@ -2,7 +2,6 @@
from glob import glob from glob import glob
from celery.task import task
from django.conf import settings from django.conf import settings
from django.db import transaction from django.db import transaction
from django.db.models import Q from django.db.models import Q
@ -10,6 +9,7 @@ from django.db.models import Q
from item.models import Item from item.models import Item
from item.tasks import update_poster, update_timeline from item.tasks import update_poster, update_timeline
from taskqueue.models import Task from taskqueue.models import Task
from app.celery import app
from . import models from . import models
from . import extract from . import extract
@ -69,7 +69,7 @@ def update_or_create_instance(volume, f):
instance.file.item.update_wanted() instance.file.item.update_wanted()
return instance return instance
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_files(user, volume, files): def update_files(user, volume, files):
user = models.User.objects.get(username=user) user = models.User.objects.get(username=user)
volume, created = models.Volume.objects.get_or_create(user=user, name=volume) volume, created = models.Volume.objects.get_or_create(user=user, name=volume)
@ -101,7 +101,7 @@ def update_files(user, volume, files):
Task.start(i, user) Task.start(i, user)
update_timeline.delay(i.public_id) update_timeline.delay(i.public_id)
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_info(user, info): def update_info(user, info):
user = models.User.objects.get(username=user) user = models.User.objects.get(username=user)
files = models.File.objects.filter(oshash__in=list(info)) files = models.File.objects.filter(oshash__in=list(info))
@ -115,7 +115,7 @@ def update_info(user, info):
Task.start(i, user) Task.start(i, user)
update_timeline.delay(i.public_id) update_timeline.delay(i.public_id)
@task(queue="encoding") @app.task(queue="encoding")
def process_stream(fileId): def process_stream(fileId):
''' '''
process uploaded stream process uploaded stream
@ -141,7 +141,7 @@ def process_stream(fileId):
Task.finish(file.item) Task.finish(file.item)
return True return True
@task(queue="encoding") @app.task(queue="encoding")
def extract_stream(fileId): def extract_stream(fileId):
''' '''
extract stream from direct upload extract stream from direct upload
@ -170,7 +170,7 @@ def extract_stream(fileId):
models.File.objects.filter(id=fileId).update(encoding=False) models.File.objects.filter(id=fileId).update(encoding=False)
Task.finish(file.item) Task.finish(file.item)
@task(queue="encoding") @app.task(queue="encoding")
def extract_derivatives(fileId, rebuild=False): def extract_derivatives(fileId, rebuild=False):
file = models.File.objects.get(id=fileId) file = models.File.objects.get(id=fileId)
streams = file.streams.filter(source=None) streams = file.streams.filter(source=None)
@ -178,7 +178,7 @@ def extract_derivatives(fileId, rebuild=False):
streams[0].extract_derivatives(rebuild) streams[0].extract_derivatives(rebuild)
return True return True
@task(queue="encoding") @app.task(queue="encoding")
def update_stream(id): def update_stream(id):
s = models.Stream.objects.get(pk=id) s = models.Stream.objects.get(pk=id)
if not glob("%s*" % s.timeline_prefix): if not glob("%s*" % s.timeline_prefix):
@ -200,11 +200,11 @@ def update_stream(id):
c.update_calculated_values() c.update_calculated_values()
c.save() c.save()
@task(queue="encoding") @app.task(queue="encoding")
def download_media(item_id, url, referer=None): def download_media(item_id, url, referer=None):
return external.download(item_id, url, referer) return external.download(item_id, url, referer)
@task(queue='default') @app.task(queue='default')
def move_media(data, user): def move_media(data, user):
from changelog.models import add_changelog from changelog.models import add_changelog
from item.models import get_item, Item, ItemSort from item.models import get_item, Item, ItemSort

View file

@ -1,7 +1,7 @@
import ox import ox
from celery.task import task from app.celery import app
@task(queue="encoding") @app.task(queue="encoding")
def extract_fulltext(id): def extract_fulltext(id):
from . import models from . import models
d = models.Document.objects.get(id=id) d = models.Document.objects.get(id=id)
@ -11,7 +11,7 @@ def extract_fulltext(id):
page.update_fulltext() page.update_fulltext()
@task(queue='default') @app.task(queue='default')
def bulk_edit(data, username): def bulk_edit(data, username):
from django.db import transaction from django.db import transaction
from . import models from . import models

View file

@ -1,20 +1,26 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from celery.task import task from app.celery import app
from .models import Event from .models import Event
''' '''
@periodic_task(run_every=crontab(hour=7, minute=30), queue='encoding') from celery.schedules import crontab
@app.task(ignore_results=True, queue='encoding')
def update_all_matches(**kwargs): def update_all_matches(**kwargs):
ids = [e['id'] for e in Event.objects.all().values('id')] ids = [e['id'] for e in Event.objects.all().values('id')]
for i in ids: for i in ids:
e = Event.objects.get(pk=i) e = Event.objects.get(pk=i)
e.update_matches() e.update_matches()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(crontab(hour=7, minute=30), update_all_matches.s())
''' '''
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_matches(eventId): def update_matches(eventId):
event = Event.objects.get(pk=eventId) event = Event.objects.get(pk=eventId)
event.update_matches() event.update_matches()

View file

@ -7,7 +7,8 @@ import os
import random import random
import logging import logging
from celery.task import task, periodic_task from app.celery import app
from celery.schedules import crontab
from django.conf import settings from django.conf import settings
from django.db import connection, transaction from django.db import connection, transaction
from django.db.models import Q from django.db.models import Q
@ -19,14 +20,17 @@ from taskqueue.models import Task
logger = logging.getLogger('pandora.' + __name__) logger = logging.getLogger('pandora.' + __name__)
@app.task(queue='encoding')
@periodic_task(run_every=timedelta(days=1), queue='encoding')
def cronjob(**kwargs): def cronjob(**kwargs):
if limit_rate('item.tasks.cronjob', 8 * 60 * 60): if limit_rate('item.tasks.cronjob', 8 * 60 * 60):
update_random_sort() update_random_sort()
update_random_clip_sort() update_random_clip_sort()
clear_cache.delay() clear_cache.delay()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(timedelta(days=1), cronjob.s())
def update_random_sort(): def update_random_sort():
from . import models from . import models
if list(filter(lambda f: f['id'] == 'random', settings.CONFIG['itemKeys'])): if list(filter(lambda f: f['id'] == 'random', settings.CONFIG['itemKeys'])):
@ -54,7 +58,7 @@ def update_random_clip_sort():
cursor.execute(row) cursor.execute(row)
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_clips(public_id): def update_clips(public_id):
from . import models from . import models
try: try:
@ -63,7 +67,7 @@ def update_clips(public_id):
return return
item.clips.all().update(user=item.user.id) item.clips.all().update(user=item.user.id)
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_poster(public_id): def update_poster(public_id):
from . import models from . import models
try: try:
@ -81,7 +85,7 @@ def update_poster(public_id):
icon=item.icon.name icon=item.icon.name
) )
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_file_paths(public_id): def update_file_paths(public_id):
from . import models from . import models
try: try:
@ -90,7 +94,7 @@ def update_file_paths(public_id):
return return
item.update_file_paths() item.update_file_paths()
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_external(public_id): def update_external(public_id):
from . import models from . import models
try: try:
@ -99,7 +103,7 @@ def update_external(public_id):
return return
item.update_external() item.update_external()
@task(queue="encoding") @app.task(queue="encoding")
def update_timeline(public_id): def update_timeline(public_id):
from . import models from . import models
try: try:
@ -109,7 +113,7 @@ def update_timeline(public_id):
item.update_timeline(async_=False) item.update_timeline(async_=False)
Task.finish(item) Task.finish(item)
@task(queue="encoding") @app.task(queue="encoding")
def rebuild_timeline(public_id): def rebuild_timeline(public_id):
from . import models from . import models
i = models.Item.objects.get(public_id=public_id) i = models.Item.objects.get(public_id=public_id)
@ -117,7 +121,7 @@ def rebuild_timeline(public_id):
s.make_timeline() s.make_timeline()
i.update_timeline(async_=False) i.update_timeline(async_=False)
@task(queue="encoding") @app.task(queue="encoding")
def load_subtitles(public_id): def load_subtitles(public_id):
from . import models from . import models
try: try:
@ -130,7 +134,7 @@ def load_subtitles(public_id):
item.update_facets() item.update_facets()
@task(queue="encoding") @app.task(queue="encoding")
def extract_clip(public_id, in_, out, resolution, format, track=None): def extract_clip(public_id, in_, out, resolution, format, track=None):
from . import models from . import models
try: try:
@ -142,7 +146,7 @@ def extract_clip(public_id, in_, out, resolution, format, track=None):
return False return False
@task(queue="encoding") @app.task(queue="encoding")
def clear_cache(days=60): def clear_cache(days=60):
import subprocess import subprocess
path = os.path.join(settings.MEDIA_ROOT, 'media') path = os.path.join(settings.MEDIA_ROOT, 'media')
@ -156,7 +160,7 @@ def clear_cache(days=60):
subprocess.check_output(cmd) subprocess.check_output(cmd)
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_sitemap(base_url): def update_sitemap(base_url):
from . import models from . import models
sitemap = os.path.abspath(os.path.join(settings.MEDIA_ROOT, 'sitemap.xml.gz')) sitemap = os.path.abspath(os.path.join(settings.MEDIA_ROOT, 'sitemap.xml.gz'))
@ -356,7 +360,7 @@ def update_sitemap(base_url):
f.write(data) f.write(data)
@task(queue='default') @app.task(queue='default')
def bulk_edit(data, username): def bulk_edit(data, username):
from django.db import transaction from django.db import transaction
from . import models from . import models

View file

@ -2,10 +2,15 @@
from datetime import timedelta, datetime from datetime import timedelta, datetime
from celery.task import periodic_task from app.celery import app
from celery.schedules import crontab
from . import models from . import models
@periodic_task(run_every=timedelta(days=1), queue='encoding') @app.task(queue='encoding')
def cronjob(**kwargs): def cronjob(**kwargs):
models.Log.objects.filter(modified__lt=datetime.now()-timedelta(days=30)).delete() models.Log.objects.filter(modified__lt=datetime.now()-timedelta(days=30)).delete()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(timedelta(days=1), cronjob.s())

View file

@ -1,11 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from celery.task import task
from . import models from . import models
from app.celery import app
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_itemsort(id): def update_itemsort(id):
try: try:
p = models.Person.objects.get(pk=id) p = models.Person.objects.get(pk=id)
@ -13,7 +13,7 @@ def update_itemsort(id):
except models.Person.DoesNotExist: except models.Person.DoesNotExist:
pass pass
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_file_paths(id): def update_file_paths(id):
from item.models import Item, ItemFind from item.models import Item, ItemFind
p = models.Person.objects.get(pk=id) p = models.Person.objects.get(pk=id)

View file

@ -1,20 +1,26 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from celery.task import task from app.celery import app
from . import models from . import models
''' '''
@periodic_task(run_every=crontab(hour=6, minute=30), queue='encoding') from celery.schedules import crontab
@app.task(queue='encoding')
def update_all_matches(**kwargs): def update_all_matches(**kwargs):
ids = [p['id'] for p in models.Place.objects.all().values('id')] ids = [p['id'] for p in models.Place.objects.all().values('id')]
for i in ids: for i in ids:
p = models.Place.objects.get(pk=i) p = models.Place.objects.get(pk=i)
p.update_matches() p.update_matches()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(crontab(hour=6, minute=30), update_all_matches.s())
''' '''
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_matches(id): def update_matches(id):
place = models.Place.objects.get(pk=id) place = models.Place.objects.get(pk=id)
place.update_matches() place.update_matches()

View file

@ -1,12 +1,12 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from django.db import connection, transaction from django.db import connection, transaction
from celery.task import task from app.celery import app
import item.models import item.models
from . import extract from . import extract
@task(ignore_results=True, queue='encoding') @app.task(ignore_results=True, queue='encoding')
def get_sequences(public_id): def get_sequences(public_id):
from . import models from . import models
i = item.models.Item.objects.get(public_id=public_id) i = item.models.Item.objects.get(public_id=public_id)

View file

@ -123,6 +123,7 @@ INSTALLED_APPS = (
'django_extensions', 'django_extensions',
'django_celery_results', 'django_celery_results',
'django_celery_beat',
'app', 'app',
'log', 'log',
'annotation', 'annotation',
@ -184,6 +185,9 @@ CACHES = {
} }
} }
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"
AUTH_PROFILE_MODULE = 'user.UserProfile' AUTH_PROFILE_MODULE = 'user.UserProfile'
AUTH_CHECK_USERNAME = True AUTH_CHECK_USERNAME = True
FFMPEG = 'ffmpeg' FFMPEG = 'ffmpeg'
@ -209,6 +213,8 @@ CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json'] CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_BROKER_URL = 'amqp://pandora:box@localhost:5672//pandora' CELERY_BROKER_URL = 'amqp://pandora:box@localhost:5672//pandora'

View file

@ -2,17 +2,22 @@
from datetime import timedelta, datetime from datetime import timedelta, datetime
from celery.task import task, periodic_task
from django.conf import settings from django.conf import settings
from app.utils import limit_rate from app.utils import limit_rate
from app.celery import app
from celery.schedules import crontab
@periodic_task(run_every=timedelta(days=1), queue='encoding') @app.task(queue='encoding')
def cronjob(**kwargs): def cronjob(**kwargs):
if limit_rate('translations.tasks.cronjob', 8 * 60 * 60): if limit_rate('translations.tasks.cronjob', 8 * 60 * 60):
load_translations() load_translations()
@task(ignore_results=True, queue='encoding') @app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(timedelta(days=1), cronjob.s())
@app.task(ignore_results=True, queue='encoding')
def load_translations(): def load_translations():
from .models import load_itemkey_translations, load_translations from .models import load_itemkey_translations, load_translations
load_translations() load_translations()

View file

@ -2,17 +2,22 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from celery.task import periodic_task from app.celery import app
from celery.schedules import crontab
from app.utils import limit_rate from app.utils import limit_rate
from . import models from . import models
@periodic_task(run_every=timedelta(days=1), queue='encoding') @app.task(queue='encoding')
def update_program(**kwargs): def update_program(**kwargs):
if limit_rate('tv.tasks.update_program', 8 * 60 * 60): if limit_rate('tv.tasks.update_program', 8 * 60 * 60):
for c in models.Channel.objects.all(): for c in models.Channel.objects.all():
c.update_program() c.update_program()
old = datetime.now() - timedelta(days=180) old = datetime.now() - timedelta(days=180)
models.Program.objects.filter(created__lt=old).delete() models.Program.objects.filter(created__lt=old).delete()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(timedelta(days=1), update_program.s())

View file

@ -4,17 +4,22 @@ from datetime import timedelta
from itertools import zip_longest from itertools import zip_longest
import json import json
from celery.task import task, periodic_task from celery.schedules import crontab
from app.celery import app
from app.utils import limit_rate from app.utils import limit_rate
from app.models import Settings from app.models import Settings
from .statistics import Statistics from .statistics import Statistics
@periodic_task(run_every=timedelta(hours=1), queue='encoding') @app.task(queue='encoding')
def cronjob(**kwargs): def cronjob(**kwargs):
if limit_rate('user.tasks.cronjob', 30 * 60): if limit_rate('user.tasks.cronjob', 30 * 60):
update_statistics() update_statistics()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(timedelta(hours=1), cronjob.s())
def update_statistics(): def update_statistics():
from . import models from . import models
@ -31,7 +36,7 @@ def update_statistics():
stats.add(u.json()) stats.add(u.json())
Settings.set('statistics', stats) Settings.set('statistics', stats)
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def parse_data(key): def parse_data(key):
from . import models from . import models
try: try:
@ -41,7 +46,7 @@ def parse_data(key):
session_data.parse_data() session_data.parse_data()
session_data.save() session_data.save()
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_numberoflists(username): def update_numberoflists(username):
from . import models from . import models
user = models.User.objects.get(username=username) user = models.User.objects.get(username=username)
@ -51,7 +56,7 @@ def update_numberoflists(username):
numberoflists=user.lists.count() numberoflists=user.lists.count()
) )
@task(ignore_results=True, queue='default') @app.task(ignore_results=True, queue='default')
def update_numberofcollections(username): def update_numberofcollections(username):
from . import models from . import models
user = models.User.objects.get(username=username) user = models.User.objects.get(username=username)

View file

@ -3,6 +3,7 @@ simplejson
chardet chardet
celery<5.0,>4.3 celery<5.0,>4.3
django-celery-results<2 django-celery-results<2
django-celery-beat
django-extensions==2.2.9 django-extensions==2.2.9
gunicorn==20.0.4 gunicorn==20.0.4
html5lib html5lib