# encoding: utf-8 # vi:si:et:sw=4:sts=4:ts=4 import os import json import shutil import time try: import _thread as thread from queue import Queue except: import thread from Queue import Queue from threading import Thread import ox from twisted.web.resource import Resource from twisted.web.static import File from twisted.web.server import Site from twisted.internet import reactor from . import extract from .utils import hash_prefix class UploadThread(Thread): def __init__(self, server): Thread.__init__(self) self.server = server def run(self): while True: oshash = self.server.upload.get() print(oshash) try: self.server.client.upload([oshash]) except: print('failed to upload', oshash) self.server.upload.task_done() class Server(Resource): def __init__(self, client): self.upload = Queue() self.client = client Resource.__init__(self) t = UploadThread(self) t.setDaemon(True) t.start() def active_encodes(self): conn, c = self.client._conn() site = self.client._config['url'] active = int(time.mktime(time.localtime())) - 120 status = 'active' sql = 'SELECT oshash FROM encode WHERE site = ? AND status = ? AND modified > ?' args = [site, status, active] c.execute(sql, tuple(args)) files = [row[0] for row in c] # reset inactive encodes sql = 'UPDATE encode SET status = ? WHERE site = ? AND status = ? AND modified < ?' c.execute(sql, ('', site, 'active', active)) conn.commit() conn.close() return files def is_available(self, oshash): info = self.client.info(oshash) if info and 'error' not in info: for path in self.client.path(oshash): if os.path.exists(path): return True return False def queued(self): site = self.client._config['url'] files = self.client.get_encodes(site) available = list(filter(lambda oshash: self.is_available(oshash), files)) unavailable = list(set(files) - set(available)) return available, unavailable def queued_encodes(self): available, unavailable = self.queued() return available def queue_available_uploads(self): files = self.queued_encodes() for oshash in files: info = self.client.info(oshash) if not info or 'error' in info: continue path = self.media_path(oshash) if os.path.exists(path) and os.stat(path).st_size > 0: self.update_status(oshash, 'done') self.upload.put(oshash) def update_status(self, oshash, status): conn, c = self.client._conn() site = self.client._config['url'] modified = int(time.mktime(time.localtime())) c.execute(u'UPDATE encode SET status = ?, modified = ? WHERE site = ? AND oshash = ?', (status, modified, site, oshash)) conn.commit() conn.close() def media_path(self, oshash): return os.path.join( self.client.media_cache(), os.path.join(*hash_prefix(oshash)), self.client.profile(self.client.info(oshash)) ) def render_json(self, request, response): request.setHeader('Content-Type', 'application/json') return json.dumps(response, indent=2).encode() def getChild(self, name, request): # make source media available via oshash if request.path.startswith(b'/get/'): oshash = request.path.decode().split('/')[-1] for path in self.client.path(oshash): if os.path.exists(path): f = File(path, 'application/octet-stream') f.isLeaf = True return f return self def render_PUT(self, request): if request.path.startswith(b'/upload'): parts = request.path.decode().split('/') oshash = parts[-1] if len(oshash) == 16: path = self.media_path(oshash) tmp = path + '.tmp.webm' ox.makedirs(os.path.dirname(path)) with open(tmp, 'wb') as f: shutil.copyfileobj(request.content, f) shutil.move(tmp, path) self.update_status(oshash, 'done') self.upload.put(oshash) return self.render_json(request, { 'path': path }) request.setResponseCode(404) return '404 unkown location' def render_POST(self, request): if request.path.startswith(b'/status'): oshash = request.path.decode().split('/')[-1] self.update_status(oshash, 'failed') return self.render_json(request, {}) request.setResponseCode(404) return '404 unkown location' def render_GET(self, request): if request.path.startswith(b'/next'): response = {} files = self.queued_encodes() for oshash in files: info = self.client.info(oshash) if not info or 'error' in info: continue path = self.media_path(oshash) if os.path.exists(path) and os.stat(path).st_size > 0: self.update_status(oshash, 'done') self.upload.put(oshash) continue for f in self.client.path(oshash): if os.path.exists(f): response['oshash'] = oshash url = 'http://%s:%s/get/%s' % (request.host.host, request.host.port, oshash) output = '/tmp/%s.%s' % (oshash, self.client.profile(info)) response['cmd'] = extract.video_cmd(url, output, self.client.profile(info), info) if isinstance(response['cmd'][0], list): for part in response['cmd']: part[0] = 'ffmpeg' else: response['cmd'][0] = 'ffmpeg' response['output'] = output self.update_status(oshash, 'active') print(oshash, f) return self.render_json(request, response) return self.render_json(request, response) elif request.path.startswith(b'/ping/'): parts = request.path.decode().split('/') # FIXME: store client id somewhere client = parts[-1] oshash = parts[-2] self.update_status(oshash, 'active') return self.render_json(request, {}) elif request.path.startswith(b'/update'): thread.start_new_thread(self.update, ()) return self.render_json(request, {'status': True}) elif request.path.startswith(b'/status'): queued, offline = self.queued() return self.render_json(request, { 'active': self.active_encodes(), 'queue': queued, 'offline': offline }) request.setHeader('Content-Type', 'text/html') data = b'pandora_client distributed encoding server' return data def update(self): self.client.scan([]) self.client.sync([]) self.client.update_encodes(True) def run(client, args=None): if not args: args = [] root = Server(client) site = Site(root) port = 8789 interface = '0.0.0.0' if args: if ':' in args[0]: interface, port = args[0].split(':') port = int(port) else: port = int(args[0]) local = None else: try: from . import localnode local = localnode.Server(port) except: local = None reactor.listenTCP(port, site, interface=interface) print('listening on http://%s:%s' % (interface, port)) if local: print('broadcasting location via mdns (you can just use pandora_client client on the other side)') client.update_encodes() root.queue_available_uploads() reactor.run() if local: del local