pandora_render/edit.py
2024-05-22 10:46:32 +02:00

316 lines
11 KiB
Python
Executable file

#!/usr/bin/env python3
from argparse import ArgumentParser
from glob import glob
import getpass
import hashlib
import json
import math
import os
import subprocess
import sys
import urllib.parse
import urllib.request
import ox
import ox.web.auth
base_url = None
prefix = '/mnt'
render = './cache'
pandora_client_config = {}
use_local = False
if os.path.exists('files.json'):
files = json.load(open('files.json'))
else:
files = {}
def get_volume_prefix(name):
path = pandora_client_config.get('volumes', {}).get(name)
if path:
return path
else:
return os.path.join(prefix, name)
def get_info(api, oshash, item, part):
if oshash not in files:
r = api.findMedia({
'query': {
'conditions': [{'key': 'oshash', 'value': oshash}]
},
'keys': ['id', 'instances', 'resolution']
})['data']
if not r['items'][0]['instances']:
if use_local:
print(r, item, part)
raise Exception('item without instance')
files[oshash] = {
'resolution': r['items'][0]['resolution']
}
if r['items'][0]['instances']:
volume_prefix = get_volume_prefix(r['items'][0]['instances'][0]['volume'])
files[oshash]['path'] = os.path.join(volume_prefix, r['items'][0]['instances'][0]['path'])
return files[oshash]
def normalize(name):
return name.replace(':', '_').replace('/', '_')
def sort_clips(edit, sort):
clips = edit['clips']
idx = 0
for clip in clips:
clip['index'] = idx
idx += 1
reverse = False
if sort.startswith('+'):
reverse = False
sort = sort[1:]
elif sort.startswith('-'):
reverse = True
sort = sort[1:]
last = '' if reverse else 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
if sort == 'manual':
s = clips
if reverse:
s = reversed(s)
elif sort in [
'id', 'index', 'in', 'out', 'duration',
'title', 'director', 'year', 'videoRatio',
]:
s = sorted(clips, key=lambda c: (ox.sort_string(str(c.get(sort, last))), c['title'], c['in'], c['out']))
if reverse:
s = reversed(s)
else:
ids = api.sortClips({
'edit': edit['id'],
'sort': [{'key': sort, 'operator': '-' if reverse else '+'}]
})['data']['clips']
#print(set(c['id'] for c in clips) - set(ids))
#print(set(ids) - set(c['id'] for c in clips))
s = sorted(clips, key=lambda c: ids.index(c['id']) if c['id'] in ids else -1)
return s
def get_pandora_media_path(oshash):
h = oshash
path = '/srv/pandora/data/media/' + '/'.join([h[:2], h[2:4], h[4:6], h[6:]])
path = glob('%s/data.*' % path)
if path:
path = path[0]
else:
path = ox.sorted_strings(glob('%s/*.mp4' % path))[-1]
return path
def cache_clips(api, videos, use_source=False, use_pandora=False):
for clip in videos:
if clip.get("title"):
continue
out = '%s/%s.mp4' % (render, clip['oshash'])
if 'path' in clip:
clip['src'] = clip['path']
clip['path'] = out
if not os.path.exists(out):
if use_pandora:
os.symlink(get_pandora_media_path(clip['oshash']), out)
else:
url = clip['url']
if use_source:
name = url.split('/')[-1].split('.')[0]
resolution, part = name.split('p')
if part and part.isdigit():
part = int(part)
else:
part = 1
url = '/'.join(url.split('/')[:-1] + ['download', 'source', str(part)])
print(url, out)
api.save_url(url, out)
def make_title(title):
from PIL import Image, ImageFont, ImageDraw
title_mp4 = "cache/title_%s.mp4" % hashlib.sha1(title.encode()).hexdigest()
title_png = title_mp4.replace('.mp4', '.png')
if not os.path.exists(title_mp4):
width = 852
height = 480
image = Image.new("RGB", (width, height), "black")
font = ImageFont.truetype("/usr/share/fonts/truetype/roboto/unhinted/RobotoTTF/Roboto-Regular.ttf", size=30)
draw = ImageDraw.Draw(image)
_, _, font_width, font_height = font.getbbox(title)
new_width = (width - font_width) / 2
new_height = (height - font_height) / 2
draw.text((new_width, new_height), title, font=font)
image.save(title_png)
cmd = [
'ffmpeg', '-r', '25',
'-i', title_png, '-t', '5',
'-pix_fmt', 'yuv420p',
title_mp4
]
subprocess.call(cmd)
clip = {
"title": title,
"in": 0,
"out": 5,
"duration": 5,
"volume": 1.0,
"path": title_mp4
}
return clip
if __name__ == '__main__':
usage = "usage: %(prog)s [options] edit-url"
parser = ArgumentParser(usage=usage)
parser.add_argument('-p', '--prefix', dest='prefix', type=str,
help="prefix to use instead of pandora_client config", default='')
parser.add_argument('-s', '--source', dest='source', type=str,
help="source, local or site", default='site')
parser.add_argument('-r', '--resolution', dest='stream_resolution', type=int,
help="resolution of streams to download i.e. 480, 240, 96 default 480", default=480)
parser.add_argument('-t', '--title', dest='title', type=str,
help="title", default="")
parser.add_argument('-c', '--config', dest='config',
help='config.json containing config',
default='~/.ox/client.json')
parser.add_argument('-o', '--output', dest='output',
help='json output',
default='')
parser.add_argument('url', metavar='url', type=str,
help='edit url')
opts = parser.parse_args()
edit_url = opts.url
use_local = opts.source == 'local'
use_source = opts.source == 'source'
use_pandora = opts.source == 'pandora'
prefix = opts.prefix
parts = edit_url.split('/')
site = parts[2]
base_url = '/'.join(parts[:3])
edit_id = urllib.parse.unquote(parts[4]).replace('_', ' ').replace('\t', '_')
sort_by = parts[6] if len(parts) >= 7 else 'year'
stream_resolution = opts.stream_resolution
credentials = None
config = os.path.expanduser(opts.config)
if config and os.path.exists(config):
with open(config) as fd:
data = json.load(fd)
if data['url'].startswith(base_url):
pandora_client_config = data
credentials = {
'username': data['username'],
'password': data['password']
}
update = False
if not credentials:
try:
credentials = ox.web.auth.get(site)
except:
credentials = {}
print('Please provide your username and password for %s:' % site)
credentials['username'] = input('Username: ')
credentials['password'] = getpass.getpass('Password: ')
update = True
api = ox.API(base_url + '/api/')
r = api.signin(**credentials)
if 'errors' in r.get('data', {}):
for kv in r['data']['errors'].items():
print('%s: %s' % kv)
sys.exit(1)
if update:
ox.web.auth.update(site, credentials)
print('Edit:', edit_id)
print('Sort:', sort_by)
r = api.getEdit(id=edit_id)
if 'data' not in r:
print(r)
sys.exit(1)
if r.get('status', {}).get('code') == 404:
print(r.get('status', {}).get('text'))
sys.exit(1)
edit = r['data']
videos = []
subtitles = []
position = 0
clips = sort_clips(edit, sort_by)
if opts.title:
clips.insert(0, make_title(opts.title))
for clip in clips:
if clip.get("title"):
videos.append(clip)
position += clip['duration']
position = math.ceil(position / (1/25)) * 1/25
continue
clip_out = position + clip['duration']
clip_subtitles = []
for sub in clip['layers'].get('subtitles', []):
value = sub['value'].replace('<br/>', '\n').replace('<br>', '\n').replace('\n\n', '\n')
subtitles.append({
'in': max(position, position + sub['in'] - clip['in']),
'out': min(position + sub['out'] - clip['in'], clip_out),
'value': value,
})
clip_subtitles.append({
'in': max(0, sub['in'] - clip['in']),
'out': min(sub['out'] - clip['in'], clip['out'] - clip['in']),
'value': value,
})
part_pos = 0
for i, duration in enumerate(clip['durations']):
stream_out = stream_in = None
if part_pos + duration < clip['in']:
part_pos += duration
elif part_pos <= clip['in']:
stream_in = clip['in'] - part_pos
stream_out = min(clip['out'] - part_pos, duration)
elif clip['out'] > part_pos:
stream_in = 0
stream_out = min(clip['out'] - part_pos, duration)
if stream_in is not None and stream_out is not None:
stream_in = math.ceil(stream_in / (1/25)) * 1/25
stream_out = math.ceil(stream_out / (1/25)) * 1/25
info = get_info(api, clip['streams'][i], clip['item'], i+1)
clip_duration = stream_out - stream_in
if clip_duration > 0:
videos.append({
'oshash': clip['streams'][i],
'url': '%s/%s/%sp%s.mp4' % (base_url, clip['item'], stream_resolution, i + 1),
'item': clip['item'],
'annotation': clip.get('annotation'),
'resolution': info['resolution'],
'in': stream_in,
'out': stream_out,
'volume': clip.get('volume', 1),
'subtitles': clip_subtitles,
})
if 'path' in info:
videos[-1]['path'] = os.path.join(prefix, info['path'])
part_pos += duration
position += clip['duration']
position = math.ceil(position / (1/25)) * 1/25
if not use_local:
cache_clips(api, videos, use_source, use_pandora)
if opts.output:
name = opts.output.replace('.json', '')
else:
name = normalize(edit_id)
if sort_by != 'year':
name += '_' + sort_by
if subtitles:
with open('%s.srt' % name, 'wb') as fd:
fd.write(ox.srt.encode(subtitles))
with open('%s.json' % name, 'w') as fd:
json.dump(videos, fd, indent=4, ensure_ascii=False)
with open('files.json', 'w') as fd:
json.dump(files, fd, indent=4, ensure_ascii=False)
print('created: %s' % '%s.json' % name)