pandora_p_for_power/generate.py

1671 lines
56 KiB
Python
Raw Normal View History

from pathlib import Path
import hashlib
import math
import os
import time
2026-01-24 13:26:30 +01:00
import shutil
import subprocess
import cv2
import ox
import requests
import fal_client
from byteplussdkarkruntime import Ark
from django.conf import settings
from item.models import Item
2026-01-15 17:02:34 +00:00
from document.models import Document
from archive.models import File, Stream
2026-01-26 09:23:47 +01:00
import itemlist.models
os.environ["FAL_KEY"] = settings.FAL_KEY
MAX_DURATION = 12
headers = {
"Authorization": "Bearer " + settings.BYTEPLUSE_TOKEN,
"Content-Type": "application/json",
}
def public_url(path):
return path.replace("/srv/pandora/static/", settings.PUBLIC_URL + "static/")
2026-01-24 13:26:30 +01:00
2026-01-15 16:39:54 +00:00
def public_document_url(document):
url = "%sdocuments/%s/source.%s?token=%s" % (
settings.PUBLIC_URL,
2026-01-26 09:23:47 +01:00
document.get_id(),
2026-01-15 16:39:54 +00:00
document.extension,
settings.PUBLIC_TOKEN,
)
return url
2026-01-24 13:26:30 +01:00
def public_video_url(item):
url = "%s%s/download/source/?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
settings.PUBLIC_TOKEN,
)
return url
2026-01-24 13:26:30 +01:00
def public_frame_url(item, position):
image = "%s%s/source%s.png?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
position,
settings.PUBLIC_TOKEN,
)
return image
def trim_video(src, dst, frames, start0=False):
cap = cv2.VideoCapture(src)
fps = cap.get(cv2.CAP_PROP_FPS)
frames_src = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = 0
offset = int((frames_src - frames) / 2)
if start0:
offset = 0
print(frames_src, frames, offset)
fourcc = cv2.VideoWriter_fourcc(*"avc1")
out = cv2.VideoWriter(dst, fourcc, fps, (width, height))
written = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count < offset:
continue
if frame_count >= (frames + offset):
continue
out.write(frame)
written += 1
out.release()
cap.release()
2026-01-26 09:23:47 +01:00
def make_single_character_image(character):
character = get_character_document(character, type='Character')
character_url = public_document_url(character)
data = {
"model": "seedream-4-5-251128",
"size": "2K",
"watermark": False,
'image': character_url,
"prompt": "character from image 1 is standing straight up, full body portrait from head to toe. face, clothing, skin are photo realistic, camera facing straight at character. background is white"
}
url = bytedance_image_generation(data)
extension = url.split(".")[-1].split("?")[0]
if extension == "jpeg":
extension = "jpg"
file = Document(user=character.user)
file.rightslevel = 2
2026-01-26 09:23:47 +01:00
file.data["title"] = character.data['title'].replace('Character', 'Single Character')
file.extension = extension
file.width = -1
file.pages = -1
file.uploading = True
file.save()
file.uploading = True
name = "data.%s" % file.extension
file.file.name = file.path(name)
ox.net.save_url(url, file.file.path, overwrite=True)
file.get_info()
file.get_ratio()
file.oshash = ox.oshash(file.file.path)
file.save()
file.update_sort()
return file
def make_single_character_image_flux(character):
character = get_character_document(character, type='Character')
character_url = public_document_url(character)
prompt = 'character from @image 1 is standing straight up, full body portrait from head to toe. face, clothing, skin are photo realistic, camera facing straight at character. background is white'
url = flux_edit_image([character_url], prompt)
extension = url.split(".")[-1].split("?")[0]
if extension == "jpeg":
extension = "jpg"
file = Document(user=character.user)
file.rightslevel = 2
2026-01-26 09:23:47 +01:00
file.data["title"] = character.data['title'].replace('Character', 'FLUX Single Character')
file.extension = extension
file.width = -1
file.pages = -1
file.uploading = True
file.save()
file.uploading = True
name = "data.%s" % file.extension
file.file.name = file.path(name)
ox.net.save_url(url, file.file.path, overwrite=True)
file.get_info()
file.get_ratio()
file.oshash = ox.oshash(file.file.path)
file.save()
file.update_sort()
return file
def bytedance_task(data):
url = "https://ark.ap-southeast.bytepluses.com/api/v3/contents/generations/tasks"
model = "seedance-1-5-pro-251215"
resolution = "720p"
defaults = {
"model": model,
"generate_audio": False,
"ratio": "16:9",
"watermark": False,
"resolution": resolution,
"camera_fixed": True,
"return_last_frame": True,
}
for key, value in defaults.items():
if key not in data:
data[key] = value
2026-01-24 13:26:30 +01:00
if data["model"] in EP:
data["model"] = EP[data["model"]]
print(data)
r = requests.post(url, headers=headers, json=data).json()
print(r)
task_id = r["id"]
status = requests.get(url + "/" + task_id, headers=headers).json()
while status["status"] in ("queued", "running", "cancelled"):
time.sleep(10)
status = requests.get(url + "/" + task_id, headers=headers).json()
print(status)
return status
def bytedance_response(data):
url = "https://ark.ap-southeast.bytepluses.com/api/v3/responses"
defaults = {"model": "seed-1-8-251228"}
for key, value in defaults.items():
if key not in data:
data[key] = value
print(data)
2026-01-24 13:26:30 +01:00
if data["model"] in EP:
data["model"] = EP[data["model"]]
response = requests.post(url, headers=headers, json=data).json()
print(response)
return response
def t2v_bytedance(prompt, duration, output):
nduration = max(4, int(math.ceil(duration)))
data = {
"duration": nduration,
"content": [{"type": "text", "text": prompt}],
}
status = bytedance_task(data)
output_url = status["content"]["video_url"]
ox.net.save_url(output_url, output, overwrite=True)
if "last_frame_url" in status["content"]:
ox.net.save_url(
status["content"]["last_frame_url"],
output + ".last_frame.png",
overwrite=True,
)
return status
2026-01-24 13:26:30 +01:00
def i2v_bytedance(first_frame, prompt, duration, output, last_frame=None, seed=None):
2026-01-15 16:39:54 +00:00
nduration = max(4, int(math.ceil(duration)))
data = {
"duration": nduration,
"content": [
{
"type": "text",
"text": prompt,
},
{
"type": "image_url",
"role": "first_frame",
"image_url": {"url": first_frame},
},
],
}
if last_frame:
2026-01-24 13:26:30 +01:00
data["content"].append(
{
"type": "image_url",
"role": "last_frame",
"image_url": {"url": last_frame},
}
)
if seed:
data["seed"] = seed
2026-01-15 16:39:54 +00:00
status = bytedance_task(data)
output_url = status["content"]["video_url"]
ox.net.save_url(output_url, output, overwrite=True)
if "last_frame_url" in status["content"]:
ox.net.save_url(
status["content"]["last_frame_url"],
output + ".last_frame.png",
overwrite=True,
)
return status
2026-01-24 13:26:30 +01:00
def first_last(first_frame, last_frame, prompt, duration, output):
nduration = max(4, int(math.ceil(duration)))
data = {
"duration": nduration,
"content": [
{
"type": "text",
"text": prompt,
},
{
"type": "image_url",
"role": "first_frame",
"image_url": {"url": first_frame},
},
{
"type": "image_url",
"role": "last_frame",
"image_url": {"url": last_frame},
},
],
}
status = bytedance_task(data)
output_url = status["content"]["video_url"]
ox.net.save_url(output_url, output, overwrite=True)
if "last_frame_url" in status["content"]:
ox.net.save_url(
status["content"]["last_frame_url"],
output + ".last_frame.png",
overwrite=True,
)
return status
def get_item_segments(item, max_duration=MAX_DURATION):
cuts = item.get("cuts")
filename = item.files.all()[0].data.path
input_info = ox.avinfo(filename)
p = 0
nc = []
for c in cuts:
d = c - p
if d < 0.5:
continue
p = c
nc.append(c)
nc = nc + [input_info["duration"]]
if len(nc) > 3:
if nc[-1] - nc[-2] < 0.5:
nc = nc[:-2] + nc[-1:]
segments = []
position = 0
for out in nc:
duration = out - position
while duration > max_duration:
position += max_duration
if len(segments):
segments.append(["c", position])
else:
segments.append(position)
duration = out - position
else:
segments.append(out)
position = out
return segments
def join_segments(processed, joined_output):
out = None
for filename in processed:
cap = cv2.VideoCapture(filename)
if out is None:
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"avc1")
out = cv2.VideoWriter(joined_output, fourcc, fps, (width, height))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
out.write(frame)
cap.release()
if out is not None:
out.release()
def remake_video(item_id, prompt):
item = Item.objects.get(public_id=item_id)
segments = get_item_segments(item)
print(segments)
prompt_hash = hashlib.sha1(prompt.encode()).hexdigest()
position = n = 0
processed = []
2026-01-24 13:26:30 +01:00
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
for segment in segments:
if isinstance(segment, list):
stype, segment = segment
else:
stype = "n"
duration = segment - position
2026-01-24 13:26:30 +01:00
if stype == "c":
2026-01-24 13:26:30 +01:00
first_frame_path = "%s/%06d.mp4.last_frame.png" % (prefix, n - 1)
first_frame = public_url(first_frame_path)
else:
first_frame = "%s%s/source%s.png?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
position,
settings.PUBLIC_TOKEN,
)
last_frame_position = segment - 2 / 24
last_frame = "%s%s/source%0.3f.png?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
last_frame_position,
settings.PUBLIC_TOKEN,
)
2026-01-24 13:26:30 +01:00
output = "%s/%06d.mp4" % (prefix, n)
if not os.path.exists(output):
first_last(first_frame, last_frame, prompt, duration, output)
2026-01-24 13:26:30 +01:00
trimmed = "%s/%06d_trimmed.mp4" % (prefix, n)
frames = int(duration * 24)
if not os.path.exists(trimmed):
trim_video(output, trimmed, frames, stype == "c")
processed.append(trimmed)
position = segment
n += 1
2026-01-24 13:26:30 +01:00
joined_output = "%s/joined.mp4" % (prefix,)
join_segments(processed, joined_output)
return joined_output
2026-01-24 13:26:30 +01:00
EP = {
"seedream-4-5-251128": "ep-20260122071519-pbf7l",
"seed-1-8-251228": "ep-20260122071243-8qfrk",
"seedance-1-5-pro-251215": "ep-20260122071613-blmsd",
}
def bytedance_image_generation(data):
model = "seedream-4-5-251128"
2026-01-24 13:26:30 +01:00
url = "https://ark.ap-southeast.bytepluses.com/api/v3/images/generations"
defaults = {
"model": model,
"size": "2560x1440",
"watermark": False,
}
for key in defaults:
if key not in data:
data[key] = defaults[key]
if data["model"] in EP:
data["model"] = EP[data["model"]]
print("prepare_image", data)
response = requests.post(url, headers=headers, json=data).json()
print(response)
return response["data"][0]["url"]
def prepare_image(image, prompt, out=None):
if not image.startswith("http:"):
image = public_url(image)
data = {
"prompt": prompt,
"image": image,
"size": "2560x1440",
}
2026-01-24 13:26:30 +01:00
output_url = bytedance_image_generation(data)
if out is None:
out = image + ".ai.png"
ox.net.save_url(output_url, out, overwrite=True)
return r
2026-01-24 13:26:30 +01:00
def process_frame(item, prompt, character=None, position=0, seed=None):
2026-01-15 16:39:54 +00:00
model = "seedream-4-5-251128"
if isinstance(item, str):
item = Item.objects.get(public_id=item)
2026-01-24 13:26:30 +01:00
if isinstance(character, Document):
character = public_document_url(character)
image = public_frame_url(item, position)
2026-01-15 16:39:54 +00:00
if character is not None:
image = [image, character]
data = {
"model": model,
"prompt": prompt,
"image": image,
"size": "2560x1440",
}
2026-01-24 13:26:30 +01:00
if seed:
data["seed"] = seed
url = bytedance_image_generation(data)
2026-01-15 16:39:54 +00:00
img = add_ai_image(item, position, url)
img.refresh_from_db()
2026-01-24 13:26:30 +01:00
img.data["model"] = model
img.data["prompt"] = prompt
img.data["source"] = item.public_id
2026-01-15 16:39:54 +00:00
if character:
2026-01-24 13:26:30 +01:00
img.data["source"] += " " + character.split("?")[0]
2026-01-15 16:39:54 +00:00
print(img, img.data)
img.save()
img.update_sort()
img.update_find()
return img
2026-01-26 09:23:47 +01:00
def get_character_document(character, type="Single Character"):
if character in ("P1", "P2", "P3", "P4", "P5"):
return Document.objects.get(data__title=type + " " + character)
return character
2026-01-24 13:26:30 +01:00
"""
REPLACE_CHARACTER_PROMPT = "Replace the foreground character in image 1 with the character in image 2, keep the posture, clothing, background, light, atmosphere from image 1, but take the facial features and personality from image 2. Make sure the size of the character is adjusted since the new character is a child and make sure the size of the head matches the body. The quality of the image should be the same between foreground and background, adjust the quality of the character to match the background. Use the style of image 1 for the character: if image 1 is a photo make the character a real person, if image 1 is a drawing make the character a drawn character, if image 1 is a comic use a comic character and so on"
"""
REPLACE_CHARACTER_PROMPT = "Replace the foreground character in image 1 with the character in image 2, keep the posture, clothing, background, light, atmosphere from image 1, but take the facial features and personality from image 2. Make sure the size of the character is adjusted since the new character is a child and make sure the size of the head matches the body. The quality of the image should be the same between foreground and background, adjust the quality of the character to match the background. Use the style of image 1 for the character: if image 1 is a photo make the character a real person, if image 1 is a drawing make the character a drawn character, if image 1 is a comic use a comic character"
def fal_replace_character(item, character, position=0):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
prompt = REPLACE_CHARACTER_PROMPT.replace("image 1", "@image 1").replace(
"image 2", "@image 2"
)
2026-01-16 11:12:40 +00:00
if character == "P5":
2026-01-24 13:26:30 +01:00
prompt = prompt.replace("child", "teenager")
2026-01-26 09:23:47 +01:00
character = get_character_document(character)
2026-01-24 13:26:30 +01:00
if isinstance(character, Document):
character = public_document_url(character)
image = public_frame_url(item, position)
image = [image, character]
url = flux_edit_image(image, prompt)
img = add_ai_image(item, position, url)
img.refresh_from_db()
img.data["model"] = "flux-2-pro"
img.data["prompt"] = prompt
img.data["source"] = item.public_id
img.data["source"] += " " + character.split("?")[0]
2026-01-26 09:23:47 +01:00
if isinstance(character, Document):
img.data["character"] = character.get_id()
else:
img.data["character"] = character
img.data["position"] = position
2026-01-24 13:26:30 +01:00
print(img, img.data)
img.save()
img.update_sort()
img.update_find()
return img
def replace_character(item, character, position=0, seed=None, extra=None):
2026-01-24 13:26:30 +01:00
prompt = REPLACE_CHARACTER_PROMPT
if character == "P5":
prompt = prompt.replace("child", "teenager")
if extra:
prompt += " " + extra
2026-01-26 09:23:47 +01:00
character = get_character_document(character)
if isinstance(character, Document):
character_url = public_document_url(character)
else:
character_url = character
frame = process_frame(item, prompt, character_url, position, seed=seed)
if isinstance(character, Document):
frame.data["character"] = character.get_id()
else:
frame.data["character"] = character
frame.data["position"] = position
frame.save()
return frame
2026-01-24 13:26:30 +01:00
def kling_lipsync(audio_item, video_item, keep=False):
2026-01-24 13:26:30 +01:00
video_url = public_video_url(video_item)
audio_path = audio_item.streams()[0].file.data.path
prefix = "/srv/pandora/static/power/cache/%s" % (audio_item.public_id)
os.makedirs(prefix, exist_ok=True)
output = prefix + '/audio.m4a'
if not os.path.exists(output):
cmd = ['ffmpeg', '-hide_banner', '-nostats', '-i', audio_path, '-vn', '-c:a', 'copy', output]
subprocess.call(cmd)
if not os.path.exists(output):
raise Exception
audio_url = public_url(output)
2026-01-24 13:26:30 +01:00
model = "fal-ai/kling-video/lipsync/audio-to-video"
data = {
"video_url": video_url,
"audio_url": audio_url
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
output = prefix + "/lipsync.mp4"
2026-01-24 13:26:30 +01:00
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(audio_item, output, "ai:lipsync")
2026-01-24 13:26:30 +01:00
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(prefix)
2026-01-24 13:26:30 +01:00
return ai
def kling_v2v_reference(item, character, keep=False):
# https://fal.ai/models/fal-ai/kling-video/o1/video-to-video/reference/api
if isinstance(item, str):
item = Item.objects.get(public_id=item)
2026-01-16 11:12:40 +00:00
if character in ("P1", "P2", "P3", "P4", "P5"):
2026-01-24 13:26:30 +01:00
character = public_document_url(
Document.objects.get(data__title="Character " + character)
)
2026-01-26 09:23:47 +01:00
character = get_character_document(character)
if isinstance(character, Document):
character_url = public_document_url(character)
else:
character_url = character
2026-01-24 13:26:30 +01:00
video_url = public_video_url(item)
prompt = "Replace the main character in @Video1 with the character from the reference images, adjust the style of the character to match the style of the video"
model = "fal-ai/kling-video/o1/video-to-video/reference"
2026-01-26 09:23:47 +01:00
prompt_hash = hashlib.sha1((prompt + character_url).encode()).hexdigest()
2026-01-24 13:26:30 +01:00
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
item.public_id,
prompt_hash,
)
for d in [3, 4, 5, 6, 7, 8, 9, 10]:
if d > item.sort.duration:
break
duration = d
data = {
"prompt": prompt,
"image_url": image_url,
"video_url": video_url,
"keep_original_sound": False,
"character_orientation": "video",
}
data = {
"prompt": prompt,
"keep_audio": False,
"aspect_ratio": "16:9",
"video_url": video_url,
2026-01-26 09:23:47 +01:00
"image_urls": [character_url],
2026-01-24 13:26:30 +01:00
"duration": str(duration)
}
'''
data["elements"] = [
{
"reference_image_urls": [
image_url,
image_url,
],
"frontal_image_url": image_url
}
]
'''
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(item, output, "ai:v2v-replace")
ai.data["prompt"] = ox.escape_html(prompt)
ai.data["firstframe"] = image_url.split("?")[0]
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(output))
img.add(ai)
return ai
2026-01-15 16:39:54 +00:00
2026-01-29 16:59:45 +01:00
def kling_v2v_edit(item, background, keep=False):
# https://fal.ai/models/fal-ai/kling-video/o1/video-to-video/edit
return
video_url = public_video_url(item)
images = []
elements = {
"reference_image_urls": [],
"frontal_image": ""
}
data = {
"prompt": prompt,
"video_url": video_url,
"image_urls": images,
"elements": elements,
"keep_audio": False,
"character_orientation": "video",
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(item, output, "ai:v2v-replace")
ai.data["prompt"] = ox.escape_html(prompt)
ai.data["firstframe"] = image_url.split("?")[0]
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(output))
img.add(ai)
return ai
2026-01-29 14:15:31 +01:00
def wan_reference_to_video(foreground, background, keep=False):
foreground_url = public_video_url(foreground)
background_url = public_video_url(background)
prompt = "Use the character from @Video1 and use @Video2 as background"
model = "wan/v2.6/reference-to-video"
prompt_hash = hashlib.sha1((prompt + foreground_url + background_url).encode()).hexdigest()
2026-01-29 16:59:45 +01:00
item = background
2026-01-29 14:15:31 +01:00
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
item.public_id,
prompt_hash,
)
for d in [5, 10]:
if d > item.sort.duration:
break
duration = d
data = {
"prompt": prompt,
"video_urls": [
foreground_url,
background_url,
2026-01-29 16:59:45 +01:00
],
2026-01-29 14:15:31 +01:00
"aspect_ratio": "16:9",
2026-01-30 13:53:20 +01:00
"resolution": "720p",
2026-01-29 16:59:45 +01:00
"enable_prompt_expansion": False,
"multi_shots": True,
"enable_safety_checker": False
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(item, output, "ai:foreground-background")
ai.data["prompt"] = ox.escape_html(prompt)
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(output))
return ai
def wan_animate_replace(item, character, keep=False):
item_url = public_video_url(item)
prompt = ""
model = "fal-ai/wan/v2.2-14b/animate/replace"
prompt_hash = hashlib.sha1((prompt + foreground_url + background_url).encode()).hexdigest()
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
item.public_id,
prompt_hash,
)
data = {
"video_url": item_url,
"image_url": character_url,
"resolution": "720p",
"video_quality": "high",
"guidance_scale": 1,
"enable_safety_checker": False,
"enable_output_safety_checker": False,
2026-01-29 14:15:31 +01:00
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(item, output, "ai:foreground-background")
ai.data["prompt"] = ox.escape_html(prompt)
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(output))
return ai
2026-01-30 08:38:05 +01:00
def ltx_a2v(item, character, prompt=None, first_frame=None, keep=False):
2026-01-29 16:59:45 +01:00
video_url = public_video_url(item)
audio_path = item.streams()[0].file.data.path
2026-01-30 08:38:05 +01:00
if first_frame is None:
character = get_character_document(character)
position = 0
cid = get_character_document(character).get_id()
first_frame = item.documents.filter(
data__character=cid,
data__position=position
).order_by('-created').first()
if not first_frame:
first_frame = replace_character(item, character, position)
2026-01-29 16:59:45 +01:00
image_url = public_document_url(first_frame)
prefix = "/srv/pandora/static/power/cache/%s_a2v" % (item.public_id)
os.makedirs(prefix, exist_ok=True)
output = prefix + '/audio.m4a'
if not os.path.exists(output):
cmd = ['ffmpeg', '-hide_banner', '-nostats', '-i', audio_path, '-vn', '-c:a', 'copy', output]
subprocess.call(cmd)
if not os.path.exists(output):
raise Exception
audio_url = public_url(output)
model = "fal-ai/ltx-2-19b/audio-to-video"
neutral = True
if prompt is None:
prompt = describe_item(item, neutral)
data = {
"audio_url": audio_url,
"image_url": image_url,
"video_size": {"width": 1280, "height": 720},
"match_audio_length": True,
"fps": 24,
"prompt": prompt,
"enable_safety_checker": False,
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
output = prefix + "/ai.mp4"
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(item, output, "ai:audio-to-video")
ai.data["model"] = model
2026-01-30 13:53:20 +01:00
ai.data["prompt"] = ox.escape_html(prompt)
2026-01-29 16:59:45 +01:00
ai.data["seed"] = result["seed"]
ai.save()
first_frame.add(ai)
ai.data["firstframe"] = first_frame.get_id()
if not keep:
shutil.rmtree(prefix)
return ai
def ltx_v2v(item, character, prompt=None, keep=False):
video_url = public_video_url(item)
character = get_character_document(character)
position = 0
cid = get_character_document(character).get_id()
first_frame = item.documents.filter(
data__character=cid,
data__position=position
).order_by('-created').first()
if not first_frame:
first_frame = replace_character(item, character, position)
image_url = public_document_url(first_frame)
prefix = "/srv/pandora/static/power/cache/%s_ltx_v2v" % (item.public_id)
model = "fal-ai/ltx-2-19b/video-to-video"
neutral = True
if prompt is None:
prompt = describe_item(item, neutral)
data = {
"prompt": prompt,
"video_url": video_url,
"image_url": image_url,
"video_size": {"width": 1280, "height": 720},
"match_video_length": True,
"fps": 24,
"generate_audio": False,
"enable_safety_checker": False,
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
output = prefix + "/ai.mp4"
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(item, output, "ai:video-to-video")
ai.data["model"] = model
ai.data["seed"] = result["seed"]
ai.save()
first_frame.add(ai)
ai.data["firstframe"] = first_frame.get_id()
if not keep:
shutil.rmtree(prefix)
return ai
2026-01-15 16:39:54 +00:00
def replace_character_motion_control(item, character, keep=False):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
# FIXME get character from documents
if isinstance(character, str):
img = replace_character(item, character, 0)
else:
img = character
image_url = public_document_url(img)
video_url = public_video_url(item)
2026-01-15 16:39:54 +00:00
prompt = ""
model = "fal-ai/kling-video/v2.6/pro/motion-control"
prompt_hash = hashlib.sha1((prompt + image_url).encode()).hexdigest()
2026-01-24 13:26:30 +01:00
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
item.public_id,
prompt_hash,
)
2026-01-15 16:39:54 +00:00
data = {
"prompt": prompt,
"image_url": image_url,
"video_url": video_url,
"keep_original_sound": False,
"character_orientation": "video",
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(item, output, "ai:replace:p1:motion-control")
ai.data["prompt"] = ox.escape_html(prompt)
2026-01-24 13:26:30 +01:00
ai.data["firstframe"] = image_url.split("?")[0]
2026-01-15 16:39:54 +00:00
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(output))
img.add(ai)
return ai
2026-01-24 13:26:30 +01:00
2026-01-25 00:18:46 +01:00
def describe_video(url, neutral=False, face=False):
if face:
extra = 'describe the facial expression to allow an actor to recreate the scene, '
else:
extra = ''
2026-01-16 15:46:32 +00:00
if neutral:
prompt = (
"Detect cuts or scene changes and describe each scene, use as much details as you can. "
2026-01-25 00:18:46 +01:00
f"Describe each person incudling detalied apreance, haircut in a gender neutral way, {extra}"
2026-01-16 15:46:32 +00:00
"describe each objects, animal or plant, describe foreground and backgroud, "
"describe from what angle the scene is filmed, incude details about camera model, lense, depth of field used to film this scene. "
"Use the format: <description of scene 1>. CAMERA CUT TO <description of scene 2>. CAMERA CUT TO <description of scene 3>. "
"Don't mention it if you don't find a cut."
)
else:
prompt = (
"Detect cuts or scene changes and describe each scene, use as much details as you can. "
"Describe each person incudling detalied apreance, ethnicity, haircolor, haircut, "
"describe each objects, animal or plant, describe foreground and backgroud, "
"describe from what angle the scene is filmed, incude details about camera model, lense, depth of field used to film this scene. "
"Use the format: <description of scene 1>. CAMERA CUT TO <description of scene 2>. CAMERA CUT TO <description of scene 3>. "
"Don't mention it if you don't find a cut."
)
data = {
"input": [
{
"role": "user",
"content": [
{"type": "input_video", "video_url": url, "fps": 1},
{"type": "input_text", "text": prompt},
],
}
],
}
response = bytedance_response(data)
return response["output"][1]["content"][0]["text"]
2026-01-24 13:26:30 +01:00
2026-01-16 15:46:32 +00:00
def describe_item(item, neutral=False):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
2026-01-29 16:59:45 +01:00
if item.get("prompt") and neutral:
2026-01-30 13:53:20 +01:00
return ox.decode_html(item.get("prompt"))
2026-01-16 15:49:18 +00:00
video_url = public_video_url(item)
2026-01-29 16:59:45 +01:00
prompt = describe_video(video_url, neutral)
if neutral:
item.refresh_from_db()
2026-01-30 13:53:20 +01:00
item.data["prompt"] = ox.escape_html(prompt)
2026-01-29 16:59:45 +01:00
item.save()
return prompt
2026-01-24 13:26:30 +01:00
2026-01-25 00:18:46 +01:00
def reshoot_item(item, extra_prompt=None, first_frame=None, keep=False, prompt=None):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
2026-01-24 13:26:30 +01:00
if isinstance(first_frame, Document):
2026-01-26 09:23:47 +01:00
first_frame_url = public_document_url(first_frame)
else:
first_frame_url = first_frame
duration = item.sort.duration
frames = int(duration * 24)
2026-01-26 09:23:47 +01:00
neutral = first_frame is not None
2026-01-25 00:18:46 +01:00
if prompt is None:
2026-01-26 09:23:47 +01:00
prompt = describe_item(item, neutral)
if extra_prompt:
prompt += " " + extra_prompt
prompt_hash = hashlib.sha1((prompt).encode()).hexdigest()
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
item.public_id,
prompt_hash,
)
2026-01-15 16:39:54 +00:00
if first_frame:
2026-01-26 09:23:47 +01:00
status = i2v_bytedance(first_frame_url, prompt, duration, output)
2026-01-15 16:39:54 +00:00
else:
status = t2v_bytedance(prompt, duration, output)
trimmed = "/srv/pandora/static/power/cache/%s_%s/trimmed.mp4" % (
item.public_id,
prompt_hash,
)
trim_video(output, trimmed, frames)
2026-01-15 16:39:54 +00:00
variant = "ai:0:reshoot"
if first_frame:
variant = "ai:0:reshoot-firstframe"
ai = add_ai_variant(item, trimmed, variant)
ai.data["prompt"] = ox.escape_html(prompt)
ai.data["model"] = status["model"]
ai.data["seed"] = status["seed"]
2026-01-15 16:39:54 +00:00
if first_frame:
2026-01-15 17:02:34 +00:00
if isinstance(first_frame, Document):
first_frame.add(ai)
2026-01-26 09:23:47 +01:00
ai.data["firstframe"] = first_frame.get_id()
else:
ai.data["firstframe"] = first_frame.split("?")[0]
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(output))
return ai
2026-01-24 13:26:30 +01:00
def reshoot_item_segments(item, character, keep=False):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
max_duration = 12
source = item.files.all()[0].data.path
info = ox.avinfo(source)
duration = info["duration"]
2026-01-30 15:41:27 +01:00
if duration < max_duration and character != "annotation":
2026-01-24 13:26:30 +01:00
segments = [duration]
else:
segments = get_item_segments(item, max_duration=max_duration)
print(segments)
prompt_hash = hashlib.sha1("reshoot_segment".encode()).hexdigest()
processed = []
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
video_segments = fragment_video(source, prefix, segments)
prompts = []
first_frames = []
seed = None
n = 0
position = 0
for segment in segments:
if isinstance(segment, list):
stype, segment = segment
else:
stype = "n"
output = "%s/%06d.mp4" % (prefix, n)
output_ai = "%s/%06d_ai.mp4" % (prefix, n)
segment_duration = segment - position
if os.path.exists(output):
segment_video_url = public_url(output)
prompt = describe_video(segment_video_url, neutral=True)
prompts.append("Segment %s: " % (n + 1) + prompt)
2026-01-29 16:59:45 +01:00
if character == "annotation":
a = item.annotations.filter(
layer='prompts', start__gte=position, end__gt=position
2026-01-30 15:41:27 +01:00
).exclude(start__gte=segment).order_by('start').first()
2026-01-29 16:59:45 +01:00
if a:
segment_character = a.value
2026-01-30 15:41:27 +01:00
print('use character from annotation', a, segment_character, 'for %s-%s' % (position, segment))
2026-01-29 16:59:45 +01:00
else:
segment_character = None
2026-01-30 15:41:27 +01:00
print('use no character for %s-%s' % (position, segment))
2026-01-29 16:59:45 +01:00
else:
segment_character = character
if segment_character:
segment_first_frame = replace_character(
item, segment_character, position, seed=seed
)
segment_first_frame_url = public_document_url(segment_first_frame)
else:
segment_first_frame_url = public_url(output.replace(".mp4", "_first.jpg"))
2026-01-24 13:26:30 +01:00
status = i2v_bytedance(
segment_first_frame_url, prompt, segment_duration, output_ai, seed=seed
)
seeed = status["seed"]
trimmed = "%s/%06d_ai_trimmed.mp4" % (prefix, n)
frames = int(segment_duration * 24)
trim_video(output_ai, trimmed, frames, stype == "c")
processed.append(trimmed)
2026-01-30 15:41:27 +01:00
if segment_character:
first_frames.append(segment_first_frame)
2026-01-24 13:26:30 +01:00
n += 1
position = segment
joined_output = "%s/joined.mp4" % (prefix)
join_segments(processed, joined_output)
ai = add_ai_variant(item, joined_output, "ai:0:reshoot-firstframe")
prompt = "\n\n".join(prompts)
ai.data["prompt"] = ox.escape_html(prompt)
2026-01-26 09:23:47 +01:00
ai.data["firstframe"] = " ".join([ff.get_id() for ff in first_frames])
2026-01-24 13:26:30 +01:00
ai.data["model"] = status["model"]
ai.data["seed"] = seed
ai.save()
for first_frame in first_frames:
first_frame.add(ai)
if not keep:
shutil.rmtree(prefix)
return ai
def describe_image(url):
system_prompt = ""
system_prompt = "You are an image analyst describing different aspects of an image. You are focused on the form, composition, and task shown in the image."
prompt = "Please analyze this image according to the specified structure."
data = {
"input": [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{"type": "input_image", "image_url": url},
{"type": "input_text", "text": prompt},
],
},
],
}
response = bytedance_response(data)
return response["output"][-1]["content"][0]["text"]
def transform_remake_video(item_id, image_prompt, video_prompt):
item = Item.objects.get(public_id=item_id)
segments = get_item_segments(item)
print(segments)
prompt_hash = hashlib.sha1((image_prompt + video_prompt).encode()).hexdigest()
position = n = 0
processed = []
2026-01-24 13:26:30 +01:00
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
for segment in segments:
if isinstance(segment, list):
stype, segment = segment
else:
stype = "n"
duration = segment - position
if stype == "c":
2026-01-24 13:26:30 +01:00
first_frame_path = "%s/%06d.mp4.last_frame.png" % (prefix, n - 1)
first_frame = public_url(first_frame_path)
else:
first_frame = "%s%s/source%s.png?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
position,
settings.PUBLIC_TOKEN,
)
2026-01-24 13:26:30 +01:00
first_frame_path = "%s/%06d.first_frame.png" % (prefix, n)
if not os.path.exists(first_frame_path):
prepare_image(first_frame, image_prompt, first_frame_path)
first_frame = public_url(first_frame_path)
last_frame_position = segment - 2 / 24
last_frame = "%s%s/source%0.3f.png?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
last_frame_position,
settings.PUBLIC_TOKEN,
)
2026-01-24 13:26:30 +01:00
last_frame_path = "%s/%06d.last_frame.png" % (prefix, n)
if not os.path.exists(last_frame_path):
prepare_image(last_frame, image_prompt, last_frame_path)
last_frame = public_url(last_frame_path)
2026-01-24 13:26:30 +01:00
output = "%s/%06d.mp4" % (prefix, n)
if not os.path.exists(output):
first_last(first_frame, last_frame, video_prompt, duration, output)
2026-01-24 13:26:30 +01:00
trimmed = "%s/%06d_trimmed.mp4" % (prefix, n)
frames = int(duration * 24)
if not os.path.exists(trimmed):
trim_video(output, trimmed, frames, stype == "c")
processed.append(trimmed)
position = segment
n += 1
2026-01-24 13:26:30 +01:00
joined_output = "%s/joined.mp4" % (prefix,)
join_segments(processed, joined_output)
return joined_output
def restyle_video(item_id, prompt):
item = Item.objects.get(public_id=item_id)
video_url = public_video_url(item)
model = "decart/lucy-restyle"
handler = fal_client.submit(
model,
arguments={
"prompt": prompt,
"video_url": video_url,
"resolution": "720p",
"enhance_prompt": True,
},
)
request_id = handler.request_id
print(request_id)
status = fal_client.status(model, request_id, with_logs=True)
while isinstance(status, fal_client.InProgress):
time.sleep(10)
status = fal_client.status(model, request_id, with_logs=True)
result = fal_client.result(model, request_id)
print(result)
output_url = result["video"]["url"]
prompt_hash = hashlib.sha1((prompt).encode()).hexdigest()
output_path = "/srv/pandora/static/power/cache/%s_%s.mp4" % (
item.public_id,
prompt_hash,
)
ox.net.save_url(output_url, output_path, overwrite=True)
return output_path
2026-01-24 13:26:30 +01:00
2026-01-15 16:39:54 +00:00
def fal_wait_for(model, request_id):
status = fal_client.status(model, request_id, with_logs=True)
while isinstance(status, fal_client.InProgress):
time.sleep(10)
status = fal_client.status(model, request_id, with_logs=True)
result = fal_client.result(model, request_id)
return result
2026-01-24 13:26:30 +01:00
def motion_control_preprocess_image(item_id, image_prompt, video_prompt):
item = Item.objects.get(public_id=item_id)
video_url = public_video_url(item)
model = "fal-ai/kling-video/v2.6/pro/motion-control"
prompt_hash = hashlib.sha1((image_prompt + video_prompt).encode()).hexdigest()
output = "/srv/pandora/static/power/cache/%s_%s.mp4" % (item.public_id, prompt_hash)
first_frame = "%s%s/source%s.png?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
0,
settings.PUBLIC_TOKEN,
)
first_frame_path = "/srv/pandora/static/power/cache/%s_%s/%06d.first_frame.png" % (
item.public_id,
prompt_hash,
0,
)
if not os.path.exists(first_frame_path):
os.makedirs(os.path.dirname(first_frame_path), exist_ok=True)
prepare_image(first_frame, image_prompt, first_frame_path)
image_url = public_url(first_frame_path)
data = {
"prompt": video_prompt,
"image_url": image_url,
"video_url": video_url,
"keep_original_sound": False,
"character_orientation": "video",
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
2026-01-15 16:39:54 +00:00
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output, overwrite=True)
return output
def luma_wait_for(id):
url = "https://api.lumalabs.ai/dream-machine/v1/generations/%s" % id
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": "Bearer " + settings.LUMA_TOKEN,
}
status = requests.get(url, headers=headers).json()
while status["state"] in ("queued", "dreaming"):
time.sleep(10)
status = requests.get(url, headers=headers).json()
return status
2026-01-24 13:26:30 +01:00
def luma_modify_segment(video_url, prompt, first_frame=None, mode="flex_2"):
# also got that at fal-ai/luma-dream-machine/ray-2/modify
url = "https://api.lumalabs.ai/dream-machine/v1/generations/video/modify"
payload = {
"generation_type": "modify_video",
"model": "ray-2",
2026-01-15 16:39:54 +00:00
"mode": mode,
"prompt": prompt,
"media": {"url": video_url},
}
if first_frame:
payload["first_frame"] = {"url": first_frame}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": "Bearer " + settings.LUMA_TOKEN,
}
response = requests.post(url, json=payload, headers=headers).json()
print(response)
status = luma_wait_for(response["id"])
return status["assets"]["video"]
def fragment_video(filename, segmentdir, segments):
filename = str(filename)
input_info = ox.avinfo(filename)
segments_ = []
for segment in segments:
if isinstance(segment, list):
stype, segment = segment
else:
stype = "n"
segments_.append(segment)
segments = segments_
position = 0
segment = 0
cap = cv2.VideoCapture(filename)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"avc1")
frame_count = 0
next_cut = int(segments.pop(0) * fps)
last = None
os.makedirs(segmentdir, exist_ok=True)
while cap.isOpened():
if frame_count == 0:
output_path = segmentdir + "/%06d.mp4" % segment
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
elif next_cut and frame_count >= next_cut and segments:
print(frame_count, output_path)
cv2.imwrite(output_path.replace(".mp4", "_last.jpg"), frame)
out.release()
segment += 1
output_path = segmentdir + "/%06d.mp4" % segment
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if segments:
next_cut = int(segments.pop(0) * fps)
else:
next_cut = None
last = None
ret, frame = cap.read()
if not ret:
break
out.write(frame)
if last is None:
cv2.imwrite(output_path.replace(".mp4", "_first.jpg"), frame)
last = frame
frame_count += 1
out.release()
cap.release()
if last is None:
os.unlink(output_path)
else:
2026-01-30 15:41:27 +01:00
print(frame_count, output_path)
cv2.imwrite(output_path.replace(".mp4", "_last.jpg"), last)
def flux_edit_image(image, prompt):
2026-01-15 16:39:54 +00:00
if isinstance(image, str):
image = [image]
data = {
"prompt": prompt,
"safety_tolerance": "5",
"enable_safety_checker": False,
"output_format": "jpeg",
2026-01-15 16:39:54 +00:00
"image_urls": image,
}
print(data)
result = fal_client.subscribe("fal-ai/flux-2-pro/edit", arguments=data)
print(result)
return result["images"][0]["url"]
def in_the_style_of_fal(image, style):
prompt = "apply style from @image 2 to @image 1 keep the position of the person in @image 1 but take light, colors, clothing from @image 2"
2026-01-15 16:39:54 +00:00
return flux_edit_image([image, style], prompt)
2026-01-24 13:26:30 +01:00
def in_the_style_of_byte(image, style):
prompt = "apply style from image 2 to image 1 keep the position of the person in image 1 but take light, colors, clothing from image 2"
image_model_name = "seedream-4-5-251128"
ark_client = Ark(
base_url="https://ark.ap-southeast.bytepluses.com/api/v3",
api_key=settings.BYTEPLUSE_TOKEN,
)
create_result = ark_client.images.generate(
model=image_model_name,
image=[image, style],
prompt=prompt,
sequential_image_generation="disabled",
response_format="url",
size="2560x1440",
stream=False,
watermark=False,
)
print(create_result)
return create_result.data[0].url
2026-01-15 16:39:54 +00:00
def luma_modify_item(item, prompt="", image_prompt=None, first_frame=None, keep=False):
2026-01-24 13:26:30 +01:00
mode = "flex_2"
if isinstance(item, str):
item = Item.objects.get(public_id=item)
source = item.files.all()[0].data.path
info = ox.avinfo(source)
duration = info["duration"]
max_duration = 10
if duration < max_duration:
segments = [duration]
else:
segments = get_item_segments(item, max_duration=max_duration)
print(segments)
prompt_hash = hashlib.sha1((prompt + (image_prompt or "")).encode()).hexdigest()
processed = []
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
video_segments = fragment_video(source, prefix, segments)
n = 0
2026-01-15 16:39:54 +00:00
if isinstance(first_frame, Document):
first_frame_url = public_document_url(first_frame)
else:
first_frame_url = first_frame
for segment in segments:
if isinstance(segment, list):
stype, segment = segment
else:
stype = "n"
2026-01-24 13:26:30 +01:00
output = "%s/%06d.mp4" % (prefix, n)
output_ai = "%s/%06d_ai.mp4" % (prefix, n)
if os.path.exists(output):
video_url = luma_modify_segment(
2026-01-24 13:26:30 +01:00
public_url(output), prompt, first_frame=first_frame_url, mode=mode
)
ox.net.save_url(video_url, output_ai, overwrite=True)
processed.append(output_ai)
n += 1
2026-01-24 13:26:30 +01:00
joined_output = "%s/joined.mp4" % (prefix,)
join_segments(processed, joined_output)
2026-01-15 16:39:54 +00:00
ai = add_ai_variant(item, joined_output, "ai:replace:p1:luma")
ai.data["prompt"] = ox.escape_html(prompt)
if first_frame:
2026-01-24 13:26:30 +01:00
ai.data["firstframe"] = first_frame_url.split("?")[0]
ai.data["model"] = "ray-2:%s" % mode
2026-01-15 16:39:54 +00:00
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(joined_output))
if isinstance(first_frame, Document):
first_frame.add(ai)
return ai
def add_ai_variant(item, video_path, type):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
ai = Item()
ai.user = item.user
ai.data["type"] = [type]
ai.data["title"] = item.data["title"]
2026-01-24 13:26:30 +01:00
ai.data["chapter"] = item.data.get("chapter", "")
ai.save()
file = File()
file.oshash = ox.oshash(video_path)
file.item = ai
file.path = "%s.mp4" % type
file.extension = "mp4"
file.info = ox.avinfo(video_path)
del file.info["path"]
file.parse_info()
file.data.name = file.get_path("data." + video_path.split(".")[-1])
os.makedirs(os.path.dirname(file.data.path), exist_ok=True)
shutil.copy(video_path, file.data.path)
file.available = True
file.selected = True
file.queued = True
file.wanted = False
file.save()
file.extract_stream()
return ai
2026-01-15 16:39:54 +00:00
2026-01-24 13:26:30 +01:00
2026-01-15 16:39:54 +00:00
def add_ai_image(item, position, url, extension=None):
if extension is None:
2026-01-24 13:26:30 +01:00
extension = url.split(".")[-1].split("?")[0]
if extension == "jpeg":
extension = "jpg"
2026-01-15 16:39:54 +00:00
file = Document(user=item.user)
file.rightslevel = 2
2026-01-24 13:26:30 +01:00
file.data["title"] = "%s at %s" % (item.get("title"), position)
file.data["position"] = position
2026-01-15 16:39:54 +00:00
file.extension = extension
file.width = -1
file.pages = -1
file.uploading = True
file.save()
file.uploading = True
2026-01-24 13:26:30 +01:00
name = "data.%s" % file.extension
2026-01-15 16:39:54 +00:00
file.file.name = file.path(name)
ox.net.save_url(url, file.file.path, overwrite=True)
file.get_info()
file.get_ratio()
file.oshash = ox.oshash(file.file.path)
file.save()
file.update_sort()
file.add(item)
return file
2026-01-24 13:26:30 +01:00
def add_tag(item, tag):
if 'tags' not in item.data:
item.data['tags'] = []
item.data['tags'].append(tag)
item.save()
def extract_firstframe(character='P1'):
for item in Item.objects.filter(data__type__icontains="source"):
if 'ai-failed' in item.data.get('tags', []):
continue
if not item.documents.all().exists():
print(item)
try:
first_frame = replace_character(item, character, 0)
except:
2026-01-25 00:18:46 +01:00
item.refresh_from_db()
2026-01-24 13:26:30 +01:00
add_tag(item, 'ai-failed')
2026-01-26 09:23:47 +01:00
def process_reshoot_firstframe(character='P1'):
position = 0
2026-01-24 13:26:30 +01:00
l = itemlist.models.List.objects.get(name='Reshoot-Firstframe')
2026-01-26 09:23:47 +01:00
for item in l.items.all():
if 'ai-failed' in item.data.get('tags', []):
print('>> skip', item)
continue
if item.sort.duration > 30:
2026-01-27 12:34:22 +01:00
pass
#reshoot_item_segments(item, character)
2026-01-26 09:23:47 +01:00
else:
cid = get_character_document(character).get_id()
first_frame = item.documents.filter(
data__character=cid, data__position=position
).order_by('-created').first()
2026-01-25 00:18:46 +01:00
if not first_frame:
2026-01-27 12:34:22 +01:00
try:
first_frame = replace_character(item, character, position)
except:
item.refresh_from_db()
add_tag(item, 'ai-failed')
print('>> failed', item)
2026-01-30 13:53:20 +01:00
continue
if not first_frame:
continue
2026-01-26 09:23:47 +01:00
if first_frame.items.filter(data__type__icontains='ai:').exists():
continue
print(item, first_frame)
2026-01-25 00:18:46 +01:00
try:
2026-01-26 09:23:47 +01:00
reshoot_item(item, first_frame=first_frame)
2026-01-25 00:18:46 +01:00
except:
2026-01-27 12:34:22 +01:00
item.refresh_from_db()
2026-01-26 09:23:47 +01:00
add_tag(item, 'ai-failed')
print('>> failed', item)
2026-01-25 00:18:46 +01:00
2026-01-26 09:23:47 +01:00
def process_motion_firstframe(character="P1", keep=False):
2026-01-25 00:18:46 +01:00
l = itemlist.models.List.objects.get(name='Motion-Firstframe')
2026-01-26 09:23:47 +01:00
for item in l.items.all():
ai = Item.objects.filter(data__type__icontains='ai').filter(data__title=item.data['title'])
if ai.exists() or 'ai-failed' in item.data.get('tags', []):
print('>> skip', item)
continue
2026-01-30 13:53:20 +01:00
print(item)
2026-01-26 09:23:47 +01:00
try:
replace_character_motion_control(item, character, keep=keep)
except:
item.refresh_from_db()
add_tag(item, 'ai-failed')
print('>> failed', item)
2026-01-24 13:26:30 +01:00
def process_list(list, process, character='P1', extra=None):
position = 0
if extra is None and list.description:
extra = list.description
for item in list.items.all():
if 'ai-failed' in item.data.get('tags', []):
print('>> skip', item)
continue
if item.sort.duration > 30:
pass
#reshoot_item_segments(item, character)
else:
cid = get_character_document(character).get_id()
first_frame = item.documents.filter(
data__character=cid, data__position=position
).order_by('-created').first()
if not first_frame:
try:
first_frame = replace_character(item, character, position, extra=extra)
except:
item.refresh_from_db()
add_tag(item, 'ai-failed')
print('>> failed', item)
if first_frame.items.filter(data__type__icontains='ai:').exists():
continue
print(item, first_frame)
try:
process(item, first_frame=first_frame)
except:
item.refresh_from_db()
add_tag(item, 'ai-failed')
print('>> failed', item)
def reshoot_mustache(character="P1"):
l = itemlist.models.List.objects.get(name='mustache')
extra = "The person is wearing a fake mustache in the style of the mustache from image 1"
return process_list(l, reshoot_item, character, extra=extra)
2026-01-29 11:37:15 +01:00
def faceswap_item(item, character, keep=False):
if isinstance(character, Document):
chracter_url = public_document_url(character)
else:
chracter_url = character
video_url = public_video_url(item)
model = "half-moon-ai/ai-face-swap/faceswapvideo"
data = {
"source_face_url": chracter_url,
"target_video_url": video_url,
}
prompt_hash = hashlib.sha1(("faceswap:"+chracter_url).encode()).hexdigest()
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
output_ai = "%s/ai.mp4" % (prefix)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output_ai, overwrite=True)
ai = add_ai_variant(item, output_ai, "ai:0:faceswap")
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(prefix)
return ai
def faceswap_item_segments(item, keep=False):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
max_duration = 30
source = item.files.all()[0].data.path
info = ox.avinfo(source)
duration = info["duration"]
segments = get_item_segments(item, max_duration=max_duration)
prompt_hash = hashlib.sha1("faceswap".encode()).hexdigest()
processed = []
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
video_segments = fragment_video(source, prefix, segments)
n = 0
position = 0
model = "half-moon-ai/ai-face-swap/faceswapvideo"
for segment in segments:
if isinstance(segment, list):
stype, segment = segment
else:
stype = "n"
output = "%s/%06d.mp4" % (prefix, n)
output_ai = "%s/%06d_ai.mp4" % (prefix, n)
2026-01-29 16:59:45 +01:00
character = item.annotations.filter(
layer='prompts', start__gte=position, end__gt=position
).order_by('start').first()
if character:
character = character.value
character = get_character_document(character)
chracter_url = public_document_url(character)
else:
character = output.replace('.mp4', '_first.jpg')
chracter_url = public_url(character)
2026-01-29 11:37:15 +01:00
segment_duration = segment - position
if os.path.exists(output):
segment_video_url = public_url(output)
data = {
"source_face_url": chracter_url,
"target_video_url": segment_video_url,
}
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output_ai, overwrite=True)
processed.append(output_ai)
n += 1
position = segment
joined_output = "%s/joined.mp4" % (prefix)
join_segments(processed, joined_output)
ai = add_ai_variant(item, joined_output, "ai:0:faceswap")
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(prefix)
return ai