use python api, add option to join translations, use turbo model

This commit is contained in:
j 2025-01-24 08:56:51 +05:30
parent d91c28db0a
commit 8e2ef4aab6
3 changed files with 164 additions and 31 deletions

View file

@ -34,7 +34,7 @@ in a terminal run
copy systemd/service/pandora-transcribe.service to /etc/systemd/system/pandora-transcribe.service and
systemctl enable --now andora-transcribe.service
systemctl enable --now pandora-transcribe.service

View file

@ -23,6 +23,8 @@ class Command(BaseCommand):
default='', help='list of languages to translate: (i.e. hi:en,de:en')
parser.add_argument('--gpu', action='store_true', dest='gpu',
default=False, help='user GPU (default: disabled)')
parser.add_argument('--join-sentences', action='store_true', dest='join_sentences',
default=False, help='make larger annotations (default: disabled)')
def handle(self, **kwargs):
transcribe.main(**kwargs)

View file

@ -20,7 +20,128 @@ from user.models import User
logger = logging.getLogger(__name__)
def extract_subtitles(item, user, layer, translate, gpu=False):
def prepare_annotations(result, join_sentences=False):
if join_sentences:
return prepare_joint_annotations(result)
annotations = []
for segments in result["segments"]:
annotations.append(
{
"in": segment["start"],
"end": segment["end"] + 0.3,
"value": segment["text"],
}
)
return annotations
def prepare_joint_annotations(result, target_length=200):
abbrevs = ["Mr.", "Mrs.", "Dr."]
ignore = []
phrase_sounds = []
segments = result["segments"]
all_words = []
for s in segments:
all_words.extend(s["words"])
new_segs = []
sentence = ""
for w in all_words:
if not w == all_words[-1]:
next_w = all_words[all_words.index(w) + 1]
else:
# w is last word
if sentence == "" and w["text"] in ignore:
continue
if sentence == "":
in_ = w["start"]
# 0th word of a sentence
if w["text"] in ignore and next_w["text"][0].isupper():
continue
if sentence == "The music " and next_w["text"][0] == "The":
sentence = ""
continue
sentence += w["text"] + " "
# if this is a short sentence and next word starts less than 1 sec away
# and not last word of entire text
if (
w["text"].endswith(".")
and w != all_words[-1]
and (next_w["start"] - w["end"]) < 0.8
and len(sentence) < target_length
and next_w["text"] not in ignore
):
# then do not end this sentence yet
continue
if (
w["text"].endswith(".") and w["text"] not in abbrevs and len(w["text"]) > 2
) or (
w["text"] in ignore
and sentence.strip() == w["text"]
and (w == all_words[-1] or next_w["text"][0].isupper())
):
# end the sentence, delay end a bit
out_ = w["end"] + 0.3
sentence_dict = {"in": in_, "out": out_, "value": sentence.strip()}
new_segs.append(sentence_dict)
sentence = ""
annotations = list(filter(lambda i: i["value"].strip() not in ignore, new_segs))
return annotations
def extract_subtitles(item, user, layer, translate, gpu=False, join_sentences=False):
if "language" not in item.data:
language = None
else:
language = ox.iso.langTo2Code(item.data["language"][0])
if not item.streams():
logger.error("skip item without media %s: %s", item.public_id)
return False
src = item.streams()[0].media.path
run_py = os.path.join(os.path.dirname(os.path.abspath(__file__)), "run_whisper.py")
cmd = ["/opt/whisper-timestamped/bin/python", run_py]
if language:
cmd += ["--language", language]
if translate and language in translate:
cmd += ["--translate"]
language = "en"
cmd += [src]
try:
response = subprocess.check_output(cmd)
except:
logger.error(
"failed to extract subtitles from item %s\n%s", item.public_id, cmd
)
return False
response = json.load(response)
annotations = prepare_annotations(response, join_sentences=join_sentences)
if not annotations:
return False
if language and language != "en":
for annotation in annotations:
annotation["value"] = '<span lang="%s">%s</span>' % (
language,
annotation["value"],
)
tasks.add_annotations.delay(
{
"item": item.public_id,
"layer": layer,
"user": user.username,
"annotations": annotations,
}
)
return True
def extract_subtitles_cmd(item, user, layer, translate, gpu=False):
if "language" not in item.data:
language = None
else:
@ -31,33 +152,32 @@ def extract_subtitles(item, user, layer, translate, gpu=False):
src = item.streams()[0].media.path
tmp = tempfile.mkdtemp()
cmd = [
"/opt/whisper-timestamped/bin/whisper_timestamped",
]
cmd = ["/opt/whisper-timestamped/bin/whisper_timestamped", "--model", "turbo"]
if language:
cmd += [
"--language", language
]
cmd += ["--language", language]
if translate and language in translate:
cmd += [
'--task', 'translate'
]
language = 'en'
cmd += ["--task", "translate"]
language = "en"
if not gpu:
cmd += [
"--fp16", "False",
"--fp16",
"False",
]
cmd += [
"-f", "srt",
"-f",
"srt",
"--accurate",
"--output_dir", tmp,
"--output_dir",
tmp,
src,
]
try:
subprocess.check_output(cmd)
except:
logger.error("failed to extract subtitles from item %s\n%s", item.public_id, cmd)
logger.error(
"failed to extract subtitles from item %s\n%s", item.public_id, cmd
)
shutil.rmtree(tmp)
return False
annotations = []
@ -70,28 +190,33 @@ def extract_subtitles(item, user, layer, translate, gpu=False):
return True
if language and language != "en":
for annotation in annotations:
annotation["value"] = '<span lang="%s">%s</span>' % (language, annotation["value"])
annotation["value"] = '<span lang="%s">%s</span>' % (
language,
annotation["value"],
)
tasks.add_annotations.delay({
'item': item.public_id,
'layer': layer,
'user': user.username,
'annotations': annotations
})
tasks.add_annotations.delay(
{
"item": item.public_id,
"layer": layer,
"user": user.username,
"annotations": annotations,
}
)
shutil.rmtree(tmp)
return True
def main(**kwargs):
user = User.objects.get(username=kwargs['user'])
queue = List.objects.get(user=user, name=kwargs['queue'])
done = List.objects.get(user=user, name=kwargs['done'])
user = User.objects.get(username=kwargs["user"])
queue = List.objects.get(user=user, name=kwargs["queue"])
done = List.objects.get(user=user, name=kwargs["done"])
layer = kwargs.get("layer")
translate = kwargs.get("translate")
if translate:
translate = dict([tt.split(':') for tt in translate.split(',')])
translate = dict([tt.split(":") for tt in translate.split(",")])
if not layer:
layer = utils.get_by_key(settings.CONFIG['layers'], 'isSubtitles', True)
layer = utils.get_by_key(settings.CONFIG["layers"], "isSubtitles", True)
if layer:
layer = layer["id"]
else:
@ -101,12 +226,18 @@ def main(**kwargs):
while True:
wait = True
for item in queue.get_items(queue.user).all():
if extract_subtitles(item, user, layer, translate, kwargs.get("gpu")):
if extract_subtitles(
item,
user,
layer,
translate,
kwargs.get("gpu"),
join_sentences=kwargs.get("join_sentences"),
):
done.items.add(item)
queue.items.remove(item)
wait = False
if wait:
time.sleep(5*60)
time.sleep(5 * 60)
except KeyboardInterrupt:
pass