more video generation helpers

This commit is contained in:
j 2026-01-15 16:39:54 +00:00
commit 8257406e68

View file

@ -27,6 +27,15 @@ headers = {
def public_url(path):
return path.replace("/srv/pandora/static/", settings.PUBLIC_URL + "static/")
def public_document_url(document):
url = "%sdocuments/%s/source.%s?token=%s" % (
settings.PUBLIC_URL,
ox.toAZ(document.id),
document.extension,
settings.PUBLIC_TOKEN,
)
return url
def trim_video(src, dst, frames, start0=False):
cap = cv2.VideoCapture(src)
@ -115,10 +124,40 @@ def t2v_bytedance(prompt, duration, output):
)
return status
def i2v_bytedance(first_frame, prompt, duration, output, last_frame=None):
nduration = max(4, int(math.ceil(duration)))
data = {
"duration": nduration,
"content": [
{
"type": "text",
"text": prompt,
},
{
"type": "image_url",
"role": "first_frame",
"image_url": {"url": first_frame},
},
],
}
if last_frame:
data["content"].append({
"type": "image_url",
"role": "last_frame",
"image_url": {"url": last_frame},
})
status = bytedance_task(data)
output_url = status["content"]["video_url"]
ox.net.save_url(output_url, output, overwrite=True)
if "last_frame_url" in status["content"]:
ox.net.save_url(
status["content"]["last_frame_url"],
output + ".last_frame.png",
overwrite=True,
)
return status
def first_last(first_frame, last_frame, prompt, duration, output):
model = "seedance-1-5-pro-251215"
resolution = "720p"
nduration = max(4, int(math.ceil(duration)))
data = {
"duration": nduration,
@ -285,6 +324,93 @@ def prepare_image(image, prompt, out=None):
ox.net.save_url(output_url, out, overwrite=True)
return r
def process_frame(item, prompt, character=None, position=0):
model = "seedream-4-5-251128"
if isinstance(item, str):
item = Item.objects.get(public_id=item)
image = "%s%s/source%s.png?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
position,
settings.PUBLIC_TOKEN,
)
if character is not None:
image = [image, character]
data = {
"model": model,
"prompt": prompt,
"image": image,
"size": "2560x1440",
"watermark": False,
}
url = "https://ark.ap-southeast.bytepluses.com/api/v3/images/generations"
print("prepare_image", data)
response = requests.post(url, headers=headers, json=data).json()
print(response)
url = response["data"][0]["url"]
img = add_ai_image(item, position, url)
img.refresh_from_db()
img.data['model'] = model
img.data['prompt'] = prompt
img.data['source'] = item.public_id
if character:
img.data['source'] += ' ' + character.split('?')[0]
print(img, img.data)
img.save()
img.update_sort()
img.update_find()
return img
def replace_character(item, character, position=0):
prompt = "Replace the foreground character in image 1 with the character in image 2, keep the posture, clothing, background, light, admosthere from image 1, but take the face and personality from image 2. Make sure the size of the character is adjusted since the new character is a child. The quality of the image should be the same between foreground and background, adjust the quality of the character to match the background"
if character in ("P1", "P2", "P3"):
character = public_document_url(Document.objects.get(data__title="Character " + character))
return process_frame(item, prompt, character, position)
def replace_character_motion_control(item, character, keep=False):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
# FIXME get character from documents
if isinstance(character, str):
img = replace_character(item, character, 0)
else:
img = character
image_url = public_document_url(img)
video_url = "%s%s/download/source/?token=%s" % (
settings.PUBLIC_URL,
item.public_id,
settings.PUBLIC_TOKEN,
)
prompt = ""
model = "fal-ai/kling-video/v2.6/pro/motion-control"
prompt_hash = hashlib.sha1((prompt + image_url).encode()).hexdigest()
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (item.public_id, prompt_hash)
data = {
"prompt": prompt,
"image_url": image_url,
"video_url": video_url,
"keep_original_sound": False,
"character_orientation": "video",
}
print(data)
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output, overwrite=True)
ai = add_ai_variant(item, output, "ai:replace:p1:motion-control")
ai.data["prompt"] = ox.escape_html(prompt)
ai.data['firstframe'] = image_url.split('?')[0]
ai.data["model"] = model
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(output))
img.add(ai)
return ai
def describe_video(url):
prompt = (
@ -321,7 +447,7 @@ def describe_item(item):
return describe_video(video_url)
def reshoot_item(item, extra_prompt=None, keep=False):
def reshoot_item(item, extra_prompt=None, first_frame=None, keep=False):
if isinstance(item, str):
item = Item.objects.get(public_id=item)
duration = item.sort.duration
@ -334,16 +460,26 @@ def reshoot_item(item, extra_prompt=None, keep=False):
item.public_id,
prompt_hash,
)
status = t2v_bytedance(prompt, duration, output)
if first_frame:
status = i2v_bytedance(first_frame, prompt, duration, output)
else:
status = t2v_bytedance(prompt, duration, output)
trimmed = "/srv/pandora/static/power/cache/%s_%s/trimmed.mp4" % (
item.public_id,
prompt_hash,
)
trim_video(output, trimmed, frames)
ai = add_ai_variant(item, trimmed, "ai:0:reshoot")
variant = "ai:0:reshoot"
if first_frame:
variant = "ai:0:reshoot-firstframe"
ai = add_ai_variant(item, trimmed, variant)
ai.data["prompt"] = ox.escape_html(prompt)
ai.data["model"] = status["model"]
ai.data["seed"] = status["seed"]
if first_frame:
ai.data["firstframe"] = first_frame.split('?')[0]
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(output))
@ -479,6 +615,13 @@ def restyle_video(item_id, prompt):
ox.net.save_url(output_url, output_path, overwrite=True)
return output_path
def fal_wait_for(model, request_id):
status = fal_client.status(model, request_id, with_logs=True)
while isinstance(status, fal_client.InProgress):
time.sleep(10)
status = fal_client.status(model, request_id, with_logs=True)
result = fal_client.result(model, request_id)
return result
def motion_control_preprocess_image(item_id, image_prompt, video_prompt):
item = Item.objects.get(public_id=item_id)
@ -516,11 +659,7 @@ def motion_control_preprocess_image(item_id, image_prompt, video_prompt):
handler = fal_client.submit(model, arguments=data)
request_id = handler.request_id
print(request_id)
status = fal_client.status(model, request_id, with_logs=True)
while isinstance(status, fal_client.InProgress):
time.sleep(10)
status = fal_client.status(model, request_id, with_logs=True)
result = fal_client.result(model, request_id)
result = fal_wait_for(model, request_id)
print(result)
output_url = result["video"]["url"]
ox.net.save_url(output_url, output, overwrite=True)
@ -541,13 +680,13 @@ def luma_wait_for(id):
return status
def luma_modify_segment(video_url, prompt, first_frame=None):
def luma_modify_segment(video_url, prompt, first_frame=None, mode='flex_2'):
# also got that at fal-ai/luma-dream-machine/ray-2/modify
url = "https://api.lumalabs.ai/dream-machine/v1/generations/video/modify"
payload = {
"generation_type": "modify_video",
"model": "ray-2",
"mode": "adhere_1",
"mode": mode,
"prompt": prompt,
"media": {"url": video_url},
}
@ -627,12 +766,14 @@ def fragment_video(filename, segmentdir, segments):
def flux_edit_image(image, prompt):
if isinstance(image, str):
image = [image]
data = {
"prompt": prompt,
"safety_tolerance": "5",
"enable_safety_checker": False,
"output_format": "jpeg",
"image_urls": [image],
"image_urls": image,
}
print(data)
result = fal_client.subscribe("fal-ai/flux-2-pro/edit", arguments=data)
@ -642,19 +783,7 @@ def flux_edit_image(image, prompt):
def in_the_style_of_fal(image, style):
prompt = "apply style from @image 2 to @image 1 keep the position of the person in @image 1 but take light, colors, clothing from @image 2"
data = {
"prompt": prompt,
"safety_tolerance": "5",
"enable_safety_checker": False,
"output_format": "jpeg",
"image_urls": [image, style],
}
print(data)
result = fal_client.subscribe("fal-ai/flux-2-pro/edit", arguments=data)
print(result)
return result["images"][0]["url"]
return flux_edit_image([image, style], prompt)
def in_the_style_of_byte(image, style):
prompt = "apply style from image 2 to image 1 keep the position of the person in image 1 but take light, colors, clothing from image 2"
@ -677,7 +806,8 @@ def in_the_style_of_byte(image, style):
return create_result.data[0].url
def luma_modify_item(item, prompt, image_prompt=None, first_frame=None):
def luma_modify_item(item, prompt="", image_prompt=None, first_frame=None, keep=False):
mode = 'flex_2'
if isinstance(item, str):
item = Item.objects.get(public_id=item)
source = item.files.all()[0].data.path
@ -694,6 +824,10 @@ def luma_modify_item(item, prompt, image_prompt=None, first_frame=None):
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
video_segments = fragment_video(source, prefix, segments)
n = 0
if isinstance(first_frame, Document):
first_frame_url = public_document_url(first_frame)
else:
first_frame_url = first_frame
for segment in segments:
if isinstance(segment, list):
stype, segment = segment
@ -711,17 +845,30 @@ def luma_modify_item(item, prompt, image_prompt=None, first_frame=None):
)
if os.path.exists(output):
video_url = luma_modify_segment(
public_url(output), prompt, first_frame=first_frame
public_url(output),
prompt,
first_frame=first_frame_url,
mode=mode
)
ox.net.save_url(video_url, output_ai, overwrite=True)
processed.append(output_ai)
n += 1
joined_output = "/srv/pandora/static/power/cache/%s_%s.mp4" % (
joined_output = "/srv/pandora/static/power/cache/%s_%s/joined.mp4" % (
item.public_id,
prompt_hash,
)
join_segments(processed, joined_output)
return joined_output
ai = add_ai_variant(item, joined_output, "ai:replace:p1:luma")
ai.data["prompt"] = ox.escape_html(prompt)
if first_frame:
ai.data['firstframe'] = first_frame_url.split('?')[0]
ai.data["model"] = 'ray-2:%s' % mode
ai.save()
if not keep:
shutil.rmtree(os.path.dirname(joined_output))
if isinstance(first_frame, Document):
first_frame.add(ai)
return ai
def add_ai_variant(item, video_path, type):
@ -751,3 +898,28 @@ def add_ai_variant(item, video_path, type):
file.save()
file.extract_stream()
return ai
def add_ai_image(item, position, url, extension=None):
from document.models import Document
if extension is None:
extension = url.split('.')[-1].split('?')[0]
if extension == 'jpeg': extension = 'jpg'
file = Document(user=item.user)
file.data['title'] = '%s at %s' % (item.get('title'), position)
file.data['position'] = position
file.extension = extension
file.width = -1
file.pages = -1
file.uploading = True
file.save()
file.uploading = True
name = 'data.%s' % file.extension
file.file.name = file.path(name)
ox.net.save_url(url, file.file.path, overwrite=True)
file.get_info()
file.get_ratio()
file.oshash = ox.oshash(file.file.path)
file.save()
file.update_sort()
file.add(item)
return file