From 9340c0e578960da7f406f0037c044909420f68a7 Mon Sep 17 00:00:00 2001 From: j Date: Wed, 15 Dec 2021 19:22:49 +0000 Subject: [PATCH] update to new celery api --- etc/systemd/system/pandora-cron.service | 2 +- pandora/annotation/tasks.py | 8 +++---- pandora/app/celery.py | 8 ------- pandora/app/tasks.py | 9 ++++--- pandora/archive/tasks.py | 18 +++++++------- pandora/document/tasks.py | 6 ++--- pandora/event/tasks.py | 12 +++++++--- pandora/item/tasks.py | 32 ++++++++++++++----------- pandora/log/tasks.py | 9 +++++-- pandora/person/tasks.py | 6 ++--- pandora/place/tasks.py | 12 +++++++--- pandora/sequence/tasks.py | 4 ++-- pandora/settings.py | 6 +++++ pandora/translation/tasks.py | 11 ++++++--- pandora/tv/tasks.py | 9 +++++-- pandora/user/tasks.py | 15 ++++++++---- requirements.txt | 1 + 17 files changed, 103 insertions(+), 65 deletions(-) diff --git a/etc/systemd/system/pandora-cron.service b/etc/systemd/system/pandora-cron.service index 9f1ef157b..7c6b0028d 100644 --- a/etc/systemd/system/pandora-cron.service +++ b/etc/systemd/system/pandora-cron.service @@ -11,7 +11,7 @@ PIDFile=/run/pandora/cron.pid WorkingDirectory=/srv/pandora/pandora ExecStart=/srv/pandora/bin/celery \ -A app beat \ - -s /run/pandora/celerybeat-schedule \ + --scheduler django_celery_beat.schedulers:DatabaseScheduler \ --pidfile /run/pandora/cron.pid \ -l INFO ExecReload=/bin/kill -HUP $MAINPID diff --git a/pandora/annotation/tasks.py b/pandora/annotation/tasks.py index 8054c6e3e..298695c3c 100644 --- a/pandora/annotation/tasks.py +++ b/pandora/annotation/tasks.py @@ -5,12 +5,12 @@ from django.contrib.auth import get_user_model from django.db import transaction import ox -from celery.task import task +from app.celery import app from .models import Annotation -@task(ignore_results=False, queue='default') +@app.task(ignore_results=False, queue='default') def add_annotations(data): from item.models import Item from entity.models import Entity @@ -51,7 +51,7 @@ def add_annotations(data): annotation.item.update_facets() return True -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_item(id, force=False): from item.models import Item from clip.models import Clip @@ -72,7 +72,7 @@ def update_item(id, force=False): a.item.save() -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_annotations(layers, value): items = {} diff --git a/pandora/app/celery.py b/pandora/app/celery.py index 710d0d0e3..c43cc2501 100644 --- a/pandora/app/celery.py +++ b/pandora/app/celery.py @@ -6,16 +6,8 @@ root_dir = os.path.normpath(os.path.abspath(os.path.dirname(__file__))) root_dir = os.path.dirname(root_dir) os.chdir(root_dir) -# set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') app = Celery('pandora') - -# Using a string here means the worker doesn't have to serialize -# the configuration object to child processes. -# - namespace='CELERY' means all celery-related configuration keys -# should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') - -# Load task modules from all registered Django app configs. app.autodiscover_tasks() diff --git a/pandora/app/tasks.py b/pandora/app/tasks.py index 03b267783..51a286e4e 100644 --- a/pandora/app/tasks.py +++ b/pandora/app/tasks.py @@ -2,13 +2,16 @@ import datetime -from celery.task import periodic_task +from app.celery import app from celery.schedules import crontab - -@periodic_task(run_every=crontab(hour=6, minute=0), queue='encoding') +@app.task(queue='encoding') def cron(**kwargs): from django.db import transaction from django.contrib.sessions.models import Session Session.objects.filter(expire_date__lt=datetime.datetime.now()).delete() transaction.commit() + +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task(crontab(hour=6, minute=0), cron.s()) diff --git a/pandora/archive/tasks.py b/pandora/archive/tasks.py index b9f379d2d..9e061b385 100644 --- a/pandora/archive/tasks.py +++ b/pandora/archive/tasks.py @@ -2,7 +2,6 @@ from glob import glob -from celery.task import task from django.conf import settings from django.db import transaction from django.db.models import Q @@ -10,6 +9,7 @@ from django.db.models import Q from item.models import Item from item.tasks import update_poster, update_timeline from taskqueue.models import Task +from app.celery import app from . import models from . import extract @@ -69,7 +69,7 @@ def update_or_create_instance(volume, f): instance.file.item.update_wanted() return instance -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_files(user, volume, files): user = models.User.objects.get(username=user) volume, created = models.Volume.objects.get_or_create(user=user, name=volume) @@ -101,7 +101,7 @@ def update_files(user, volume, files): Task.start(i, user) update_timeline.delay(i.public_id) -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_info(user, info): user = models.User.objects.get(username=user) files = models.File.objects.filter(oshash__in=list(info)) @@ -115,7 +115,7 @@ def update_info(user, info): Task.start(i, user) update_timeline.delay(i.public_id) -@task(queue="encoding") +@app.task(queue="encoding") def process_stream(fileId): ''' process uploaded stream @@ -141,7 +141,7 @@ def process_stream(fileId): Task.finish(file.item) return True -@task(queue="encoding") +@app.task(queue="encoding") def extract_stream(fileId): ''' extract stream from direct upload @@ -170,7 +170,7 @@ def extract_stream(fileId): models.File.objects.filter(id=fileId).update(encoding=False) Task.finish(file.item) -@task(queue="encoding") +@app.task(queue="encoding") def extract_derivatives(fileId, rebuild=False): file = models.File.objects.get(id=fileId) streams = file.streams.filter(source=None) @@ -178,7 +178,7 @@ def extract_derivatives(fileId, rebuild=False): streams[0].extract_derivatives(rebuild) return True -@task(queue="encoding") +@app.task(queue="encoding") def update_stream(id): s = models.Stream.objects.get(pk=id) if not glob("%s*" % s.timeline_prefix): @@ -200,11 +200,11 @@ def update_stream(id): c.update_calculated_values() c.save() -@task(queue="encoding") +@app.task(queue="encoding") def download_media(item_id, url, referer=None): return external.download(item_id, url, referer) -@task(queue='default') +@app.task(queue='default') def move_media(data, user): from changelog.models import add_changelog from item.models import get_item, Item, ItemSort diff --git a/pandora/document/tasks.py b/pandora/document/tasks.py index dfd97d1c6..fcfb5576d 100644 --- a/pandora/document/tasks.py +++ b/pandora/document/tasks.py @@ -1,7 +1,7 @@ import ox -from celery.task import task +from app.celery import app -@task(queue="encoding") +@app.task(queue="encoding") def extract_fulltext(id): from . import models d = models.Document.objects.get(id=id) @@ -11,7 +11,7 @@ def extract_fulltext(id): page.update_fulltext() -@task(queue='default') +@app.task(queue='default') def bulk_edit(data, username): from django.db import transaction from . import models diff --git a/pandora/event/tasks.py b/pandora/event/tasks.py index 234dd5a76..46acdda29 100644 --- a/pandora/event/tasks.py +++ b/pandora/event/tasks.py @@ -1,20 +1,26 @@ # -*- coding: utf-8 -*- -from celery.task import task +from app.celery import app from .models import Event ''' -@periodic_task(run_every=crontab(hour=7, minute=30), queue='encoding') +from celery.schedules import crontab + +@app.task(ignore_results=True, queue='encoding') def update_all_matches(**kwargs): ids = [e['id'] for e in Event.objects.all().values('id')] for i in ids: e = Event.objects.get(pk=i) e.update_matches() + +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task(crontab(hour=7, minute=30), update_all_matches.s()) ''' -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_matches(eventId): event = Event.objects.get(pk=eventId) event.update_matches() diff --git a/pandora/item/tasks.py b/pandora/item/tasks.py index 7bfa24366..a754a7936 100644 --- a/pandora/item/tasks.py +++ b/pandora/item/tasks.py @@ -7,7 +7,8 @@ import os import random import logging -from celery.task import task, periodic_task +from app.celery import app +from celery.schedules import crontab from django.conf import settings from django.db import connection, transaction from django.db.models import Q @@ -19,14 +20,17 @@ from taskqueue.models import Task logger = logging.getLogger('pandora.' + __name__) - -@periodic_task(run_every=timedelta(days=1), queue='encoding') +@app.task(queue='encoding') def cronjob(**kwargs): if limit_rate('item.tasks.cronjob', 8 * 60 * 60): update_random_sort() update_random_clip_sort() clear_cache.delay() +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task(timedelta(days=1), cronjob.s()) + def update_random_sort(): from . import models if list(filter(lambda f: f['id'] == 'random', settings.CONFIG['itemKeys'])): @@ -54,7 +58,7 @@ def update_random_clip_sort(): cursor.execute(row) -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_clips(public_id): from . import models try: @@ -63,7 +67,7 @@ def update_clips(public_id): return item.clips.all().update(user=item.user.id) -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_poster(public_id): from . import models try: @@ -81,7 +85,7 @@ def update_poster(public_id): icon=item.icon.name ) -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_file_paths(public_id): from . import models try: @@ -90,7 +94,7 @@ def update_file_paths(public_id): return item.update_file_paths() -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_external(public_id): from . import models try: @@ -99,7 +103,7 @@ def update_external(public_id): return item.update_external() -@task(queue="encoding") +@app.task(queue="encoding") def update_timeline(public_id): from . import models try: @@ -109,7 +113,7 @@ def update_timeline(public_id): item.update_timeline(async_=False) Task.finish(item) -@task(queue="encoding") +@app.task(queue="encoding") def rebuild_timeline(public_id): from . import models i = models.Item.objects.get(public_id=public_id) @@ -117,7 +121,7 @@ def rebuild_timeline(public_id): s.make_timeline() i.update_timeline(async_=False) -@task(queue="encoding") +@app.task(queue="encoding") def load_subtitles(public_id): from . import models try: @@ -130,7 +134,7 @@ def load_subtitles(public_id): item.update_facets() -@task(queue="encoding") +@app.task(queue="encoding") def extract_clip(public_id, in_, out, resolution, format, track=None): from . import models try: @@ -142,7 +146,7 @@ def extract_clip(public_id, in_, out, resolution, format, track=None): return False -@task(queue="encoding") +@app.task(queue="encoding") def clear_cache(days=60): import subprocess path = os.path.join(settings.MEDIA_ROOT, 'media') @@ -156,7 +160,7 @@ def clear_cache(days=60): subprocess.check_output(cmd) -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_sitemap(base_url): from . import models sitemap = os.path.abspath(os.path.join(settings.MEDIA_ROOT, 'sitemap.xml.gz')) @@ -356,7 +360,7 @@ def update_sitemap(base_url): f.write(data) -@task(queue='default') +@app.task(queue='default') def bulk_edit(data, username): from django.db import transaction from . import models diff --git a/pandora/log/tasks.py b/pandora/log/tasks.py index 768fd7310..bdd9cd1f1 100644 --- a/pandora/log/tasks.py +++ b/pandora/log/tasks.py @@ -2,10 +2,15 @@ from datetime import timedelta, datetime -from celery.task import periodic_task +from app.celery import app +from celery.schedules import crontab from . import models -@periodic_task(run_every=timedelta(days=1), queue='encoding') +@app.task(queue='encoding') def cronjob(**kwargs): models.Log.objects.filter(modified__lt=datetime.now()-timedelta(days=30)).delete() + +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task(timedelta(days=1), cronjob.s()) diff --git a/pandora/person/tasks.py b/pandora/person/tasks.py index a94ed2737..4dcb7bad8 100644 --- a/pandora/person/tasks.py +++ b/pandora/person/tasks.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- -from celery.task import task from . import models +from app.celery import app -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_itemsort(id): try: p = models.Person.objects.get(pk=id) @@ -13,7 +13,7 @@ def update_itemsort(id): except models.Person.DoesNotExist: pass -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_file_paths(id): from item.models import Item, ItemFind p = models.Person.objects.get(pk=id) diff --git a/pandora/place/tasks.py b/pandora/place/tasks.py index 3feb88dd1..96926528f 100644 --- a/pandora/place/tasks.py +++ b/pandora/place/tasks.py @@ -1,20 +1,26 @@ # -*- coding: utf-8 -*- -from celery.task import task +from app.celery import app from . import models ''' -@periodic_task(run_every=crontab(hour=6, minute=30), queue='encoding') +from celery.schedules import crontab + +@app.task(queue='encoding') def update_all_matches(**kwargs): ids = [p['id'] for p in models.Place.objects.all().values('id')] for i in ids: p = models.Place.objects.get(pk=i) p.update_matches() + +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task(crontab(hour=6, minute=30), update_all_matches.s()) ''' -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_matches(id): place = models.Place.objects.get(pk=id) place.update_matches() diff --git a/pandora/sequence/tasks.py b/pandora/sequence/tasks.py index b90c0d5a4..0747bf8c8 100644 --- a/pandora/sequence/tasks.py +++ b/pandora/sequence/tasks.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- from django.db import connection, transaction -from celery.task import task +from app.celery import app import item.models from . import extract -@task(ignore_results=True, queue='encoding') +@app.task(ignore_results=True, queue='encoding') def get_sequences(public_id): from . import models i = item.models.Item.objects.get(public_id=public_id) diff --git a/pandora/settings.py b/pandora/settings.py index 8f97c89c3..67412f9cf 100644 --- a/pandora/settings.py +++ b/pandora/settings.py @@ -123,6 +123,7 @@ INSTALLED_APPS = ( 'django_extensions', 'django_celery_results', + 'django_celery_beat', 'app', 'log', 'annotation', @@ -184,6 +185,9 @@ CACHES = { } } +DEFAULT_AUTO_FIELD = "django.db.models.AutoField" + + AUTH_PROFILE_MODULE = 'user.UserProfile' AUTH_CHECK_USERNAME = True FFMPEG = 'ffmpeg' @@ -209,6 +213,8 @@ CELERY_RESULT_BACKEND = 'django-db' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TIMEZONE = 'UTC' +CELERY_ENABLE_UTC = True CELERY_BROKER_URL = 'amqp://pandora:box@localhost:5672//pandora' diff --git a/pandora/translation/tasks.py b/pandora/translation/tasks.py index 8b0be30b2..9e9143d7a 100644 --- a/pandora/translation/tasks.py +++ b/pandora/translation/tasks.py @@ -2,17 +2,22 @@ from datetime import timedelta, datetime -from celery.task import task, periodic_task from django.conf import settings from app.utils import limit_rate +from app.celery import app +from celery.schedules import crontab -@periodic_task(run_every=timedelta(days=1), queue='encoding') +@app.task(queue='encoding') def cronjob(**kwargs): if limit_rate('translations.tasks.cronjob', 8 * 60 * 60): load_translations() -@task(ignore_results=True, queue='encoding') +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task(timedelta(days=1), cronjob.s()) + +@app.task(ignore_results=True, queue='encoding') def load_translations(): from .models import load_itemkey_translations, load_translations load_translations() diff --git a/pandora/tv/tasks.py b/pandora/tv/tasks.py index 20a9df943..35dcf4163 100644 --- a/pandora/tv/tasks.py +++ b/pandora/tv/tasks.py @@ -2,17 +2,22 @@ from datetime import datetime, timedelta -from celery.task import periodic_task +from app.celery import app +from celery.schedules import crontab from app.utils import limit_rate from . import models -@periodic_task(run_every=timedelta(days=1), queue='encoding') +@app.task(queue='encoding') def update_program(**kwargs): if limit_rate('tv.tasks.update_program', 8 * 60 * 60): for c in models.Channel.objects.all(): c.update_program() old = datetime.now() - timedelta(days=180) models.Program.objects.filter(created__lt=old).delete() + +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task(timedelta(days=1), update_program.s()) diff --git a/pandora/user/tasks.py b/pandora/user/tasks.py index c1da304c7..ad3869c86 100644 --- a/pandora/user/tasks.py +++ b/pandora/user/tasks.py @@ -4,17 +4,22 @@ from datetime import timedelta from itertools import zip_longest import json -from celery.task import task, periodic_task +from celery.schedules import crontab +from app.celery import app from app.utils import limit_rate from app.models import Settings from .statistics import Statistics -@periodic_task(run_every=timedelta(hours=1), queue='encoding') +@app.task(queue='encoding') def cronjob(**kwargs): if limit_rate('user.tasks.cronjob', 30 * 60): update_statistics() +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task(timedelta(hours=1), cronjob.s()) + def update_statistics(): from . import models @@ -31,7 +36,7 @@ def update_statistics(): stats.add(u.json()) Settings.set('statistics', stats) -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def parse_data(key): from . import models try: @@ -41,7 +46,7 @@ def parse_data(key): session_data.parse_data() session_data.save() -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_numberoflists(username): from . import models user = models.User.objects.get(username=username) @@ -51,7 +56,7 @@ def update_numberoflists(username): numberoflists=user.lists.count() ) -@task(ignore_results=True, queue='default') +@app.task(ignore_results=True, queue='default') def update_numberofcollections(username): from . import models user = models.User.objects.get(username=username) diff --git a/requirements.txt b/requirements.txt index 0860ebfb3..66f20ff05 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ simplejson chardet celery<5.0,>4.3 django-celery-results<2 +django-celery-beat django-extensions==2.2.9 gunicorn==20.0.4 html5lib