from pathlib import Path import hashlib import math import os import time import cv2 import ox import requests import fal_client from byteplussdkarkruntime import Ark from django.conf import settings from item.models import Item os.environ["FAL_KEY"] = settings.FAL_KEY MAX_DURATION = 12 headers = { "Authorization": "Bearer " + settings.BYTEPLUSE_TOKEN, "Content-Type": "application/json", } def public_url(path): return path.replace("/srv/pandora/static/", settings.PUBLIC_URL + "static/") def trim_video(src, dst, frames, start0=False): cap = cv2.VideoCapture(src) fps = cap.get(cv2.CAP_PROP_FPS) frames_src = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) frame_count = 0 offset = int((frames_src - frames) / 2) if start0: offset = 0 print(frames_src, frames, offset) fourcc = cv2.VideoWriter_fourcc(*"avc1") out = cv2.VideoWriter(dst, fourcc, fps, (width, height)) written = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count < offset: continue if frame_count >= (frames + offset): continue out.write(frame) written += 1 out.release() cap.release() def bytedance_task(data): url = "https://ark.ap-southeast.bytepluses.com/api/v3/contents/generations/tasks" model = "seedance-1-5-pro-251215" resolution = "720p" defaults = { "model": model, "generate_audio": False, "ratio": "16:9", "watermark": False, "resolution": resolution, "camera_fixed": True, "return_last_frame": True, } for key, value in defaults.items(): if key not in data: data[key] = value print(data) r = requests.post(url, headers=headers, json=data).json() print(r) task_id = r["id"] status = requests.get(url + "/" + task_id, headers=headers).json() while status["status"] in ("queued", "running", "cancelled"): time.sleep(10) status = requests.get(url + "/" + task_id, headers=headers).json() print(status) return status def bytedance_response(data): url = "https://ark.ap-southeast.bytepluses.com/api/v3/responses" defaults = {"model": "seed-1-8-251228"} for key, value in defaults.items(): if key not in data: data[key] = value print(data) response = requests.post(url, headers=headers, json=data).json() print(response) return response def t2v_bytedance(prompt, duration, output): nduration = max(4, int(math.ceil(duration))) data = { "duration": nduration, "content": [{"type": "text", "text": prompt}], } status = bytedance_task(data) output_url = status["content"]["video_url"] ox.net.save_url(output_url, output, overwrite=True) if "last_frame_url" in status["content"]: ox.net.save_url( status["content"]["last_frame_url"], output + ".last_frame.png", overwrite=True, ) return status def first_last(first_frame, last_frame, prompt, duration, output): model = "seedance-1-5-pro-251215" resolution = "720p" nduration = max(4, int(math.ceil(duration))) data = { "duration": nduration, "content": [ { "type": "text", "text": prompt, }, { "type": "image_url", "role": "first_frame", "image_url": {"url": first_frame}, }, { "type": "image_url", "role": "last_frame", "image_url": {"url": last_frame}, }, ], } status = bytedance_task(data) output_url = status["content"]["video_url"] ox.net.save_url(output_url, output, overwrite=True) if "last_frame_url" in status["content"]: ox.net.save_url( status["content"]["last_frame_url"], output + ".last_frame.png", overwrite=True, ) return status def get_item_segments(item, max_duration=MAX_DURATION): cuts = item.get("cuts") filename = item.files.all()[0].data.path input_info = ox.avinfo(filename) p = 0 nc = [] for c in cuts: d = c - p if d < 0.5: continue p = c nc.append(c) nc = nc + [input_info["duration"]] if len(nc) > 3: if nc[-1] - nc[-2] < 0.5: nc = nc[:-2] + nc[-1:] segments = [] position = 0 for out in nc: duration = out - position while duration > max_duration: position += max_duration if len(segments): segments.append(["c", position]) else: segments.append(position) duration = out - position else: segments.append(out) position = out return segments def join_segments(processed, joined_output): out = None for filename in processed: cap = cv2.VideoCapture(filename) if out is None: fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*"avc1") out = cv2.VideoWriter(joined_output, fourcc, fps, (width, height)) while cap.isOpened(): ret, frame = cap.read() if not ret: break out.write(frame) cap.release() if out is not None: out.release() def remake_video(item_id, prompt): item = Item.objects.get(public_id=item_id) segments = get_item_segments(item) print(segments) prompt_hash = hashlib.sha1(prompt.encode()).hexdigest() position = n = 0 processed = [] for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" duration = segment - position if stype == "c": first_frame_path = ( "/srv/pandora/static/power/cache/%s_%s/%06d.mp4.last_frame.png" % (item.public_id, prompt_hash, n - 1) ) first_frame = public_url(first_frame_path) else: first_frame = "%s%s/source%s.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, position, settings.PUBLIC_TOKEN, ) last_frame_position = segment - 2 / 24 last_frame = "%s%s/source%0.3f.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, last_frame_position, settings.PUBLIC_TOKEN, ) output = "/srv/pandora/static/power/cache/%s_%s/%06d.mp4" % ( item.public_id, prompt_hash, n, ) if not os.path.exists(output): first_last(first_frame, last_frame, prompt, duration, output) trimmed = "/srv/pandora/static/power/cache/%s_%s/%06d_trimmed.mp4" % ( item.public_id, prompt_hash, n, ) frames = int(duration * 24) if not os.path.exists(trimmed): trim_video(output, trimmed, frames, stype == "c") processed.append(trimmed) position = segment n += 1 joined_output = "/srv/pandora/static/power/cache/%s_%s.mp4" % ( item.public_id, prompt_hash, ) join_segments(processed, joined_output) return joined_output def prepare_image(image, prompt, out=None): model = "seedream-4-5-251128" if not image.startswith("http:"): image = public_url(image) data = { "model": model, "prompt": prompt, "image": image, "size": "2560x1440", "watermark": False, } url = "https://ark.ap-southeast.bytepluses.com/api/v3/images/generations" print("prepare_image", data) r = requests.post(url, headers=headers, json=data).json() print(r) output_url = r["data"][0]["url"] if out is None: out = image + ".ai.png" ox.net.save_url(output_url, out, overwrite=True) return r def describe_video(url): prompt = ( "Detect cuts or scene changes and describe each scene, use as much details as you can. " "Describe each person incudling detalied apreance, ethnicity, haircolor, haircut, " "describe each objects, animal or plant, describe foreground and backgroud, " "describe from what angle the scene is filmed, incude details about camera model, lense, depth of field used to film this scene. " "Use the format: . CAMERA CUT TO . CAMERA CUT TO . " "Don't mention it if you don't find a cut." ) data = { "input": [ { "role": "user", "content": [ {"type": "input_video", "video_url": url, "fps": 1}, {"type": "input_text", "text": prompt}, ], } ], } response = bytedance_response(data) return response["output"][1]["content"][0]["text"] def describe_item(item): if isinstance(item, str): item = Item.objects.get(public_id=item) video_url = "%s%s/download/source/?token=%s" % ( settings.PUBLIC_URL, item.public_id, settings.PUBLIC_TOKEN, ) return describe_video(video_url) def reshoot_item(item, extra_prompt=None, keep=False): if isinstance(item, str): item = Item.objects.get(public_id=item) duration = item.sort.duration frames = int(duration * 24) prompt = describe_item(item) if extra_prompt: prompt += " " + extra_prompt prompt_hash = hashlib.sha1((prompt).encode()).hexdigest() output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % ( item.public_id, prompt_hash, ) status = t2v_bytedance(prompt, duration, output) trimmed = "/srv/pandora/static/power/cache/%s_%s/trimmed.mp4" % ( item.public_id, prompt_hash, ) trim_video(output, trimmed, frames) ai = add_ai_variant(item, trimmed, "ai:0:reshoot") ai.data["prompt"] = ox.escape_html(prompt) ai.data["model"] = status["model"] ai.data["seed"] = status["seed"] ai.save() if not keep: shutil.rmtree(os.path.dirname(output)) return ai def describe_image(url): system_prompt = "" system_prompt = "You are an image analyst describing different aspects of an image. You are focused on the form, composition, and task shown in the image." prompt = "Please analyze this image according to the specified structure." data = { "input": [ {"role": "system", "content": system_prompt}, { "role": "user", "content": [ {"type": "input_image", "image_url": url}, {"type": "input_text", "text": prompt}, ], }, ], } response = bytedance_response(data) return response["output"][-1]["content"][0]["text"] def transform_remake_video(item_id, image_prompt, video_prompt): item = Item.objects.get(public_id=item_id) segments = get_item_segments(item) print(segments) prompt_hash = hashlib.sha1((image_prompt + video_prompt).encode()).hexdigest() position = n = 0 processed = [] for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" duration = segment - position if stype == "c": first_frame_path = ( "/srv/pandora/static/power/cache/%s_%s/%06d.mp4.last_frame.png" % (item.public_id, prompt_hash, n - 1) ) first_frame = public_url(first_frame_path) else: first_frame = "%s%s/source%s.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, position, settings.PUBLIC_TOKEN, ) first_frame_path = ( "/srv/pandora/static/power/cache/%s_%s/%06d.first_frame.png" % (item.public_id, prompt_hash, n) ) if not os.path.exists(first_frame_path): prepare_image(first_frame, image_prompt, first_frame_path) first_frame = public_url(first_frame_path) last_frame_position = segment - 2 / 24 last_frame = "%s%s/source%0.3f.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, last_frame_position, settings.PUBLIC_TOKEN, ) last_frame_path = ( "/srv/pandora/static/power/cache/%s_%s/%06d.last_frame.png" % (item.public_id, prompt_hash, n) ) if not os.path.exists(last_frame_path): prepare_image(last_frame, image_prompt, last_frame_path) last_frame = public_url(last_frame_path) output = "/srv/pandora/static/power/cache/%s_%s/%06d.mp4" % ( item.public_id, prompt_hash, n, ) if not os.path.exists(output): first_last(first_frame, last_frame, video_prompt, duration, output) trimmed = "/srv/pandora/static/power/cache/%s_%s/%06d_trimmed.mp4" % ( item.public_id, prompt_hash, n, ) frames = int(duration * 24) if not os.path.exists(trimmed): trim_video(output, trimmed, frames, stype == "c") processed.append(trimmed) position = segment n += 1 joined_output = "/srv/pandora/static/power/cache/%s_%s.mp4" % ( item.public_id, prompt_hash, ) join_segments(processed, joined_output) return joined_output def restyle_video(item_id, prompt): item = Item.objects.get(public_id=item_id) video_url = "%s%s/download/source/?token=%s" % ( settings.PUBLIC_URL, item.public_id, settings.PUBLIC_TOKEN, ) model = "decart/lucy-restyle" handler = fal_client.submit( model, arguments={ "prompt": prompt, "video_url": video_url, "resolution": "720p", "enhance_prompt": True, }, ) request_id = handler.request_id print(request_id) status = fal_client.status(model, request_id, with_logs=True) while isinstance(status, fal_client.InProgress): time.sleep(10) status = fal_client.status(model, request_id, with_logs=True) result = fal_client.result(model, request_id) print(result) output_url = result["video"]["url"] prompt_hash = hashlib.sha1((prompt).encode()).hexdigest() output_path = "/srv/pandora/static/power/cache/%s_%s.mp4" % ( item.public_id, prompt_hash, ) ox.net.save_url(output_url, output_path, overwrite=True) return output_path def motion_control_preprocess_image(item_id, image_prompt, video_prompt): item = Item.objects.get(public_id=item_id) video_url = "%s%s/download/source/?token=%s" % ( settings.PUBLIC_URL, item.public_id, settings.PUBLIC_TOKEN, ) model = "fal-ai/kling-video/v2.6/pro/motion-control" prompt_hash = hashlib.sha1((image_prompt + video_prompt).encode()).hexdigest() output = "/srv/pandora/static/power/cache/%s_%s.mp4" % (item.public_id, prompt_hash) first_frame = "%s%s/source%s.png?token=%s" % ( settings.PUBLIC_URL, item.public_id, 0, settings.PUBLIC_TOKEN, ) first_frame_path = "/srv/pandora/static/power/cache/%s_%s/%06d.first_frame.png" % ( item.public_id, prompt_hash, 0, ) if not os.path.exists(first_frame_path): os.makedirs(os.path.dirname(first_frame_path), exist_ok=True) prepare_image(first_frame, image_prompt, first_frame_path) image_url = public_url(first_frame_path) data = { "prompt": video_prompt, "image_url": image_url, "video_url": video_url, "keep_original_sound": False, "character_orientation": "video", } print(data) handler = fal_client.submit(model, arguments=data) request_id = handler.request_id print(request_id) status = fal_client.status(model, request_id, with_logs=True) while isinstance(status, fal_client.InProgress): time.sleep(10) status = fal_client.status(model, request_id, with_logs=True) result = fal_client.result(model, request_id) print(result) output_url = result["video"]["url"] ox.net.save_url(output_url, output, overwrite=True) return output def luma_wait_for(id): url = "https://api.lumalabs.ai/dream-machine/v1/generations/%s" % id headers = { "accept": "application/json", "content-type": "application/json", "authorization": "Bearer " + settings.LUMA_TOKEN, } status = requests.get(url, headers=headers).json() while status["state"] in ("queued", "dreaming"): time.sleep(10) status = requests.get(url, headers=headers).json() return status def luma_modify_segment(video_url, prompt, first_frame=None): # also got that at fal-ai/luma-dream-machine/ray-2/modify url = "https://api.lumalabs.ai/dream-machine/v1/generations/video/modify" payload = { "generation_type": "modify_video", "model": "ray-2", "mode": "adhere_1", "prompt": prompt, "media": {"url": video_url}, } if first_frame: payload["first_frame"] = {"url": first_frame} headers = { "accept": "application/json", "content-type": "application/json", "authorization": "Bearer " + settings.LUMA_TOKEN, } response = requests.post(url, json=payload, headers=headers).json() print(response) status = luma_wait_for(response["id"]) return status["assets"]["video"] def fragment_video(filename, segmentdir, segments): filename = str(filename) input_info = ox.avinfo(filename) segments_ = [] for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" segments_.append(segment) segments = segments_ position = 0 segment = 0 cap = cv2.VideoCapture(filename) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*"avc1") frame_count = 0 next_cut = int(segments.pop(0) * fps) last = None os.makedirs(segmentdir, exist_ok=True) while cap.isOpened(): if frame_count == 0: output_path = segmentdir + "/%06d.mp4" % segment out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) elif next_cut and frame_count >= next_cut and segments: print(frame_count, output_path) cv2.imwrite(output_path.replace(".mp4", "_last.jpg"), frame) out.release() segment += 1 output_path = segmentdir + "/%06d.mp4" % segment out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) if segments: next_cut = int(segments.pop(0) * fps) else: next_cut = None last = None ret, frame = cap.read() if not ret: break out.write(frame) if last is None: cv2.imwrite(output_path.replace(".mp4", "_first.jpg"), frame) last = frame frame_count += 1 out.release() cap.release() if last is None: os.unlink(output_path) else: cv2.imwrite(output_path.replace(".mp4", "_last.jpg"), last) def flux_edit_image(image, prompt): data = { "prompt": prompt, "safety_tolerance": "5", "enable_safety_checker": False, "output_format": "jpeg", "image_urls": [image], } print(data) result = fal_client.subscribe("fal-ai/flux-2-pro/edit", arguments=data) print(result) return result["images"][0]["url"] def in_the_style_of_fal(image, style): prompt = "apply style from @image 2 to @image 1 keep the position of the person in @image 1 but take light, colors, clothing from @image 2" data = { "prompt": prompt, "safety_tolerance": "5", "enable_safety_checker": False, "output_format": "jpeg", "image_urls": [image, style], } print(data) result = fal_client.subscribe("fal-ai/flux-2-pro/edit", arguments=data) print(result) return result["images"][0]["url"] def in_the_style_of_byte(image, style): prompt = "apply style from image 2 to image 1 keep the position of the person in image 1 but take light, colors, clothing from image 2" image_model_name = "seedream-4-5-251128" ark_client = Ark( base_url="https://ark.ap-southeast.bytepluses.com/api/v3", api_key=settings.BYTEPLUSE_TOKEN, ) create_result = ark_client.images.generate( model=image_model_name, image=[image, style], prompt=prompt, sequential_image_generation="disabled", response_format="url", size="2560x1440", stream=False, watermark=False, ) print(create_result) return create_result.data[0].url def luma_modify_item(item, prompt, image_prompt=None, first_frame=None): if isinstance(item, str): item = Item.objects.get(public_id=item) source = item.files.all()[0].data.path info = ox.avinfo(source) duration = info["duration"] max_duration = 10 if duration < max_duration: segments = [duration] else: segments = get_item_segments(item, max_duration=max_duration) print(segments) prompt_hash = hashlib.sha1((prompt + (image_prompt or "")).encode()).hexdigest() processed = [] prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash) video_segments = fragment_video(source, prefix, segments) n = 0 for segment in segments: if isinstance(segment, list): stype, segment = segment else: stype = "n" output = "/srv/pandora/static/power/cache/%s_%s/%06d.mp4" % ( item.public_id, prompt_hash, n, ) output_ai = "/srv/pandora/static/power/cache/%s_%s/%06d_ai.mp4" % ( item.public_id, prompt_hash, n, ) if os.path.exists(output): video_url = luma_modify_segment( public_url(output), prompt, first_frame=first_frame ) ox.net.save_url(video_url, output_ai, overwrite=True) processed.append(output_ai) n += 1 joined_output = "/srv/pandora/static/power/cache/%s_%s.mp4" % ( item.public_id, prompt_hash, ) join_segments(processed, joined_output) return joined_output def add_ai_variant(item, video_path, type): if isinstance(item, str): item = Item.objects.get(public_id=item) from archive.models import File, Stream ai = Item() ai.user = item.user ai.data["type"] = [type] ai.data["title"] = item.data["title"] ai.save() file = File() file.oshash = ox.oshash(video_path) file.item = ai file.path = "%s.mp4" % type file.info = ox.avinfo(video_path) del file.info["path"] file.parse_info() file.data.name = file.get_path("data." + video_path.split(".")[-1]) os.makedirs(os.path.dirname(file.data.path), exist_ok=True) shutil.copy(video_path, file.data.path) file.available = True file.selected = True file.queued = True file.wanted = False file.save() file.extract_stream() return ai