phantasmobile/app/item/tasks.py
2023-08-20 21:10:19 +02:00

78 lines
2.6 KiB
Python

import os
import tempfile
import requests
from celery.schedules import crontab
from django.conf import settings
from django.utils import timezone
from ..signalbot import rpc
from ..telegrambot import rpc as telegram_rpc
from ..celery import app
from . import models
@app.task(queue="default", ignore_results=True)
def announce_items():
if not getattr(settings, 'SIGNAL_ANNOUNCE_GROUP'):
return
now = timezone.now()
qs = models.Item.objects.exclude(published=None).filter(announced=None).filter(published__lte=now).order_by('published')
for item in qs:
item.announced = now
link = settings.BASE_URL + item.get_absolute_url()
message = link
image = item.data.get('thumbnail')
description = ""
if image:
f = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False)
r = requests.get(image)
f.write(r.content)
f.close()
image = f.name
if getattr(settings, "SIGNAL_ANNOUNCE_GROUP"):
r = rpc.send(
message,
group=settings.SIGNAL_ANNOUNCE_GROUP,
preview_url=link,
preview_title=item.title,
preview_image=image,
preview_description=description,
)
if getattr(settings, "TELEGRAM_ANNOUNCE_CHANNEL"):
message = f'{item.title}<br><a href="{link}">{link}</a>'
r = telegram_rpc.send(
message,
channel=settings.TELEGRAM_ANNOUNCE_CHANNEL,
preview_image=image,
)
item.save()
if image:
os.unlink(f.name)
@app.task(queue="default", ignore_results=True)
def notify_moderators(id, link):
comment = models.Comment.objects.filter(id=id).first()
if comment:
message = "%s commented on %s (%s)\n\n%s" % (
comment.name, comment.item.title, link, comment.text
)
r = rpc.send(message, group=settings.SIGNAL_MODERATORS_GROUP)
if r and "timestamp" in r:
comment.data["moderator_ts"] = r["timestamp"]
comment.save()
if comment.published:
account = settings.SIGNAL_ACCOUNT
group = settings.SIGNAL_MODERATORS_GROUP
rpc.send_reaction(
account, comment.data["moderator_ts"], "🎉", group=group
)
else:
print("failed to notify", r)
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(crontab(minute="*/2"), announce_items.s())