oxd/oxd.py

731 lines
26 KiB
Python

# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2010
from __future__ import division, with_statement
import fractions
from glob import glob
import json
import os
import re
import Queue
import sqlite3
import subprocess
import sys
import shutil
import tempfile
import time
import thread
from threading import Thread
from twisted.cred.portal import IRealm, Portal
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
from twisted.internet import task, reactor
from twisted.web import server
from twisted.web.guard import HTTPAuthSessionWrapper, DigestCredentialFactory
from twisted.web.resource import Resource, IResource
from twisted.web.static import File
from zope.interface import implements
STATUS_NEW=0
STATUS_EXTRACTING=1
STATUS_AVAILABLE=2
STATUS_FAILED=3
VIDEO_PROFILES = [
'720p',
'480p',
'360p',
'96p',
]
class AspectRatio(fractions.Fraction):
def __new__(cls, numerator, denominator=None):
if not denominator:
ratio = map(int, numerator.split(':'))
if len(ratio) == 1: ratio.append(1)
numerator = ratio[0]
denominator = ratio[1]
#if its close enough to the common aspect ratios rather use that
if abs(numerator/denominator - 4/3) < 0.03:
numerator = 4
denominator = 3
elif abs(numerator/denominator - 16/9) < 0.02:
numerator = 16
denominator = 9
return super(AspectRatio, cls).__new__(cls, numerator, denominator)
@property
def ratio(self):
return "%d:%d" % (self.numerator, self.denominator)
def avinfo(filename):
if os.path.getsize(filename):
p = subprocess.Popen(['ffmpeg2theora', '--info', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
info, error = p.communicate()
#remove metadata, can be broken
reg = re.compile('"metadata": {.*?},', re.DOTALL)
info = re.sub(reg, '', info)
try:
info = json.loads(info.decode('utf-8', 'replace'))
except:
print info, error
if 'video' in info and info['video']:
if not 'display_aspect_ratio' in info['video'][0]:
dar = AspectRatio(info['video'][0]['width'], info['video'][0]['height'])
info['video'][0]['display_aspect_ratio'] = dar.ratio
del info['path']
if os.path.splitext(filename)[-1] in ('.srt', '.sub', '.idx', '.rar') and 'error' in info:
del info['error']
if 'code' in info and info['code'] == 'badfile':
del info['code']
return info
return {'path': filename, 'size': 0}
def hash_prefix(h):
return [h[:2], h[2:4], h[4:6], h[6:]]
def extract_all_stills():
db = Database('dev.sqlite')
conn = db.conn()
c = conn.cursor()
sql = 'SELECT path, oshash, info FROM file'
c.execute(sql)
for row in c:
video = row[0]
oshash = row[1]
info = json.loads(row[2])
if not 'Extras/' in video and 'video' in info and info['video']:
prefix = os.path.join('media', os.path.join(*hash_prefix(oshash)))
print video
extract_stills(video, prefix, info)
def run_command(cmd, timeout=25):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
while timeout > 0:
time.sleep(0.2)
timeout -= 0.2
if p.poll() != None:
return p.returncode
if p.poll() == None:
os.kill(p.pid, 9)
killedpid, stat = os.waitpid(p.pid, os.WNOHANG)
return p.returncode
def extract_still(video, target, position):
fdir = os.path.dirname(target)
if fdir and not os.path.exists(fdir):
os.makedirs(fdir)
'''
#oxframe
#this only works with theora and webm files!!!
cmd = ['oxframe', '-p', str(position), '-i', video, '-o', frame]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
info, error = p.communicate()
'''
'''
#ffmpeg
#decodes all the wat to position, that takes to long
cmd = ['ffmpeg', '-i', video, '-vframes', '1','-ss', str(position), '-f','image2', target]
print cmd
p = subprocess.Popen(cmd)
p.wait()
'''
'''
#VLC
#hangs on mpg and some avi files with old divx3 video
out = position + 0.2
framedir = tempfile.mkdtemp()
vlc_path = 'vlc'
for i in ("/Applications/VLC.app/Contents/MacOS/VLC", ):
if os.path.exists(i):
vlc_path = i
cmd = [
vlc_path, '--vout=dummy', video, '--start-time=%s'%position, '--stop-time=%s'%out,
'-I', 'dummy', '--video-filter=scene', '--scene-path=%s'%framedir,
'--scene-format=png', '--scene-ratio=25', '--scene-prefix=still', '--swscale-mode=2',
'--sout-transcode-vcodec=avcodec', '--noaudio', 'vlc://quit',
]
#print cmd
run_command(cmd)
images = glob('%s/still*.png' % framedir)
if images:
shutil.move(images[0], target)
shutil.rmtree(framedir)
'''
#mplayer
cwd = os.getcwd()
target = os.path.abspath(target)
framedir = tempfile.mkdtemp()
os.chdir(framedir)
cmd = ['mplayer', '-noautosub', video, '-ss', str(position), '-frames', '2', '-vo', 'png:z=9', '-ao', 'null']
r = run_command(cmd)
images = glob('%s/*.png' % framedir)
if images:
shutil.move(images[-1], target)
os.chdir(cwd)
shutil.rmtree(framedir)
return r == 0
def extract_video(video, target, profile, info):
if not os.path.exists(target):
fdir = os.path.dirname(target)
if not os.path.exists(fdir):
os.makedirs(fdir)
dar = AspectRatio(info['video'][0]['display_aspect_ratio'])
profile_cmd = []
'''
look into
lag
mb_static_threshold
qmax/qmin
rc_buf_aggressivity=0.95
token_partitions=4
level / speedlevel
bt?
'''
if profile == '720p':
height = 720
width = int(dar * height)
profile_cmd = ['-vb', '2M', '-g', '250']
if info['audio']:
profile_cmd += ['-ar', '48000', '-aq', '5']
if profile == '480p':
height = 480
width = int(dar * height)
profile_cmd = ['-vb', '1400k', '-g', '250']
if info['audio']:
profile_cmd += ['-ar', '44100', '-aq', '2']
if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 2:
profile_cmd += ['-ac', '2']
elif profile == '360p':
height = 360
width = int(dar * height)
profile_cmd = ['-vb', '768k']
if info['audio']:
profile_cmd += ['-ar', '44100', '-aq', '1']
if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 2:
profile_cmd += ['-ac', '2']
else:
height = 96
width = int(dar * height)
profile_cmd = ['-vb', '96k', '-g', '50']
if info['audio']:
profile_cmd += ['-ar', '22050', '-ac', '1', '-aq', '-1']
if 'channels' in info['audio'][0] and info['audio'][0]['channels'] > 1:
profile_cmd += ['-ac', '1']
if info['audio']:
profile_cmd +=['-acodec', 'libvorbis']
aspect = dar.ratio
#use 1:1 pixel aspect ratio if dar is close to that
if abs(width/height - dar) < 0.02:
aspect = '%s:%s' % (width, height)
cmd = ['./ffmpeg', '-y', '-threads', '2',
'-i', video
] + profile_cmd + [
'-s', '%dx%d'%(width, height),
'-aspect', aspect,
'-f','webm',
target]
print cmd
#r = run_command(cmd, -1)
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
p.wait()
r = p.returncode
print "done"
return r == 0
def video_frame_positions(duration):
pos = duration / 2
#return [pos/4, pos/2, pos/2+pos/4, pos, pos+pos/2, pos+pos/2+pos/4]
return map(int, [pos/2, pos, pos+pos/2])
class ExtractThread(Thread):
def __init__(self, db):
Thread.__init__(self)
self.db = db
def run(self):
while True:
oshash, name = self.db.extract.get()
self.db.extract_derivative(oshash, name)
self.db.extract.task_done()
class Database(object):
def __init__(self, db_conn):
self.extract = Queue.Queue()
for i in range(2):
t = ExtractThread(self)
t.setDaemon(True)
t.start()
self.db_conn = db_conn
conn, c = self.conn()
c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''')
if int(self.get('version', 0)) < 1:
self.set('version', 1)
db = [
'''CREATE TABLE IF NOT EXISTS file (
path varchar(1024) unique,
folder varchar(1024),
filename varchar(1024),
oshash varchar(16),
atime FLOAT,
ctime FLOAT,
mtime FLOAT,
size INT,
info TEXT,
created INT,
modified INT,
deleted INT)''',
'''CREATE INDEX IF NOT EXISTS path_idx ON file (path)''',
'''CREATE INDEX IF NOT EXISTS oshash_idx ON file (oshash)''',
'''CREATE TABLE IF NOT EXISTS archive (
site varchar(1024),
name varchar(1024),
path varchar(1024),
updated INT,
created INT,
updating INT,
UNIQUE(site, name)))''',
'''CREATE TABLE IF NOT EXISTS derivative (
oshash varchar(16),
name varchar(1024),
status INT,
UNIQUE(oshash, name))''',
]
for i in db:
c.execute(i)
c.execute('UPDATE archive set updating=0 WHERE updating!=0')
conn.commit()
def conn(self):
conn = sqlite3.connect(self.db_conn, timeout=10)
conn.text_factory = sqlite3.OptimizedUnicode
return conn, conn.cursor()
def get(self, key, default=None):
conn, c = self.conn()
c.execute('SELECT value FROM setting WHERE key = ?', (key, ))
for row in c:
return row[0]
return default
def set(self, key, value):
conn, c = self.conn()
c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value)))
conn.commit()
def remove_file(self, path):
conn, c = self.conn()
sql = 'DELETE FROM file WHERE path=?'
c.execute(sql, (path, ))
conn.commit()
#files
def file(self, oshash):
conn, c = self.conn()
f = {}
sql = 'SELECT path, folder, filename, info FROM file WHERE oshash=?'
c.execute(sql, (oshash, ))
for row in c:
f['path'] = row[0]
f['folder'] = row[1]
f['filename'] = row[2]
f['info'] = json.loads(row[3])
break
return f
def files(self, site, archive, since=None):
conn, c = self.conn()
c.execute('SELECT path from archive where name=? AND site=?', (archive, site))
prefix = None
for row in c:
prefix = row[0]
if not prefix:
return {}
def get_files(files, key, sql, t=()):
t = list(t) + [u"%s%%"%prefix]
c.execute(sql, t)
for row in c:
folder = row[0]
filename = row[1]
info = json.loads(row[2])
if key:
if not key in files: files[key]={}
if not folder in files[key]: files[key][folder]={}
files[key][folder][filename] = info
else:
if not folder in files: files[folder]={}
files[folder][filename] = info
files = {}
sql_prefix = 'SELECT folder, filename, info FROM file WHERE '
sql_postfix = ' deleted < 0 AND path LIKE ? ORDER BY path'
if since:
get_files(files, 'deleted', sql_prefix + 'deleted >= ? ORDER BY path' , (since, ))
get_files(files, 'modified',
sql_prefix + 'created < ? AND modified >= ? AND'+sql_postfix,
(since, since))
get_files(files, 'new', sql_prefix + 'created >= ? AND'+sql_postfix, (since, ))
else:
get_files(files, None, sql_prefix + sql_postfix)
return files
#derivative
def derivative(self, oshash, name, status=None):
conn, c = self.conn()
d = {}
d['oshash'] = oshash
d['name'] = name
d['status'] = status
if status == None:
sql = 'SELECT status FROM derivative WHERE oshash=? AND name=?'
c.execute(sql, (oshash, name))
for row in c:
d['status'] = row[0]
if d['status'] == None:
#this is a new derivative, add to db and add to enc queue
return self.derivative(oshash, name, STATUS_NEW)
else:
print "insert or update derivative", oshash, name, status
c.execute(u'INSERT OR REPLACE INTO derivative values (?, ?, ?)', (oshash, name, status))
conn.commit()
prefix = hash_prefix(oshash)
path_prefix = os.path.join(self.get('media_cache', 'media'), *prefix)
d['path'] = os.path.join(path_prefix, name)
d['location'] = '/'.join(['/media', ] + prefix + [name, ])
return d
def derivatives(self, oshash, status=STATUS_AVAILABLE):
conn, c = self.conn()
derivatives = []
sql = 'SELECT name FROM derivative WHERE status=? AND oshash=?'
c.execute(sql, (status, oshash))
for row in c:
derivatives.append(self.derivative(oshash, row[0]))
return derivatives
def extract_derivative(self, oshash, name):
f = self.file(oshash)
derivative = self.derivative(oshash, name)
if derivative['status'] == STATUS_NEW:
if name.endswith('.png'):
for pos in video_frame_positions(f['info']['duration']):
still_name = '%s.png' % pos
still_d = self.derivative(oshash, still_name)
if still_d['status'] == STATUS_NEW:
self.derivative(oshash, still_name, STATUS_EXTRACTING)
if extract_still(f['path'], still_d['path'], pos):
self.derivative(oshash, still_name, STATUS_AVAILABLE)
else:
self.derivative(oshash, still_name, STATUS_FAILED)
elif name.endswith('.webm'):
profile = name[:-5]
print 'now lets go, are we having fun?'
self.derivative(oshash, name, STATUS_EXTRACTING)
if extract_video(f['path'], derivative['path'], profile, f['info']):
self.derivative(oshash, name, STATUS_AVAILABLE)
else:
self.derivative(oshash, name, STATUS_FAILED)
#archive
def update(self, path, folder, filename):
conn, c = self.conn()
update = True
modified = time.mktime(time.localtime())
created = modified
sql = 'SELECT atime, ctime, mtime, size, created FROM file WHERE deleted < 0 AND path=?'
c.execute(sql, [path])
stat = os.stat(path)
for row in c:
if stat.st_atime == row[0] and stat.st_ctime == row[1] and stat.st_mtime == row[2] and stat.st_size == row[3]:
created = row[4]
update = False
break
if update:
info = avinfo(path)
for key in ('atime', 'ctime', 'mtime'):
info[key] = getattr(stat, 'st_'+key)
oshash = info['oshash']
deleted = -1
t = (path, folder, filename, oshash, stat.st_atime, stat.st_ctime, stat.st_mtime,
stat.st_size, json.dumps(info), created, modified, deleted)
c.execute(u'INSERT OR REPLACE INTO file values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', t)
conn.commit()
def spider(self, path):
path = os.path.normpath(path)
files = []
for dirpath, dirnames, filenames in os.walk(path):
if isinstance(dirpath, str):
dirpath = dirpath.decode('utf-8')
if filenames:
prefix = dirpath[len(path)+1:]
for filename in filenames:
if isinstance(filename, str):
filename = filename.decode('utf-8')
if not filename.startswith('._') and not filename in ('.DS_Store', ):
file_path = os.path.join(dirpath, filename)
files.append(file_path)
self.update(file_path, prefix, filename)
conn, c = self.conn()
c.execute('SELECT path FROM file WHERE path LIKE ? AND deleted < 0', ["%s%%"%path])
known_files = [r[0] for r in c.fetchall()]
deleted_files = filter(lambda f: f not in files, known_files)
'''
print 'known'
print json.dumps(known_files, indent=2)
print 'spidered'
print json.dumps(files, indent=2)
'''
print 'now delete'
print json.dumps(deleted_files, indent=2)
if deleted_files:
deleted = time.mktime(time.localtime())
for f in deleted_files:
c.execute('UPDATE file SET deleted=? WHERE path=?', (deleted, f))
conn.commit()
def add_archive(self, site, name, path):
conn, c = self.conn()
path = os.path.normpath(path)
created = time.mktime(time.localtime())
t = (site, name, path, created, created)
#FIXME: check if site/name exists or deal with error here
c.execute(u'INSERT INTO archive values (?, ?, ?, ?, ?, 0)', t)
conn.commit()
def archives(self, site):
conn, c = self.conn()
sql = 'SELECT name, path FROM archive WHERE site=? ORDER BY name';
c.execute(sql, [site])
archives = {}
for row in c:
archives[row[0]] = row[1]
return archives
def update_archives(self):
conn, c = self.conn()
c.execute('SELECT path FROM archive WHERE updating = 0 GROUP BY path ORDER BY path')
paths = [r[0] for r in c.fetchall()]
def not_subpath(path):
for p in paths:
if p != path and path.startswith(p):
return False
return True
paths = filter(not_subpath, paths)
for path in paths:
c.execute(u'UPDATE archive SET updating=1 WHERE path LIKE ?', ['%s%%'%path])
conn.commit()
self.spider(path)
updated = time.mktime(time.localtime())
c.execute(u'UPDATE archive SET updated=?, updating=0 WHERE path LIKE ?', (updated, '%s%%'%path))
conn.commit()
def remove_archive(self, site, name):
conn, c = self.conn()
c.execute('DELETE FROM archive WHERE site=? AND name=?', [site, name])
#fixme, files could be still used by subarchive
#c.execute('DELETE FROM file WHERE path LIKE ?', ["%s%%"%path])
conn.commit()
#web
def json_response(request, data):
request.headers['Content-Type'] = 'text/javascript'
return json.dumps(data, indent=2)
class OxControl(Resource):
_children = []
#isLeaf = True
def __init__(self, db_path):
self.db_path = db_path
Resource.__init__(self)
self.db = Database(self.db_path)
self.putChild("media", File(self.db.get('media_cache', 'media')))
#FIXME: this is just for debugging
if not 'Test' in self.db.archives('0xdb.org'):
self.db.add_archive('0xdb.org', 'Test', '/media/2010/Movies')
def putChild(self, name, child):
self._children.append(name)
return Resource.putChild(self, name, child)
def getChild(self, name, request):
if name in self._children:
return Resource.getChild(self, name, request)
return self
def render_GET(self, request):
if request.path == '/add_archive':
args = {}
for arg in ('site', 'name', 'path'):
args[arg] = request.args.get(arg)[0]
self.db.add_archive(**arg)
response = {'status': 'ok'}
return json_response(request, response)
if request.path == '/remove_archive':
args = {}
for arg in ('site', 'name'):
args[arg] = request.args.get(arg)[0]
self.db.remove_archive(**arg)
response = {'status': 'ok'}
return json_response(request, response)
if request.path == '/archives':
args = {}
for arg in ['site']:
args[arg] = request.args.get(arg)[0]
response = {}
response['archives'] = self.db.archives(**args)
return json_response(request, response)
if request.path == '/files':
"""
/files
archive archive name
site site name
since (optional) timestamp, return changes since
files in archive
"""
args = {}
for arg in ['site', 'archive']:
args[arg] = request.args[arg][0]
since = request.args.get("since", [None])[0]
if since:
args['since'] = float(since)
files = self.db.files(**args)
return json_response(request, files)
if request.path == '/update':
"""
checks for new files in all known archives
"""
#update in another thread, this otherwise blocks web server
thread.start_new_thread(self.db.update_archives,())
response = {'status': 'ok'}
return json_response(request, response)
if request.path == '/extract':
"""
extract derivatives from videos
"""
oshash = request.args.get("oshash", [None])[0]
media = request.args.get("media", [None])[0]
retry = request.args.get("retry", [None])[0]
response = {'status': 'not enough data provided'}
f = self.db.file(oshash)
if not f:
response = {'status': 'unkown oshash'}
elif not 'duration' in f['info']:
response = {'status': 'unkown format, can not extract data'}
else:
if media == 'stills':
name = '%s.png'%video_frame_positions(f['info']['duration'])[0]
elif media.endswith('.webm'):
profile = media[:-5]
if profile in VIDEO_PROFILES:
name = media
else:
response = {'status': 'unsupported video profile requested'}
if name:
#get or create derivative
derivative = self.db.derivative(oshash, name)
if derivative['status'] == STATUS_FAILED and retry:
derivative = self.db.derivative(oshash, name, STATUS_NEW)
response['status'] = {
STATUS_NEW: 'extracting',
STATUS_EXTRACTING: 'extracting',
STATUS_AVAILABLE: 'available',
STATUS_FAILED: 'failed',
}.get(derivative['status'], 'extracting')
if derivative['status'] == STATUS_NEW:
self.db.extract.put((oshash, name))
files = [f['location'] for f in self.db.derivatives(oshash)]
if media == 'stills':
response['stills'] = filter(lambda f: f.endswith('.png'), files)
else:
response['video'] = filter(lambda f: f.endswith('.webm'), files)
return json_response(request, response)
if request.path == '/get':
"""
get information about a file, including derivatives
oshash - oshash of file
"""
oshash = request.args.get("oshash", [None, ])[0]
response = {'status': 'no oshash provided'}
if oshash:
f = self.db.file(oshash)
response['status'] = 'available'
response['info'] = f['info']
files = [f['location'] for f in self.db.derivatives(oshash)]
response['video'] = filter(lambda f: f.endswith('.webm'), files)
response['stills'] = filter(lambda f: f.endswith('.png'), files)
return json_response(request, response)
return "<!DOCTYPE html><html>this is not for humans</html>"
if __name__ == '__main__':
db = 'dev.sqlite'
port = 2620
username = 'fix'
password = 'me'
interface = '127.0.0.1'
interface = '10.26.20.10'
interface = '0.0.0.0'
print 'http://%s:%d/' % (interface, port)
root = OxControl(db)
checker = InMemoryUsernamePasswordDatabaseDontUse()
checker.addUser(username, password)
class PublicHTMLRealm(object):
implements(IRealm)
def requestAvatar(self, avatarId, mind, *interfaces):
if IResource in interfaces:
return (IResource, root, lambda: None)
raise NotImplementedError()
portal = Portal(PublicHTMLRealm(), [checker])
credentialFactory = DigestCredentialFactory("md5", "oxbackend")
resource = HTTPAuthSessionWrapper(portal, [credentialFactory])
site = server.Site(resource)
reactor.listenTCP(port, site, interface=interface)
reactor.run()