from pathlib import Path import hashlib import math import os import time import shutil import cv2 import ox import requests import fal_client from byteplussdkarkruntime import Ark from django.conf import settings from item.models import Item from document.models import Document from archive.models import File, Stream os.environ["FAL_KEY"] = settings.FAL_KEY MAX_DURATION = 12 headers = { "Authorization": "Bearer " + settings.BYTEPLUSE_TOKEN, "Content-Type": "application/json", } def public_url(path): return path.replace("/srv/pandora/static/", settings.PUBLIC_URL + "static/") def public_document_url(document): url = "%sdocuments/%s/source.%s?token=%s" % ( settings.PUBLIC_URL, ox.toAZ(document.id), document.extension, settings.PUBLIC_TOKEN, ) return url def public_video_url(item): url = "%s%s/download/source/?token=%s" % ( settings.PUBLIC_URL, item.public_id, settings.PUBLIC_TOKEN, ) return url def public_frame_url(item, position): image = "%s%s/source%s.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, position, settings.PUBLIC_TOKEN, ) return image def trim_video(src, dst, frames, start0=False): cap = cv2.VideoCapture(src) fps = cap.get(cv2.CAP_PROP_FPS) frames_src = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) frame_count = 0 offset = int((frames_src - frames) / 2) if start0: offset = 0 print(frames_src, frames, offset) fourcc = cv2.VideoWriter_fourcc(*"avc1") out = cv2.VideoWriter(dst, fourcc, fps, (width, height)) written = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count < offset: continue if frame_count >= (frames + offset): continue out.write(frame) written += 1 out.release() cap.release() def bytedance_task(data): url = "https://ark.ap-southeast.bytepluses.com/api/v3/contents/generations/tasks" model = "seedance-1-5-pro-251215" resolution = "720p" defaults = { "model": model, "generate_audio": False, "ratio": "16:9", "watermark": False, "resolution": resolution, "camera_fixed": True, "return_last_frame": True, } for key, value in defaults.items(): if key not in data: data[key] = value if data["model"] in EP: data["model"] = EP[data["model"]] print(data) r = requests.post(url, headers=headers, json=data).json() print(r) task_id = r["id"] status = requests.get(url + "/" + task_id, headers=headers).json() while status["status"] in ("queued", "running", "cancelled"): time.sleep(10) status = requests.get(url + "/" + task_id, headers=headers).json() print(status) return status def bytedance_response(data): url = "https://ark.ap-southeast.bytepluses.com/api/v3/responses" defaults = {"model": "seed-1-8-251228"} for key, value in defaults.items(): if key not in data: data[key] = value print(data) if data["model"] in EP: data["model"] = EP[data["model"]] response = requests.post(url, headers=headers, json=data).json() print(response) return response def t2v_bytedance(prompt, duration, output): nduration = max(4, int(math.ceil(duration))) data = { "duration": nduration, "content": [{"type": "text", "text": prompt}], } status = bytedance_task(data) output_url = status["content"]["video_url"] ox.net.save_url(output_url, output, overwrite=True) if "last_frame_url" in status["content"]: ox.net.save_url( status["content"]["last_frame_url"], output + ".last_frame.png", overwrite=True, ) return status def i2v_bytedance(first_frame, prompt, duration, output, last_frame=None, seed=None): nduration = max(4, int(math.ceil(duration))) data = { "duration": nduration, "content": [ { "type": "text", "text": prompt, }, { "type": "image_url", "role": "first_frame", "image_url": {"url": first_frame}, }, ], } if last_frame: data["content"].append( { "type": "image_url", "role": "last_frame", "image_url": {"url": last_frame}, } ) if seed: data["seed"] = seed status = bytedance_task(data) output_url = status["content"]["video_url"] ox.net.save_url(output_url, output, overwrite=True) if "last_frame_url" in status["content"]: ox.net.save_url( status["content"]["last_frame_url"], output + ".last_frame.png", overwrite=True, ) return status def first_last(first_frame, last_frame, prompt, duration, output): nduration = max(4, int(math.ceil(duration))) data = { "duration": nduration, "content": [ { "type": "text", "text": prompt, }, { "type": "image_url", "role": "first_frame", "image_url": {"url": first_frame}, }, { "type": "image_url", "role": "last_frame", "image_url": {"url": last_frame}, }, ], } status = bytedance_task(data) output_url = status["content"]["video_url"] ox.net.save_url(output_url, output, overwrite=True) if "last_frame_url" in status["content"]: ox.net.save_url( status["content"]["last_frame_url"], output + ".last_frame.png", overwrite=True, ) return status def get_item_segments(item, max_duration=MAX_DURATION): cuts = item.get("cuts") filename = item.files.all()[0].data.path input_info = ox.avinfo(filename) p = 0 nc = [] for c in cuts: d = c - p if d < 0.5: continue p = c nc.append(c) nc = nc + [input_info["duration"]] if len(nc) > 3: if nc[-1] - nc[-2] < 0.5: nc = nc[:-2] + nc[-1:] segments = [] position = 0 for out in nc: duration = out - position while duration > max_duration: position += max_duration if len(segments): segments.append(["c", position]) else: segments.append(position) duration = out - position else: segments.append(out) position = out return segments def join_segments(processed, joined_output): out = None for filename in processed: cap = cv2.VideoCapture(filename) if out is None: fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*"avc1") out = cv2.VideoWriter(joined_output, fourcc, fps, (width, height)) while cap.isOpened(): ret, frame = cap.read() if not ret: break out.write(frame) cap.release() if out is not None: out.release() def remake_video(item_id, prompt): item = Item.objects.get(public_id=item_id) segments = get_item_segments(item) print(segments) prompt_hash = hashlib.sha1(prompt.encode()).hexdigest() position = n = 0 processed = [] prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash) for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" duration = segment - position if stype == "c": first_frame_path = "%s/%06d.mp4.last_frame.png" % (prefix, n - 1) first_frame = public_url(first_frame_path) else: first_frame = "%s%s/source%s.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, position, settings.PUBLIC_TOKEN, ) last_frame_position = segment - 2 / 24 last_frame = "%s%s/source%0.3f.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, last_frame_position, settings.PUBLIC_TOKEN, ) output = "%s/%06d.mp4" % (prefix, n) if not os.path.exists(output): first_last(first_frame, last_frame, prompt, duration, output) trimmed = "%s/%06d_trimmed.mp4" % (prefix, n) frames = int(duration * 24) if not os.path.exists(trimmed): trim_video(output, trimmed, frames, stype == "c") processed.append(trimmed) position = segment n += 1 joined_output = "%s/joined.mp4" % (prefix,) join_segments(processed, joined_output) return joined_output EP = { "seedream-4-5-251128": "ep-20260122071519-pbf7l", "seed-1-8-251228": "ep-20260122071243-8qfrk", "seedance-1-5-pro-251215": "ep-20260122071613-blmsd", } def bytedance_image_generation(data): model = "seedream-4-5-251128" url = "https://ark.ap-southeast.bytepluses.com/api/v3/images/generations" defaults = { "model": model, "size": "2560x1440", "watermark": False, } for key in defaults: if key not in data: data[key] = defaults[key] if data["model"] in EP: data["model"] = EP[data["model"]] print("prepare_image", data) response = requests.post(url, headers=headers, json=data).json() print(response) return response["data"][0]["url"] def prepare_image(image, prompt, out=None): if not image.startswith("http:"): image = public_url(image) data = { "prompt": prompt, "image": image, "size": "2560x1440", } output_url = bytedance_image_generation(data) if out is None: out = image + ".ai.png" ox.net.save_url(output_url, out, overwrite=True) return r def process_frame(item, prompt, character=None, position=0, seed=None): model = "seedream-4-5-251128" if isinstance(item, str): item = Item.objects.get(public_id=item) if isinstance(character, Document): character = public_document_url(character) image = public_frame_url(item, position) if character is not None: image = [image, character] data = { "model": model, "prompt": prompt, "image": image, "size": "2560x1440", } if seed: data["seed"] = seed url = bytedance_image_generation(data) img = add_ai_image(item, position, url) img.refresh_from_db() img.data["model"] = model img.data["prompt"] = prompt img.data["source"] = item.public_id if character: img.data["source"] += " " + character.split("?")[0] print(img, img.data) img.save() img.update_sort() img.update_find() return img """ REPLACE_CHARACTER_PROMPT = "Replace the foreground character in image 1 with the character in image 2, keep the posture, clothing, background, light, atmosphere from image 1, but take the facial features and personality from image 2. Make sure the size of the character is adjusted since the new character is a child and make sure the size of the head matches the body. The quality of the image should be the same between foreground and background, adjust the quality of the character to match the background. Use the style of image 1 for the character: if image 1 is a photo make the character a real person, if image 1 is a drawing make the character a drawn character, if image 1 is a comic use a comic character and so on" """ REPLACE_CHARACTER_PROMPT = "Replace the foreground character in image 1 with the character in image 2, keep the posture, clothing, background, light, atmosphere from image 1, but take the facial features and personality from image 2. Make sure the size of the character is adjusted since the new character is a child and make sure the size of the head matches the body. The quality of the image should be the same between foreground and background, adjust the quality of the character to match the background. Use the style of image 1 for the character: if image 1 is a photo make the character a real person, if image 1 is a drawing make the character a drawn character, if image 1 is a comic use a comic character" def fal_replace_character(item, character, position=0): if isinstance(item, str): item = Item.objects.get(public_id=item) prompt = REPLACE_CHARACTER_PROMPT.replace("image 1", "@image 1").replace( "image 2", "@image 2" ) if character == "P5": prompt = prompt.replace("child", "teenager") if character in ("P1", "P2", "P3", "P4", "P5"): character = Document.objects.get(data__title="Character " + character) if isinstance(character, Document): character = public_document_url(character) image = public_frame_url(item, position) image = [image, character] url = flux_edit_image(image, prompt) img = add_ai_image(item, position, url) img.refresh_from_db() img.data["model"] = "flux-2-pro" img.data["prompt"] = prompt img.data["source"] = item.public_id img.data["source"] += " " + character.split("?")[0] print(img, img.data) img.save() img.update_sort() img.update_find() return img def replace_character(item, character, position=0, seed=None): prompt = REPLACE_CHARACTER_PROMPT if character == "P5": prompt = prompt.replace("child", "teenager") if character in ("P1", "P2", "P3", "P4", "P5"): character = public_document_url( Document.objects.get(data__title="Character " + character) ) return process_frame(item, prompt, character, position, seed=seed) def kling_lipsync(audio_item, video_item): video_url = public_video_url(video_item) audio_url = public_video_url(audio_item) model = "fal-ai/kling-video/lipsync/audio-to-video" data = { "video_url": video_url, "audio_url": audio_url } print(data) handler = fal_client.submit(model, arguments=data) request_id = handler.request_id print(request_id) result = fal_wait_for(model, request_id) print(result) output_url = result["video"]["url"] ox.net.save_url(output_url, output, overwrite=True) ai = add_ai_variant(item, output, "ai:lipsync") ai.data["model"] = model ai.save() if not keep: shutil.rmtree(os.path.dirname(output)) img.add(ai) return ai def kling_v2v_reference(item, character, keep=False): # https://fal.ai/models/fal-ai/kling-video/o1/video-to-video/reference/api if isinstance(item, str): item = Item.objects.get(public_id=item) if character in ("P1", "P2", "P3", "P4", "P5"): character = public_document_url( Document.objects.get(data__title="Character " + character) ) video_url = public_video_url(item) prompt = "Replace the main character in @Video1 with the character from the reference images, adjust the style of the character to match the style of the video" model = "fal-ai/kling-video/o1/video-to-video/reference" prompt_hash = hashlib.sha1((prompt + character).encode()).hexdigest() output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % ( item.public_id, prompt_hash, ) for d in [3, 4, 5, 6, 7, 8, 9, 10]: if d > item.sort.duration: break duration = d data = { "prompt": prompt, "image_url": image_url, "video_url": video_url, "keep_original_sound": False, "character_orientation": "video", } data = { "prompt": prompt, "keep_audio": False, "aspect_ratio": "16:9", "video_url": video_url, "image_urls": [image_url], "duration": str(duration) } ''' data["elements"] = [ { "reference_image_urls": [ image_url, image_url, ], "frontal_image_url": image_url } ] ''' print(data) handler = fal_client.submit(model, arguments=data) request_id = handler.request_id print(request_id) result = fal_wait_for(model, request_id) print(result) output_url = result["video"]["url"] ox.net.save_url(output_url, output, overwrite=True) ai = add_ai_variant(item, output, "ai:v2v-replace") ai.data["prompt"] = ox.escape_html(prompt) ai.data["firstframe"] = image_url.split("?")[0] ai.data["model"] = model ai.save() if not keep: shutil.rmtree(os.path.dirname(output)) img.add(ai) return ai def replace_character_motion_control(item, character, keep=False): if isinstance(item, str): item = Item.objects.get(public_id=item) # FIXME get character from documents if isinstance(character, str): img = replace_character(item, character, 0) else: img = character image_url = public_document_url(img) video_url = public_video_url(item) prompt = "" model = "fal-ai/kling-video/v2.6/pro/motion-control" prompt_hash = hashlib.sha1((prompt + image_url).encode()).hexdigest() output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % ( item.public_id, prompt_hash, ) data = { "prompt": prompt, "image_url": image_url, "video_url": video_url, "keep_original_sound": False, "character_orientation": "video", } print(data) handler = fal_client.submit(model, arguments=data) request_id = handler.request_id print(request_id) result = fal_wait_for(model, request_id) print(result) output_url = result["video"]["url"] ox.net.save_url(output_url, output, overwrite=True) ai = add_ai_variant(item, output, "ai:replace:p1:motion-control") ai.data["prompt"] = ox.escape_html(prompt) ai.data["firstframe"] = image_url.split("?")[0] ai.data["model"] = model ai.save() if not keep: shutil.rmtree(os.path.dirname(output)) img.add(ai) return ai def describe_video(url, neutral=False, face=False): if face: extra = 'describe the facial expression to allow an actor to recreate the scene, ' else: extra = '' if neutral: prompt = ( "Detect cuts or scene changes and describe each scene, use as much details as you can. " f"Describe each person incudling detalied apreance, haircut in a gender neutral way, {extra}" "describe each objects, animal or plant, describe foreground and backgroud, " "describe from what angle the scene is filmed, incude details about camera model, lense, depth of field used to film this scene. " "Use the format: . CAMERA CUT TO . CAMERA CUT TO . " "Don't mention it if you don't find a cut." ) else: prompt = ( "Detect cuts or scene changes and describe each scene, use as much details as you can. " "Describe each person incudling detalied apreance, ethnicity, haircolor, haircut, " "describe each objects, animal or plant, describe foreground and backgroud, " "describe from what angle the scene is filmed, incude details about camera model, lense, depth of field used to film this scene. " "Use the format: . CAMERA CUT TO . CAMERA CUT TO . " "Don't mention it if you don't find a cut." ) data = { "input": [ { "role": "user", "content": [ {"type": "input_video", "video_url": url, "fps": 1}, {"type": "input_text", "text": prompt}, ], } ], } response = bytedance_response(data) return response["output"][1]["content"][0]["text"] def describe_item(item, neutral=False): if isinstance(item, str): item = Item.objects.get(public_id=item) video_url = public_video_url(item) return describe_video(video_url, neutral) def reshoot_item(item, extra_prompt=None, first_frame=None, keep=False, prompt=None): if isinstance(item, str): item = Item.objects.get(public_id=item) if isinstance(first_frame, Document): first_frame = public_document_url(first_frame) duration = item.sort.duration frames = int(duration * 24) if prompt is None: prompt = describe_item(item, first_frame is not None) if extra_prompt: prompt += " " + extra_prompt prompt_hash = hashlib.sha1((prompt).encode()).hexdigest() output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % ( item.public_id, prompt_hash, ) if first_frame: status = i2v_bytedance(first_frame, prompt, duration, output) else: status = t2v_bytedance(prompt, duration, output) trimmed = "/srv/pandora/static/power/cache/%s_%s/trimmed.mp4" % ( item.public_id, prompt_hash, ) trim_video(output, trimmed, frames) variant = "ai:0:reshoot" if first_frame: variant = "ai:0:reshoot-firstframe" ai = add_ai_variant(item, trimmed, variant) ai.data["prompt"] = ox.escape_html(prompt) ai.data["model"] = status["model"] ai.data["seed"] = status["seed"] if first_frame: ai.data["firstframe"] = first_frame.split("?")[0] if isinstance(first_frame, Document): first_frame.add(ai) ai.save() if not keep: shutil.rmtree(os.path.dirname(output)) return ai def reshoot_item_segments(item, character, keep=False): if isinstance(item, str): item = Item.objects.get(public_id=item) max_duration = 12 source = item.files.all()[0].data.path info = ox.avinfo(source) duration = info["duration"] if duration < max_duration: segments = [duration] else: segments = get_item_segments(item, max_duration=max_duration) print(segments) prompt_hash = hashlib.sha1("reshoot_segment".encode()).hexdigest() processed = [] prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash) video_segments = fragment_video(source, prefix, segments) prompts = [] first_frames = [] seed = None n = 0 position = 0 for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" output = "%s/%06d.mp4" % (prefix, n) output_ai = "%s/%06d_ai.mp4" % (prefix, n) segment_duration = segment - position if os.path.exists(output): segment_video_url = public_url(output) prompt = describe_video(segment_video_url, neutral=True) prompts.append("Segment %s: " % (n + 1) + prompt) segment_character = character if position: segment_character = segment_first_frame_url segment_first_frame = replace_character( item, segment_character, position, seed=seed ) segment_first_frame_url = public_document_url(segment_first_frame) status = i2v_bytedance( segment_first_frame_url, prompt, segment_duration, output_ai, seed=seed ) seeed = status["seed"] trimmed = "%s/%06d_ai_trimmed.mp4" % (prefix, n) frames = int(segment_duration * 24) trim_video(output_ai, trimmed, frames, stype == "c") processed.append(trimmed) first_frames.append(segment_first_frame) n += 1 position = segment joined_output = "%s/joined.mp4" % (prefix) join_segments(processed, joined_output) ai = add_ai_variant(item, joined_output, "ai:0:reshoot-firstframe") prompt = "\n\n".join(prompts) ai.data["prompt"] = ox.escape_html(prompt) ai.data["firstframe"] = " ".join([ox.toAZ(ff.id) for ff in first_frames]) ai.data["model"] = status["model"] ai.data["seed"] = seed ai.save() for first_frame in first_frames: first_frame.add(ai) if not keep: shutil.rmtree(prefix) return ai def describe_image(url): system_prompt = "" system_prompt = "You are an image analyst describing different aspects of an image. You are focused on the form, composition, and task shown in the image." prompt = "Please analyze this image according to the specified structure." data = { "input": [ {"role": "system", "content": system_prompt}, { "role": "user", "content": [ {"type": "input_image", "image_url": url}, {"type": "input_text", "text": prompt}, ], }, ], } response = bytedance_response(data) return response["output"][-1]["content"][0]["text"] def transform_remake_video(item_id, image_prompt, video_prompt): item = Item.objects.get(public_id=item_id) segments = get_item_segments(item) print(segments) prompt_hash = hashlib.sha1((image_prompt + video_prompt).encode()).hexdigest() position = n = 0 processed = [] prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash) for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" duration = segment - position if stype == "c": first_frame_path = "%s/%06d.mp4.last_frame.png" % (prefix, n - 1) first_frame = public_url(first_frame_path) else: first_frame = "%s%s/source%s.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, position, settings.PUBLIC_TOKEN, ) first_frame_path = "%s/%06d.first_frame.png" % (prefix, n) if not os.path.exists(first_frame_path): prepare_image(first_frame, image_prompt, first_frame_path) first_frame = public_url(first_frame_path) last_frame_position = segment - 2 / 24 last_frame = "%s%s/source%0.3f.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, last_frame_position, settings.PUBLIC_TOKEN, ) last_frame_path = "%s/%06d.last_frame.png" % (prefix, n) if not os.path.exists(last_frame_path): prepare_image(last_frame, image_prompt, last_frame_path) last_frame = public_url(last_frame_path) output = "%s/%06d.mp4" % (prefix, n) if not os.path.exists(output): first_last(first_frame, last_frame, video_prompt, duration, output) trimmed = "%s/%06d_trimmed.mp4" % (prefix, n) frames = int(duration * 24) if not os.path.exists(trimmed): trim_video(output, trimmed, frames, stype == "c") processed.append(trimmed) position = segment n += 1 joined_output = "%s/joined.mp4" % (prefix,) join_segments(processed, joined_output) return joined_output def restyle_video(item_id, prompt): item = Item.objects.get(public_id=item_id) video_url = public_video_url(item) model = "decart/lucy-restyle" handler = fal_client.submit( model, arguments={ "prompt": prompt, "video_url": video_url, "resolution": "720p", "enhance_prompt": True, }, ) request_id = handler.request_id print(request_id) status = fal_client.status(model, request_id, with_logs=True) while isinstance(status, fal_client.InProgress): time.sleep(10) status = fal_client.status(model, request_id, with_logs=True) result = fal_client.result(model, request_id) print(result) output_url = result["video"]["url"] prompt_hash = hashlib.sha1((prompt).encode()).hexdigest() output_path = "/srv/pandora/static/power/cache/%s_%s.mp4" % ( item.public_id, prompt_hash, ) ox.net.save_url(output_url, output_path, overwrite=True) return output_path def fal_wait_for(model, request_id): status = fal_client.status(model, request_id, with_logs=True) while isinstance(status, fal_client.InProgress): time.sleep(10) status = fal_client.status(model, request_id, with_logs=True) result = fal_client.result(model, request_id) return result def motion_control_preprocess_image(item_id, image_prompt, video_prompt): item = Item.objects.get(public_id=item_id) video_url = public_video_url(item) model = "fal-ai/kling-video/v2.6/pro/motion-control" prompt_hash = hashlib.sha1((image_prompt + video_prompt).encode()).hexdigest() output = "/srv/pandora/static/power/cache/%s_%s.mp4" % (item.public_id, prompt_hash) first_frame = "%s%s/source%s.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, 0, settings.PUBLIC_TOKEN, ) first_frame_path = "/srv/pandora/static/power/cache/%s_%s/%06d.first_frame.png" % ( item.public_id, prompt_hash, 0, ) if not os.path.exists(first_frame_path): os.makedirs(os.path.dirname(first_frame_path), exist_ok=True) prepare_image(first_frame, image_prompt, first_frame_path) image_url = public_url(first_frame_path) data = { "prompt": video_prompt, "image_url": image_url, "video_url": video_url, "keep_original_sound": False, "character_orientation": "video", } print(data) handler = fal_client.submit(model, arguments=data) request_id = handler.request_id print(request_id) result = fal_wait_for(model, request_id) print(result) output_url = result["video"]["url"] ox.net.save_url(output_url, output, overwrite=True) return output def luma_wait_for(id): url = "https://api.lumalabs.ai/dream-machine/v1/generations/%s" % id headers = { "accept": "application/json", "content-type": "application/json", "authorization": "Bearer " + settings.LUMA_TOKEN, } status = requests.get(url, headers=headers).json() while status["state"] in ("queued", "dreaming"): time.sleep(10) status = requests.get(url, headers=headers).json() return status def luma_modify_segment(video_url, prompt, first_frame=None, mode="flex_2"): # also got that at fal-ai/luma-dream-machine/ray-2/modify url = "https://api.lumalabs.ai/dream-machine/v1/generations/video/modify" payload = { "generation_type": "modify_video", "model": "ray-2", "mode": mode, "prompt": prompt, "media": {"url": video_url}, } if first_frame: payload["first_frame"] = {"url": first_frame} headers = { "accept": "application/json", "content-type": "application/json", "authorization": "Bearer " + settings.LUMA_TOKEN, } response = requests.post(url, json=payload, headers=headers).json() print(response) status = luma_wait_for(response["id"]) return status["assets"]["video"] def fragment_video(filename, segmentdir, segments): filename = str(filename) input_info = ox.avinfo(filename) segments_ = [] for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" segments_.append(segment) segments = segments_ position = 0 segment = 0 cap = cv2.VideoCapture(filename) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*"avc1") frame_count = 0 next_cut = int(segments.pop(0) * fps) last = None os.makedirs(segmentdir, exist_ok=True) while cap.isOpened(): if frame_count == 0: output_path = segmentdir + "/%06d.mp4" % segment out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) elif next_cut and frame_count >= next_cut and segments: print(frame_count, output_path) cv2.imwrite(output_path.replace(".mp4", "_last.jpg"), frame) out.release() segment += 1 output_path = segmentdir + "/%06d.mp4" % segment out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) if segments: next_cut = int(segments.pop(0) * fps) else: next_cut = None last = None ret, frame = cap.read() if not ret: break out.write(frame) if last is None: cv2.imwrite(output_path.replace(".mp4", "_first.jpg"), frame) last = frame frame_count += 1 out.release() cap.release() if last is None: os.unlink(output_path) else: cv2.imwrite(output_path.replace(".mp4", "_last.jpg"), last) def flux_edit_image(image, prompt): if isinstance(image, str): image = [image] data = { "prompt": prompt, "safety_tolerance": "5", "enable_safety_checker": False, "output_format": "jpeg", "image_urls": image, } print(data) result = fal_client.subscribe("fal-ai/flux-2-pro/edit", arguments=data) print(result) return result["images"][0]["url"] def in_the_style_of_fal(image, style): prompt = "apply style from @image 2 to @image 1 keep the position of the person in @image 1 but take light, colors, clothing from @image 2" return flux_edit_image([image, style], prompt) def in_the_style_of_byte(image, style): prompt = "apply style from image 2 to image 1 keep the position of the person in image 1 but take light, colors, clothing from image 2" image_model_name = "seedream-4-5-251128" ark_client = Ark( base_url="https://ark.ap-southeast.bytepluses.com/api/v3", api_key=settings.BYTEPLUSE_TOKEN, ) create_result = ark_client.images.generate( model=image_model_name, image=[image, style], prompt=prompt, sequential_image_generation="disabled", response_format="url", size="2560x1440", stream=False, watermark=False, ) print(create_result) return create_result.data[0].url def luma_modify_item(item, prompt="", image_prompt=None, first_frame=None, keep=False): mode = "flex_2" if isinstance(item, str): item = Item.objects.get(public_id=item) source = item.files.all()[0].data.path info = ox.avinfo(source) duration = info["duration"] max_duration = 10 if duration < max_duration: segments = [duration] else: segments = get_item_segments(item, max_duration=max_duration) print(segments) prompt_hash = hashlib.sha1((prompt + (image_prompt or "")).encode()).hexdigest() processed = [] prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash) video_segments = fragment_video(source, prefix, segments) n = 0 if isinstance(first_frame, Document): first_frame_url = public_document_url(first_frame) else: first_frame_url = first_frame for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" output = "%s/%06d.mp4" % (prefix, n) output_ai = "%s/%06d_ai.mp4" % (prefix, n) if os.path.exists(output): video_url = luma_modify_segment( public_url(output), prompt, first_frame=first_frame_url, mode=mode ) ox.net.save_url(video_url, output_ai, overwrite=True) processed.append(output_ai) n += 1 joined_output = "%s/joined.mp4" % (prefix,) join_segments(processed, joined_output) ai = add_ai_variant(item, joined_output, "ai:replace:p1:luma") ai.data["prompt"] = ox.escape_html(prompt) if first_frame: ai.data["firstframe"] = first_frame_url.split("?")[0] ai.data["model"] = "ray-2:%s" % mode ai.save() if not keep: shutil.rmtree(os.path.dirname(joined_output)) if isinstance(first_frame, Document): first_frame.add(ai) return ai def add_ai_variant(item, video_path, type): if isinstance(item, str): item = Item.objects.get(public_id=item) ai = Item() ai.user = item.user ai.data["type"] = [type] ai.data["title"] = item.data["title"] ai.data["chapter"] = item.data.get("chapter", "") ai.save() file = File() file.oshash = ox.oshash(video_path) file.item = ai file.path = "%s.mp4" % type file.info = ox.avinfo(video_path) del file.info["path"] file.parse_info() file.data.name = file.get_path("data." + video_path.split(".")[-1]) os.makedirs(os.path.dirname(file.data.path), exist_ok=True) shutil.copy(video_path, file.data.path) file.available = True file.selected = True file.queued = True file.wanted = False file.save() file.extract_stream() return ai def add_ai_image(item, position, url, extension=None): if extension is None: extension = url.split(".")[-1].split("?")[0] if extension == "jpeg": extension = "jpg" file = Document(user=item.user) file.data["title"] = "%s at %s" % (item.get("title"), position) file.data["position"] = position file.extension = extension file.width = -1 file.pages = -1 file.uploading = True file.save() file.uploading = True name = "data.%s" % file.extension file.file.name = file.path(name) ox.net.save_url(url, file.file.path, overwrite=True) file.get_info() file.get_ratio() file.oshash = ox.oshash(file.file.path) file.save() file.update_sort() file.add(item) return file def add_tag(item, tag): if 'tags' not in item.data: item.data['tags'] = [] item.data['tags'].append(tag) item.save() def process_motion_firstframe(character="P1", keep=False): l = itemlist.models.List.objects.get(name='Motion-Firstframe') for i in l.items.all(): ai = Item.objects.filter(data__type__icontains='ai').filter(data__title=i.data['title']) if ai.exists() or 'ai-failed' in i.data.get('tags', []): print('>> skip', i) continue print(i) try: replace_character_motion_control(i, character, keep=keep) except: i.refresh_from_db() add_tag(i, 'ai-failed') print('>> failed', i) def extract_firstframe(character='P1'): for item in Item.objects.filter(data__type__icontains="source"): if 'ai-failed' in item.data.get('tags', []): continue if not item.documents.all().exists(): print(item) try: first_frame = replace_character(item, character, 0) except: item.refresh_from_db() add_tag(item, 'ai-failed') def process_reshoot_firstframe(): l = itemlist.models.List.objects.get(name='Reshoot-Firstframe') for i in l.items.all(): if i.sort.duration > 30: continue if i.public_id == 'HZ': continue if i.documents.all().count(): ai = Item.objects.filter(data__type__icontains='ai').filter(data__title=i.data['title']) if ai.exists() or 'ai-failed' in i.data.get('tags', []): print('>> skip', i) continue first_frame = i.documents.all().order_by('-created').first() if not first_frame: first_frame = replace_character(i, 'P1', 0) print(i, first_frame, i.documents.all().count()) try: reshoot_item(i, first_frame=first_frame) except: add_tag(i, 'ai-failed') print('>> failed', i) def process_motion_firstframe(): l = itemlist.models.List.objects.get(name='Motion-Firstframe') for i in l.items.all(): if i.sort.duration > 30: continue if i.public_id == 'HZ': continue if i.documents.all().count(): ai = Item.objects.filter(data__type__icontains='ai').filter(data__title=i.data['title']) if ai.exists() or 'ai-failed' in i.data.get('tags', []): print('>> skip', i) continue first_frame = i.documents.all().order_by('-created').first() if not first_frame: first_frame = replace_character(i, 'P1', 0) print(i, first_frame, i.documents.all().count()) try: replace_character_motion_control(i, first_frame) except: add_tag(i, 'ai-failed') print('>> failed', i)