# -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 # GPL 2010 from __future__ import division, with_statement import fractions from glob import glob import json import os import re import Queue import sqlite3 import subprocess import sys import shutil import tempfile import time import thread from threading import Thread from twisted.cred.portal import IRealm, Portal from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse from twisted.internet import task, reactor from twisted.web import server from twisted.web.guard import HTTPAuthSessionWrapper, DigestCredentialFactory from twisted.web.resource import Resource, IResource from twisted.web.static import File from zope.interface import implements STATUS_NEW=0 STATUS_EXTRACTING=1 STATUS_AVAILABLE=2 STATUS_FAILED=3 VIDEO_PROFILES = [ '720p', '480p', '360p', '96p', ] class AspectRatio(fractions.Fraction): def __new__(cls, numerator, denominator=None): if not denominator: ratio = map(int, numerator.split(':')) if len(ratio) == 1: ratio.append(1) numerator = ratio[0] denominator = ratio[1] #if its close enough to the common aspect ratios rather use that if abs(numerator/denominator - 4/3) < 0.03: numerator = 4 denominator = 3 elif abs(numerator/denominator - 16/9) < 0.02: numerator = 16 denominator = 9 return super(AspectRatio, cls).__new__(cls, numerator, denominator) @property def ratio(self): return "%d:%d" % (self.numerator, self.denominator) def avinfo(filename): if os.path.getsize(filename): p = subprocess.Popen(['ffmpeg2theora', '--info', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE) info, error = p.communicate() #remove metadata, can be broken reg = re.compile('"metadata": {.*?},', re.DOTALL) info = re.sub(reg, '', info) try: info = json.loads(info.decode('utf-8', 'replace')) except: print info, error if 'video' in info and info['video']: if not 'display_aspect_ratio' in info['video'][0]: dar = AspectRatio(info['video'][0]['width'], info['video'][0]['height']) info['video'][0]['display_aspect_ratio'] = dar.ratio del info['path'] if os.path.splitext(filename)[-1] in ('.srt', '.sub', '.idx', '.rar') and 'error' in info: del info['error'] if 'code' in info and info['code'] == 'badfile': del info['code'] return info return {'path': filename, 'size': 0} def hash_prefix(h): return [h[:2], h[2:4], h[4:6], h[6:]] def extract_all_stills(): db = Database('dev.sqlite') conn = db.conn() c = conn.cursor() sql = 'SELECT path, oshash, info FROM file' c.execute(sql) for row in c: video = row[0] oshash = row[1] info = json.loads(row[2]) if not 'Extras/' in video and 'video' in info and info['video']: prefix = os.path.join('media', os.path.join(*hash_prefix(oshash))) print video extract_stills(video, prefix, info) def run_command(cmd, timeout=25): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) while timeout > 0: time.sleep(0.2) timeout -= 0.2 if p.poll() != None: return p.returncode if p.poll() == None: os.kill(p.pid, 9) killedpid, stat = os.waitpid(p.pid, os.WNOHANG) return p.returncode def extract_still(video, target, position): fdir = os.path.dirname(target) if fdir and not os.path.exists(fdir): os.makedirs(fdir) ''' #oxframe #this only works with theora and webm files!!! cmd = ['oxframe', '-p', str(position), '-i', video, '-o', frame] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) info, error = p.communicate() ''' ''' #ffmpeg #decodes all the wat to position, that takes to long cmd = ['ffmpeg', '-i', video, '-vframes', '1','-ss', str(position), '-f','image2', target] print cmd p = subprocess.Popen(cmd) p.wait() ''' ''' #VLC #hangs on mpg and some avi files with old divx3 video out = position + 0.2 framedir = tempfile.mkdtemp() vlc_path = 'vlc' for i in ("/Applications/VLC.app/Contents/MacOS/VLC", ): if os.path.exists(i): vlc_path = i cmd = [ vlc_path, '--vout=dummy', video, '--start-time=%s'%position, '--stop-time=%s'%out, '-I', 'dummy', '--video-filter=scene', '--scene-path=%s'%framedir, '--scene-format=png', '--scene-ratio=25', '--scene-prefix=still', '--swscale-mode=2', '--sout-transcode-vcodec=avcodec', '--noaudio', 'vlc://quit', ] #print cmd run_command(cmd) images = glob('%s/still*.png' % framedir) if images: shutil.move(images[0], target) shutil.rmtree(framedir) ''' #mplayer cwd = os.getcwd() target = os.path.abspath(target) framedir = tempfile.mkdtemp() os.chdir(framedir) cmd = ['mplayer', '-noautosub', video, '-ss', str(position), '-frames', '2', '-vo', 'png:z=9', '-ao', 'null'] r = run_command(cmd) images = glob('%s/*.png' % framedir) if images: shutil.move(images[-1], target) os.chdir(cwd) shutil.rmtree(framedir) return r == 0 def extract_video(video, target, profile, info): if not os.path.exists(target): fdir = os.path.dirname(target) if not os.path.exists(fdir): os.makedirs(fdir) dar = AspectRatio(info['video'][0]['display_aspect_ratio']) profile_cmd = [] ''' look into lag mb_static_threshold qmax/qmin rc_buf_aggressivity=0.95 token_partitions=4 level / speedlevel bt? ''' if profile == '720p': height = 720 width = int(dar * height) profile_cmd = ['-vb', '2M', '-g', '250'] if info['audio']: profile_cmd += ['-ar', '48000', '-aq', '5'] if profile == '480p': height = 480 width = int(dar * height) profile_cmd = ['-vb', '1400k', '-g', '250'] if info['audio']: profile_cmd += ['-ar', '44100', '-aq', '2'] if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 2: profile_cmd += ['-ac', '2'] elif profile == '360p': height = 360 width = int(dar * height) profile_cmd = ['-vb', '768k'] if info['audio']: profile_cmd += ['-ar', '44100', '-aq', '1'] if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 2: profile_cmd += ['-ac', '2'] else: height = 96 width = int(dar * height) profile_cmd = ['-vb', '96k', '-g', '50'] if info['audio']: profile_cmd += ['-ar', '22050', '-ac', '1', '-aq', '-1'] if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 1: profile_cmd += ['-ac', '1'] if info['audio']: profile_cmd +=['-acodec', 'libvorbis'] aspect = dar.ratio #use 1:1 pixel aspect ratio if dar is close to that if abs(width/height - dar) < 0.02: aspect = '%s:%s' % (width, height) cmd = ['./ffmpeg', '-y', '-threads', '2', '-i', video ] + profile_cmd + [ '-s', '%dx%d'%(width, height), '-aspect', aspect, '-f','webm', target] print cmd #r = run_command(cmd, -1) p = subprocess.Popen(cmd, stdin=subprocess.PIPE) p.wait() r = p.returncode print "done" return r == 0 def video_frame_positions(duration): pos = duration / 2 #return [pos/4, pos/2, pos/2+pos/4, pos, pos+pos/2, pos+pos/2+pos/4] return map(int, [pos/2, pos, pos+pos/2]) class ExtractThread(Thread): def __init__(self, db): Thread.__init__(self) self.db = db def run(self): while True: oshash, name = self.db.extract.get() self.db.extract_derivative(oshash, name) self.db.extract.task_done() class Database(object): def __init__(self, db_conn): self.extract = Queue.Queue() for i in range(2): t = ExtractThread(self) t.setDaemon(True) t.start() self.db_conn = db_conn conn, c = self.conn() c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''') if int(self.get('version', 0)) < 1: self.set('version', 1) db = [ '''CREATE TABLE IF NOT EXISTS file ( path varchar(1024) unique, folder varchar(1024), filename varchar(1024), oshash varchar(16), atime FLOAT, ctime FLOAT, mtime FLOAT, size INT, info TEXT, created INT, modified INT, deleted INT)''', '''CREATE INDEX IF NOT EXISTS path_idx ON file (path)''', '''CREATE INDEX IF NOT EXISTS oshash_idx ON file (oshash)''', '''CREATE TABLE IF NOT EXISTS archive ( site varchar(1024), name varchar(1024), path varchar(1024), updated INT, created INT, updating INT, UNIQUE(site, name)))''', '''CREATE TABLE IF NOT EXISTS derivative ( oshash varchar(16), name varchar(1024), status INT, UNIQUE(oshash, name))''', ] for i in db: c.execute(i) c.execute('UPDATE archive set updating=0 WHERE updating!=0') conn.commit() def conn(self): conn = sqlite3.connect(self.db_conn, timeout=10) conn.text_factory = sqlite3.OptimizedUnicode return conn, conn.cursor() def get(self, key, default=None): conn, c = self.conn() c.execute('SELECT value FROM setting WHERE key = ?', (key, )) for row in c: return row[0] return default def set(self, key, value): conn, c = self.conn() c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value))) conn.commit() def remove_file(self, path): conn, c = self.conn() sql = 'DELETE FROM file WHERE path=?' c.execute(sql, (path, )) conn.commit() #files def file(self, oshash): conn, c = self.conn() f = {} sql = 'SELECT path, folder, filename, info FROM file WHERE oshash=?' c.execute(sql, (oshash, )) for row in c: f['path'] = row[0] f['folder'] = row[1] f['filename'] = row[2] f['info'] = json.loads(row[3]) break return f def files(self, site, archive, since=None): conn, c = self.conn() c.execute('SELECT path from archive where name=? AND site=?', (archive, site)) prefix = None for row in c: prefix = row[0] if not prefix: return {} def get_files(files, key, sql, t=()): t = list(t) + [u"%s%%"%prefix] c.execute(sql, t) for row in c: folder = row[0] filename = row[1] info = json.loads(row[2]) if key: if not key in files: files[key]={} if not folder in files[key]: files[key][folder]={} files[key][folder][filename] = info else: if not folder in files: files[folder]={} files[folder][filename] = info files = {} sql_prefix = 'SELECT folder, filename, info FROM file WHERE ' sql_postfix = ' deleted < 0 AND path LIKE ? ORDER BY path' if since: get_files(files, 'deleted', sql_prefix + 'deleted >= ? ORDER BY path' , (since, )) get_files(files, 'modified', sql_prefix + 'created < ? AND modified >= ? AND'+sql_postfix, (since, since)) get_files(files, 'new', sql_prefix + 'created >= ? AND'+sql_postfix, (since, )) else: get_files(files, None, sql_prefix + sql_postfix) return files #derivative def derivative(self, oshash, name, status=None): conn, c = self.conn() d = {} d['oshash'] = oshash d['name'] = name d['status'] = status if status == None: sql = 'SELECT status FROM derivative WHERE oshash=? AND name=?' c.execute(sql, (oshash, name)) for row in c: d['status'] = row[0] if d['status'] == None: #this is a new derivative, add to db and add to enc queue return self.derivative(oshash, name, STATUS_NEW) else: print "insert or update derivative", oshash, name, status c.execute(u'INSERT OR REPLACE INTO derivative values (?, ?, ?)', (oshash, name, status)) conn.commit() prefix = hash_prefix(oshash) path_prefix = os.path.join(self.get('media_cache', 'media'), *prefix) d['path'] = os.path.join(path_prefix, name) d['location'] = '/'.join(['/media', ] + prefix + [name, ]) return d def derivatives(self, oshash, status=STATUS_AVAILABLE): conn, c = self.conn() derivatives = [] sql = 'SELECT name FROM derivative WHERE status=? AND oshash=?' c.execute(sql, (status, oshash)) for row in c: derivatives.append(self.derivative(oshash, row[0])) return derivatives def extract_derivative(self, oshash, name): f = self.file(oshash) derivative = self.derivative(oshash, name) if derivative['status'] == STATUS_NEW: if name.endswith('.png'): for pos in video_frame_positions(f['info']['duration']): still_name = '%s.png' % pos still_d = self.derivative(oshash, still_name) if still_d['status'] == STATUS_NEW: self.derivative(oshash, still_name, STATUS_EXTRACTING) if extract_still(f['path'], still_d['path'], pos): self.derivative(oshash, still_name, STATUS_AVAILABLE) else: self.derivative(oshash, still_name, STATUS_FAILED) elif name.endswith('.webm'): profile = name[:-5] print 'now lets go, are we having fun?' self.derivative(oshash, name, STATUS_EXTRACTING) if extract_video(f['path'], derivative['path'], profile, f['info']): self.derivative(oshash, name, STATUS_AVAILABLE) else: self.derivative(oshash, name, STATUS_FAILED) #archive def update(self, path, folder, filename): conn, c = self.conn() update = True modified = time.mktime(time.localtime()) created = modified sql = 'SELECT atime, ctime, mtime, size, created FROM file WHERE deleted < 0 AND path=?' c.execute(sql, [path]) stat = os.stat(path) for row in c: if stat.st_atime == row[0] and stat.st_ctime == row[1] and stat.st_mtime == row[2] and stat.st_size == row[3]: created = row[4] update = False break if update: info = avinfo(path) for key in ('atime', 'ctime', 'mtime'): info[key] = getattr(stat, 'st_'+key) oshash = info['oshash'] deleted = -1 t = (path, folder, filename, oshash, stat.st_atime, stat.st_ctime, stat.st_mtime, stat.st_size, json.dumps(info), created, modified, deleted) c.execute(u'INSERT OR REPLACE INTO file values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', t) conn.commit() def spider(self, path): path = os.path.normpath(path) files = [] for dirpath, dirnames, filenames in os.walk(path): if isinstance(dirpath, str): dirpath = dirpath.decode('utf-8') if filenames: prefix = dirpath[len(path)+1:] for filename in filenames: if isinstance(filename, str): filename = filename.decode('utf-8') if not filename.startswith('._') and not filename in ('.DS_Store', ): file_path = os.path.join(dirpath, filename) files.append(file_path) self.update(file_path, prefix, filename) conn, c = self.conn() c.execute('SELECT path FROM file WHERE path LIKE ? AND deleted < 0', ["%s%%"%path]) known_files = [r[0] for r in c.fetchall()] deleted_files = filter(lambda f: f not in files, known_files) ''' print 'known' print json.dumps(known_files, indent=2) print 'spidered' print json.dumps(files, indent=2) ''' print 'now delete' print json.dumps(deleted_files, indent=2) if deleted_files: deleted = time.mktime(time.localtime()) for f in deleted_files: c.execute('UPDATE file SET deleted=? WHERE path=?', (deleted, f)) conn.commit() def add_archive(self, site, name, path): conn, c = self.conn() path = os.path.normpath(path) created = time.mktime(time.localtime()) t = (site, name, path, created, created) #FIXME: check if site/name exists or deal with error here c.execute(u'INSERT INTO archive values (?, ?, ?, ?, ?, 0)', t) conn.commit() def archives(self, site): conn, c = self.conn() sql = 'SELECT name, path FROM archive WHERE site=? ORDER BY name'; c.execute(sql, [site]) archives = {} for row in c: archives[row[0]] = row[1] return archives def update_archives(self): conn, c = self.conn() c.execute('SELECT path FROM archive WHERE updating = 0 GROUP BY path ORDER BY path') paths = [r[0] for r in c.fetchall()] def not_subpath(path): for p in paths: if p != path and path.startswith(p): return False return True paths = filter(not_subpath, paths) for path in paths: c.execute(u'UPDATE archive SET updating=1 WHERE path LIKE ?', ['%s%%'%path]) conn.commit() self.spider(path) updated = time.mktime(time.localtime()) c.execute(u'UPDATE archive SET updated=?, updating=0 WHERE path LIKE ?', (updated, '%s%%'%path)) conn.commit() def remove_archive(self, site, name): conn, c = self.conn() c.execute('DELETE FROM archive WHERE site=? AND name=?', [site, name]) #fixme, files could be still used by subarchive #c.execute('DELETE FROM file WHERE path LIKE ?', ["%s%%"%path]) conn.commit() #web def json_response(request, data): request.headers['Content-Type'] = 'text/javascript' return json.dumps(data, indent=2) class OxControl(Resource): _children = [] #isLeaf = True def __init__(self, db_path): self.db_path = db_path Resource.__init__(self) self.db = Database(self.db_path) self.putChild("media", File(self.db.get('media_cache', 'media'))) #FIXME: this is just for debugging if not 'Test' in self.db.archives('0xdb.org'): self.db.add_archive('0xdb.org', 'Test', '/media/2010/Movies') def putChild(self, name, child): self._children.append(name) return Resource.putChild(self, name, child) def getChild(self, name, request): if name in self._children: return Resource.getChild(self, name, request) return self def render_GET(self, request): if request.path == '/add_archive': args = {} for arg in ('site', 'name', 'path'): args[arg] = request.args.get(arg)[0] self.db.add_archive(**arg) response = {'status': 'ok'} return json_response(request, response) if request.path == '/remove_archive': args = {} for arg in ('site', 'name'): args[arg] = request.args.get(arg)[0] self.db.remove_archive(**arg) response = {'status': 'ok'} return json_response(request, response) if request.path == '/archives': args = {} for arg in ['site']: args[arg] = request.args.get(arg)[0] response = {} response['archives'] = self.db.archives(**args) return json_response(request, response) if request.path == '/files': """ /files archive archive name site site name since (optional) timestamp, return changes since files in archive """ args = {} for arg in ['site', 'archive']: args[arg] = request.args[arg][0] since = request.args.get("since", [None])[0] if since: args['since'] = float(since) files = self.db.files(**args) return json_response(request, files) if request.path == '/update': """ checks for new files in all known archives """ #update in another thread, this otherwise blocks web server thread.start_new_thread(self.db.update_archives,()) response = {'status': 'ok'} return json_response(request, response) if request.path == '/extract': """ extract derivatives from videos """ oshash = request.args.get("oshash", [None])[0] media = request.args.get("media", [None])[0] retry = request.args.get("retry", [None])[0] response = {'status': 'not enough data provided'} f = self.db.file(oshash) if not f: response = {'status': 'unkown oshash'} elif not 'duration' in f['info']: response = {'status': 'unkown format, can not extract data'} else: if media == 'stills': name = '%s.png'%video_frame_positions(f['info']['duration'])[0] elif media.endswith('.webm'): profile = media[:-5] if profile in VIDEO_PROFILES: name = media else: response = {'status': 'unsupported video profile requested'} if name: #get or create derivative derivative = self.db.derivative(oshash, name) if derivative['status'] == STATUS_FAILED and retry: derivative = self.db.derivative(oshash, name, STATUS_NEW) response['status'] = { STATUS_NEW: 'extracting', STATUS_EXTRACTING: 'extracting', STATUS_AVAILABLE: 'available', STATUS_FAILED: 'failed', }.get(derivative['status'], 'extracting') if derivative['status'] == STATUS_NEW: self.db.extract.put((oshash, name)) files = [f['location'] for f in self.db.derivatives(oshash)] if media == 'stills': response['stills'] = filter(lambda f: f.endswith('.png'), files) else: response['video'] = filter(lambda f: f.endswith('.webm'), files) return json_response(request, response) if request.path == '/get': """ get information about a file, including derivatives oshash - oshash of file """ oshash = request.args.get("oshash", [None, ])[0] response = {'status': 'no oshash provided'} if oshash: f = self.db.file(oshash) response['status'] = 'available' response['info'] = f['info'] files = [f['location'] for f in self.db.derivatives(oshash)] response['video'] = filter(lambda f: f.endswith('.webm'), files) response['stills'] = filter(lambda f: f.endswith('.png'), files) return json_response(request, response) return "this is not for humans" if __name__ == '__main__': db = 'dev.sqlite' port = 2620 username = 'fix' password = 'me' interface = '127.0.0.1' interface = '10.26.20.10' interface = '0.0.0.0' print 'http://%s:%d/' % (interface, port) root = OxControl(db) checker = InMemoryUsernamePasswordDatabaseDontUse() checker.addUser(username, password) class PublicHTMLRealm(object): implements(IRealm) def requestAvatar(self, avatarId, mind, *interfaces): if IResource in interfaces: return (IResource, root, lambda: None) raise NotImplementedError() portal = Portal(PublicHTMLRealm(), [checker]) credentialFactory = DigestCredentialFactory("md5", "oxbackend") resource = HTTPAuthSessionWrapper(portal, [credentialFactory]) site = server.Site(resource) reactor.listenTCP(port, site, interface=interface) reactor.run()