commit 1b5de1882a0c9c6573410ab16b41c0bfbaa51f3b Author: j Date: Sun Jul 7 15:32:37 2024 +0100 pandora_transcribe diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/README.md b/README.md new file mode 100644 index 0000000..633507c --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +# pandora_transcribe + +use whisper_timestamped to add automatic transcriptions to pan.do/ra + +## installation + + cd /srv/pandora/pandora + git clone https://code.0x2620.org/0x2620/pandora_transcribe transcribe + +add "transcribe" to LOCAL_APPS in local_setttings.py + + +## configuration + + add a user called subtitles and create 2 lists for that user: Queue and Transcribed + alternatively see options for `./manage.py transcribe` to use another user or list name + +## run + + in a terminal run + ./manage.py transcribe + + +## install a service + +copy systemd/service/pandora-transcribe.service to /etc/systemd/system/pandora-transcribe.service and + + systemctl enable --now andora-transcribe.service + + + diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps.py b/apps.py new file mode 100644 index 0000000..65031ce --- /dev/null +++ b/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class WhisperConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = 'transcribe' diff --git a/management/commands/transcribe.py b/management/commands/transcribe.py new file mode 100644 index 0000000..6cf3a2e --- /dev/null +++ b/management/commands/transcribe.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- + +from django.core.management.base import BaseCommand +from django.conf import settings +from django.db import transaction + +import app.monkey_patch +from ... import transcribe + +class Command(BaseCommand): + help = 'transcribe items with whisper_timestamped' + + def add_arguments(self, parser): + parser.add_argument('--user', type=str, dest='user', + default='subtitles', help='user for subtitles (default: subtitles)') + parser.add_argument('--queue', type=str, dest='queue', + default='Queue', help='name of incoming list (default: Queue)') + parser.add_argument('--done', type=str, dest='done', + default='Transcribed', help='name of incoming list (default: Transcribed)') + parser.add_argument('--layer', type=str, dest='layer', + default=None, help='import into layer (default subtitle layer)') + parser.add_argument('--translate', type=str, dest='translate', + default='', help='list of languages to translate: (i.e. hi:en,de:en') + parser.add_argument('--gpu', action='store_true', dest='gpu', + default=False, help='user GPU (default: disabled)') + + def handle(self, **kwargs): + transcribe.main(**kwargs) diff --git a/transcribe.py b/transcribe.py new file mode 100644 index 0000000..9793553 --- /dev/null +++ b/transcribe.py @@ -0,0 +1,109 @@ +import logging +import os +import shutil +import signal +import subprocess +import tempfile + +import ox +import ox.iso + +from django.conf import settings + +from annotation import tasks +from item.models import Item +from itemlist.models import List +from item import utils + + +logger = logging.getLogger(__name__) + +def extract_subtitles(item, user, layer, translate, gpu=False): + if "language" not in item.data: + logger.error("skip item without language %s", item.public_id) + return False + language = ox.iso.langTo2Code(item.data["language"][0]) + if not language: + logger.error("skip item with unknown language %s: %s", item.public_id, item.data["language"]) + return False + if not item.streams(): + logger.error("skip item without media %s: %s", item.public_id) + return False + src = item.streams()[0].media.path + + tmp = tempfile.mkdtemp() + cmd = [ + "/opt/whisper-timestamped/bin/whisper_timestamped", + "--language", language, + ] + if translate and language in translate: + cmd += [ + '--task', 'translate' + ] + if not gpu: + cmd += [ + "--fp16", "False", + ] + + cmd += [ + "-f", "srt", + "--accurate", + "--output_dir", tmp, + src, + ] + try: + subprocess.check_output(cmd) + except: + logger.error("failed to extract subtitles from item %s\n%s", item.public_id, cmd) + shutil.rmtree(tmp) + return False + annotations = [] + for f in os.listdir(tmp): + if f.endswith(".srt") and "words.srt" not in f: + srt = os.path.join(tmp, f) + annotations = ox.srt.load(srt) + if not annotations: + logger.error("no subtitles detected %s", item.public_id) + return True + if language != "en": + for annotation in annotations: + annotation["value"] = '%s' % (language, annotation["value"]) + + tasks.add_annotations.delay({ + 'item': item.public_id, + 'layer': layer, + 'user': user.username, + 'annotations': annotations + }) + shutil.rmtree(tmp) + return True + + +def main(**kwargs): + queue = List.objects.get(kwargs['queue']) + done = List.objects.get(kwargs['done']) + user = User.objects.get(kwargs['user']) + layer = kwargs.get("layer") + translate = kwargs.get("translate") + if translate: + translate = dict([tt.split(':') for tt in translate.split(',')]) + if not layer: + layer = utils.get_by_key(settings.CONFIG['layers'], 'isSubtitles', True) + if layer: + layer = layer["id"] + else: + logger.error("no layer defined and config has no subtitle layer") + return + try: + while True: + wait = True + for item in queue.get_items(queue.user): + if extract_subtitles(item, user, layer, translate, kwargs.get("gpu")): + queue.items.remove(item) + done.items.remove(item) + wait = False + if wait: + time.sleep(5*60) + except KeyboardInterrupt: + pass +