# -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 # GPL 2010 from __future__ import division, with_statement import fractions from glob import glob import json import os import re import Queue import sqlite3 import subprocess import sys import shutil import tempfile import time import thread from threading import Thread from twisted.cred.portal import IRealm, Portal from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse from twisted.internet import task, reactor from twisted.web import server from twisted.web.guard import HTTPAuthSessionWrapper, DigestCredentialFactory from twisted.web.resource import Resource, IResource from twisted.web.static import File from zope.interface import implements STATUS_NEW=0 STATUS_EXTRACTING=1 STATUS_AVAILABLE=2 STATUS_FAILED=3 VIDEO_PROFILES = [ '720p', '480p', '360p', '96p', ] class AspectRatio(fractions.Fraction): def __new__(cls, numerator, denominator=None): if not denominator: ratio = map(int, numerator.split(':')) if len(ratio) == 1: ratio.append(1) numerator = ratio[0] denominator = ratio[1] #if its close enough to the common aspect ratios rather use that if abs(numerator/denominator - 4/3) < 0.03: numerator = 4 denominator = 3 elif abs(numerator/denominator - 16/9) < 0.02: numerator = 16 denominator = 9 return super(AspectRatio, cls).__new__(cls, numerator, denominator) @property def ratio(self): return "%d:%d" % (self.numerator, self.denominator) def avinfo(filename): if os.path.getsize(filename): p = subprocess.Popen(['ffmpeg2theora', '--info', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE) info, error = p.communicate() #remove metadata, can be broken reg = re.compile('"metadata": {.*?},', re.DOTALL) info = re.sub(reg, '', info) try: info = json.loads(info.decode('utf-8', 'replace')) except: print info, error if 'video' in info and info['video']: if not 'display_aspect_ratio' in info['video'][0]: dar = AspectRatio(info['video'][0]['width'], info['video'][0]['height']) info['video'][0]['display_aspect_ratio'] = dar.ratio del info['path'] if os.path.splitext(filename)[-1] in ('.srt', '.sub', '.idx', '.rar') and 'error' in info: del info['error'] if 'code' in info and info['code'] == 'badfile': del info['code'] return info return {'path': filename, 'size': 0} def hash_prefix(h): return [h[:2], h[2:4], h[4:6], h[6:]] def extract_all_stills(): db = Database('dev.sqlite') conn = db.conn() c = conn.cursor() sql = 'SELECT path, oshash, info FROM file' c.execute(sql) for row in c: video = row[0] oshash = row[1] info = json.loads(row[2]) if not 'Extras/' in video and 'video' in info and info['video']: prefix = os.path.join('media', os.path.join(*hash_prefix(oshash))) print video extract_stills(video, prefix, info) def run_command(cmd, timeout=25): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) while timeout > 0: time.sleep(0.2) timeout -= 0.2 if p.poll() != None: return p.returncode if p.poll() == None: os.kill(p.pid, 9) killedpid, stat = os.waitpid(p.pid, os.WNOHANG) return p.returncode def extract_still(video, target, position): fdir = os.path.dirname(target) if fdir and not os.path.exists(fdir): os.makedirs(fdir) ''' #oxframe #this only works with theora and webm files!!! cmd = ['oxframe', '-p', str(position), '-i', video, '-o', frame] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) info, error = p.communicate() ''' ''' #ffmpeg #decodes all the wat to position, that takes to long cmd = ['ffmpeg', '-i', video, '-vframes', '1','-ss', str(position), '-f','image2', target] print cmd p = subprocess.Popen(cmd) p.wait() ''' ''' #VLC #hangs on mpg and some avi files with old divx3 video out = position + 0.2 framedir = tempfile.mkdtemp() vlc_path = 'vlc' for i in ("/Applications/VLC.app/Contents/MacOS/VLC", ): if os.path.exists(i): vlc_path = i cmd = [ vlc_path, '--vout=dummy', video, '--start-time=%s'%position, '--stop-time=%s'%out, '-I', 'dummy', '--video-filter=scene', '--scene-path=%s'%framedir, '--scene-format=png', '--scene-ratio=25', '--scene-prefix=still', '--swscale-mode=2', '--sout-transcode-vcodec=avcodec', '--noaudio', 'vlc://quit', ] #print cmd run_command(cmd) images = glob('%s/still*.png' % framedir) if images: shutil.move(images[0], target) shutil.rmtree(framedir) ''' #mplayer cwd = os.getcwd() target = os.path.abspath(target) framedir = tempfile.mkdtemp() os.chdir(framedir) cmd = ['mplayer', '-noautosub', video, '-ss', str(position), '-frames', '2', '-vo', 'png:z=9', '-ao', 'null'] r = run_command(cmd) images = glob('%s/*.png' % framedir) if images: shutil.move(images[-1], target) os.chdir(cwd) shutil.rmtree(framedir) return r == 0 def extract_video(video, target, profile, info): if not os.path.exists(target): fdir = os.path.dirname(target) if not os.path.exists(fdir): os.makedirs(fdir) dar = AspectRatio(info['video'][0]['display_aspect_ratio']) profile_cmd = [] ''' look into lag mb_static_threshold qmax/qmin rc_buf_aggressivity=0.95 token_partitions=4 level / speedlevel bt? ''' if profile == '720p': height = 720 width = int(dar * height) profile_cmd = ['-vb', '2M', '-g', '250'] if info['audio']: profile_cmd += ['-ar', '48000', '-aq', '5'] if profile == '480p': height = 480 width = int(dar * height) profile_cmd = ['-vb', '1400k', '-g', '250'] if info['audio']: profile_cmd += ['-ar', '44100', '-aq', '2'] if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 2: profile_cmd += ['-ac', '2'] elif profile == '360p': height = 360 width = int(dar * height) profile_cmd = ['-vb', '768k'] if info['audio']: profile_cmd += ['-ar', '44100', '-aq', '1'] if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 2: profile_cmd += ['-ac', '2'] else: height = 96 width = int(dar * height) profile_cmd = ['-vb', '96k', '-g', '50'] if info['audio']: profile_cmd += ['-ar', '22050', '-ac', '1', '-aq', '-1'] if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 1: profile_cmd += ['-ac', '1'] if info['audio']: profile_cmd +=['-acodec', 'libvorbis'] aspect = dar.ratio if abs(width/height - dar) < 0.02: aspect = '%s:%s' % (width, height) cmd = ['./ffmpeg', '-y', '-threads', '2', '-i', video ] + profile_cmd + [ '-s', '%dx%d'%(width, height), '-aspect', aspect, '-f','webm', target] print cmd #r = run_command(cmd, -1) p = subprocess.Popen(cmd, stdin=subprocess.PIPE) p.wait() r = p.returncode print "done" return r == 0 def video_frame_positions(duration): pos = duration / 2 #return [pos/4, pos/2, pos/2+pos/4, pos, pos+pos/2, pos+pos/2+pos/4] return map(int, [pos/2, pos, pos+pos/2]) class ExtractThread(Thread): def __init__(self, db): Thread.__init__(self) self.db = db def run(self): while True: oshash, name = self.db.extract.get() self.db.extract_derivative(oshash, name) self.db.extract.task_done() class Database(object): def __init__(self, conn): self.extract = Queue.Queue() for i in range(2): t = ExtractThread(self) t.setDaemon(True) t.start() self.db_conn = conn conn = self.conn() c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''') if int(self.get('version', 0)) < 1: self.set('version', 1) db = [ '''CREATE TABLE IF NOT EXISTS file ( archive varchar(1024), path varchar(1024) unique, folder varchar(1024), filename varchar(1024), oshash varchar(16), atime FLOAT, ctime FLOAT, mtime FLOAT, size INT, info TEXT, created INT, modified INT, deleted INT)''', '''CREATE INDEX IF NOT EXISTS archive_idx ON file (archive)''', '''CREATE INDEX IF NOT EXISTS path_idx ON file (path)''', '''CREATE INDEX IF NOT EXISTS oshash_idx ON file (oshash)''', '''CREATE TABLE IF NOT EXISTS archive ( site varchar(1024), name varchar(1024) unique, path varchar(1024) unique, updated INT, created INT, updating INT)''', '''CREATE TABLE IF NOT EXISTS derivative ( oshash varchar(16), name varchar(1024), status INT, UNIQUE(oshash, name))''', ] for i in db: c.execute(i) c.execute('UPDATE archive set updating=0 WHERE 1=1') conn.commit() def conn(self): conn = sqlite3.connect(self.db_conn, timeout=10) conn.text_factory = str return conn def get(self, key, default=None): conn = self.conn() c = conn.cursor() c.execute('SELECT value FROM setting WHERE key = ?', (key, )) for row in c: return row[0] return default def set(self, key, value): conn = self.conn() c = conn.cursor() c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value))) conn.commit() def remove(self, path): sql = 'DELETE FROM file WHERE path=?' conn = self.conn() c = conn.cursor() c.execute(sql, (path, )) #files def get_file(self, oshash): conn = self.conn() c = conn.cursor() f = {} sql = 'SELECT path, archive, folder, filename, info FROM file WHERE oshash=?' c.execute(sql, (oshash, )) for row in c: f['path'] = row[0] f['archive'] = row[1] f['folder'] = row[2] f['filename'] = row[3] f['info'] = json.loads(row[4]) break return f def files(self, since=None): conn = self.conn() c = conn.cursor() def get_files(files, key, sql, t=()): c.execute(sql, t) for row in c: archive = row[0] folder = row[1] filename = row[2] info = json.loads(row[3]) if not archive in files: files[archive]={} if key: if not key in files[archive]: files[archive][key]={} if not folder in files[archive][key]: files[archive][key][folder]={} files[archive][key][folder][filename] = info else: if not folder in files[archive]: files[archive][folder]={} files[archive][folder][filename] = info files = {} sql_prefix = 'SELECT archive, folder, filename, info FROM file WHERE ' sql_postfix = ' deleted < 0 ORDER BY path' if since: get_files(files, 'deleted', sql_prefix + 'deleted >= ? ORDER BY path' , (since, )) get_files(files, 'modified', sql_prefix + 'created < ? AND modified >= ? AND'+sql_postfix, (since, since)) get_files(files, 'new', sql_prefix + 'created >= ? AND'+sql_postfix, (since, )) else: get_files(files, None, sql_prefix + sql_postfix) return files #derivative def derivative(self, oshash, name, status=None): conn = self.conn() c = conn.cursor() d = {} d['oshash'] = oshash d['name'] = name d['status'] = status if status == None: sql = 'SELECT status FROM derivative WHERE oshash=? AND name=?' c.execute(sql, (oshash, name)) for row in c: d['status'] = row[0] if d['status'] == None: #this is a new derivative, add to db and add to enc queue return self.derivative(oshash, name, STATUS_NEW) else: print "insert or update derivative", oshash, name, status c.execute(u'INSERT OR REPLACE INTO derivative values (?, ?, ?)', (oshash, name, status)) conn.commit() prefix = hash_prefix(oshash) path_prefix = os.path.join(self.get('media_cache', 'media'), *prefix) d['path'] = os.path.join(path_prefix, name) d['location'] = '/'.join(['/media', ] + prefix + [name, ]) return d def derivatives(self, oshash, status=STATUS_AVAILABLE): conn = self.conn() c = conn.cursor() derivatives = [] sql = 'SELECT name FROM derivative WHERE status=? AND oshash=?' c.execute(sql, (status, oshash)) for row in c: derivatives.append(self.derivative(oshash, row[0])) return derivatives def extract_derivative(self, oshash, name): f = self.get_file(oshash) derivative = self.derivative(oshash, name) if derivative['status'] == STATUS_NEW: if name.endswith('.png'): for pos in video_frame_positions(f['info']['duration']): still_name = '%s.png' % pos still_d = self.derivative(oshash, still_name) if still_d['status'] == STATUS_NEW: self.derivative(oshash, still_name, STATUS_EXTRACTING) if extract_still(f['path'], still_d['path'], pos): self.derivative(oshash, still_name, STATUS_AVAILABLE) else: self.derivative(oshash, still_name, STATUS_FAILED) elif name.endswith('.webm'): profile = name[:-5] print 'now lets go, are we having fun?' self.derivative(oshash, name, STATUS_EXTRACTING) if extract_video(f['path'], derivative['path'], profile, f['info']): self.derivative(oshash, name, STATUS_AVAILABLE) else: self.derivative(oshash, name, STATUS_FAILED) #archive def update(self, archive, path, folder, filename): update = True modified = time.mktime(time.localtime()) created = modified sql = 'SELECT atime, ctime, mtime, size, created FROM file WHERE path=?' conn = self.conn() c = conn.cursor() c.execute(sql, (path, )) stat = os.stat(path) for row in c: if stat.st_atime == row[0] and stat.st_ctime == row[1] and stat.st_mtime == row[2] and stat.st_size == row[3]: created = row[4] update = False break if update: info = avinfo(path) for key in ('atime', 'ctime', 'mtime'): info[key] = getattr(stat, 'st_'+key) oshash = info['oshash'] deleted = -1 t = (archive, path, folder, filename, oshash, stat.st_atime, stat.st_ctime, stat.st_mtime, stat.st_size, json.dumps(info), created, modified, deleted) c.execute(u'INSERT OR REPLACE INTO file values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', t) conn.commit() def spider(self, archive): path = self.archives()[archive] path = os.path.normpath(path) for dirpath, dirnames, filenames in os.walk(path): if filenames: prefix = dirpath[len(path)+1:] for filename in filenames: if not filename.startswith('._') and not filename in ('.DS_Store', ): print dirpath, filename self.update(archive, os.path.join(dirpath, filename), prefix, filename) def add_archive(self, site, name, path): path = os.path.normpath(path) conn = self.conn() c = conn.cursor() created = time.mktime(time.localtime()) t = (site, name, path, created, created) c.execute(u'INSERT INTO archive values (?, ?, ?, ?, ?, 0)', t) conn.commit() def archives(self): conn = self.conn() c = conn.cursor() sql = 'SELECT name, path FROM archive ORDER BY name'; c.execute(sql) archives = {} for row in c: archives[row[0]] = row[1] return archives def update_archives(self): conn = self.conn() c = conn.cursor() c.execute('SELECT name FROM archive WHERE updating = 0 ORDER BY name'); for row in c: name = row[0] c.execute(u'UPDATE archive set updating=1 where name=?', (name, )) conn.commit() self.spider(name) updated = time.mktime(time.localtime()) c.execute(u'UPDATE archive set updated=?, updating=0 where name=?', (updated, name)) conn.commit() def remove_archive(self, name): conn = self.conn() c = conn.cursor() c.execute('DELETE FROM archive WHERE path=?', (path, )) c.execute('DELETE FROM file WHERE path LIKE(?%)', (path, )) conn.commit() #web def json_response(request, data): request.headers['Content-Type'] = 'text/javascript' return json.dumps(data, indent=2) class OxControl(Resource): _children = [] #isLeaf = True def __init__(self, db_path): self.db_path = db_path Resource.__init__(self) self.db = Database(self.db_path) self.putChild("media", File(self.db.get('media_cache', 'media'))) #FIXME: this is just for debugging if not 'Test' in self.db.archives(): self.db.add_archive('0xdb.org', 'Test', '/media/2010/Movies') def putChild(self, name, child): self._children.append(name) return Resource.putChild(self, name, child) def getChild(self, name, request): if name in self._children: return Resource.getChild(self, name, request) return self def render_GET(self, request): if request.path == '/files': """ /files optional ?since=unixtimestamp new/modified files by archive """ since = request.args.get("since", None) if since: since = float(since[0]) files = self.db.files(since) return json_response(request, files) if request.path == '/update': """ checks for new files in all known archives """ #update in another thread, this otherwise blocks web server thread.start_new_thread(self.db.update_archives,()) response = {'status': 'ok'} return json_response(request, response) if request.path == '/extract': """ extract derivatives from videos """ oshash = request.args.get("oshash", [None])[0] media = request.args.get("media", [None, ])[0] retry = request.args.get("retry", [None, ])[0] response = {'status': 'not enough data provided'} f = self.db.get_file(oshash) if not f: response = {'status': 'unkown oshash'} elif not 'duration' in f['info']: response = {'status': 'unkown format, can not extract data'} else: if media == 'stills': name = '%s.png'%video_frame_positions(f['info']['duration'])[0] elif media.endswith('.webm'): profile = media[:-5] if profile in VIDEO_PROFILES: name = media else: response = {'status': 'unsupported video profile requested'} if name: #get or create derivative derivative = self.db.derivative(oshash, name) if derivative['status'] == STATUS_FAILED and retry: derivative = self.db.derivative(oshash, name, STATUS_NEW) response['status'] = { STATUS_NEW: 'extracting', STATUS_EXTRACTING: 'extracting', STATUS_AVAILABLE: 'available', STATUS_FAILED: 'failed', }.get(derivative['status'], 'extracting') if derivative['status'] == STATUS_NEW: self.db.extract.put((oshash, name)) files = [f['location'] for f in self.db.derivatives(oshash)] if media == 'stills': response['stills'] = filter(lambda f: f.endswith('.png'), files) else: response['video'] = filter(lambda f: f.endswith('.webm'), files) return json_response(request, response) if request.path == '/get': """ get information about a file, including derivatives """ oshash = request.args.get("oshash", [None, ])[0] response = {'status': 'no oshash provided'} if oshash: f = self.db.get_file(oshash) response['status'] = 'available' response['info'] = f['info'] files = [f['location'] for f in self.db.derivatives(oshash)] response['video'] = filter(lambda f: f.endswith('.webm'), files) response['stills'] = filter(lambda f: f.endswith('.png'), files) return json_response(request, response) return "this is not for humans" if __name__ == '__main__': db = 'dev.sqlite' port = 2620 username = 'fix' password = 'me' interface = '127.0.0.1' interface = '10.26.20.10' interface = '0.0.0.0' print 'http://%s:%d/' % (interface, port) root = OxControl(db) checker = InMemoryUsernamePasswordDatabaseDontUse() checker.addUser(username, password) class PublicHTMLRealm(object): implements(IRealm) def requestAvatar(self, avatarId, mind, *interfaces): if IResource in interfaces: return (IResource, root, lambda: None) raise NotImplementedError() portal = Portal(PublicHTMLRealm(), [checker]) credentialFactory = DigestCredentialFactory("md5", "oxbackend") resource = HTTPAuthSessionWrapper(portal, [credentialFactory]) site = server.Site(resource) reactor.listenTCP(port, site, interface=interface) reactor.run()