pandora_client/pandora_client/server.py

237 lines
8.2 KiB
Python

# encoding: utf-8
# vi:si:et:sw=4:sts=4:ts=4
import os
import json
import shutil
import time
try:
import _thread as thread
from queue import Queue
except:
import thread
from Queue import Queue
from threading import Thread
import ox
from twisted.web.resource import Resource
from twisted.web.static import File
from twisted.web.server import Site
from twisted.internet import reactor
from . import extract
from .utils import hash_prefix
class UploadThread(Thread):
def __init__(self, server):
Thread.__init__(self)
self.server = server
def run(self):
while True:
oshash = self.server.upload.get()
print(oshash)
try:
self.server.client.upload([oshash])
except:
print('failed to upload', oshash)
self.server.upload.task_done()
class Server(Resource):
def __init__(self, client):
self.upload = Queue()
self.client = client
Resource.__init__(self)
t = UploadThread(self)
t.setDaemon(True)
t.start()
def active_encodes(self):
conn, c = self.client._conn()
site = self.client._config['url']
active = int(time.mktime(time.localtime())) - 120
status = 'active'
sql = 'SELECT oshash FROM encode WHERE site = ? AND status = ? AND modified > ?'
args = [site, status, active]
c.execute(sql, tuple(args))
files = [row[0] for row in c]
# reset inactive encodes
sql = 'UPDATE encode SET status = ? WHERE site = ? AND status = ? AND modified < ?'
c.execute(sql, ('', site, 'active', active))
conn.commit()
conn.close()
return files
def is_available(self, oshash):
info = self.client.info(oshash)
if info and 'error' not in info:
for path in self.client.path(oshash):
if os.path.exists(path):
return True
return False
def queued(self):
site = self.client._config['url']
files = self.client.get_encodes(site)
available = list(filter(lambda oshash: self.is_available(oshash), files))
unavailable = list(set(files) - set(available))
return available, unavailable
def queued_encodes(self):
available, unavailable = self.queued()
return available
def queue_available_uploads(self):
files = self.queued_encodes()
for oshash in files:
info = self.client.info(oshash)
if not info or 'error' in info:
continue
path = self.media_path(oshash)
if os.path.exists(path) and os.stat(path).st_size > 0:
self.update_status(oshash, 'done')
self.upload.put(oshash)
def update_status(self, oshash, status):
conn, c = self.client._conn()
site = self.client._config['url']
modified = int(time.mktime(time.localtime()))
c.execute(u'UPDATE encode SET status = ?, modified = ? WHERE site = ? AND oshash = ?', (status, modified, site, oshash))
conn.commit()
conn.close()
def media_path(self, oshash):
return os.path.join(
self.client.media_cache(),
os.path.join(*hash_prefix(oshash)),
self.client.profile(self.client.info(oshash))
)
def render_json(self, request, response):
request.setHeader('Content-Type', 'application/json')
return json.dumps(response, indent=2).encode()
def getChild(self, name, request):
# make source media available via oshash
if request.path.startswith(b'/get/'):
oshash = request.path.decode().split('/')[-1]
for path in self.client.path(oshash):
if os.path.exists(path):
f = File(path, 'application/octet-stream')
f.isLeaf = True
return f
return self
def render_PUT(self, request):
if request.path.startswith(b'/upload'):
parts = request.path.decode().split('/')
oshash = parts[-1]
if len(oshash) == 16:
path = self.media_path(oshash)
tmp = path + '.tmp.webm'
ox.makedirs(os.path.dirname(path))
with open(tmp, 'wb') as f:
shutil.copyfileobj(request.content, f)
shutil.move(tmp, path)
self.update_status(oshash, 'done')
self.upload.put(oshash)
return self.render_json(request, {
'path': path
})
request.setResponseCode(404)
return '404 unkown location'
def render_POST(self, request):
if request.path.startswith(b'/status'):
oshash = request.path.decode().split('/')[-1]
self.update_status(oshash, 'failed')
return self.render_json(request, {})
request.setResponseCode(404)
return '404 unkown location'
def render_GET(self, request):
if request.path.startswith(b'/next'):
response = {}
files = self.queued_encodes()
for oshash in files:
info = self.client.info(oshash)
if not info or 'error' in info:
continue
path = self.media_path(oshash)
if os.path.exists(path) and os.stat(path).st_size > 0:
self.update_status(oshash, 'done')
self.upload.put(oshash)
continue
for f in self.client.path(oshash):
if os.path.exists(f):
response['oshash'] = oshash
url = 'http://%s:%s/get/%s' % (request.host.host, request.host.port, oshash)
output = '/tmp/%s.%s' % (oshash, self.client.profile(info))
response['cmd'] = extract.video_cmd(url, output, self.client.profile(info), info)
if isinstance(response['cmd'][0], list):
for part in response['cmd']:
part[0] = 'ffmpeg'
else:
response['cmd'][0] = 'ffmpeg'
response['output'] = output
self.update_status(oshash, 'active')
print(oshash, f)
return self.render_json(request, response)
return self.render_json(request, response)
elif request.path.startswith(b'/ping/'):
parts = request.path.decode().split('/')
# FIXME: store client id somewhere
client = parts[-1]
oshash = parts[-2]
self.update_status(oshash, 'active')
return self.render_json(request, {})
elif request.path.startswith(b'/update'):
thread.start_new_thread(self.update, ())
return self.render_json(request, {'status': True})
elif request.path.startswith(b'/status'):
queued, offline = self.queued()
return self.render_json(request, {
'active': self.active_encodes(),
'queue': queued,
'offline': offline
})
request.setHeader('Content-Type', 'text/html')
data = b'pandora_client distributed encoding server'
return data
def update(self):
self.client.scan([])
self.client.sync([])
self.client.update_encodes(True)
def run(client, args=None):
if not args:
args = []
root = Server(client)
site = Site(root)
port = 8789
interface = '0.0.0.0'
if args:
if ':' in args[0]:
interface, port = args[0].split(':')
port = int(port)
else:
port = int(args[0])
local = None
else:
try:
from . import localnode
local = localnode.Server(port)
except:
local = None
reactor.listenTCP(port, site, interface=interface)
print('listening on http://%s:%s' % (interface, port))
if local:
print('broadcasting location via mdns (you can just use pandora_client client on the other side)')
client.update_encodes()
root.queue_available_uploads()
reactor.run()
if local:
del local