359 lines
11 KiB
Python
Executable file
359 lines
11 KiB
Python
Executable file
#!/usr/bin/python3
|
|
import os
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
from collections import defaultdict
|
|
import string
|
|
from glob import glob
|
|
|
|
from pi import random
|
|
from keywords import KEYWORDS
|
|
import ox
|
|
import ox.web.auth
|
|
|
|
|
|
base_url = 'https://cdosea.0x2620.org'
|
|
|
|
FRAME_DURATION = 1/60
|
|
|
|
api = None
|
|
|
|
def get_api():
|
|
global api
|
|
if not api:
|
|
api = ox.API(base_url + '/api/')
|
|
api.signin(**ox.web.auth.get('cdosea'))
|
|
|
|
if os.path.exists('PATHS.json'):
|
|
PATHS = json.load(open('PATHS.json'))
|
|
else:
|
|
PATHS = {}
|
|
|
|
if os.path.exists('CLIPS.json'):
|
|
CLIPS = json.load(open('CLIPS.json'))
|
|
else:
|
|
CLIPS = {}
|
|
|
|
if not os.path.exists('MUSIC.json'):
|
|
MUSIC = defaultdict(list)
|
|
for letter in os.listdir('music'):
|
|
for d in range(10):
|
|
path = os.path.join('music', letter, '%d.mp3' % d)
|
|
MUSIC[letter].append({
|
|
'path': path,
|
|
'duration': ox.avinfo(path)['duration']
|
|
})
|
|
with open('MUSIC.json', 'w') as fd:
|
|
json.dump(MUSIC, fd, indent=2, sort_keys=True)
|
|
else:
|
|
MUSIC = json.load(open('MUSIC.json'))
|
|
|
|
if not os.path.exists('VOCALS.json'):
|
|
VOCALS = defaultdict(list)
|
|
for letter in os.listdir('vocals'):
|
|
for fn in sorted(os.listdir(os.path.join('vocals', letter))):
|
|
path = os.path.join('vocals', letter, fn)
|
|
VOCALS[letter].append({
|
|
'path': path,
|
|
'duration': ox.avinfo(path)['duration']
|
|
})
|
|
with open('VOCALS.json', 'w') as fd:
|
|
json.dump(VOCALS, fd, indent=2, sort_keys=True)
|
|
else:
|
|
VOCALS = json.load(open('VOCALS.json'))
|
|
|
|
def get_path(id):
|
|
global PATHS
|
|
if id not in PATHS:
|
|
get_api()
|
|
info = api.findMedia({
|
|
'query': {
|
|
'conditions': [
|
|
{'key': 'id', 'operator': '==', 'value': id}
|
|
]
|
|
},
|
|
'keys': ['id', 'extension'],
|
|
'range': [0, 1]
|
|
})['data']['items'][0]
|
|
path = os.path.join('cache', '%s.%s' % (info['id'], info['extension']))
|
|
h = info['id']
|
|
source = '/srv/pandora/data/media/%s/%s/%s/%s/data.*' % (
|
|
h[:2], h[2:4], h[4:6], h[6:]
|
|
)
|
|
source = glob(source)[0]
|
|
if not os.path.exists(path):
|
|
if not os.path.exists(source):
|
|
print('WTF', source)
|
|
sys.exit(1)
|
|
os.symlink(source, path)
|
|
'''
|
|
url = '%s/%s/download/source/' % (base_url, id)
|
|
print('get video', url)
|
|
'''
|
|
PATHS[id] = path
|
|
with open('PATHS.json', 'w') as fd:
|
|
json.dump(PATHS, fd, indent=4, sort_keys=True)
|
|
return PATHS[id]
|
|
|
|
def get_clips(tag):
|
|
global CLIPS
|
|
if tag not in CLIPS:
|
|
get_api()
|
|
clips = api.findAnnotations({
|
|
'query': {
|
|
'conditions': [
|
|
{'key': 'layer', 'operator': '==', 'value': 'keywords'},
|
|
{'key': 'value', 'operator': '==', 'value': tag}
|
|
],
|
|
'operator': '&'
|
|
},
|
|
'keys': ['id', 'in', 'out'],
|
|
'range': [0, 10000]})['data']['items']
|
|
for clip in clips:
|
|
clip['path'] = get_path(clip['id'].split('/')[0])
|
|
# or use round?
|
|
clip['in'] = int(clip['in'] / FRAME_DURATION) * FRAME_DURATION
|
|
clip['out'] = int(clip['out'] / FRAME_DURATION) * FRAME_DURATION
|
|
clip['duration'] = clip['out'] - clip['in']
|
|
clip['tag'] = tag
|
|
CLIPS[tag] = list(sorted(clips, key=lambda c: c['id']))
|
|
with open('CLIPS.json', 'w') as fd:
|
|
json.dump(CLIPS, fd, indent=4, sort_keys=True)
|
|
return CLIPS[tag].copy()
|
|
|
|
def random_choice(seq, items):
|
|
n = n_ = len(items) - 1
|
|
#print('len', n)
|
|
if n == 0:
|
|
return items[0]
|
|
r = seq()
|
|
base = 10
|
|
while n > 10:
|
|
n /= 10
|
|
#print(r)
|
|
r += seq()
|
|
base += 10
|
|
r = int(n_ * r / base)
|
|
#print('result', r, items)
|
|
return items[r]
|
|
|
|
def splitint(number, by):
|
|
div = int(number/by)
|
|
mod = number % by
|
|
return [div + 1 if i > (by - 1 - mod) else div for i in range(by)]
|
|
|
|
def filter_clips(clips, duration, max_duration=0):
|
|
# 1 minute
|
|
blur = 0.5
|
|
low = 1
|
|
high = 10
|
|
|
|
# 2 minute
|
|
blur = 1
|
|
low = 2
|
|
high = 20
|
|
|
|
buckets = {}
|
|
clips_ = []
|
|
for tag in clips:
|
|
for clip in clips[tag]:
|
|
clip['tag'] = tag
|
|
clips_.append(clip)
|
|
clips_.sort(key=lambda c: c['duration'])
|
|
#print(clips_)
|
|
size = splitint(len(clips_), 10)
|
|
p = 0
|
|
for i in range(10):
|
|
buckets[i+1] = clips_[p:+p+size[i]]
|
|
p += size[i]
|
|
clips_ = {}
|
|
#print(buckets[duration])
|
|
for clip in buckets[duration]:
|
|
if clip['tag'] not in clips_:
|
|
clips_[clip['tag']] = []
|
|
clips_[clip['tag']].append(clip)
|
|
return clips_
|
|
|
|
def sequence(seq, letter):
|
|
tags = KEYWORDS[letter]
|
|
clips = {tag: get_clips(tag) for tag in tags}
|
|
all_clips = clips.copy()
|
|
result = {
|
|
'clips': [],
|
|
'text': [],
|
|
'vocals': [],
|
|
'music': [],
|
|
}
|
|
duration = 0
|
|
MAX_DURATION = 60 * 2 + 5
|
|
MIN_DURATION = 60 * 2 - 4
|
|
while duration < MAX_DURATION and not duration >= MIN_DURATION:
|
|
# clip duration: 1-10
|
|
n = seq()
|
|
if n == 0:
|
|
n = 10
|
|
|
|
max_duration = MAX_DURATION - duration
|
|
clips_n = filter_clips(clips, n, max_duration)
|
|
tags_n = [tag for tag in tags if tag in clips_n]
|
|
if not tags_n:
|
|
print('MISSING CLIPS, fall back to all clips!', letter, n)
|
|
clips_n = filter_clips(all_clips, n, max_duration)
|
|
tags_n = [tag for tag in tags if tag in clips_n]
|
|
if not tags_n:
|
|
print('NO tags for', letter, n)
|
|
sys.exit(1)
|
|
tag = random_choice(seq, tags_n)
|
|
#if 'tiger' in tags_n:
|
|
# tag = 'tiger'
|
|
clip = random_choice(seq, clips_n[tag])
|
|
duration += clip['duration']
|
|
result['clips'].append(clip.copy())
|
|
|
|
# no reuse
|
|
for t in clips:
|
|
clips[t] = [c for c in clips[t] if not c['id'] == clip['id']]
|
|
'''
|
|
for clip in result['clips']:
|
|
if seq() == 0:
|
|
clip['black'] = True
|
|
'''
|
|
|
|
# text overlay
|
|
position = last_text = 0
|
|
tags_text = []
|
|
while position < duration:
|
|
n = seq()
|
|
if n == 0:
|
|
blank = {'blank': True, 'duration': position - last_text}
|
|
result['text'].append(blank)
|
|
n = seq()
|
|
if n == 0:
|
|
n = 10
|
|
n = min(n, duration-position)
|
|
if not tags_text:
|
|
tags_text = list(sorted(set(tags)))
|
|
ttag = random_choice(seq, tags_text)
|
|
tags_text.remove(ttag)
|
|
text = {
|
|
'text': ttag,
|
|
'duration': n
|
|
}
|
|
result['text'].append(text)
|
|
position += n
|
|
last_text = position
|
|
else:
|
|
position += n
|
|
if last_text < duration:
|
|
blank = {'blank': True, 'duration': duration - last_text}
|
|
result['text'].append(blank)
|
|
|
|
def add_blank(track, d):
|
|
if track and track[-1].get('blank'):
|
|
track[-1]['duration'] += d
|
|
else:
|
|
blank = {'blank': True, 'duration': d}
|
|
track.append(blank)
|
|
return d
|
|
position += d
|
|
|
|
# music
|
|
track = 'music'
|
|
if letter in MUSIC:
|
|
position = 0
|
|
while position < duration:
|
|
n = seq()
|
|
if n < 5:
|
|
n = seq()
|
|
position += add_blank(result[track], min(n, duration-position))
|
|
else:
|
|
n = seq()
|
|
clip = MUSIC[letter][n]
|
|
position += clip['duration']
|
|
if result[track] and position > duration \
|
|
and result[track][-1].get('blank') \
|
|
and result[track][-1]['duration'] > clip['duration']:
|
|
result[track][-1]['duration'] -= (position-duration)
|
|
position = duration
|
|
if position <= duration:
|
|
result[track].append(clip)
|
|
else:
|
|
position -= clip['duration']
|
|
break
|
|
if position < duration:
|
|
position += add_blank(result[track], duration - position)
|
|
# vocals
|
|
track = 'vocals'
|
|
if letter in VOCALS:
|
|
position = 0
|
|
while position < duration:
|
|
n = seq()
|
|
if n < 5:
|
|
n = seq()
|
|
position += add_blank(result[track], min(n, duration-position))
|
|
else:
|
|
n = seq()
|
|
clip = VOCALS[letter][n]
|
|
position += clip['duration']
|
|
if result[track] and position > duration \
|
|
and result[track][-1].get('blank') \
|
|
and result[track][-1]['duration'] > clip['duration']:
|
|
result[track][-1]['duration'] -= (position-duration)
|
|
position = duration
|
|
if position <= duration:
|
|
result[track].append(clip)
|
|
else:
|
|
position -= clip['duration']
|
|
break
|
|
if position < duration:
|
|
position += add_blank(result[track], duration - position)
|
|
'''
|
|
if letter in VOCALS:
|
|
n = seq()
|
|
clip = VOCALS[letter][n]
|
|
|
|
n = 1.0 / (seq() + 1) # 0.1 - 1
|
|
|
|
silence = duration - clip['duration']
|
|
silence_start = n * silence
|
|
blank = {'blank': True, 'duration': silence_start}
|
|
if n != 0:
|
|
result['vocals'].append(blank)
|
|
result['vocals'].append(clip)
|
|
if n != 1:
|
|
blank = {'blank': True, 'duration': silence - silence_start}
|
|
result['vocals'].append(blank)
|
|
'''
|
|
|
|
for track in result:
|
|
if result[track]:
|
|
tduration = sum([c['duration'] for c in result[track]])
|
|
if not abs(tduration - duration) < 0.000001:
|
|
raise Exception('invalid duration %s vs %s %s' % (tduration, duration, result[track]))
|
|
return result
|
|
|
|
|
|
if __name__ == '__main__':
|
|
for n in range(10):
|
|
seq = random(n * 1000)
|
|
#for letter in ('T', 'W'):
|
|
for letter in string.ascii_uppercase:
|
|
r = sequence(seq, letter)
|
|
tjson = 'output/%02d/%s.json' % (n, letter)
|
|
folder = os.path.dirname(tjson)
|
|
if not os.path.exists(folder):
|
|
ox.makedirs(folder)
|
|
if os.path.exists(tjson):
|
|
with open(tjson, 'r') as fd:
|
|
old = fd.read()
|
|
else:
|
|
old = None
|
|
current = json.dumps(r, indent=4, sort_keys=True)
|
|
#print(current)
|
|
#print(sum([c['duration'] for c in r['clips']]))
|
|
if current != old:
|
|
with open(tjson, 'w') as fd:
|
|
fd.write(current)
|
|
subprocess.call(['./render_mlt.py', tjson])
|
|
#subprocess.call(['./render_audio.py', tjson])
|