commit ea1acf26077fd45f27baac12542a8c49cfd0f774 Author: j <0x006A@0x2620.org> Date: Fri Jul 30 15:24:50 2010 +0200 lets use bzr diff --git a/oxd.py b/oxd.py new file mode 100644 index 0000000..7dc6b67 --- /dev/null +++ b/oxd.py @@ -0,0 +1,743 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +# GPL 2010 +from __future__ import division, with_statement +""" + on ubuntu/debian: + apt-get install python-twisted + else + easy_install twisted + twisted available on os x since 10.5 + +FRAME extraction: + - vlc hangs on some files, mpg but also some avis + - ffmpeg decodes full video, so it takes to long extracting frames at the end + - oxframe only support ogv and webm (adding av* as option might work) + - mplayer seams to work. might be an issue installing/bundling it + +FIREFOX integration: + possible ways: + - launch oxbackend on localhost and connect to it + - way to add/configure backends + launch one localy + adding folders in remote backends is a bit complicated + beeing able to manage a backend remotely would be nice(security questions though) + also makes it a bit more complicated, than again ideal for situations with + media servers hosting the actuall videos and clients to access them + - rewrite large parts in javascript + sqlite bundled with firefox (requires js subprocess to work) + +TODO: + security, add auth framework, DIGEST might be good enough + add fields: + make archive / file link via id? + + is extracted field enough or requires frames/video thingy + + cache location, should cache be inside of archive, home folder or whats a good default. + must be a config option in +""" + +""" +/files?since=timestamp +{ + archive: { + new + updated + deleted + } +} + +/extract?oshash=abc&media=stills +/extract?oshash=abc&media=profile.webm +{ + status: 'extracting|available|failed', +} + +/get?oshash=abc +{ + info: {} + stills: [], + video: [], +} +/get?oshash=abc +{ + stills: [ + "/media/../../12.png", + "/media/../../123.png", + "/media/../../321.png", + ], + video: [ + "/media/../../96p.webm" + ], + info: {} +} +""" +import fractions +from glob import glob +import json +import os +import re +import Queue +import sqlite3 +import subprocess +import sys +import shutil +import tempfile +import time +import thread +from threading import Thread + +from twisted.cred.portal import IRealm, Portal +from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse +from twisted.internet import task, reactor +from twisted.web import server +from twisted.web.guard import HTTPAuthSessionWrapper, DigestCredentialFactory +from twisted.web.resource import Resource, IResource +from twisted.web.static import File +from zope.interface import implements + + +STATUS_NEW=0 +STATUS_EXTRACTING=1 +STATUS_AVAILABLE=2 +STATUS_FAILED=3 + +VIDEO_PROFILES = [ + '720p', + '480p', + '360p', + '96p', +] + +class AspectRatio(fractions.Fraction): + def __new__(cls, numerator, denominator=None): + if not denominator: + ratio = map(int, numerator.split(':')) + if len(ratio) == 1: ratio.append(1) + numerator = ratio[0] + denominator = ratio[1] + #if its close enough to the common aspect ratios rather use that + if abs(numerator/denominator - 4/3) < 0.03: + numerator = 4 + denominator = 3 + elif abs(numerator/denominator - 16/9) < 0.02: + numerator = 16 + denominator = 9 + return super(AspectRatio, cls).__new__(cls, numerator, denominator) + + @property + def ratio(self): + return "%d:%d" % (self.numerator, self.denominator) + +def avinfo(filename): + if os.path.getsize(filename): + p = subprocess.Popen(['ffmpeg2theora', '--info', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + info, error = p.communicate() + #remove metadata, can be broken + reg = re.compile('"metadata": {.*?},', re.DOTALL) + info = re.sub(reg, '', info) + try: + info = json.loads(info.decode('utf-8', 'replace')) + except: + print info, error + if 'video' in info and info['video']: + if not 'display_aspect_ratio' in info['video'][0]: + dar = AspectRatio(info['video'][0]['width'], info['video'][0]['height']) + info['video'][0]['display_aspect_ratio'] = dar.ratio + del info['path'] + if os.path.splitext(filename)[-1] in ('.srt', '.sub', '.idx', '.rar') and 'error' in info: + del info['error'] + if 'code' in info and info['code'] == 'badfile': + del info['code'] + return info + return {'path': filename, 'size': 0} + + +def hash_prefix(h): + return [h[:2], h[2:4], h[4:6], h[6:]] + +def extract_all_stills(): + db = Database('dev.sqlite') + conn = db.conn() + c = conn.cursor() + sql = 'SELECT path, oshash, info FROM file' + c.execute(sql) + for row in c: + video = row[0] + oshash = row[1] + info = json.loads(row[2]) + if not 'Extras/' in video and 'video' in info and info['video']: + prefix = os.path.join('media', os.path.join(*hash_prefix(oshash))) + print video + extract_stills(video, prefix, info) + +def run_command(cmd, timeout=25): + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) + while timeout > 0: + time.sleep(0.2) + timeout -= 0.2 + if p.poll() != None: + return p.returncode + if p.poll() == None: + os.kill(p.pid, 9) + killedpid, stat = os.waitpid(p.pid, os.WNOHANG) + return p.returncode + +def extract_still(video, target, position): + fdir = os.path.dirname(target) + if fdir and not os.path.exists(fdir): + os.makedirs(fdir) + + ''' + #oxframe + #this only works with theora and webm files!!! + cmd = ['oxframe', '-p', str(position), '-i', video, '-o', frame] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + info, error = p.communicate() + ''' + ''' + #ffmpeg + #decodes all the wat to position, that takes to long + cmd = ['ffmpeg', '-i', video, '-vframes', '1','-ss', str(position), '-f','image2', target] + print cmd + p = subprocess.Popen(cmd) + p.wait() + ''' + + ''' + #VLC + #hangs on mpg and some avi files with old divx3 video + out = position + 0.2 + framedir = tempfile.mkdtemp() + vlc_path = 'vlc' + for i in ("/Applications/VLC.app/Contents/MacOS/VLC", ): + if os.path.exists(i): + vlc_path = i + cmd = [ + vlc_path, '--vout=dummy', video, '--start-time=%s'%position, '--stop-time=%s'%out, + '-I', 'dummy', '--video-filter=scene', '--scene-path=%s'%framedir, + '--scene-format=png', '--scene-ratio=25', '--scene-prefix=still', '--swscale-mode=2', + '--sout-transcode-vcodec=avcodec', '--noaudio', 'vlc://quit', + ] + #print cmd + run_command(cmd) + + images = glob('%s/still*.png' % framedir) + if images: + shutil.move(images[0], target) + shutil.rmtree(framedir) + ''' + + #mplayer + cwd = os.getcwd() + target = os.path.abspath(target) + framedir = tempfile.mkdtemp() + os.chdir(framedir) + cmd = ['mplayer', '-noautosub', video, '-ss', str(position), '-frames', '2', '-vo', 'png:z=9', '-ao', 'null'] + r = run_command(cmd) + images = glob('%s/*.png' % framedir) + if images: + shutil.move(images[-1], target) + os.chdir(cwd) + shutil.rmtree(framedir) + return r == 0 + + +def extract_video(video, target, profile, info): + if not os.path.exists(target): + fdir = os.path.dirname(target) + if not os.path.exists(fdir): + os.makedirs(fdir) + + dar = AspectRatio(info['video'][0]['display_aspect_ratio']) + profile_cmd = [] + ''' + look into + lag + mb_static_threshold + qmax/qmin + rc_buf_aggressivity=0.95 + token_partitions=4 + level / speedlevel + bt? + + ''' + if profile == '720p': + height = 720 + width = int(dar * height) + profile_cmd = ['-vb', '2M', '-g', '250'] + if info['audio']: + profile_cmd += ['-ar', '48000', '-aq', '5'] + if profile == '480p': + height = 480 + width = int(dar * height) + profile_cmd = ['-vb', '1400k', '-g', '250'] + if info['audio']: + profile_cmd += ['-ar', '44100', '-aq', '2'] + if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 2: + profile_cmd += ['-ac', '2'] + elif profile == '360p': + height = 360 + width = int(dar * height) + profile_cmd = ['-vb', '768k'] + if info['audio']: + profile_cmd += ['-ar', '44100', '-aq', '1'] + if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 2: + profile_cmd += ['-ac', '2'] + else: + height = 96 + width = int(dar * height) + profile_cmd = ['-vb', '96k', '-g', '50'] + if info['audio']: + profile_cmd += ['-ar', '22050', '-ac', '1', '-aq', '-1'] + if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 1: + profile_cmd += ['-ac', '1'] + + if info['audio']: + profile_cmd +=['-acodec', 'libvorbis'] + + aspect = dar.ratio + if abs(width/height - dar) < 0.02: + aspect = '%s:%s' % (width, height) + cmd = ['./ffmpeg', '-y', '-threads', '2', + '-i', video + ] + profile_cmd + [ + '-s', '%dx%d'%(width, height), + '-aspect', aspect, + '-f','webm', + target] + print cmd + #r = run_command(cmd, -1) + p = subprocess.Popen(cmd, stdin=subprocess.PIPE) + p.wait() + r = p.returncode + print "done" + return r == 0 + +def video_frame_positions(duration): + pos = duration / 2 + #return [pos/4, pos/2, pos/2+pos/4, pos, pos+pos/2, pos+pos/2+pos/4] + return map(int, [pos/2, pos, pos+pos/2]) + +class ExtractThread(Thread): + def __init__(self, db): + Thread.__init__(self) + self.db = db + + def run(self): + while True: + oshash, name = self.db.extract.get() + self.db.extract_derivative(oshash, name) + self.db.extract.task_done() + +class Database(object): + def __init__(self, conn): + + self.extract = Queue.Queue() + for i in range(2): + t = ExtractThread(self) + t.setDaemon(True) + t.start() + + self.db_conn = conn + conn = self.conn() + c = conn.cursor() + c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''') + + if int(self.get('version', 0)) < 1: + self.set('version', 1) + db = [ + '''CREATE TABLE IF NOT EXISTS file ( + archive varchar(1024), + path varchar(1024) unique, + folder varchar(1024), + filename varchar(1024), + oshash varchar(16), + atime FLOAT, + ctime FLOAT, + mtime FLOAT, + size INT, + info TEXT, + created INT, + modified INT, + deleted INT)''', + '''CREATE INDEX IF NOT EXISTS archive_idx ON file (archive)''', + '''CREATE INDEX IF NOT EXISTS path_idx ON file (path)''', + '''CREATE INDEX IF NOT EXISTS oshash_idx ON file (oshash)''', + '''CREATE TABLE IF NOT EXISTS archive ( + site varchar(1024), + name varchar(1024) unique, + path varchar(1024) unique, + updated INT, + created INT, + updating INT)''', + '''CREATE TABLE IF NOT EXISTS derivative ( + oshash varchar(16), + name varchar(1024), + status INT, + UNIQUE(oshash, name))''', + ] + for i in db: + c.execute(i) + + c.execute('UPDATE archive set updating=0 WHERE 1=1') + conn.commit() + + def conn(self): + conn = sqlite3.connect(self.db_conn, timeout=10) + conn.text_factory = str + return conn + + def get(self, key, default=None): + conn = self.conn() + c = conn.cursor() + c.execute('SELECT value FROM setting WHERE key = ?', (key, )) + for row in c: + return row[0] + return default + + def set(self, key, value): + conn = self.conn() + c = conn.cursor() + c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value))) + conn.commit() + + def remove(self, path): + sql = 'DELETE FROM file WHERE path=?' + conn = self.conn() + c = conn.cursor() + c.execute(sql, (path, )) + + #files + def get_file(self, oshash): + conn = self.conn() + c = conn.cursor() + f = {} + sql = 'SELECT path, archive, folder, filename, info FROM file WHERE oshash=?' + c.execute(sql, (oshash, )) + for row in c: + f['path'] = row[0] + f['archive'] = row[1] + f['folder'] = row[2] + f['filename'] = row[3] + f['info'] = json.loads(row[4]) + break + return f + + def files(self, since=None): + conn = self.conn() + c = conn.cursor() + + def get_files(files, key, sql, t=()): + c.execute(sql, t) + for row in c: + archive = row[0] + folder = row[1] + filename = row[2] + info = json.loads(row[3]) + if not archive in files: files[archive]={} + if key: + if not key in files[archive]: files[archive][key]={} + if not folder in files[archive][key]: files[archive][key][folder]={} + files[archive][key][folder][filename] = info + else: + if not folder in files[archive]: files[archive][folder]={} + files[archive][folder][filename] = info + files = {} + sql_prefix = 'SELECT archive, folder, filename, info FROM file WHERE ' + sql_postfix = ' deleted < 0 ORDER BY path' + if since: + get_files(files, 'deleted', sql_prefix + 'deleted >= ? ORDER BY path' , (since, )) + get_files(files, 'modified', + sql_prefix + 'created < ? AND modified >= ? AND'+sql_postfix, + (since, since)) + get_files(files, 'new', sql_prefix + 'created >= ? AND'+sql_postfix, (since, )) + else: + get_files(files, None, sql_prefix + sql_postfix) + return files + + #derivative + def derivative(self, oshash, name, status=None): + conn = self.conn() + c = conn.cursor() + + d = {} + d['oshash'] = oshash + d['name'] = name + d['status'] = status + + if status == None: + sql = 'SELECT status FROM derivative WHERE oshash=? AND name=?' + c.execute(sql, (oshash, name)) + for row in c: + d['status'] = row[0] + if d['status'] == None: + #this is a new derivative, add to db and add to enc queue + return self.derivative(oshash, name, STATUS_NEW) + else: + print "insert or update derivative", oshash, name, status + c.execute(u'INSERT OR REPLACE INTO derivative values (?, ?, ?)', (oshash, name, status)) + conn.commit() + + prefix = hash_prefix(oshash) + path_prefix = os.path.join(self.get('media_cache', 'media'), *prefix) + d['path'] = os.path.join(path_prefix, name) + d['location'] = '/'.join(['/media', ] + prefix + [name, ]) + return d + + def derivatives(self, oshash, status=STATUS_AVAILABLE): + conn = self.conn() + c = conn.cursor() + derivatives = [] + sql = 'SELECT name FROM derivative WHERE status=? AND oshash=?' + c.execute(sql, (status, oshash)) + for row in c: + derivatives.append(self.derivative(oshash, row[0])) + return derivatives + + def extract_derivative(self, oshash, name): + f = self.get_file(oshash) + derivative = self.derivative(oshash, name) + if derivative['status'] == STATUS_NEW: + if name.endswith('.png'): + for pos in video_frame_positions(f['info']['duration']): + still_name = '%s.png' % pos + still_d = self.derivative(oshash, still_name) + if still_d['status'] == STATUS_NEW: + self.derivative(oshash, still_name, STATUS_EXTRACTING) + if extract_still(f['path'], still_d['path'], pos): + self.derivative(oshash, still_name, STATUS_AVAILABLE) + else: + self.derivative(oshash, still_name, STATUS_FAILED) + elif name.endswith('.webm'): + profile = name[:-5] + print 'now lets go, are we having fun?' + self.derivative(oshash, name, STATUS_EXTRACTING) + if extract_video(f['path'], derivative['path'], profile, f['info']): + self.derivative(oshash, name, STATUS_AVAILABLE) + else: + self.derivative(oshash, name, STATUS_FAILED) + + #archive + def update(self, archive, path, folder, filename): + update = True + + modified = time.mktime(time.localtime()) + created = modified + + sql = 'SELECT atime, ctime, mtime, size, created FROM file WHERE path=?' + conn = self.conn() + c = conn.cursor() + c.execute(sql, (path, )) + stat = os.stat(path) + for row in c: + if stat.st_atime == row[0] and stat.st_ctime == row[1] and stat.st_mtime == row[2] and stat.st_size == row[3]: + created = row[4] + update = False + break + if update: + info = avinfo(path) + for key in ('atime', 'ctime', 'mtime'): + info[key] = getattr(stat, 'st_'+key) + oshash = info['oshash'] + deleted = -1 + t = (archive, path, folder, filename, oshash, stat.st_atime, stat.st_ctime, stat.st_mtime, + stat.st_size, json.dumps(info), created, modified, deleted) + c.execute(u'INSERT OR REPLACE INTO file values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', t) + conn.commit() + + def spider(self, archive): + path = self.archives()[archive] + path = os.path.normpath(path) + for dirpath, dirnames, filenames in os.walk(path): + if filenames: + prefix = dirpath[len(path)+1:] + for filename in filenames: + if not filename.startswith('._') and not filename in ('.DS_Store', ): + print dirpath, filename + self.update(archive, os.path.join(dirpath, filename), prefix, filename) + + def add_archive(self, site, name, path): + path = os.path.normpath(path) + conn = self.conn() + c = conn.cursor() + created = time.mktime(time.localtime()) + t = (site, name, path, created, created) + c.execute(u'INSERT INTO archive values (?, ?, ?, ?, ?, 0)', t) + conn.commit() + + def archives(self): + conn = self.conn() + c = conn.cursor() + sql = 'SELECT name, path FROM archive ORDER BY name'; + c.execute(sql) + archives = {} + for row in c: + archives[row[0]] = row[1] + return archives + + def update_archives(self): + conn = self.conn() + c = conn.cursor() + c.execute('SELECT name FROM archive WHERE updating = 0 ORDER BY name'); + for row in c: + name = row[0] + c.execute(u'UPDATE archive set updating=1 where name=?', (name, )) + conn.commit() + self.spider(name) + updated = time.mktime(time.localtime()) + c.execute(u'UPDATE archive set updated=?, updating=0 where name=?', (updated, name)) + conn.commit() + + def remove_archive(self, name): + conn = self.conn() + c = conn.cursor() + c.execute('DELETE FROM archive WHERE path=?', (path, )) + c.execute('DELETE FROM file WHERE path LIKE(?%)', (path, )) + conn.commit() + +#web +def json_response(request, data): + request.headers['Content-Type'] = 'text/javascript' + return json.dumps(data, indent=2) + +class OxControl(Resource): + _children = [] + #isLeaf = True + + def __init__(self, db_path): + self.db_path = db_path + Resource.__init__(self) + + self.db = Database(self.db_path) + self.putChild("media", File(self.db.get('media_cache', 'media'))) + + #FIXME: this is just for debugging + if not 'Test' in self.db.archives(): + self.db.add_archive('0xdb.org', 'Test', '/media/2010/Movies') + + def putChild(self, name, child): + self._children.append(name) + return Resource.putChild(self, name, child) + + def getChild(self, name, request): + if name in self._children: + return Resource.getChild(self, name, request) + return self + + def render_GET(self, request): + if request.path == '/files': + """ + /files + optional ?since=unixtimestamp + new/modified + files by archive + """ + since = request.args.get("since", None) + if since: since = float(since[0]) + files = self.db.files(since) + return json_response(request, files) + + if request.path == '/update': + """ + checks for new files in all known archives + """ + #update in another thread, this otherwise blocks web server + thread.start_new_thread(self.db.update_archives,()) + response = {'status': 'ok'} + return json_response(request, response) + + if request.path == '/extract': + """ + extract derivatives from videos + """ + oshash = request.args.get("oshash", [None])[0] + media = request.args.get("media", [None, ])[0] + retry = request.args.get("retry", [None, ])[0] + + response = {'status': 'not enough data provided'} + + f = self.db.get_file(oshash) + if not f: + response = {'status': 'unkown oshash'} + elif not 'duration' in f['info']: + response = {'status': 'unkown format, can not extract data'} + else: + if media == 'stills': + name = '%s.png'%video_frame_positions(f['info']['duration'])[0] + elif media.endswith('.webm'): + profile = media[:-5] + if profile in VIDEO_PROFILES: + name = media + else: + response = {'status': 'unsupported video profile requested'} + if name: + #get or create derivative + derivative = self.db.derivative(oshash, name) + if derivative['status'] == STATUS_FAILED and retry: + derivative = self.db.derivative(oshash, name, STATUS_NEW) + response['status'] = { + STATUS_NEW: 'extracting', + STATUS_EXTRACTING: 'extracting', + STATUS_AVAILABLE: 'available', + STATUS_FAILED: 'failed', + }.get(derivative['status'], 'extracting') + if derivative['status'] == STATUS_NEW: + self.db.extract.put((oshash, name)) + files = [f['location'] for f in self.db.derivatives(oshash)] + if media == 'stills': + response['stills'] = filter(lambda f: f.endswith('.png'), files) + else: + response['video'] = filter(lambda f: f.endswith('.webm'), files) + return json_response(request, response) + + if request.path == '/get': + """ + get information about a file, including derivatives + """ + oshash = request.args.get("oshash", [None, ])[0] + response = {'status': 'no oshash provided'} + if oshash: + f = self.db.get_file(oshash) + response['status'] = 'available' + response['info'] = f['info'] + files = [f['location'] for f in self.db.derivatives(oshash)] + response['video'] = filter(lambda f: f.endswith('.webm'), files) + response['stills'] = filter(lambda f: f.endswith('.png'), files) + + return json_response(request, response) + return "this is not for humans" + +if __name__ == '__main__': + db = 'dev.sqlite' + port = 2620 + username = 'fix' + password = 'me' + + interface = '127.0.0.1' + interface = '10.26.20.10' + interface = '0.0.0.0' + + print 'http://%s:%d/' % (interface, port) + + root = OxControl(db) + + checker = InMemoryUsernamePasswordDatabaseDontUse() + checker.addUser(username, password) + + class PublicHTMLRealm(object): + implements(IRealm) + + def requestAvatar(self, avatarId, mind, *interfaces): + if IResource in interfaces: + return (IResource, root, lambda: None) + raise NotImplementedError() + + portal = Portal(PublicHTMLRealm(), [checker]) + + credentialFactory = DigestCredentialFactory("md5", "oxbackend") + resource = HTTPAuthSessionWrapper(portal, [credentialFactory]) + + site = server.Site(resource) + reactor.listenTCP(port, site, interface=interface) + reactor.run() +