1859 lines
64 KiB
Python
1859 lines
64 KiB
Python
from pathlib import Path
|
|
import hashlib
|
|
import math
|
|
import os
|
|
import time
|
|
import shutil
|
|
import subprocess
|
|
|
|
import cv2
|
|
import ox
|
|
import requests
|
|
import fal_client
|
|
from byteplussdkarkruntime import Ark
|
|
|
|
from django.conf import settings
|
|
|
|
from item.models import Item
|
|
from document.models import Document
|
|
from archive.models import File, Stream
|
|
import itemlist.models
|
|
|
|
|
|
os.environ["FAL_KEY"] = settings.FAL_KEY
|
|
|
|
MAX_DURATION = 12
|
|
headers = {
|
|
"Authorization": "Bearer " + settings.BYTEPLUSE_TOKEN,
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
|
|
def public_url(path):
|
|
return path.replace("/srv/pandora/static/", settings.PUBLIC_URL + "static/")
|
|
|
|
|
|
def public_document_url(document):
|
|
url = "%sdocuments/%s/source.%s?token=%s" % (
|
|
settings.PUBLIC_URL,
|
|
document.get_id(),
|
|
document.extension,
|
|
settings.PUBLIC_TOKEN,
|
|
)
|
|
return url
|
|
|
|
|
|
def public_video_url(item):
|
|
url = "%s%s/download/source/?token=%s" % (
|
|
settings.PUBLIC_URL,
|
|
item.public_id,
|
|
settings.PUBLIC_TOKEN,
|
|
)
|
|
return url
|
|
|
|
|
|
def public_frame_url(item, position):
|
|
image = "%s%s/source%s.png?token=%s" % (
|
|
settings.PUBLIC_URL,
|
|
item.public_id,
|
|
position,
|
|
settings.PUBLIC_TOKEN,
|
|
)
|
|
return image
|
|
|
|
|
|
def trim_video(src, dst, frames, start0=False):
|
|
cap = cv2.VideoCapture(src)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
frames_src = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
frame_count = 0
|
|
|
|
offset = int((frames_src - frames) / 2)
|
|
if start0:
|
|
offset = 0
|
|
print(frames_src, frames, offset)
|
|
fourcc = cv2.VideoWriter_fourcc(*"avc1")
|
|
out = cv2.VideoWriter(dst, fourcc, fps, (width, height))
|
|
written = 0
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
frame_count += 1
|
|
if frame_count < offset:
|
|
continue
|
|
if frame_count >= (frames + offset):
|
|
continue
|
|
out.write(frame)
|
|
written += 1
|
|
out.release()
|
|
cap.release()
|
|
|
|
def make_single_character_image(character):
|
|
character = get_character_document(character, type='Character')
|
|
character_url = public_document_url(character)
|
|
data = {
|
|
"model": "seedream-4-5-251128",
|
|
"size": "2K",
|
|
"watermark": False,
|
|
'image': character_url,
|
|
"prompt": "character from image 1 is standing straight up, full body portrait from head to toe. face, clothing, skin are photo realistic, camera facing straight at character. background is white"
|
|
}
|
|
url = bytedance_image_generation(data)
|
|
extension = url.split(".")[-1].split("?")[0]
|
|
if extension == "jpeg":
|
|
extension = "jpg"
|
|
file = Document(user=character.user)
|
|
file.rightslevel = 2
|
|
file.data["title"] = character.data['title'].replace('Character', 'Single Character')
|
|
file.extension = extension
|
|
file.width = -1
|
|
file.pages = -1
|
|
file.uploading = True
|
|
file.save()
|
|
file.uploading = True
|
|
name = "data.%s" % file.extension
|
|
file.file.name = file.path(name)
|
|
ox.net.save_url(url, file.file.path, overwrite=True)
|
|
file.get_info()
|
|
file.get_ratio()
|
|
file.oshash = ox.oshash(file.file.path)
|
|
file.save()
|
|
file.update_sort()
|
|
file.update_find()
|
|
return file
|
|
|
|
def age_character_image(character, age):
|
|
character = get_character_document(character)
|
|
character_url = public_document_url(character)
|
|
data = {
|
|
"model": "seedream-4-5-251128",
|
|
"size": "2K",
|
|
"watermark": False,
|
|
'image': character_url,
|
|
"prompt": "use character from image 1, but make older, change the body, face and appearance to match that of a %d year old person, replace clothing, shoes to match the age, adjust hair style to match the age, keep the full body including feet visible. photo realistic picture of a real person in high detail, studio light" % age
|
|
}
|
|
url = bytedance_image_generation(data)
|
|
extension = url.split(".")[-1].split("?")[0]
|
|
if extension == "jpeg":
|
|
extension = "jpg"
|
|
file = Document(user=character.user)
|
|
file.rightslevel = 2
|
|
file.data["title"] = character.data['title'] + ' (Age %s)' % age
|
|
file.extension = extension
|
|
file.width = -1
|
|
file.pages = -1
|
|
file.uploading = True
|
|
file.save()
|
|
file.uploading = True
|
|
name = "data.%s" % file.extension
|
|
file.file.name = file.path(name)
|
|
ox.net.save_url(url, file.file.path, overwrite=True)
|
|
file.get_info()
|
|
file.get_ratio()
|
|
file.oshash = ox.oshash(file.file.path)
|
|
file.save()
|
|
file.update_sort()
|
|
file.update_find()
|
|
return file
|
|
|
|
def make_single_character_image_flux(character):
|
|
character = get_character_document(character, type='Character')
|
|
character_url = public_document_url(character)
|
|
prompt = 'character from @image 1 is standing straight up, full body portrait from head to toe. face, clothing, skin are photo realistic, camera facing straight at character. background is white'
|
|
url = flux_edit_image([character_url], prompt)
|
|
extension = url.split(".")[-1].split("?")[0]
|
|
if extension == "jpeg":
|
|
extension = "jpg"
|
|
file = Document(user=character.user)
|
|
file.rightslevel = 2
|
|
file.data["title"] = character.data['title'].replace('Character', 'FLUX Single Character')
|
|
file.extension = extension
|
|
file.width = -1
|
|
file.pages = -1
|
|
file.uploading = True
|
|
file.save()
|
|
file.uploading = True
|
|
name = "data.%s" % file.extension
|
|
file.file.name = file.path(name)
|
|
ox.net.save_url(url, file.file.path, overwrite=True)
|
|
file.get_info()
|
|
file.get_ratio()
|
|
file.oshash = ox.oshash(file.file.path)
|
|
file.save()
|
|
file.update_sort()
|
|
file.update_find()
|
|
return file
|
|
|
|
|
|
def bytedance_task(data):
|
|
url = "https://ark.ap-southeast.bytepluses.com/api/v3/contents/generations/tasks"
|
|
model = "seedance-1-5-pro-251215"
|
|
resolution = "1080p"
|
|
defaults = {
|
|
"model": model,
|
|
"generate_audio": False,
|
|
"ratio": "16:9",
|
|
"watermark": False,
|
|
"resolution": resolution,
|
|
"camera_fixed": True,
|
|
"return_last_frame": True,
|
|
}
|
|
for key, value in defaults.items():
|
|
if key not in data:
|
|
data[key] = value
|
|
if data["model"] in EP:
|
|
data["model"] = EP[data["model"]]
|
|
print(data)
|
|
r = requests.post(url, headers=headers, json=data).json()
|
|
print(r)
|
|
task_id = r["id"]
|
|
status = requests.get(url + "/" + task_id, headers=headers).json()
|
|
while status["status"] in ("queued", "running", "cancelled"):
|
|
time.sleep(10)
|
|
status = requests.get(url + "/" + task_id, headers=headers).json()
|
|
print(status)
|
|
return status
|
|
|
|
|
|
def bytedance_response(data):
|
|
url = "https://ark.ap-southeast.bytepluses.com/api/v3/responses"
|
|
defaults = {"model": "seed-1-8-251228"}
|
|
for key, value in defaults.items():
|
|
if key not in data:
|
|
data[key] = value
|
|
print(data)
|
|
if data["model"] in EP:
|
|
data["model"] = EP[data["model"]]
|
|
response = requests.post(url, headers=headers, json=data).json()
|
|
print(response)
|
|
return response
|
|
|
|
|
|
def t2v_bytedance(prompt, duration, output):
|
|
nduration = max(4, int(math.ceil(duration)))
|
|
data = {
|
|
"duration": nduration,
|
|
"content": [{"type": "text", "text": prompt}],
|
|
}
|
|
status = bytedance_task(data)
|
|
output_url = status["content"]["video_url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
if "last_frame_url" in status["content"]:
|
|
ox.net.save_url(
|
|
status["content"]["last_frame_url"],
|
|
output + ".last_frame.png",
|
|
overwrite=True,
|
|
)
|
|
return status
|
|
|
|
|
|
def i2v_bytedance(first_frame, prompt, duration, output, last_frame=None, seed=None):
|
|
nduration = max(4, int(math.ceil(duration)))
|
|
data = {
|
|
"duration": nduration,
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": prompt,
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"role": "first_frame",
|
|
"image_url": {"url": first_frame},
|
|
},
|
|
],
|
|
}
|
|
if last_frame:
|
|
data["content"].append(
|
|
{
|
|
"type": "image_url",
|
|
"role": "last_frame",
|
|
"image_url": {"url": last_frame},
|
|
}
|
|
)
|
|
if seed:
|
|
data["seed"] = seed
|
|
status = bytedance_task(data)
|
|
output_url = status["content"]["video_url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
if "last_frame_url" in status["content"]:
|
|
ox.net.save_url(
|
|
status["content"]["last_frame_url"],
|
|
output + ".last_frame.png",
|
|
overwrite=True,
|
|
)
|
|
return status
|
|
|
|
|
|
def first_last(first_frame, last_frame, prompt, duration, output):
|
|
nduration = max(4, int(math.ceil(duration)))
|
|
data = {
|
|
"duration": nduration,
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": prompt,
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"role": "first_frame",
|
|
"image_url": {"url": first_frame},
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"role": "last_frame",
|
|
"image_url": {"url": last_frame},
|
|
},
|
|
],
|
|
}
|
|
status = bytedance_task(data)
|
|
output_url = status["content"]["video_url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
if "last_frame_url" in status["content"]:
|
|
ox.net.save_url(
|
|
status["content"]["last_frame_url"],
|
|
output + ".last_frame.png",
|
|
overwrite=True,
|
|
)
|
|
return status
|
|
|
|
|
|
def get_item_segments(item, max_duration=MAX_DURATION):
|
|
cuts = item.get("cuts")
|
|
filename = item.files.all()[0].data.path
|
|
input_info = ox.avinfo(filename)
|
|
p = 0
|
|
nc = []
|
|
for c in cuts:
|
|
d = c - p
|
|
if d < 0.5:
|
|
continue
|
|
p = c
|
|
nc.append(c)
|
|
nc = nc + [input_info["duration"]]
|
|
if len(nc) > 3:
|
|
if nc[-1] - nc[-2] < 0.5:
|
|
nc = nc[:-2] + nc[-1:]
|
|
segments = []
|
|
position = 0
|
|
for out in nc:
|
|
duration = out - position
|
|
while duration > max_duration:
|
|
position += max_duration
|
|
if len(segments):
|
|
segments.append(["c", position])
|
|
else:
|
|
segments.append(position)
|
|
duration = out - position
|
|
else:
|
|
segments.append(out)
|
|
position = out
|
|
return segments
|
|
|
|
|
|
def join_segments(processed, joined_output):
|
|
out = None
|
|
for filename in processed:
|
|
cap = cv2.VideoCapture(filename)
|
|
if out is None:
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
fourcc = cv2.VideoWriter_fourcc(*"avc1")
|
|
out = cv2.VideoWriter(joined_output, fourcc, fps, (width, height))
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
out.write(frame)
|
|
cap.release()
|
|
if out is not None:
|
|
out.release()
|
|
|
|
|
|
def remake_video(item_id, prompt):
|
|
item = Item.objects.get(public_id=item_id)
|
|
segments = get_item_segments(item)
|
|
print(segments)
|
|
prompt_hash = hashlib.sha1(prompt.encode()).hexdigest()
|
|
position = n = 0
|
|
processed = []
|
|
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
|
|
for segment in segments:
|
|
if isinstance(segment, list):
|
|
stype, segment = segment
|
|
else:
|
|
stype = "n"
|
|
duration = segment - position
|
|
|
|
if stype == "c":
|
|
first_frame_path = "%s/%06d.mp4.last_frame.png" % (prefix, n - 1)
|
|
first_frame = public_url(first_frame_path)
|
|
else:
|
|
first_frame = "%s%s/source%s.png?token=%s" % (
|
|
settings.PUBLIC_URL,
|
|
item.public_id,
|
|
position,
|
|
settings.PUBLIC_TOKEN,
|
|
)
|
|
last_frame_position = segment - 2 / 24
|
|
last_frame = "%s%s/source%0.3f.png?token=%s" % (
|
|
settings.PUBLIC_URL,
|
|
item.public_id,
|
|
last_frame_position,
|
|
settings.PUBLIC_TOKEN,
|
|
)
|
|
output = "%s/%06d.mp4" % (prefix, n)
|
|
if not os.path.exists(output):
|
|
first_last(first_frame, last_frame, prompt, duration, output)
|
|
trimmed = "%s/%06d_trimmed.mp4" % (prefix, n)
|
|
frames = int(duration * 24)
|
|
if not os.path.exists(trimmed):
|
|
trim_video(output, trimmed, frames, stype == "c")
|
|
processed.append(trimmed)
|
|
position = segment
|
|
n += 1
|
|
|
|
joined_output = "%s/joined.mp4" % (prefix,)
|
|
join_segments(processed, joined_output)
|
|
return joined_output
|
|
|
|
|
|
EP = {
|
|
"seedream-4-5-251128": "ep-20260122071519-pbf7l",
|
|
"seed-1-8-251228": "ep-20260122071243-8qfrk",
|
|
"seedance-1-5-pro-251215": "ep-20260122071613-blmsd",
|
|
}
|
|
|
|
def bytedance_image_generation(data):
|
|
model = "seedream-4-5-251128"
|
|
url = "https://ark.ap-southeast.bytepluses.com/api/v3/images/generations"
|
|
defaults = {
|
|
"model": model,
|
|
"size": "2560x1440",
|
|
"watermark": False,
|
|
}
|
|
for key in defaults:
|
|
if key not in data:
|
|
data[key] = defaults[key]
|
|
if data["model"] in EP:
|
|
data["model"] = EP[data["model"]]
|
|
print("prepare_image", data)
|
|
response = requests.post(url, headers=headers, json=data).json()
|
|
print(response)
|
|
return response["data"][0]["url"]
|
|
|
|
|
|
def prepare_image(image, prompt, out=None):
|
|
if not image.startswith("http:"):
|
|
image = public_url(image)
|
|
data = {
|
|
"prompt": prompt,
|
|
"image": image,
|
|
"size": "2560x1440",
|
|
}
|
|
output_url = bytedance_image_generation(data)
|
|
if out is None:
|
|
out = image + ".ai.png"
|
|
ox.net.save_url(output_url, out, overwrite=True)
|
|
return r
|
|
|
|
def process_frame(item, prompt, character=None, position=0, seed=None):
|
|
model = "seedream-4-5-251128"
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
if isinstance(character, Document):
|
|
character = public_document_url(character)
|
|
image = public_frame_url(item, position)
|
|
if character is not None:
|
|
image = [image, character]
|
|
data = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"image": image,
|
|
"size": "2560x1440",
|
|
}
|
|
if seed:
|
|
data["seed"] = seed
|
|
url = bytedance_image_generation(data)
|
|
img = add_ai_image(item, position, url)
|
|
img.refresh_from_db()
|
|
img.data["model"] = model
|
|
img.data["prompt"] = prompt
|
|
img.data["source"] = item.public_id
|
|
if character:
|
|
img.data["source"] += " " + character.split("?")[0]
|
|
print(img, img.data)
|
|
img.save()
|
|
img.update_sort()
|
|
img.update_find()
|
|
return img
|
|
|
|
def process_image(document, prompt, character=None, position=0, seed=None):
|
|
model = "seedream-4-5-251128"
|
|
if isinstance(character, Document):
|
|
character = public_document_url(character)
|
|
image = public_document_url(document)
|
|
if character is not None:
|
|
image = [image, character]
|
|
data = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"image": image,
|
|
"size": "2560x1440",
|
|
}
|
|
if seed:
|
|
data["seed"] = seed
|
|
url = bytedance_image_generation(data)
|
|
img = add_ai_image(document, position, url)
|
|
img.refresh_from_db()
|
|
img.data["model"] = model
|
|
img.data["prompt"] = prompt
|
|
img.data["source"] = document.get_id()
|
|
if character:
|
|
img.data["source"] += " " + character.split("?")[0]
|
|
print(img, img.data)
|
|
img.save()
|
|
img.update_sort()
|
|
img.update_find()
|
|
return img
|
|
|
|
def replace_background(image, background, prompt=None, seed=None):
|
|
model = "seedream-4-5-251128"
|
|
position = 0
|
|
if prompt is None:
|
|
prompt = "Place the character from image 2 into image 1"
|
|
if isinstance(background, Item):
|
|
background_url = public_frame_url(background, position)
|
|
else:
|
|
background_url = public_document_url(background)
|
|
images = [
|
|
background_url,
|
|
public_document_url(image),
|
|
]
|
|
data = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"image": images,
|
|
"size": "2560x1440",
|
|
}
|
|
if seed:
|
|
data["seed"] = seed
|
|
url = bytedance_image_generation(data)
|
|
img = add_ai_image(image, position, url)
|
|
img.refresh_from_db()
|
|
img.data["model"] = model
|
|
img.data["prompt"] = prompt
|
|
img.data["source"] = image.get_id()
|
|
img.save()
|
|
img.update_sort()
|
|
img.update_find()
|
|
return img
|
|
|
|
def get_character_document(character, type="Single Character", age=None):
|
|
if character in ("P1", "P2", "P3", "P4", "P5"):
|
|
title = type + " " + character
|
|
if age:
|
|
title += ' (Age %d)' % age
|
|
return Document.objects.get(data__title=title)
|
|
return character
|
|
|
|
"""
|
|
REPLACE_CHARACTER_PROMPT = "Replace the foreground character in image 1 with the character in image 2, keep the posture, clothing, background, light, atmosphere from image 1, but take the facial features and personality from image 2. Make sure the size of the character is adjusted since the new character is a child and make sure the size of the head matches the body. The quality of the image should be the same between foreground and background, adjust the quality of the character to match the background. Use the style of image 1 for the character: if image 1 is a photo make the character a real person, if image 1 is a drawing make the character a drawn character, if image 1 is a comic use a comic character and so on"
|
|
"""
|
|
|
|
REPLACE_CHARACTER_PROMPT = "Replace the foreground character in image 1 with the character in image 2, keep the posture, clothing, background, light, atmosphere from image 1, but take the facial features and personality from image 2. Make sure the size of the character is adjusted since the new character is a child and make sure the size of the head matches the body. The quality of the image should be the same between foreground and background, adjust the quality of the character to match the background. Use the style of image 1 for the character: if image 1 is a photo make the character a real person, if image 1 is a drawing make the character a drawn character, if image 1 is a comic use a comic character, restore any blurred out regions of the image"
|
|
|
|
REPLACE_CHARACTER_PROMPT = "Replace the main person in image 1 with the person in image 2, keep the posture, clothing, background, light, atmosphere from image 1, but take the facial features and personality from image 2. Make sure the size of the person is adjusted since the new person is a child and make sure the size of the head matches the body. The quality of the image should be the same between foreground and background, adjust the quality of the person to match the background. Use the style of image 1 for the person: if image 1 is a photo make the person a real person, if image 1 is a drawing make the person a drawn person, if image 1 is a comic use a comic character, restore any blurred out regions of the image"
|
|
|
|
|
|
def fal_replace_character(item, character, position=0):
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
prompt = REPLACE_CHARACTER_PROMPT.replace("image 1", "@image 1").replace(
|
|
"image 2", "@image 2"
|
|
)
|
|
if character == "P5":
|
|
prompt = prompt.replace("child", "teenager")
|
|
|
|
character = get_character_document(character)
|
|
if isinstance(character, Document):
|
|
character = public_document_url(character)
|
|
image = public_frame_url(item, position)
|
|
image = [image, character]
|
|
url = flux_edit_image(image, prompt)
|
|
img = add_ai_image(item, position, url)
|
|
img.refresh_from_db()
|
|
img.data["model"] = "flux-2-pro"
|
|
img.data["prompt"] = prompt
|
|
img.data["source"] = item.public_id
|
|
img.data["source"] += " " + character.split("?")[0]
|
|
if isinstance(character, Document):
|
|
img.data["character"] = character.get_id()
|
|
else:
|
|
img.data["character"] = character
|
|
img.data["position"] = position
|
|
print(img, img.data)
|
|
img.save()
|
|
img.update_sort()
|
|
img.update_find()
|
|
return img
|
|
|
|
|
|
def replace_character(item, character, position=0, seed=None, extra=None, age=None, prompt=None):
|
|
if prompt is None:
|
|
prompt = REPLACE_CHARACTER_PROMPT
|
|
if age:
|
|
prompt = prompt.replace("child", "person")
|
|
elif character == "P5":
|
|
prompt = prompt.replace("child", "teenager")
|
|
if extra:
|
|
prompt += " " + extra
|
|
character = get_character_document(character, age=age)
|
|
if isinstance(character, Document):
|
|
character_url = public_document_url(character)
|
|
else:
|
|
character_url = character
|
|
frame = process_frame(item, prompt, character_url, position, seed=seed)
|
|
if isinstance(character, Document):
|
|
frame.data["character"] = character.get_id()
|
|
else:
|
|
frame.data["character"] = character
|
|
frame.data["position"] = position
|
|
if age:
|
|
frame.data["title"] += " (Age %d)" % age
|
|
frame.save()
|
|
return frame
|
|
|
|
|
|
def kling_lipsync(audio_item, video_item, keep=False):
|
|
video_url = public_video_url(video_item)
|
|
audio_path = audio_item.streams()[0].file.data.path
|
|
prefix = "/srv/pandora/static/power/cache/%s" % (audio_item.public_id)
|
|
os.makedirs(prefix, exist_ok=True)
|
|
output = prefix + '/audio.m4a'
|
|
if not os.path.exists(output):
|
|
cmd = ['ffmpeg', '-hide_banner', '-nostats', '-i', audio_path, '-vn', '-c:a', 'copy', output]
|
|
subprocess.call(cmd)
|
|
if not os.path.exists(output):
|
|
raise Exception
|
|
audio_url = public_url(output)
|
|
model = "fal-ai/kling-video/lipsync/audio-to-video"
|
|
data = {
|
|
"video_url": video_url,
|
|
"audio_url": audio_url
|
|
}
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
output = prefix + "/lipsync.mp4"
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
ai = add_ai_variant(audio_item, output, "ai:lipsync")
|
|
ai.data["model"] = model
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(prefix)
|
|
return ai
|
|
|
|
def kling_v2v_reference(item, character, keep=False):
|
|
# https://fal.ai/models/fal-ai/kling-video/o1/video-to-video/reference/api
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
|
|
if character in ("P1", "P2", "P3", "P4", "P5"):
|
|
character = public_document_url(
|
|
Document.objects.get(data__title="Character " + character)
|
|
)
|
|
character = get_character_document(character)
|
|
if isinstance(character, Document):
|
|
character_url = public_document_url(character)
|
|
else:
|
|
character_url = character
|
|
video_url = public_video_url(item)
|
|
prompt = "Replace the main character in @Video1 with the character from the reference images, adjust the style of the character to match the style of the video"
|
|
model = "fal-ai/kling-video/o1/video-to-video/reference"
|
|
prompt_hash = hashlib.sha1((prompt + character_url).encode()).hexdigest()
|
|
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
|
|
item.public_id,
|
|
prompt_hash,
|
|
)
|
|
for d in [3, 4, 5, 6, 7, 8, 9, 10]:
|
|
if d > item.sort.duration:
|
|
break
|
|
duration = d
|
|
|
|
data = {
|
|
"prompt": prompt,
|
|
"image_url": image_url,
|
|
"video_url": video_url,
|
|
"keep_original_sound": False,
|
|
"character_orientation": "video",
|
|
}
|
|
data = {
|
|
"prompt": prompt,
|
|
"keep_audio": False,
|
|
"aspect_ratio": "16:9",
|
|
"video_url": video_url,
|
|
"image_urls": [character_url],
|
|
"duration": str(duration)
|
|
}
|
|
'''
|
|
data["elements"] = [
|
|
{
|
|
"reference_image_urls": [
|
|
image_url,
|
|
image_url,
|
|
],
|
|
"frontal_image_url": image_url
|
|
}
|
|
]
|
|
'''
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
ai = add_ai_variant(item, output, "ai:v2v-replace")
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
ai.data["firstframe"] = image_url.split("?")[0]
|
|
ai.data["model"] = model
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(os.path.dirname(output))
|
|
img.add(ai)
|
|
return ai
|
|
|
|
def kling_v2v_edit(item, background, keep=False):
|
|
# https://fal.ai/models/fal-ai/kling-video/o1/video-to-video/edit
|
|
return
|
|
|
|
video_url = public_video_url(item)
|
|
|
|
images = []
|
|
elements = {
|
|
"reference_image_urls": [],
|
|
"frontal_image": ""
|
|
}
|
|
data = {
|
|
"prompt": prompt,
|
|
"video_url": video_url,
|
|
"image_urls": images,
|
|
"elements": elements,
|
|
"keep_audio": False,
|
|
"character_orientation": "video",
|
|
}
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
ai = add_ai_variant(item, output, "ai:v2v-replace")
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
ai.data["firstframe"] = image_url.split("?")[0]
|
|
ai.data["model"] = model
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(os.path.dirname(output))
|
|
img.add(ai)
|
|
return ai
|
|
|
|
def wan_reference_to_video(foreground, background, keep=False):
|
|
prompt = "Use the character from @Video1 and use @Video2 as background"
|
|
prompt = "Character1 dances in the foreground, Character2 as background"
|
|
model = "wan/v2.6/reference-to-video"
|
|
|
|
foreground_url = public_video_url(foreground)
|
|
#background_url = public_video_url(background)
|
|
src = background.files.all()[0].data.path
|
|
item = background
|
|
prompt_hash = hashlib.sha1((prompt + foreground_url + src).encode()).hexdigest()
|
|
prefix = "/srv/pandora/static/power/cache/%s_%s" % (
|
|
item.public_id,
|
|
prompt_hash,
|
|
)
|
|
os.makedirs(prefix, exist_ok=True)
|
|
frames = int(foreground.sort.duration * 24)
|
|
dst = prefix + "/background.mp4"
|
|
trim_video(src, dst, frames)
|
|
if not os.path.exists(dst):
|
|
raise Exception
|
|
|
|
fg = prefix + "/foreground.mp4"
|
|
shutil.copy(foreground.files.all()[0].data.path, fg)
|
|
foreground_url = public_url(fg)
|
|
background_url = public_url(dst)
|
|
item = background
|
|
output = prefix + "/ai.mp4"
|
|
for d in [5, 10]:
|
|
if d > item.sort.duration:
|
|
break
|
|
duration = d
|
|
|
|
data = {
|
|
"prompt": prompt,
|
|
"video_urls": [
|
|
foreground_url,
|
|
background_url,
|
|
],
|
|
"aspect_ratio": "16:9",
|
|
"resolution": "720p",
|
|
"duration": str(duration),
|
|
"enable_prompt_expansion": True,
|
|
"multi_shots": True,
|
|
"enable_safety_checker": False
|
|
}
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
ai = add_ai_variant(item, output, "ai:foreground-background")
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
ai.data["model"] = model
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(os.path.dirname(output))
|
|
return ai
|
|
|
|
def wan_animate_replace(item, character, keep=False):
|
|
item_url = public_video_url(item)
|
|
prompt = ""
|
|
model = "fal-ai/wan/v2.2-14b/animate/replace"
|
|
prompt_hash = hashlib.sha1((prompt + foreground_url + background_url).encode()).hexdigest()
|
|
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
|
|
item.public_id,
|
|
prompt_hash,
|
|
)
|
|
data = {
|
|
"video_url": item_url,
|
|
"image_url": character_url,
|
|
"resolution": "720p",
|
|
"video_quality": "high",
|
|
"guidance_scale": 1,
|
|
"enable_safety_checker": False,
|
|
"enable_output_safety_checker": False,
|
|
}
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
ai = add_ai_variant(item, output, "ai:foreground-background")
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
ai.data["model"] = model
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(os.path.dirname(output))
|
|
return ai
|
|
|
|
def ltx_a2v(item, character, prompt=None, first_frame=None, keep=False, expand_prompt=False):
|
|
video_url = public_video_url(item)
|
|
audio_path = item.streams()[0].file.data.path
|
|
if first_frame is None:
|
|
character = get_character_document(character)
|
|
position = 0
|
|
cid = get_character_document(character).get_id()
|
|
first_frame = item.documents.filter(
|
|
data__character=cid,
|
|
data__position=position
|
|
).order_by('-created').first()
|
|
if not first_frame:
|
|
first_frame = replace_character(item, character, position)
|
|
image_url = public_document_url(first_frame)
|
|
prefix = "/srv/pandora/static/power/cache/%s_a2v" % (item.public_id)
|
|
os.makedirs(prefix, exist_ok=True)
|
|
if audio_path.endswith('.mp3'):
|
|
output = prefix + '/audio.mp3'
|
|
shutil.copy(audio_path, output)
|
|
else:
|
|
output = prefix + '/audio.m4a'
|
|
if not os.path.exists(output):
|
|
cmd = ['ffmpeg', '-hide_banner', '-nostats', '-i', audio_path, '-vn', '-c:a', 'copy', output]
|
|
subprocess.call(cmd)
|
|
if not os.path.exists(output):
|
|
raise Exception
|
|
audio_url = public_url(output)
|
|
model = "fal-ai/ltx-2-19b/audio-to-video"
|
|
neutral = True
|
|
if prompt is None:
|
|
prompt = describe_item(item, neutral)
|
|
data = {
|
|
"audio_url": audio_url,
|
|
"image_url": image_url,
|
|
"video_size": {"width": 1280, "height": 720},
|
|
"match_audio_length": True,
|
|
"fps": 24,
|
|
"prompt": prompt,
|
|
"enable_safety_checker": False,
|
|
"enable_prompt_expansion": expand_prompt,
|
|
}
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
output = prefix + "/ai.mp4"
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
ai = add_ai_variant(item, output, "ai:audio-to-video")
|
|
ai.data["model"] = model
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
ai.data["seed"] = result["seed"]
|
|
ai.save()
|
|
first_frame.add(ai)
|
|
ai.data["firstframe"] = first_frame.get_id()
|
|
if not keep:
|
|
shutil.rmtree(prefix)
|
|
return ai
|
|
|
|
|
|
def vo2video(vo, item, character, position=0, prompt=None, age=None, expand_prompt=False):
|
|
first_frame = replace_character(item, character, position, age=age)
|
|
if prompt is None:
|
|
# the painting becomes animated and the girl looks into the camera and speaks
|
|
prompt = "the scene and person become animated, the person looks into the camera and speaks"
|
|
return ltx_a2v(vo, character=character, prompt=prompt, first_frame=first_frame, expand_prompt=expand_prompt)
|
|
|
|
|
|
def ltx_v2v(item, character, prompt=None, keep=False):
|
|
video_url = public_video_url(item)
|
|
character = get_character_document(character)
|
|
|
|
position = 0
|
|
cid = get_character_document(character).get_id()
|
|
first_frame = item.documents.filter(
|
|
data__character=cid,
|
|
data__position=position
|
|
).order_by('-created').first()
|
|
if not first_frame:
|
|
first_frame = replace_character(item, character, position)
|
|
image_url = public_document_url(first_frame)
|
|
prefix = "/srv/pandora/static/power/cache/%s_ltx_v2v" % (item.public_id)
|
|
model = "fal-ai/ltx-2-19b/video-to-video"
|
|
neutral = True
|
|
if prompt is None:
|
|
prompt = describe_item(item, neutral)
|
|
data = {
|
|
"prompt": prompt,
|
|
"video_url": video_url,
|
|
"image_url": image_url,
|
|
"video_size": {"width": 1280, "height": 720},
|
|
"match_video_length": True,
|
|
"fps": 24,
|
|
"generate_audio": False,
|
|
"enable_safety_checker": False,
|
|
}
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
output = prefix + "/ai.mp4"
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
ai = add_ai_variant(item, output, "ai:video-to-video")
|
|
ai.data["model"] = model
|
|
ai.data["seed"] = result["seed"]
|
|
ai.save()
|
|
first_frame.add(ai)
|
|
ai.data["firstframe"] = first_frame.get_id()
|
|
if not keep:
|
|
shutil.rmtree(prefix)
|
|
return ai
|
|
|
|
def replace_character_motion_control(item, character, first_frame=None, background=None, keep=False):
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
|
|
add = []
|
|
if first_frame:
|
|
img = first_frame
|
|
image_url = public_document_url(first_frame)
|
|
else:
|
|
# FIXME get character from documents
|
|
if isinstance(character, str):
|
|
img = replace_character(item, character, 0)
|
|
else:
|
|
img = character
|
|
if background:
|
|
add.append(img)
|
|
img = replace_background(img, background)
|
|
image_url = public_document_url(img)
|
|
|
|
video_url = public_video_url(item)
|
|
prompt = ""
|
|
model = "fal-ai/kling-video/v2.6/pro/motion-control"
|
|
prompt_hash = hashlib.sha1((prompt + image_url).encode()).hexdigest()
|
|
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
|
|
item.public_id,
|
|
prompt_hash,
|
|
)
|
|
data = {
|
|
"prompt": prompt,
|
|
"image_url": image_url,
|
|
"video_url": video_url,
|
|
"keep_original_sound": False,
|
|
"character_orientation": "video",
|
|
}
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
ai = add_ai_variant(item, output, "ai:replace:p1:motion-control")
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
ai.data["firstframe"] = image_url.split("?")[0]
|
|
ai.data["model"] = model
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(os.path.dirname(output))
|
|
img.add(ai)
|
|
for img_ in add:
|
|
img_.add(ai)
|
|
return ai
|
|
|
|
|
|
def describe_video(url, neutral=False, face=False):
|
|
if face:
|
|
extra = 'describe the facial expression to allow an actor to recreate the scene, '
|
|
else:
|
|
extra = ''
|
|
if neutral:
|
|
prompt = (
|
|
"Detect cuts or scene changes and describe each scene, use as much details as you can. "
|
|
f"Describe each person incudling detalied apreance, haircut in a gender neutral way, {extra}"
|
|
"describe each objects, animal or plant, describe foreground and backgroud, "
|
|
"describe from what angle the scene is filmed, incude details about camera model, lense, depth of field used to film this scene. "
|
|
"Use the format: <description of scene 1>. CAMERA CUT TO <description of scene 2>. CAMERA CUT TO <description of scene 3>. "
|
|
"Don't mention it if you don't find a cut."
|
|
)
|
|
else:
|
|
prompt = (
|
|
"Detect cuts or scene changes and describe each scene, use as much details as you can. "
|
|
"Describe each person incudling detalied apreance, ethnicity, haircolor, haircut, "
|
|
"describe each objects, animal or plant, describe foreground and backgroud, "
|
|
"describe from what angle the scene is filmed, incude details about camera model, lense, depth of field used to film this scene. "
|
|
"Use the format: <description of scene 1>. CAMERA CUT TO <description of scene 2>. CAMERA CUT TO <description of scene 3>. "
|
|
"Don't mention it if you don't find a cut."
|
|
)
|
|
data = {
|
|
"input": [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "input_video", "video_url": url, "fps": 1},
|
|
{"type": "input_text", "text": prompt},
|
|
],
|
|
}
|
|
],
|
|
}
|
|
response = bytedance_response(data)
|
|
return response["output"][1]["content"][0]["text"]
|
|
|
|
|
|
def describe_item(item, neutral=False):
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
if item.get("prompt") and neutral:
|
|
return ox.decode_html(item.get("prompt"))
|
|
video_url = public_video_url(item)
|
|
prompt = describe_video(video_url, neutral)
|
|
if neutral:
|
|
item.refresh_from_db()
|
|
item.data["prompt"] = ox.escape_html(prompt)
|
|
item.save()
|
|
return prompt
|
|
|
|
|
|
def reshoot_item(item, extra_prompt=None, first_frame=None, keep=False, prompt=None, last_frame=None, duration=None):
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
if isinstance(first_frame, Document):
|
|
first_frame_url = public_document_url(first_frame)
|
|
else:
|
|
first_frame_url = first_frame
|
|
if isinstance(last_frame, Document):
|
|
last_frame_url = public_document_url(last_frame)
|
|
else:
|
|
last_frame_url = last_frame
|
|
if duration is None:
|
|
duration = item.sort.duration
|
|
frames = int(duration * 24)
|
|
neutral = first_frame is not None
|
|
if prompt is None:
|
|
prompt = describe_item(item, neutral)
|
|
if extra_prompt:
|
|
prompt += " " + extra_prompt
|
|
prompt_hash = hashlib.sha1((prompt).encode()).hexdigest()
|
|
output = "/srv/pandora/static/power/cache/%s_%s/ai.mp4" % (
|
|
item.public_id,
|
|
prompt_hash,
|
|
)
|
|
if first_frame:
|
|
status = i2v_bytedance(first_frame_url, prompt, duration, output, last_frame=last_frame)
|
|
else:
|
|
status = t2v_bytedance(prompt, duration, output)
|
|
|
|
trimmed = "/srv/pandora/static/power/cache/%s_%s/trimmed.mp4" % (
|
|
item.public_id,
|
|
prompt_hash,
|
|
)
|
|
trim_video(output, trimmed, frames)
|
|
variant = "ai:0:reshoot"
|
|
if first_frame:
|
|
variant = "ai:0:reshoot-firstframe"
|
|
ai = add_ai_variant(item, trimmed, variant)
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
ai.data["model"] = status["model"]
|
|
ai.data["seed"] = status["seed"]
|
|
if first_frame:
|
|
if isinstance(first_frame, Document):
|
|
first_frame.add(ai)
|
|
ai.data["firstframe"] = first_frame.get_id()
|
|
else:
|
|
ai.data["firstframe"] = first_frame.split("?")[0]
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(os.path.dirname(output))
|
|
return ai
|
|
|
|
|
|
def reshoot_item_segments(item, character, age=None, keep=False, offset=0):
|
|
if isinstance(item, str):
|
|
etem = Item.objects.get(public_id=item)
|
|
max_duration = 12
|
|
source = item.files.all()[0].data.path
|
|
info = ox.avinfo(source)
|
|
duration = info["duration"]
|
|
if duration < max_duration and character != "annotation":
|
|
segments = [duration]
|
|
else:
|
|
segments = get_item_segments(item, max_duration=max_duration)
|
|
print(segments)
|
|
prompt_hash = hashlib.sha1("reshoot_segment".encode()).hexdigest()
|
|
processed = []
|
|
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
|
|
video_segments = fragment_video(source, prefix, segments)
|
|
prompts = []
|
|
first_frames = []
|
|
seed = None
|
|
n = 0
|
|
position = 0
|
|
for segment in segments:
|
|
if isinstance(segment, list):
|
|
stype, segment = segment
|
|
else:
|
|
stype = "n"
|
|
output = "%s/%06d.mp4" % (prefix, n)
|
|
output_ai = "%s/%06d_ai.mp4" % (prefix, n)
|
|
segment_duration = segment - position
|
|
if os.path.exists(output):
|
|
segment_video_url = public_url(output)
|
|
prompt = describe_video(segment_video_url, neutral=True)
|
|
prompts.append("Segment %s: " % (n + 1) + prompt)
|
|
if character == "annotation":
|
|
a = item.annotations.filter(
|
|
layer='prompts', start__gte=position, end__gt=position
|
|
).exclude(start__gte=segment).order_by('start').first()
|
|
if a:
|
|
segment_character = a.value
|
|
print('use character from annotation', a, segment_character, 'for %s-%s' % (position, segment))
|
|
else:
|
|
segment_character = None
|
|
print('use no character for %s-%s' % (position, segment))
|
|
else:
|
|
segment_character = character
|
|
if segment_character:
|
|
segment_first_frame = replace_character(
|
|
item, segment_character, position+offset, seed=seed, age=age
|
|
)
|
|
segment_first_frame_url = public_document_url(segment_first_frame)
|
|
else:
|
|
segment_first_frame_url = public_url(output.replace(".mp4", "_first.jpg"))
|
|
status = i2v_bytedance(
|
|
segment_first_frame_url, prompt, segment_duration, output_ai, seed=seed
|
|
)
|
|
seeed = status["seed"]
|
|
trimmed = "%s/%06d_ai_trimmed.mp4" % (prefix, n)
|
|
frames = int(segment_duration * 24)
|
|
trim_video(output_ai, trimmed, frames, stype == "c")
|
|
processed.append(trimmed)
|
|
if segment_character:
|
|
first_frames.append(segment_first_frame)
|
|
n += 1
|
|
position = segment
|
|
joined_output = "%s/joined.mp4" % (prefix)
|
|
join_segments(processed, joined_output)
|
|
ai = add_ai_variant(item, joined_output, "ai:0:reshoot-firstframe")
|
|
prompt = "\n\n".join(prompts)
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
ai.data["firstframe"] = " ".join([ff.get_id() for ff in first_frames])
|
|
ai.data["model"] = status["model"]
|
|
ai.data["seed"] = seed
|
|
ai.save()
|
|
for first_frame in first_frames:
|
|
first_frame.add(ai)
|
|
if not keep:
|
|
shutil.rmtree(prefix)
|
|
return ai
|
|
|
|
|
|
def describe_image(url):
|
|
system_prompt = ""
|
|
system_prompt = "You are an image analyst describing different aspects of an image. You are focused on the form, composition, and task shown in the image."
|
|
prompt = "Please analyze this image according to the specified structure."
|
|
data = {
|
|
"input": [
|
|
{"role": "system", "content": system_prompt},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "input_image", "image_url": url},
|
|
{"type": "input_text", "text": prompt},
|
|
],
|
|
},
|
|
],
|
|
}
|
|
response = bytedance_response(data)
|
|
return response["output"][-1]["content"][0]["text"]
|
|
|
|
|
|
def transform_remake_video(item_id, image_prompt, video_prompt):
|
|
item = Item.objects.get(public_id=item_id)
|
|
segments = get_item_segments(item)
|
|
print(segments)
|
|
prompt_hash = hashlib.sha1((image_prompt + video_prompt).encode()).hexdigest()
|
|
position = n = 0
|
|
processed = []
|
|
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
|
|
for segment in segments:
|
|
if isinstance(segment, list):
|
|
stype, segment = segment
|
|
else:
|
|
stype = "n"
|
|
duration = segment - position
|
|
if stype == "c":
|
|
first_frame_path = "%s/%06d.mp4.last_frame.png" % (prefix, n - 1)
|
|
first_frame = public_url(first_frame_path)
|
|
else:
|
|
first_frame = "%s%s/source%s.png?token=%s" % (
|
|
settings.PUBLIC_URL,
|
|
item.public_id,
|
|
position,
|
|
settings.PUBLIC_TOKEN,
|
|
)
|
|
first_frame_path = "%s/%06d.first_frame.png" % (prefix, n)
|
|
if not os.path.exists(first_frame_path):
|
|
prepare_image(first_frame, image_prompt, first_frame_path)
|
|
first_frame = public_url(first_frame_path)
|
|
last_frame_position = segment - 2 / 24
|
|
last_frame = "%s%s/source%0.3f.png?token=%s" % (
|
|
settings.PUBLIC_URL,
|
|
item.public_id,
|
|
last_frame_position,
|
|
settings.PUBLIC_TOKEN,
|
|
)
|
|
last_frame_path = "%s/%06d.last_frame.png" % (prefix, n)
|
|
if not os.path.exists(last_frame_path):
|
|
prepare_image(last_frame, image_prompt, last_frame_path)
|
|
last_frame = public_url(last_frame_path)
|
|
|
|
output = "%s/%06d.mp4" % (prefix, n)
|
|
if not os.path.exists(output):
|
|
first_last(first_frame, last_frame, video_prompt, duration, output)
|
|
trimmed = "%s/%06d_trimmed.mp4" % (prefix, n)
|
|
frames = int(duration * 24)
|
|
if not os.path.exists(trimmed):
|
|
trim_video(output, trimmed, frames, stype == "c")
|
|
processed.append(trimmed)
|
|
position = segment
|
|
n += 1
|
|
|
|
joined_output = "%s/joined.mp4" % (prefix,)
|
|
join_segments(processed, joined_output)
|
|
return joined_output
|
|
|
|
|
|
def restyle_video(item_id, prompt):
|
|
item = Item.objects.get(public_id=item_id)
|
|
video_url = public_video_url(item)
|
|
model = "decart/lucy-restyle"
|
|
handler = fal_client.submit(
|
|
model,
|
|
arguments={
|
|
"prompt": prompt,
|
|
"video_url": video_url,
|
|
"resolution": "720p",
|
|
"enhance_prompt": True,
|
|
},
|
|
)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
status = fal_client.status(model, request_id, with_logs=True)
|
|
while isinstance(status, fal_client.InProgress):
|
|
time.sleep(10)
|
|
status = fal_client.status(model, request_id, with_logs=True)
|
|
result = fal_client.result(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
prompt_hash = hashlib.sha1((prompt).encode()).hexdigest()
|
|
output_path = "/srv/pandora/static/power/cache/%s_%s.mp4" % (
|
|
item.public_id,
|
|
prompt_hash,
|
|
)
|
|
ox.net.save_url(output_url, output_path, overwrite=True)
|
|
return output_path
|
|
|
|
|
|
def fal_wait_for(model, request_id):
|
|
status = fal_client.status(model, request_id, with_logs=True)
|
|
while isinstance(status, fal_client.InProgress):
|
|
time.sleep(10)
|
|
status = fal_client.status(model, request_id, with_logs=True)
|
|
result = fal_client.result(model, request_id)
|
|
return result
|
|
|
|
|
|
def motion_control_preprocess_image(item_id, image_prompt, video_prompt):
|
|
item = Item.objects.get(public_id=item_id)
|
|
video_url = public_video_url(item)
|
|
model = "fal-ai/kling-video/v2.6/pro/motion-control"
|
|
prompt_hash = hashlib.sha1((image_prompt + video_prompt).encode()).hexdigest()
|
|
output = "/srv/pandora/static/power/cache/%s_%s.mp4" % (item.public_id, prompt_hash)
|
|
first_frame = "%s%s/source%s.png?token=%s" % (
|
|
settings.PUBLIC_URL,
|
|
item.public_id,
|
|
0,
|
|
settings.PUBLIC_TOKEN,
|
|
)
|
|
first_frame_path = "/srv/pandora/static/power/cache/%s_%s/%06d.first_frame.png" % (
|
|
item.public_id,
|
|
prompt_hash,
|
|
0,
|
|
)
|
|
if not os.path.exists(first_frame_path):
|
|
os.makedirs(os.path.dirname(first_frame_path), exist_ok=True)
|
|
prepare_image(first_frame, image_prompt, first_frame_path)
|
|
image_url = public_url(first_frame_path)
|
|
data = {
|
|
"prompt": video_prompt,
|
|
"image_url": image_url,
|
|
"video_url": video_url,
|
|
"keep_original_sound": False,
|
|
"character_orientation": "video",
|
|
}
|
|
print(data)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
ox.net.save_url(output_url, output, overwrite=True)
|
|
return output
|
|
|
|
|
|
def luma_wait_for(id):
|
|
url = "https://api.lumalabs.ai/dream-machine/v1/generations/%s" % id
|
|
headers = {
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
"authorization": "Bearer " + settings.LUMA_TOKEN,
|
|
}
|
|
status = requests.get(url, headers=headers).json()
|
|
while status["state"] in ("queued", "dreaming"):
|
|
time.sleep(10)
|
|
status = requests.get(url, headers=headers).json()
|
|
return status
|
|
|
|
|
|
def luma_modify_segment(video_url, prompt, first_frame=None, mode="flex_2"):
|
|
# also got that at fal-ai/luma-dream-machine/ray-2/modify
|
|
url = "https://api.lumalabs.ai/dream-machine/v1/generations/video/modify"
|
|
payload = {
|
|
"generation_type": "modify_video",
|
|
"model": "ray-2",
|
|
"mode": mode,
|
|
"prompt": prompt,
|
|
"media": {"url": video_url},
|
|
}
|
|
if first_frame:
|
|
payload["first_frame"] = {"url": first_frame}
|
|
headers = {
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
"authorization": "Bearer " + settings.LUMA_TOKEN,
|
|
}
|
|
response = requests.post(url, json=payload, headers=headers).json()
|
|
print(response)
|
|
status = luma_wait_for(response["id"])
|
|
return status["assets"]["video"]
|
|
|
|
|
|
def fragment_video(filename, segmentdir, segments):
|
|
filename = str(filename)
|
|
input_info = ox.avinfo(filename)
|
|
|
|
segments_ = []
|
|
for segment in segments:
|
|
if isinstance(segment, list):
|
|
stype, segment = segment
|
|
else:
|
|
stype = "n"
|
|
segments_.append(segment)
|
|
segments = segments_
|
|
|
|
position = 0
|
|
segment = 0
|
|
|
|
cap = cv2.VideoCapture(filename)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*"avc1")
|
|
frame_count = 0
|
|
next_cut = int(segments.pop(0) * fps)
|
|
|
|
last = None
|
|
os.makedirs(segmentdir, exist_ok=True)
|
|
|
|
while cap.isOpened():
|
|
if frame_count == 0:
|
|
output_path = segmentdir + "/%06d.mp4" % segment
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
|
elif next_cut and frame_count >= next_cut and segments:
|
|
print(frame_count, output_path)
|
|
cv2.imwrite(output_path.replace(".mp4", "_last.jpg"), frame)
|
|
out.release()
|
|
segment += 1
|
|
output_path = segmentdir + "/%06d.mp4" % segment
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
|
if segments:
|
|
next_cut = int(segments.pop(0) * fps)
|
|
else:
|
|
next_cut = None
|
|
last = None
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
out.write(frame)
|
|
if last is None:
|
|
cv2.imwrite(output_path.replace(".mp4", "_first.jpg"), frame)
|
|
last = frame
|
|
frame_count += 1
|
|
|
|
out.release()
|
|
cap.release()
|
|
|
|
if last is None:
|
|
os.unlink(output_path)
|
|
else:
|
|
print(frame_count, output_path)
|
|
cv2.imwrite(output_path.replace(".mp4", "_last.jpg"), last)
|
|
|
|
def loop_video(filename, out, loops=2):
|
|
filename = str(filename)
|
|
input_info = ox.avinfo(filename)
|
|
|
|
position = 0
|
|
|
|
cap = cv2.VideoCapture(filename)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*"avc1")
|
|
|
|
os.makedirs(os.path.dirname(out), exist_ok=True)
|
|
|
|
out = cv2.VideoWriter(out, fourcc, fps, (width, height))
|
|
while loops:
|
|
loops -= 1
|
|
cap = cv2.VideoCapture(filename)
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
out.write(frame)
|
|
cap.release()
|
|
out.release()
|
|
|
|
|
|
def flux_edit_image(image, prompt):
|
|
if isinstance(image, str):
|
|
image = [image]
|
|
data = {
|
|
"prompt": prompt,
|
|
"safety_tolerance": "5",
|
|
"enable_safety_checker": False,
|
|
"output_format": "jpeg",
|
|
"image_urls": image,
|
|
}
|
|
print(data)
|
|
result = fal_client.subscribe("fal-ai/flux-2-pro/edit", arguments=data)
|
|
print(result)
|
|
return result["images"][0]["url"]
|
|
|
|
|
|
def in_the_style_of_fal(image, style):
|
|
prompt = "apply style from @image 2 to @image 1 keep the position of the person in @image 1 but take light, colors, clothing from @image 2"
|
|
return flux_edit_image([image, style], prompt)
|
|
|
|
|
|
def in_the_style_of_byte(image, style):
|
|
prompt = "apply style from image 2 to image 1 keep the position of the person in image 1 but take light, colors, clothing from image 2"
|
|
image_model_name = "seedream-4-5-251128"
|
|
ark_client = Ark(
|
|
base_url="https://ark.ap-southeast.bytepluses.com/api/v3",
|
|
api_key=settings.BYTEPLUSE_TOKEN,
|
|
)
|
|
create_result = ark_client.images.generate(
|
|
model=image_model_name,
|
|
image=[image, style],
|
|
prompt=prompt,
|
|
sequential_image_generation="disabled",
|
|
response_format="url",
|
|
size="2560x1440",
|
|
stream=False,
|
|
watermark=False,
|
|
)
|
|
print(create_result)
|
|
return create_result.data[0].url
|
|
|
|
|
|
def luma_modify_item(item, prompt="", image_prompt=None, first_frame=None, keep=False):
|
|
mode = "flex_2"
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
source = item.files.all()[0].data.path
|
|
info = ox.avinfo(source)
|
|
duration = info["duration"]
|
|
max_duration = 10
|
|
if duration < max_duration:
|
|
segments = [duration]
|
|
else:
|
|
segments = get_item_segments(item, max_duration=max_duration)
|
|
print(segments)
|
|
prompt_hash = hashlib.sha1((prompt + (image_prompt or "")).encode()).hexdigest()
|
|
processed = []
|
|
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
|
|
video_segments = fragment_video(source, prefix, segments)
|
|
n = 0
|
|
if isinstance(first_frame, Document):
|
|
first_frame_url = public_document_url(first_frame)
|
|
else:
|
|
first_frame_url = first_frame
|
|
for segment in segments:
|
|
if isinstance(segment, list):
|
|
stype, segment = segment
|
|
else:
|
|
stype = "n"
|
|
output = "%s/%06d.mp4" % (prefix, n)
|
|
output_ai = "%s/%06d_ai.mp4" % (prefix, n)
|
|
if os.path.exists(output):
|
|
video_url = luma_modify_segment(
|
|
public_url(output), prompt, first_frame=first_frame_url, mode=mode
|
|
)
|
|
ox.net.save_url(video_url, output_ai, overwrite=True)
|
|
processed.append(output_ai)
|
|
n += 1
|
|
joined_output = "%s/joined.mp4" % (prefix,)
|
|
join_segments(processed, joined_output)
|
|
ai = add_ai_variant(item, joined_output, "ai:replace:p1:luma")
|
|
ai.data["prompt"] = ox.escape_html(prompt)
|
|
if first_frame:
|
|
ai.data["firstframe"] = first_frame_url.split("?")[0]
|
|
ai.data["model"] = "ray-2:%s" % mode
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(os.path.dirname(joined_output))
|
|
if isinstance(first_frame, Document):
|
|
first_frame.add(ai)
|
|
return ai
|
|
|
|
|
|
def add_ai_variant(item, video_path, type):
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
|
|
ai = Item()
|
|
ai.user = item.user
|
|
ai.data["type"] = [type]
|
|
ai.data["title"] = item.data["title"]
|
|
ai.data["chapter"] = item.data.get("chapter", "")
|
|
ai.save()
|
|
file = File()
|
|
file.oshash = ox.oshash(video_path)
|
|
file.item = ai
|
|
file.path = "%s.mp4" % type
|
|
file.info = ox.avinfo(video_path)
|
|
file.info['extension'] = "mp4"
|
|
del file.info["path"]
|
|
file.parse_info()
|
|
file.data.name = file.get_path("data." + video_path.split(".")[-1])
|
|
os.makedirs(os.path.dirname(file.data.path), exist_ok=True)
|
|
shutil.copy(video_path, file.data.path)
|
|
file.available = True
|
|
file.selected = True
|
|
file.queued = True
|
|
file.wanted = False
|
|
file.save()
|
|
file.extract_stream()
|
|
return ai
|
|
|
|
|
|
def add_ai_image(item, position, url, extension=None):
|
|
if extension is None:
|
|
extension = url.split(".")[-1].split("?")[0]
|
|
if extension == "jpeg":
|
|
extension = "jpg"
|
|
file = Document(user=item.user)
|
|
file.rightslevel = 2
|
|
file.data["title"] = "%s at %s" % (item.data["title"], position)
|
|
file.data["position"] = position
|
|
file.extension = extension
|
|
file.width = -1
|
|
file.pages = -1
|
|
file.uploading = True
|
|
file.save()
|
|
file.uploading = True
|
|
name = "data.%s" % file.extension
|
|
file.file.name = file.path(name)
|
|
ox.net.save_url(url, file.file.path, overwrite=True)
|
|
file.get_info()
|
|
file.get_ratio()
|
|
file.oshash = ox.oshash(file.file.path)
|
|
file.save()
|
|
file.update_sort()
|
|
file.update_find()
|
|
if isinstance(item, Item):
|
|
file.add(item)
|
|
return file
|
|
|
|
def add_tag(item, tag):
|
|
if 'tags' not in item.data:
|
|
item.data['tags'] = []
|
|
item.data['tags'].append(tag)
|
|
item.save()
|
|
|
|
def extract_firstframe(character='P1'):
|
|
for item in Item.objects.filter(data__type__icontains="source"):
|
|
if 'ai-failed' in item.data.get('tags', []):
|
|
continue
|
|
if not item.documents.all().exists():
|
|
print(item)
|
|
try:
|
|
first_frame = replace_character(item, character, 0)
|
|
except:
|
|
item.refresh_from_db()
|
|
add_tag(item, 'ai-failed')
|
|
|
|
def process_reshoot_firstframe(character='P1', age=None, l=None):
|
|
position = 0
|
|
if l is None:
|
|
l = itemlist.models.List.objects.get(name='Reshoot-Firstframe')
|
|
for item in l.items.all():
|
|
if 'ai-failed' in item.data.get('tags', []):
|
|
print('>> skip', item)
|
|
continue
|
|
if item.sort.duration > 12:
|
|
print('>> skip', item, "only up to 12 second for single shot")
|
|
pass
|
|
#reshoot_item_segments(item, character)
|
|
else:
|
|
cid = get_character_document(character, age=age).get_id()
|
|
qs = item.documents.filter(
|
|
data__character=cid, data__position=position
|
|
)
|
|
if age:
|
|
qs = qs.filter(data__title__contains='(Age %d)' % age)
|
|
first_frame = qs.order_by('-created').first()
|
|
if not first_frame:
|
|
try:
|
|
first_frame = replace_character(item, character, position, age=age)
|
|
except:
|
|
item.refresh_from_db()
|
|
add_tag(item, 'ai-failed')
|
|
print('>> failed', item)
|
|
continue
|
|
if not first_frame:
|
|
continue
|
|
if first_frame.items.filter(data__type__icontains='ai:').exists():
|
|
continue
|
|
print(item, first_frame)
|
|
try:
|
|
reshoot_item(item, first_frame=first_frame)
|
|
except:
|
|
item.refresh_from_db()
|
|
add_tag(item, 'ai-failed')
|
|
print('>> failed', item)
|
|
|
|
def process_motion_firstframe(character="P1", keep=False):
|
|
l = itemlist.models.List.objects.get(name='Motion-Firstframe')
|
|
for item in l.items.all():
|
|
ai = Item.objects.filter(data__type__icontains='ai').filter(data__title=item.data['title'])
|
|
if ai.exists() or 'ai-failed' in item.data.get('tags', []):
|
|
print('>> skip', item)
|
|
continue
|
|
print(item)
|
|
try:
|
|
replace_character_motion_control(item, character, keep=keep)
|
|
except:
|
|
item.refresh_from_db()
|
|
add_tag(item, 'ai-failed')
|
|
print('>> failed', item)
|
|
|
|
|
|
|
|
def process_list(list, process, character='P1', extra=None):
|
|
position = 0
|
|
if extra is None and list.description:
|
|
extra = list.description
|
|
for item in list.items.all():
|
|
if 'ai-failed' in item.data.get('tags', []):
|
|
print('>> skip', item)
|
|
continue
|
|
if item.sort.duration > 30:
|
|
pass
|
|
#reshoot_item_segments(item, character)
|
|
else:
|
|
cid = get_character_document(character).get_id()
|
|
first_frame = item.documents.filter(
|
|
data__character=cid, data__position=position
|
|
).order_by('-created').first()
|
|
if not first_frame:
|
|
try:
|
|
first_frame = replace_character(item, character, position, extra=extra)
|
|
except:
|
|
item.refresh_from_db()
|
|
add_tag(item, 'ai-failed')
|
|
print('>> failed', item)
|
|
if first_frame.items.filter(data__type__icontains='ai:').exists():
|
|
continue
|
|
print(item, first_frame)
|
|
try:
|
|
process(item, first_frame=first_frame)
|
|
except:
|
|
item.refresh_from_db()
|
|
add_tag(item, 'ai-failed')
|
|
print('>> failed', item)
|
|
|
|
def reshoot_mustache(character="P1"):
|
|
l = itemlist.models.List.objects.get(name='mustache')
|
|
extra = "The person is wearing a fake mustache in the style of the mustache from image 1"
|
|
return process_list(l, reshoot_item, character, extra=extra)
|
|
|
|
|
|
|
|
def faceswap_item(item, character, keep=False):
|
|
if isinstance(character, Document):
|
|
chracter_url = public_document_url(character)
|
|
else:
|
|
chracter_url = character
|
|
video_url = public_video_url(item)
|
|
model = "half-moon-ai/ai-face-swap/faceswapvideo"
|
|
data = {
|
|
"source_face_url": chracter_url,
|
|
"target_video_url": video_url,
|
|
}
|
|
prompt_hash = hashlib.sha1(("faceswap:"+chracter_url).encode()).hexdigest()
|
|
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
|
|
output_ai = "%s/ai.mp4" % (prefix)
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
ox.net.save_url(output_url, output_ai, overwrite=True)
|
|
ai = add_ai_variant(item, output_ai, "ai:0:faceswap")
|
|
ai.data["model"] = model
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(prefix)
|
|
return ai
|
|
|
|
|
|
def faceswap_item_segments(item, keep=False):
|
|
if isinstance(item, str):
|
|
item = Item.objects.get(public_id=item)
|
|
max_duration = 30
|
|
source = item.files.all()[0].data.path
|
|
info = ox.avinfo(source)
|
|
duration = info["duration"]
|
|
segments = get_item_segments(item, max_duration=max_duration)
|
|
prompt_hash = hashlib.sha1("faceswap".encode()).hexdigest()
|
|
processed = []
|
|
prefix = "/srv/pandora/static/power/cache/%s_%s" % (item.public_id, prompt_hash)
|
|
video_segments = fragment_video(source, prefix, segments)
|
|
n = 0
|
|
position = 0
|
|
model = "half-moon-ai/ai-face-swap/faceswapvideo"
|
|
for segment in segments:
|
|
if isinstance(segment, list):
|
|
stype, segment = segment
|
|
else:
|
|
stype = "n"
|
|
output = "%s/%06d.mp4" % (prefix, n)
|
|
output_ai = "%s/%06d_ai.mp4" % (prefix, n)
|
|
character = item.annotations.filter(
|
|
layer='prompts', start__gte=position, end__gt=position
|
|
).order_by('start').first()
|
|
if character:
|
|
character = character.value
|
|
character = get_character_document(character)
|
|
chracter_url = public_document_url(character)
|
|
else:
|
|
character = output.replace('.mp4', '_first.jpg')
|
|
chracter_url = public_url(character)
|
|
segment_duration = segment - position
|
|
if os.path.exists(output):
|
|
segment_video_url = public_url(output)
|
|
data = {
|
|
"source_face_url": chracter_url,
|
|
"target_video_url": segment_video_url,
|
|
}
|
|
handler = fal_client.submit(model, arguments=data)
|
|
request_id = handler.request_id
|
|
print(request_id)
|
|
result = fal_wait_for(model, request_id)
|
|
print(result)
|
|
output_url = result["video"]["url"]
|
|
ox.net.save_url(output_url, output_ai, overwrite=True)
|
|
processed.append(output_ai)
|
|
n += 1
|
|
position = segment
|
|
joined_output = "%s/joined.mp4" % (prefix)
|
|
join_segments(processed, joined_output)
|
|
ai = add_ai_variant(item, joined_output, "ai:0:faceswap")
|
|
ai.data["model"] = model
|
|
ai.save()
|
|
if not keep:
|
|
shutil.rmtree(prefix)
|
|
return ai
|
|
|