phantasmobile/app/item/tasks.py

91 lines
3 KiB
Python

import os
import tempfile
import requests
from celery.schedules import crontab
from django.conf import settings
from django.utils import timezone
from ..signalbot import rpc
from ..telegrambot import rpc as telegram_rpc
from ..celery import app
from . import models
@app.task(queue="default", ignore_results=True)
def announce_items():
if not getattr(settings, 'SIGNAL_ANNOUNCE_GROUP'):
return
now = timezone.now()
qs = models.Item.objects.exclude(published=None).filter(announced=None).filter(published__lte=now).order_by('published')
for item in qs:
item.announced = now
link = settings.BASE_URL + item.get_absolute_url()
message = link
image = item.data.get('thumbnail')
description = ""
if image:
f = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False)
r = requests.get(image)
f.write(r.content)
f.close()
image = f.name
if getattr(settings, "SIGNAL_ANNOUNCE_GROUP"):
r = rpc.send(
message,
group=settings.SIGNAL_ANNOUNCE_GROUP,
preview_url=link,
preview_title=item.title,
preview_image=image,
preview_description=description,
)
if getattr(settings, "TELEGRAM_ANNOUNCE_CHANNEL"):
message = f'{item.title}\n<a href="{link}">{link}</a>'
r = telegram_rpc.send(
message,
channel=settings.TELEGRAM_ANNOUNCE_CHANNEL,
preview_image=image,
)
item.save()
if image:
os.unlink(f.name)
@app.task(queue="default", ignore_results=True)
def notify_telegram(message):
if getattr(settings, "TELEGRAM_ANNOUNCE_CHANNEL"):
r = telegram_rpc.send(
message,
channel=settings.TELEGRAM_ANNOUNCE_CHANNEL
)
@app.task(queue="default", ignore_results=True)
def notify_moderators(id, link):
comment = models.Comment.objects.filter(id=id).first()
if comment:
message = "%s commented on %s (%s)\n\n%s" % (
comment.name, comment.item.title, link, comment.text
)
r = rpc.send(message, group=settings.SIGNAL_MODERATORS_GROUP)
if r and "timestamp" in r:
comment.data["moderator_ts"] = r["timestamp"]
comment.save()
if comment.published:
account = settings.SIGNAL_ACCOUNT
group = settings.SIGNAL_MODERATORS_GROUP
rpc.send_reaction(
account, comment.data["moderator_ts"], "🎉", group=group
)
msg = '%s commented on <b>%s</b> at <a href="%s">%s</a>' % (
comment.name, comment.item.title, link, link
)
notify_telegram.delay(msg)
else:
print("failed to notify", r)
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(crontab(minute="*/2"), announce_items.s())