add client/server mode for distributed encoding

This commit is contained in:
j 2013-04-28 18:34:37 +02:00
parent bf55bc1fba
commit 888599e7e2
6 changed files with 342 additions and 21 deletions

14
README
View File

@ -38,3 +38,17 @@ api documentation is available as python docstrings.
i.e. in ipython:
api.find?
(alternatively you can open the api url in a browser to read further documentation)
== Distributed encoding ==
pandora_client can distribute the encoding to multiple nodes
on a local network or multiple encodings on the same host.
to do this you need to install additional dependencies:
apt-get install python-twisted python-requests
now run one node in server mode:
pandora_client server
and start the other nodes with:
pandora_client client http://SERVER_IP:8789

View File

@ -31,13 +31,17 @@ if __name__ == '__main__':
actions = ('scan', 'sync', 'upload', 'extract', 'clean', 'cmd', 'import_srt')
config = ('config', 'add_volume')
if not args or args[0] not in actions + config:
parser.error('you must specify a valid action. \n\t\tknown actions are: %s\n\t\tconfiguration: config, add_volume' % ', '.join(actions))
server = ('server', 'client')
if not args or args[0] not in actions + config + server:
parser.error('''you must specify a valid action.
\t\tknown actions are: %s
\t\tconfiguration: config, add_volume
\t\tdistributed encoding: server, client
for more information visit https://wiki.0x2620.org/wiki/pandora_client''' % ', '.join(actions))
action = args[0]
offline = False
offline = action in config
offline = action in config or action == 'client'
if action == 'config':
if not os.path.exists(opts.config):
with open(opts.config, 'w') as f:

View File

@ -169,9 +169,19 @@ class Client(object):
for i in db:
c.execute(i)
conn.commit()
if int(self.get('version', 0)) < 4:
self.set('version', 4)
db = [
'''ALTER TABLE encode add status varchar(255)''',
'''CREATE INDEX IF NOT EXISTS encode_status_idx ON encode (status)''',
'''ALTER TABLE encode ADD modified INT DEFAULT 0''',
]
for i in db:
c.execute(i)
conn.commit()
def load_plugins(self, base='~/.ox/client.d'):
global parse_path, example_path, ignore_file, encode, encode_cmd
global parse_path, example_path, ignore_file, encode
base = os.path.expanduser(base)
for path in sorted(glob('%s/*.py' % base)):
with open(path) as fp:
@ -184,8 +194,6 @@ class Client(object):
ignore_file = module.ignore_file
if hasattr(module, 'encode'):
encode = module.encode
if hasattr(module, 'encode_cmd'):
encode_cmd = module.encode_cmd
def _conn(self):
db_conn = os.path.expanduser(self._config['cache'])
@ -254,17 +262,37 @@ class Client(object):
def set_encodes(self, site, files):
conn, c = self._conn()
c.execute('DELETE FROM encode WHERE site = ?' , (site, ))
conn.commit()
self.add_encodes(site, files)
def get_encodes(self, site, status=''):
conn, c = self._conn()
sql = 'SELECT oshash FROM encode WHERE site = ? AND status = ?'
args = [site, status]
c.execute(sql, tuple(args))
return [row[0] for row in c]
def add_encodes(self, site, files):
conn, c = self._conn()
for oshash in files:
c.execute(u'INSERT INTO encode VALUES (?, ?)', (oshash, site))
c.execute(u'INSERT INTO encode VALUES (?, ?, ?, 0)', (oshash, site, ''))
conn.commit()
def get_encodes(self, site):
conn, c = self._conn()
c.execute('SELECT oshash FROM encodes WHERE site = ?', (site, ))
files = []
for row in c:
files.append(row[0])
return files
def update_encodes(self, add=False):
#send empty list to get updated list of requested info/files/data
site = self._config['url']
post = {'info': {}}
r = self.api.update(post)
files = r['data']['data']
if add:
conn, c = self._conn()
sql = 'SELECT oshash FROM encode WHERE site = ?'
c.execute(sql, (site, ))
known = [row[0] for row in c]
files = list(set(files) - set(known))
self.add_encodes(site, files)
else:
self.set_encodes(site, files)
def scan_file(self, path):
conn, c = self._conn()
@ -427,6 +455,7 @@ class Client(object):
print "scanned volume %s: %s files, %s new, %s deleted, %s ignored" % (
name, len(files), len(new_files), len(deleted_files), len(ignored))
def extract(self, args):
conn, c = self._conn()
if args:
@ -446,11 +475,7 @@ class Client(object):
if not self.user:
print "you need to login or run pandora_client extract offline"
return
#send empty list to get updated list of requested info/files/data
post = {'info': {}}
r = self.api.update(post)
files = r['data']['data']
self.set_encodes(self._config['url'], files)
self.update_encodes()
for oshash in files:
for path in self.path(oshash):
@ -680,6 +705,24 @@ class Client(object):
print 'item not found'
sys.exit(1)
def server(self, args):
import server
server.run(self)
def client(self, args):
if not args:
print 'you must pass url to server(i.e. http://192.168.1.1:8789)'
sys.exit(1)
import client
url = args[0]
url = 'http://127.0.0.1:8789'
if len(args) == 1:
name = socket.gethostname()
else:
name = args[1]
c = client.DistributedClient(url, name)
c.run()
class API(ox.API):
__name__ = 'pandora_client'
__version__ = __version__

88
pandora_client/client.py Normal file
View File

@ -0,0 +1,88 @@
# encoding: utf-8
# vi:si:et:sw=4:sts=4:ts=4
import os
import json
import subprocess
import time
import socket
import sys
import requests
class DistributedClient:
def __init__(self, url, name):
self.url = url
self.name = name
def ping(self, oshash):
url = '%s/ping/%s/%s' % (self.url, oshash, self.name)
requests.get(url)
def status(self, oshash, status):
url = '%s/status/%s' % (self.url, oshash)
requests.post(url, {'error': status})
def upload(self, oshash, path):
url = '%s/upload/%s' % (self.url, oshash)
with open(path) as f:
requests.put(url, f)
def next(self):
url = '%s/next' % self.url
r = requests.get(url)
data = json.loads(r.content)
if 'oshash' in data:
self.encode(**data)
return True
return False
def encode(self, oshash, cmd, output):
try:
p = subprocess.Popen(cmd)
r = None
n = 0
while True:
r = p.poll()
if r == None:
if n % 60 == 0:
self.ping(oshash)
n = 0
time.sleep(2)
n += 2
else:
break
except KeyboardInterrupt:
p.kill()
#encoding was stopped, put back in queue
self.status(oshash, '')
if os.path.exists(output):
os.unlink(output)
sys.exit(1)
if r == 0:
self.upload(oshash, output)
else:
self.status(oshash, 'failed')
if os.path.exists(output):
os.unlink(output)
def run(self):
new = True
while True:
if not self.next():
if new:
new = False
print "currently no more files to encode"
time.sleep(60)
else:
new = True
if __name__ == '__main__':
url = 'http://127.0.0.1:8789'
if len(sys.args) == 0:
name = socket.gethostname()
else:
name = sys.argv[1]
c = DistributedClient(url, name)
c.run()

172
pandora_client/server.py Normal file
View File

@ -0,0 +1,172 @@
# encoding: utf-8
# vi:si:et:sw=4:sts=4:ts=4
import os
import json
import shutil
import time
import thread
from Queue import Queue
from threading import Thread
import ox
from twisted.web.resource import Resource
from twisted.web.static import File
from twisted.web.server import Site
from twisted.internet import reactor
import extract
from utils import hash_prefix
class UploadThread(Thread):
def __init__(self, server):
Thread.__init__(self)
self.server = server
def run(self):
while True:
oshash = self.server.upload.get()
print oshash
self.server.client.upload([oshash])
self.server.upload.task_done()
class Server(Resource):
def __init__(self, client):
self.upload = Queue()
self.client = client
Resource.__init__(self)
t = UploadThread(self)
t.setDaemon(True)
t.start()
def active_encodes(self):
conn, c = self.client._conn()
site = self.client._config['url']
active = int(time.mktime(time.localtime())) - 120
status = 'active'
sql = 'SELECT oshash FROM encode WHERE site = ? AND status = ? AND modified > ?'
args = [site, status, active]
c.execute(sql, tuple(args))
files = [row[0] for row in c]
#reset inactive encodes
sql = 'UPDATE encode SET status = ? WHERE site = ? AND status = ? AND modified < ?'
c.execute(sql, ('', site, 'active', active))
conn.commit()
return files
def queued_encodes(self):
site = self.client._config['url']
files = self.client.get_encodes(site)
return files
def update_status(self, oshash, status):
conn, c = self.client._conn()
site = self.client._config['url']
modified = int(time.mktime(time.localtime()))
c.execute(u'UPDATE encode SET status = ?, modified = ? WHERE site = ? AND oshash = ?', (status, modified, site, oshash))
conn.commit()
def media_path(self, oshash):
return os.path.join(
self.client.media_cache(),
os.path.join(*hash_prefix(oshash)),
self.client.profile
)
def render_json(self, request, response):
request.headers['Content-Type'] = 'application/json'
return json.dumps(response, indent=2)
def getChild(self, name, request):
#make source media available via oshash
if request.path.startswith('/get/'):
oshash = request.path.split('/')[-1]
for path in self.client.path(oshash):
if os.path.exists(path):
f = File(path, 'application/octet-stream')
f.isLeaf = True
return f
return self
def render_PUT(self, request):
if request.path.startswith('/upload'):
parts = request.path.split('/')
oshash = parts[-1]
if len(oshash) == 16:
path = self.media_path(oshash)
ox.makedirs(os.path.dirname(path))
with open(path, 'wb') as f:
shutil.copyfileobj(request.content, f)
self.update_status(oshash, 'done')
self.upload.put(oshash)
return self.render_json(request, {
'path': path
})
request.setResponseCode(404)
return '404 unkown location'
def render_POST(self, request):
if request.path.startswith('/status'):
oshash = request.path.split('/')[-1]
error = request.args['error']
self.update_status(oshash, 'failed')
return self.render_json(request, {})
request.setResponseCode(404)
return '404 unkown location'
def render_GET(self, request):
if request.path.startswith('/next'):
response = {}
files = self.queued_encodes()
for oshash in files:
path = self.media_path(oshash)
if os.path.exists(path):
self.update_status(oshash, 'done')
self.upload.put(oshash)
continue
for f in self.client.path(oshash):
if os.path.exists(f):
response['oshash'] = oshash
info = self.client.info(oshash)
url = 'http://%s:%s/get/%s' % (request.host.host, request.host.port, oshash)
output = '/tmp/%s.%s' % (oshash, self.client.profile)
response['cmd'] = extract.video_cmd(url, output, self.client.profile, info)
response['cmd'][0] = 'avconv'
response['output'] = output
self.update_status(oshash, 'active')
print oshash, f
return self.render_json(request, response)
return self.render_json(request, response)
elif request.path.startswith('/ping/'):
parts = request.path.split('/')
#FIXME: store client id somewhere
client = parts[-1]
oshash = parts[-2]
self.update_status(oshash, 'active')
return self.render_json(request, {})
elif request.path.startswith('/update'):
thread.start_new_thread(self.update, ())
return self.render_json(request, {'status': True})
elif request.path.startswith('/status'):
return self.render_json(request, {
'active': self.active_encodes(),
'queue': self.queued_encodes()
})
request.headers['Content-Type'] = 'text/html'
data = 'pandora_client distributed encoding server'
return data
def update(self):
self.client.scan([])
self.client.update_encodes(True)
self.client.sync([])
def run(client):
root = Server(client)
site = Site(root)
port = 8789
interface = '0.0.0.0'
reactor.listenTCP(port, site, interface=interface)
print 'listening on http://%s:%s' % (interface, port)
client.update_encodes()
reactor.run()

View File

@ -36,7 +36,7 @@ It is currently known to work on Linux and Mac OS X.
'pandora_client'
],
install_requires=[
'ox >= 2.1.1'
'ox >= 2.1.541'
],
keywords = [
],