concurrent encoding

This commit is contained in:
j 2019-10-16 15:12:14 +01:00
parent 34ff1076be
commit 4b463b6f46
2 changed files with 58 additions and 10 deletions

View file

@ -638,7 +638,7 @@ class Client(object):
post['files'] = files['files'] post['files'] = files['files']
post['volume'] = name post['volume'] = name
print('sending list of files in %s (%s total)' % (name, len(post['files']))) print('sending list of files in %s (%s total)' % (name, len(post['files'])))
r = self.api.async('update', post) r = self.api.later('update', post)
# send empty list to get updated list of requested info/files/data # send empty list to get updated list of requested info/files/data
post = {'info': {}} post = {'info': {}}
r = self.api.update(post) r = self.api.update(post)
@ -770,7 +770,7 @@ class Client(object):
print('sending info for %d files' % len(info)) print('sending info for %d files' % len(info))
post = {'info': {}, 'upload': True} post = {'info': {}, 'upload': True}
post['info'] = self.get_info_for_ids(info, prefix) post['info'] = self.get_info_for_ids(info, prefix)
r = self.api.async('update', post) r = self.api.later('update', post)
# send empty list to get updated list of requested info/files/data # send empty list to get updated list of requested info/files/data
post = {'info': {}} post = {'info': {}}
r = self.api.update(post) r = self.api.update(post)
@ -989,6 +989,12 @@ class Client(object):
server.run(self, args) server.run(self, args)
def client(self, args): def client(self, args):
threads = [t.split('=')[-1] for t in args if t.startswith('c=')]
if threads:
threads = int(threads[0])
else:
threads = 1
args = [a for a in args if not a.startswith('c=')]
urls = [u for u in args if u.startswith('http:')] urls = [u for u in args if u.startswith('http:')]
name = [u for u in args if u not in urls] name = [u for u in args if u not in urls]
if not name: if not name:
@ -1016,7 +1022,7 @@ class Client(object):
else: else:
url = urls[0] url = urls[0]
from . import client from . import client
c = client.DistributedClient(url, name) c = client.DistributedClient(url, name, threads)
c.run() c.run()
class API(ox.API): class API(ox.API):
@ -1036,7 +1042,7 @@ class API(ox.API):
if hasattr(self, 'taskStatus') and not hasattr(self, 'getTaskStatus'): if hasattr(self, 'taskStatus') and not hasattr(self, 'getTaskStatus'):
self.getTaskStatus = self.taskStatus self.getTaskStatus = self.taskStatus
def async(self, action, data, interval=5): def later(self, action, data, interval=5):
t = r = getattr(self, action)(data) t = r = getattr(self, action)(data)
if r['status']['code'] == 200: if r['status']['code'] == 200:
# wait for async task to finish # wait for async task to finish

View file

@ -2,22 +2,46 @@
# vi:si:et:sw=4:sts=4:ts=4 # vi:si:et:sw=4:sts=4:ts=4
from __future__ import division, with_statement, print_function, absolute_import from __future__ import division, with_statement, print_function, absolute_import
import os
import json import json
import subprocess import os
import time
import socket import socket
import subprocess
import sys import sys
import threading
import time
import requests import requests
from . import extract from . import extract
class Encoding:
def worker(self):
while True:
if not self.client.next():
return
def __init__(self, client, threads=1):
self.client = client
self.threads = []
for i in range(threads):
t = threading.Thread(target=self.worker)
t.start()
self.threads.append(t)
def join(self):
for t in self.threads:
t.join()
class DistributedClient: class DistributedClient:
def __init__(self, url, name): interrupted = False
def __init__(self, url, name, threads):
self.url = url self.url = url
self.name = name self.name = name
self.threads = threads
self.supported_formats = extract.supported_formats() self.supported_formats = extract.supported_formats()
def ping(self, oshash): def ping(self, oshash):
@ -40,8 +64,7 @@ class DistributedClient:
url = '%s/next' % self.url url = '%s/next' % self.url
data = requests.get(url).json() data = requests.get(url).json()
if 'oshash' in data: if 'oshash' in data:
self.encode(data['oshash'], data['cmd'], data['output']) return self.encode(data['oshash'], data['cmd'], data['output'])
return True
return False return False
def encode(self, oshash, cmds, output): def encode(self, oshash, cmds, output):
@ -49,6 +72,8 @@ class DistributedClient:
cmds = [cmds] cmds = [cmds]
for cmd in cmds: for cmd in cmds:
cmd[0] = extract.command('ffmpeg') cmd[0] = extract.command('ffmpeg')
if self.threads > 1:
cmd = cmd[:1] + ['-nostats', '-loglevel', 'error'] + cmd[1:]
if 'webm' in cmd and not self.supported_formats['webm']: if 'webm' in cmd and not self.supported_formats['webm']:
print("ffmpeg is compiled without WebM support") print("ffmpeg is compiled without WebM support")
return return
@ -56,6 +81,8 @@ class DistributedClient:
print("ffmpeg is compiled without H.264 support") print("ffmpeg is compiled without H.264 support")
return return
try: try:
if self.threads > 1:
print('encode', oshash)
p = subprocess.Popen(cmd) p = subprocess.Popen(cmd)
r = None r = None
n = 0 n = 0
@ -75,17 +102,32 @@ class DistributedClient:
self.status(oshash, '') self.status(oshash, '')
if os.path.exists(output): if os.path.exists(output):
os.unlink(output) os.unlink(output)
if self.threads > 1:
self.interrupted = True
return False
sys.exit(1) sys.exit(1)
if r != 0: if r != 0:
break break
if r == 0: if r == 0:
if self.threads > 1:
print('ok', oshash)
self.upload(oshash, output) self.upload(oshash, output)
else: else:
if self.threads > 1:
print('error', oshash)
self.status(oshash, 'failed') self.status(oshash, 'failed')
if os.path.exists(output): if os.path.exists(output):
os.unlink(output) os.unlink(output)
return True
def run(self): def run(self):
if self.threads > 1:
enc = Encoding(self, self.threads)
enc.join()
else:
self.run_single()
def run_single(self):
new = True new = True
while True: while True:
if not self.next(): if not self.next():