import logging import os import shutil import signal import subprocess import tempfile import time import ox import ox.iso from django.conf import settings from annotation import tasks from item import utils from itemlist.models import List from item.models import Item from user.models import User logger = logging.getLogger(__name__) def extract_subtitles(item, user, layer, translate, gpu=False): if "language" not in item.data: language = None else: language = ox.iso.langTo2Code(item.data["language"][0]) if not item.streams(): logger.error("skip item without media %s: %s", item.public_id) return False src = item.streams()[0].media.path tmp = tempfile.mkdtemp() cmd = [ "/opt/whisper-timestamped/bin/whisper_timestamped", ] if language: cmd += [ "--language", language ] if translate and language in translate: cmd += [ '--task', 'translate' ] language = 'en' if not gpu: cmd += [ "--fp16", "False", ] cmd += [ "-f", "srt", "--accurate", "--output_dir", tmp, src, ] try: subprocess.check_output(cmd) except: logger.error("failed to extract subtitles from item %s\n%s", item.public_id, cmd) shutil.rmtree(tmp) return False annotations = [] for f in os.listdir(tmp): if f.endswith(".srt") and "words.srt" not in f: srt = os.path.join(tmp, f) annotations = ox.srt.load(srt) if not annotations: logger.error("no subtitles detected %s", item.public_id) return True if language and language != "en": for annotation in annotations: annotation["value"] = '%s' % (language, annotation["value"]) tasks.add_annotations.delay({ 'item': item.public_id, 'layer': layer, 'user': user.username, 'annotations': annotations }) shutil.rmtree(tmp) return True def main(**kwargs): user = User.objects.get(username=kwargs['user']) queue = List.objects.get(user=user, name=kwargs['queue']) done = List.objects.get(user=user, name=kwargs['done']) layer = kwargs.get("layer") translate = kwargs.get("translate") if translate: translate = dict([tt.split(':') for tt in translate.split(',')]) if not layer: layer = utils.get_by_key(settings.CONFIG['layers'], 'isSubtitles', True) if layer: layer = layer["id"] else: logger.error("no layer defined and config has no subtitle layer") return try: while True: wait = True for item in queue.get_items(queue.user).all(): if extract_subtitles(item, user, layer, translate, kwargs.get("gpu")): queue.items.remove(item) done.items.remove(item) wait = False if wait: time.sleep(5*60) except KeyboardInterrupt: pass