207 lines
7.2 KiB
Python
207 lines
7.2 KiB
Python
# encoding: utf-8
|
|
# vi:si:et:sw=4:sts=4:ts=4
|
|
from __future__ import print_function
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
import time
|
|
import thread
|
|
from Queue import Queue
|
|
from threading import Thread
|
|
|
|
import ox
|
|
from twisted.web.resource import Resource
|
|
from twisted.web.static import File
|
|
from twisted.web.server import Site
|
|
from twisted.internet import reactor
|
|
|
|
import extract
|
|
from utils import hash_prefix
|
|
|
|
class UploadThread(Thread):
|
|
def __init__(self, server):
|
|
Thread.__init__(self)
|
|
self.server = server
|
|
|
|
def run(self):
|
|
while True:
|
|
oshash = self.server.upload.get()
|
|
print(oshash)
|
|
try:
|
|
self.server.client.upload([oshash])
|
|
except:
|
|
print('failed to upload', oshash)
|
|
self.server.upload.task_done()
|
|
|
|
class Server(Resource):
|
|
|
|
def __init__(self, client):
|
|
self.upload = Queue()
|
|
self.client = client
|
|
Resource.__init__(self)
|
|
t = UploadThread(self)
|
|
t.setDaemon(True)
|
|
t.start()
|
|
|
|
def active_encodes(self):
|
|
conn, c = self.client._conn()
|
|
site = self.client._config['url']
|
|
active = int(time.mktime(time.localtime())) - 120
|
|
status = 'active'
|
|
sql = 'SELECT oshash FROM encode WHERE site = ? AND status = ? AND modified > ?'
|
|
args = [site, status, active]
|
|
c.execute(sql, tuple(args))
|
|
files = [row[0] for row in c]
|
|
# reset inactive encodes
|
|
sql = 'UPDATE encode SET status = ? WHERE site = ? AND status = ? AND modified < ?'
|
|
c.execute(sql, ('', site, 'active', active))
|
|
conn.commit()
|
|
conn.close()
|
|
return files
|
|
|
|
def is_available(self, oshash):
|
|
info = self.client.info(oshash)
|
|
if info and 'error' not in info:
|
|
for path in self.client.path(oshash):
|
|
if os.path.exists(path):
|
|
return True
|
|
return False
|
|
|
|
def queued(self):
|
|
site = self.client._config['url']
|
|
files = self.client.get_encodes(site)
|
|
available = filter(lambda oshash: self.is_available(oshash), files)
|
|
unavailable = list(set(files) - set(available))
|
|
return available, unavailable
|
|
|
|
def queued_encodes(self):
|
|
available, unavailable = self.queued()
|
|
return available
|
|
|
|
def update_status(self, oshash, status):
|
|
conn, c = self.client._conn()
|
|
site = self.client._config['url']
|
|
modified = int(time.mktime(time.localtime()))
|
|
c.execute(u'UPDATE encode SET status = ?, modified = ? WHERE site = ? AND oshash = ?', (status, modified, site, oshash))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def media_path(self, oshash):
|
|
return os.path.join(
|
|
self.client.media_cache(),
|
|
os.path.join(*hash_prefix(oshash)),
|
|
self.client.profile(self.client.info(oshash))
|
|
)
|
|
|
|
def render_json(self, request, response):
|
|
request.setHeader('Content-Type', 'application/json')
|
|
return json.dumps(response, indent=2)
|
|
|
|
def getChild(self, name, request):
|
|
# make source media available via oshash
|
|
if request.path.startswith('/get/'):
|
|
oshash = request.path.split('/')[-1]
|
|
for path in self.client.path(oshash):
|
|
if os.path.exists(path):
|
|
f = File(path, 'application/octet-stream')
|
|
f.isLeaf = True
|
|
return f
|
|
return self
|
|
|
|
def render_PUT(self, request):
|
|
if request.path.startswith('/upload'):
|
|
parts = request.path.split('/')
|
|
oshash = parts[-1]
|
|
if len(oshash) == 16:
|
|
path = self.media_path(oshash)
|
|
tmp = path + '.tmp.webm'
|
|
ox.makedirs(os.path.dirname(path))
|
|
with open(tmp, 'wb') as f:
|
|
shutil.copyfileobj(request.content, f)
|
|
shutil.move(tmp, path)
|
|
self.update_status(oshash, 'done')
|
|
self.upload.put(oshash)
|
|
return self.render_json(request, {
|
|
'path': path
|
|
})
|
|
request.setResponseCode(404)
|
|
return '404 unkown location'
|
|
|
|
def render_POST(self, request):
|
|
if request.path.startswith('/status'):
|
|
oshash = request.path.split('/')[-1]
|
|
error = request.args['error']
|
|
self.update_status(oshash, 'failed')
|
|
return self.render_json(request, {})
|
|
request.setResponseCode(404)
|
|
return '404 unkown location'
|
|
|
|
def render_GET(self, request):
|
|
if request.path.startswith('/next'):
|
|
response = {}
|
|
files = self.queued_encodes()
|
|
for oshash in files:
|
|
info = self.client.info(oshash)
|
|
if not info or 'error' in info:
|
|
continue
|
|
path = self.media_path(oshash)
|
|
if os.path.exists(path) and os.stat(path).st_size > 0:
|
|
self.update_status(oshash, 'done')
|
|
self.upload.put(oshash)
|
|
continue
|
|
for f in self.client.path(oshash):
|
|
if os.path.exists(f):
|
|
response['oshash'] = oshash
|
|
url = 'http://%s:%s/get/%s' % (request.host.host, request.host.port, oshash)
|
|
output = '/tmp/%s.%s' % (oshash, self.client.profile(info))
|
|
response['cmd'] = extract.video_cmd(url, output, self.client.profile(info), info)
|
|
response['cmd'][0] = 'ffmpeg'
|
|
response['output'] = output
|
|
self.update_status(oshash, 'active')
|
|
print(oshash, f)
|
|
return self.render_json(request, response)
|
|
return self.render_json(request, response)
|
|
elif request.path.startswith('/ping/'):
|
|
parts = request.path.split('/')
|
|
# FIXME: store client id somewhere
|
|
client = parts[-1]
|
|
oshash = parts[-2]
|
|
self.update_status(oshash, 'active')
|
|
return self.render_json(request, {})
|
|
elif request.path.startswith('/update'):
|
|
thread.start_new_thread(self.update, ())
|
|
return self.render_json(request, {'status': True})
|
|
elif request.path.startswith('/status'):
|
|
queued, offline = self.queued()
|
|
return self.render_json(request, {
|
|
'active': self.active_encodes(),
|
|
'queue': queued,
|
|
'offline': offline
|
|
})
|
|
request.setHeader('Content-Type', 'text/html')
|
|
data = 'pandora_client distributed encoding server'
|
|
return data
|
|
|
|
def update(self):
|
|
self.client.scan([])
|
|
self.client.sync([])
|
|
self.client.update_encodes(True)
|
|
|
|
def run(client, args=None):
|
|
if not args:
|
|
args = []
|
|
root = Server(client)
|
|
site = Site(root)
|
|
port = 8789
|
|
interface = '0.0.0.0'
|
|
if args:
|
|
if ':' in args[0]:
|
|
interface, port = args[0].split(':')
|
|
port = int(port)
|
|
else:
|
|
port = int(args[0])
|
|
reactor.listenTCP(port, site, interface=interface)
|
|
print('listening on http://%s:%s' % (interface, port))
|
|
client.update_encodes()
|
|
reactor.run()
|