From 888599e7e239fa20f4d9f767ddd3966251a5331d Mon Sep 17 00:00:00 2001 From: j <0x006A@0x2620.org> Date: Sun, 28 Apr 2013 18:34:37 +0200 Subject: [PATCH] add client/server mode for distributed encoding --- README | 14 +++ bin/pandora_client | 12 ++- pandora_client/__init__.py | 75 ++++++++++++---- pandora_client/client.py | 88 +++++++++++++++++++ pandora_client/server.py | 172 +++++++++++++++++++++++++++++++++++++ setup.py | 2 +- 6 files changed, 342 insertions(+), 21 deletions(-) create mode 100644 pandora_client/client.py create mode 100644 pandora_client/server.py diff --git a/README b/README index 85a1f4b..86e93b5 100644 --- a/README +++ b/README @@ -38,3 +38,17 @@ api documentation is available as python docstrings. i.e. in ipython: api.find? (alternatively you can open the api url in a browser to read further documentation) + + +== Distributed encoding == +pandora_client can distribute the encoding to multiple nodes +on a local network or multiple encodings on the same host. + +to do this you need to install additional dependencies: + apt-get install python-twisted python-requests + +now run one node in server mode: + pandora_client server + +and start the other nodes with: + pandora_client client http://SERVER_IP:8789 diff --git a/bin/pandora_client b/bin/pandora_client index fd3bbbb..39e9bf3 100755 --- a/bin/pandora_client +++ b/bin/pandora_client @@ -31,13 +31,17 @@ if __name__ == '__main__': actions = ('scan', 'sync', 'upload', 'extract', 'clean', 'cmd', 'import_srt') config = ('config', 'add_volume') - if not args or args[0] not in actions + config: - parser.error('you must specify a valid action. \n\t\tknown actions are: %s\n\t\tconfiguration: config, add_volume' % ', '.join(actions)) + server = ('server', 'client') + if not args or args[0] not in actions + config + server: + parser.error('''you must specify a valid action. +\t\tknown actions are: %s +\t\tconfiguration: config, add_volume +\t\tdistributed encoding: server, client +for more information visit https://wiki.0x2620.org/wiki/pandora_client''' % ', '.join(actions)) action = args[0] - offline = False - offline = action in config + offline = action in config or action == 'client' if action == 'config': if not os.path.exists(opts.config): with open(opts.config, 'w') as f: diff --git a/pandora_client/__init__.py b/pandora_client/__init__.py index 2da800d..02d5337 100644 --- a/pandora_client/__init__.py +++ b/pandora_client/__init__.py @@ -169,9 +169,19 @@ class Client(object): for i in db: c.execute(i) conn.commit() + if int(self.get('version', 0)) < 4: + self.set('version', 4) + db = [ + '''ALTER TABLE encode add status varchar(255)''', + '''CREATE INDEX IF NOT EXISTS encode_status_idx ON encode (status)''', + '''ALTER TABLE encode ADD modified INT DEFAULT 0''', + ] + for i in db: + c.execute(i) + conn.commit() def load_plugins(self, base='~/.ox/client.d'): - global parse_path, example_path, ignore_file, encode, encode_cmd + global parse_path, example_path, ignore_file, encode base = os.path.expanduser(base) for path in sorted(glob('%s/*.py' % base)): with open(path) as fp: @@ -184,8 +194,6 @@ class Client(object): ignore_file = module.ignore_file if hasattr(module, 'encode'): encode = module.encode - if hasattr(module, 'encode_cmd'): - encode_cmd = module.encode_cmd def _conn(self): db_conn = os.path.expanduser(self._config['cache']) @@ -254,17 +262,37 @@ class Client(object): def set_encodes(self, site, files): conn, c = self._conn() c.execute('DELETE FROM encode WHERE site = ?' , (site, )) + conn.commit() + self.add_encodes(site, files) + + def get_encodes(self, site, status=''): + conn, c = self._conn() + sql = 'SELECT oshash FROM encode WHERE site = ? AND status = ?' + args = [site, status] + c.execute(sql, tuple(args)) + return [row[0] for row in c] + + def add_encodes(self, site, files): + conn, c = self._conn() for oshash in files: - c.execute(u'INSERT INTO encode VALUES (?, ?)', (oshash, site)) + c.execute(u'INSERT INTO encode VALUES (?, ?, ?, 0)', (oshash, site, '')) conn.commit() - def get_encodes(self, site): - conn, c = self._conn() - c.execute('SELECT oshash FROM encodes WHERE site = ?', (site, )) - files = [] - for row in c: - files.append(row[0]) - return files + def update_encodes(self, add=False): + #send empty list to get updated list of requested info/files/data + site = self._config['url'] + post = {'info': {}} + r = self.api.update(post) + files = r['data']['data'] + if add: + conn, c = self._conn() + sql = 'SELECT oshash FROM encode WHERE site = ?' + c.execute(sql, (site, )) + known = [row[0] for row in c] + files = list(set(files) - set(known)) + self.add_encodes(site, files) + else: + self.set_encodes(site, files) def scan_file(self, path): conn, c = self._conn() @@ -427,6 +455,7 @@ class Client(object): print "scanned volume %s: %s files, %s new, %s deleted, %s ignored" % ( name, len(files), len(new_files), len(deleted_files), len(ignored)) + def extract(self, args): conn, c = self._conn() if args: @@ -446,11 +475,7 @@ class Client(object): if not self.user: print "you need to login or run pandora_client extract offline" return - #send empty list to get updated list of requested info/files/data - post = {'info': {}} - r = self.api.update(post) - files = r['data']['data'] - self.set_encodes(self._config['url'], files) + self.update_encodes() for oshash in files: for path in self.path(oshash): @@ -680,6 +705,24 @@ class Client(object): print 'item not found' sys.exit(1) + def server(self, args): + import server + server.run(self) + + def client(self, args): + if not args: + print 'you must pass url to server(i.e. http://192.168.1.1:8789)' + sys.exit(1) + import client + url = args[0] + url = 'http://127.0.0.1:8789' + if len(args) == 1: + name = socket.gethostname() + else: + name = args[1] + c = client.DistributedClient(url, name) + c.run() + class API(ox.API): __name__ = 'pandora_client' __version__ = __version__ diff --git a/pandora_client/client.py b/pandora_client/client.py new file mode 100644 index 0000000..cf0106c --- /dev/null +++ b/pandora_client/client.py @@ -0,0 +1,88 @@ +# encoding: utf-8 +# vi:si:et:sw=4:sts=4:ts=4 +import os +import json +import subprocess +import time +import socket +import sys + +import requests + + +class DistributedClient: + + def __init__(self, url, name): + self.url = url + self.name = name + + def ping(self, oshash): + url = '%s/ping/%s/%s' % (self.url, oshash, self.name) + requests.get(url) + + def status(self, oshash, status): + url = '%s/status/%s' % (self.url, oshash) + requests.post(url, {'error': status}) + + def upload(self, oshash, path): + url = '%s/upload/%s' % (self.url, oshash) + with open(path) as f: + requests.put(url, f) + + def next(self): + url = '%s/next' % self.url + r = requests.get(url) + data = json.loads(r.content) + if 'oshash' in data: + self.encode(**data) + return True + return False + + def encode(self, oshash, cmd, output): + try: + p = subprocess.Popen(cmd) + r = None + n = 0 + while True: + r = p.poll() + if r == None: + if n % 60 == 0: + self.ping(oshash) + n = 0 + time.sleep(2) + n += 2 + else: + break + except KeyboardInterrupt: + p.kill() + #encoding was stopped, put back in queue + self.status(oshash, '') + if os.path.exists(output): + os.unlink(output) + sys.exit(1) + if r == 0: + self.upload(oshash, output) + else: + self.status(oshash, 'failed') + if os.path.exists(output): + os.unlink(output) + + def run(self): + new = True + while True: + if not self.next(): + if new: + new = False + print "currently no more files to encode" + time.sleep(60) + else: + new = True + +if __name__ == '__main__': + url = 'http://127.0.0.1:8789' + if len(sys.args) == 0: + name = socket.gethostname() + else: + name = sys.argv[1] + c = DistributedClient(url, name) + c.run() diff --git a/pandora_client/server.py b/pandora_client/server.py new file mode 100644 index 0000000..e26b239 --- /dev/null +++ b/pandora_client/server.py @@ -0,0 +1,172 @@ +# encoding: utf-8 +# vi:si:et:sw=4:sts=4:ts=4 +import os +import json +import shutil +import time +import thread +from Queue import Queue +from threading import Thread + +import ox +from twisted.web.resource import Resource +from twisted.web.static import File +from twisted.web.server import Site +from twisted.internet import reactor + +import extract +from utils import hash_prefix + +class UploadThread(Thread): + def __init__(self, server): + Thread.__init__(self) + self.server = server + + def run(self): + while True: + oshash = self.server.upload.get() + print oshash + self.server.client.upload([oshash]) + self.server.upload.task_done() + +class Server(Resource): + + def __init__(self, client): + self.upload = Queue() + self.client = client + Resource.__init__(self) + t = UploadThread(self) + t.setDaemon(True) + t.start() + + def active_encodes(self): + conn, c = self.client._conn() + site = self.client._config['url'] + active = int(time.mktime(time.localtime())) - 120 + status = 'active' + sql = 'SELECT oshash FROM encode WHERE site = ? AND status = ? AND modified > ?' + args = [site, status, active] + c.execute(sql, tuple(args)) + files = [row[0] for row in c] + #reset inactive encodes + sql = 'UPDATE encode SET status = ? WHERE site = ? AND status = ? AND modified < ?' + c.execute(sql, ('', site, 'active', active)) + conn.commit() + return files + + def queued_encodes(self): + site = self.client._config['url'] + files = self.client.get_encodes(site) + return files + + def update_status(self, oshash, status): + conn, c = self.client._conn() + site = self.client._config['url'] + modified = int(time.mktime(time.localtime())) + c.execute(u'UPDATE encode SET status = ?, modified = ? WHERE site = ? AND oshash = ?', (status, modified, site, oshash)) + conn.commit() + + def media_path(self, oshash): + return os.path.join( + self.client.media_cache(), + os.path.join(*hash_prefix(oshash)), + self.client.profile + ) + + def render_json(self, request, response): + request.headers['Content-Type'] = 'application/json' + return json.dumps(response, indent=2) + + def getChild(self, name, request): + #make source media available via oshash + if request.path.startswith('/get/'): + oshash = request.path.split('/')[-1] + for path in self.client.path(oshash): + if os.path.exists(path): + f = File(path, 'application/octet-stream') + f.isLeaf = True + return f + return self + + def render_PUT(self, request): + if request.path.startswith('/upload'): + parts = request.path.split('/') + oshash = parts[-1] + if len(oshash) == 16: + path = self.media_path(oshash) + ox.makedirs(os.path.dirname(path)) + with open(path, 'wb') as f: + shutil.copyfileobj(request.content, f) + self.update_status(oshash, 'done') + self.upload.put(oshash) + return self.render_json(request, { + 'path': path + }) + request.setResponseCode(404) + return '404 unkown location' + + def render_POST(self, request): + if request.path.startswith('/status'): + oshash = request.path.split('/')[-1] + error = request.args['error'] + self.update_status(oshash, 'failed') + return self.render_json(request, {}) + request.setResponseCode(404) + return '404 unkown location' + + def render_GET(self, request): + if request.path.startswith('/next'): + response = {} + files = self.queued_encodes() + for oshash in files: + path = self.media_path(oshash) + if os.path.exists(path): + self.update_status(oshash, 'done') + self.upload.put(oshash) + continue + for f in self.client.path(oshash): + if os.path.exists(f): + response['oshash'] = oshash + info = self.client.info(oshash) + url = 'http://%s:%s/get/%s' % (request.host.host, request.host.port, oshash) + output = '/tmp/%s.%s' % (oshash, self.client.profile) + response['cmd'] = extract.video_cmd(url, output, self.client.profile, info) + response['cmd'][0] = 'avconv' + response['output'] = output + self.update_status(oshash, 'active') + print oshash, f + return self.render_json(request, response) + return self.render_json(request, response) + elif request.path.startswith('/ping/'): + parts = request.path.split('/') + #FIXME: store client id somewhere + client = parts[-1] + oshash = parts[-2] + self.update_status(oshash, 'active') + return self.render_json(request, {}) + elif request.path.startswith('/update'): + thread.start_new_thread(self.update, ()) + return self.render_json(request, {'status': True}) + elif request.path.startswith('/status'): + return self.render_json(request, { + 'active': self.active_encodes(), + 'queue': self.queued_encodes() + }) + request.headers['Content-Type'] = 'text/html' + data = 'pandora_client distributed encoding server' + return data + + def update(self): + self.client.scan([]) + self.client.update_encodes(True) + self.client.sync([]) + +def run(client): + root = Server(client) + site = Site(root) + port = 8789 + interface = '0.0.0.0' + reactor.listenTCP(port, site, interface=interface) + print 'listening on http://%s:%s' % (interface, port) + client.update_encodes() + reactor.run() diff --git a/setup.py b/setup.py index 49f05a7..ba60267 100644 --- a/setup.py +++ b/setup.py @@ -36,7 +36,7 @@ It is currently known to work on Linux and Mac OS X. 'pandora_client' ], install_requires=[ - 'ox >= 2.1.1' + 'ox >= 2.1.541' ], keywords = [ ],