# encoding: utf-8 # vi:si:et:sw=4:sts=4:ts=4 from __future__ import print_function import os import json import shutil import time import thread from Queue import Queue from threading import Thread import ox from twisted.web.resource import Resource from twisted.web.static import File from twisted.web.server import Site from twisted.internet import reactor import extract from utils import hash_prefix class UploadThread(Thread): def __init__(self, server): Thread.__init__(self) self.server = server def run(self): while True: oshash = self.server.upload.get() print(oshash) try: self.server.client.upload([oshash]) except: print('failed to upload', oshash) self.server.upload.task_done() class Server(Resource): def __init__(self, client): self.upload = Queue() self.client = client Resource.__init__(self) t = UploadThread(self) t.setDaemon(True) t.start() def active_encodes(self): conn, c = self.client._conn() site = self.client._config['url'] active = int(time.mktime(time.localtime())) - 120 status = 'active' sql = 'SELECT oshash FROM encode WHERE site = ? AND status = ? AND modified > ?' args = [site, status, active] c.execute(sql, tuple(args)) files = [row[0] for row in c] # reset inactive encodes sql = 'UPDATE encode SET status = ? WHERE site = ? AND status = ? AND modified < ?' c.execute(sql, ('', site, 'active', active)) conn.commit() return files def is_available(self, oshash): info = self.client.info(oshash) if info and 'error' not in info: for path in self.client.path(oshash): if os.path.exists(path): return True return False def queued(self): site = self.client._config['url'] files = self.client.get_encodes(site) available = filter(lambda oshash: self.is_available(oshash), files) unavailable = list(set(files) - set(available)) return available, unavailable def queued_encodes(self): available, unavailable = self.queued() return available def update_status(self, oshash, status): conn, c = self.client._conn() site = self.client._config['url'] modified = int(time.mktime(time.localtime())) c.execute(u'UPDATE encode SET status = ?, modified = ? WHERE site = ? AND oshash = ?', (status, modified, site, oshash)) conn.commit() def media_path(self, oshash): return os.path.join( self.client.media_cache(), os.path.join(*hash_prefix(oshash)), self.client.profile(self.client.info(oshash)) ) def render_json(self, request, response): request.setHeader('Content-Type', 'application/json') return json.dumps(response, indent=2) def getChild(self, name, request): # make source media available via oshash if request.path.startswith('/get/'): oshash = request.path.split('/')[-1] for path in self.client.path(oshash): if os.path.exists(path): f = File(path, 'application/octet-stream') f.isLeaf = True return f return self def render_PUT(self, request): if request.path.startswith('/upload'): parts = request.path.split('/') oshash = parts[-1] if len(oshash) == 16: path = self.media_path(oshash) tmp = path + '.tmp.webm' ox.makedirs(os.path.dirname(path)) with open(tmp, 'wb') as f: shutil.copyfileobj(request.content, f) shutil.move(tmp, path) self.update_status(oshash, 'done') self.upload.put(oshash) return self.render_json(request, { 'path': path }) request.setResponseCode(404) return '404 unkown location' def render_POST(self, request): if request.path.startswith('/status'): oshash = request.path.split('/')[-1] error = request.args['error'] self.update_status(oshash, 'failed') return self.render_json(request, {}) request.setResponseCode(404) return '404 unkown location' def render_GET(self, request): if request.path.startswith('/next'): response = {} files = self.queued_encodes() for oshash in files: info = self.client.info(oshash) if not info or 'error' in info: continue path = self.media_path(oshash) if os.path.exists(path) and os.stat(path).st_size > 0: self.update_status(oshash, 'done') self.upload.put(oshash) continue for f in self.client.path(oshash): if os.path.exists(f): response['oshash'] = oshash url = 'http://%s:%s/get/%s' % (request.host.host, request.host.port, oshash) output = '/tmp/%s.%s' % (oshash, self.client.profile(info)) response['cmd'] = extract.video_cmd(url, output, self.client.profile(info), info) response['cmd'][0] = 'ffmpeg' response['output'] = output self.update_status(oshash, 'active') print(oshash, f) return self.render_json(request, response) return self.render_json(request, response) elif request.path.startswith('/ping/'): parts = request.path.split('/') # FIXME: store client id somewhere client = parts[-1] oshash = parts[-2] self.update_status(oshash, 'active') return self.render_json(request, {}) elif request.path.startswith('/update'): thread.start_new_thread(self.update, ()) return self.render_json(request, {'status': True}) elif request.path.startswith('/status'): queued, offline = self.queued() return self.render_json(request, { 'active': self.active_encodes(), 'queue': queued, 'offline': offline }) request.setHeader('Content-Type', 'text/html') data = 'pandora_client distributed encoding server' return data def update(self): self.client.scan([]) self.client.sync([]) self.client.update_encodes(True) def run(client, args=None): if not args: args = [] root = Server(client) site = Site(root) port = 8789 interface = '0.0.0.0' if args: if ':' in args[0]: interface, port = args[0].split(':') port = int(port) else: port = int(args[0]) reactor.listenTCP(port, site, interface=interface) print('listening on http://%s:%s' % (interface, port)) client.update_encodes() reactor.run()