collect scripts in oxtools

This commit is contained in:
j 2009-01-18 19:39:14 +11:00
commit 4f9c3da160
22 changed files with 1297 additions and 0 deletions

1
.bzrignore Normal file
View File

@ -0,0 +1 @@
build

27
README Normal file
View File

@ -0,0 +1,27 @@
oxgt-tools
Tools
oxframe
get frame from movie, image can be resized by adding --width or --height
oxtimeline
create timeline from video
oxinfo
output information about video, output can in xml, json or cfg format
oxposter
render 0xdb poster
oxicon
extract icon from frame
Python API
import oxgst
video = oxgst.Video(videoFile)
video.frame(pos_in_nanoseconds)
>>> <PIL Image>
timeline = oxgst.Timeline(videoFile)
timeline.extract(timeline_prefix, width, height)
info = oxgst.Info(videoFile)
info.metadata
{videoCodec:..}

56
bin/oxframe Executable file
View File

@ -0,0 +1,56 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2008
import os
import sys
from optparse import OptionParser
import pygst
pygst.require("0.10")
import gst
import Image
root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
if os.path.exists(os.path.join(root, 'oxgst')):
sys.path.insert(0, root)
from oxgst import Video
if __name__ == '__main__':
width = None
height = None
size = None
parser = OptionParser()
parser.add_option('-x', '--width', dest='width', help='scale image to given width')
parser.add_option('-y', '--height', dest='height', help='scale image to given height')
parser.add_option('-p', '--pos', dest='pos', help='frame position in milliseconds')
parser.add_option('-i', '--input', dest='input', help='video input')
parser.add_option('-o', '--output', dest='output', help='path to save frame to, jpg, png supported')
(opts, args) = parser.parse_args()
if None in (opts.input, opts.output, opts.pos):
parser.print_help()
sys.exit()
if opts.width:
width = int(opts.width)
if opts.height:
height = int(opts.height)
video = Video(opts.input)
timestamp = int(float(opts.pos) * gst.SECOND)
frame = video.frame(timestamp)
if frame:
if width:
height = int(float(frame.size[1])/frame.size[0] * width)
height = height + height%2
size = (width, height)
elif height:
width = int(float(frame.size[0])/frame.size[1] * height)
width = width + width%2
size = (width, height)
if size:
frame = frame.resize(size, Image.ANTIALIAS)
frame.save(opts.output)

15
bin/oxicon Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
import sys
root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
if os.path.exists(os.path.join(root, 'oxgst')):
sys.path.insert(0, root)
from oxposter import icon
if __name__ == "__main__":
icon.main()

48
bin/oxinfo Executable file
View File

@ -0,0 +1,48 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2009
import os
import sys
import Image
from optparse import OptionParser
import xml.etree.ElementTree as ET
root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
if os.path.exists(os.path.join(root, 'oxgst')):
sys.path.insert(0, root)
from oxgst import Info
if __name__ == "__main__":
parser = OptionParser()
parser.add_option('-f', '--format', dest='format', help='output format: cfg, json, xml default: cfg')
(opts, args) = parser.parse_args()
if not args:
parser.print_help()
sys.exit()
inputFile = args[0]
i = Info(inputFile)
info = i.metadata
if opts.format == 'xml':
xml = ET.Element("gstinfo")
el = ET.SubElement(xml, "path")
el.text = inputFile
for key in sorted(info):
el = ET.SubElement(xml, key)
el.text = unicode(info[key])
print u'<?xml version="1.0" encoding="utf-8"?>\n' + ET.tostring(xml).replace('><', '>\n <').replace(' </', '</')
elif opts.format == 'json':
import simplejson
info['path'] = inputFile
print simplejson.dumps(info).replace(', ', ',\n ')
else:
print "[%s]" % inputFile
for key in sorted(info):
print "%s = %s" % (key, info[key])

15
bin/oxposter Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
import sys
root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
if os.path.exists(os.path.join(root, 'oxgst')):
sys.path.insert(0, root)
from oxposter import poster
if __name__ == "__main__":
poster.main()

38
bin/oxtimeline Executable file
View File

@ -0,0 +1,38 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2008
import os
import sys
from optparse import OptionParser
import Image
root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
if os.path.exists(os.path.join(root, 'oxgst')):
sys.path.insert(0, root)
import oxgst
if __name__ == '__main__':
parser = OptionParser()
parser.add_option('-x', '--width', dest='width', help='pixels per tile, defaults to 1500px', default=1500)
parser.add_option('-y', '--height', dest='height', help='timeline height, defaults to 64px', default=64)
parser.add_option('-o', '--prefix', dest='prefix', help='prefix for timeline tiles')
parser.add_option('-i', '--input', dest='input', help='video input')
(opts, args) = parser.parse_args()
if None in (opts.prefix, opts.input):
parser.print_help()
sys.exit()
timeline = oxgst.Timeline(opts.input)
timeline.extract(opts.prefix, opts.width, opts.height)
oxgst.timeline.createTimelineMultiline(opts.prefix)
oxgst.timeline.makeTiles(opts.prefix, 16)
oxgst.timeline.makeTimelineOverview(opts.prefix, 300)
oxgst.timeline.makeTimelineOverview(opts.prefix, 600)

7
oxgst/__init__.py Normal file
View File

@ -0,0 +1,7 @@
import timeline
import video
import info
from video import Video
from timeline import Timeline
from info import Info

67
oxgst/imagesink.py Normal file
View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2008
import gobject
import pygst
pygst.require("0.10")
import gst
import Image
from singledecodebin import SingleDecodeBin
class ImageSink(gst.BaseSink):
__gsignals__ = {
"frame" : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
( gobject.TYPE_PYOBJECT, gobject.TYPE_UINT64 ))
}
__gsttemplates__ = (
gst.PadTemplate ("sink",
gst.PAD_SINK,
gst.PAD_ALWAYS,
gst.Caps("video/x-raw-rgb,"
"bpp = (int) 24, depth = (int) 24,"
"endianness = (int) BIG_ENDIAN,"
"red_mask = (int) 0x00FF0000, "
"green_mask = (int) 0x0000FF00, "
"blue_mask = (int) 0x000000FF, "
"width = (int) [ 1, max ], "
"height = (int) [ 1, max ], "
"framerate = (fraction) [ 0, max ]"))
)
def __init__(self):
gst.BaseSink.__init__(self)
self.width = 1
self.height = 1
self.set_sync(False)
def do_set_caps(self, caps):
self.log("caps %s" % caps.to_string())
self.log("padcaps %s" % self.get_pad("sink").get_caps().to_string())
self.width = caps[0]["width"]
self.height = caps[0]["height"]
self.framerate = caps[0]["framerate"]
if not caps[0].get_name() == "video/x-raw-rgb":
return False
return True
def do_render(self, buf):
self.log("buffer %s %d" % (gst.TIME_ARGS(buf.timestamp),
len(buf.data)))
frame = Image.fromstring('RGB', (self.width, self.height), buf.data)
self.emit('frame', frame, buf.timestamp)
return gst.FLOW_OK
def do_preroll(self, buf):
return self.do_render(buf)
gobject.type_register(ImageSink)

128
oxgst/info.py Normal file
View File

@ -0,0 +1,128 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2008
import gobject
gobject.threads_init()
import os
import pygst
pygst.require("0.10")
import gst
codec_list = {
'MPEG-1 layer 3': 'MPEG-1 layer 3',
'MPEG-1 layer 3 audio': 'MPEG-1 layer 3',
'VP6 Flash video': 'VP6',
'AC-3 audio': 'AC-3',
'Uncompressed 16-bit PCM audio': 'Uncompressed 16-bit PCM',
'Generic DV': 'DV Video',
}
class Info:
video_done = True
audio_done = True
video = None
audio = None
metadata = {}
tags = {}
def __init__(self, videofile):
self.mainloop = gobject.MainLoop()
self.pipeline = gst.parse_launch('filesrc name=input ! decodebin name=dbin')
self.input = self.pipeline.get_by_name('input')
self.input.props.location = videofile
self.dbin = self.pipeline.get_by_name('dbin')
self.metadata['size'] = os.stat(videofile).st_size
self.bus = self.pipeline.get_bus()
self.dbin.connect('new-decoded-pad', self.demux_pad_added)
self.bus.add_signal_watch()
self.watch_id = self.bus.connect("message", self.onBusMessage)
self.pipeline.set_state(gst.STATE_PAUSED)
self.pipeline.get_state()
#duration
pads = None
if self.video:
pads = self.video.sink_pads()
elif self.audio:
pads = self.audio.sink_pads()
if pads:
q = gst.query_new_duration(gst.FORMAT_TIME)
for pad in pads:
if pad.get_peer() and pad.get_peer().query(q):
format, self.duration = q.parse_duration()
self.metadata["duration"] = self.duration/gst.MSECOND
self.mainloop.run()
if 'video-codec' in self.tags:
self.metadata['video-codec'] = codec_list.get(self.tags['video-codec'], self.tags['video-codec'])
if 'audio-codec' in self.tags:
self.metadata['audio-codec'] = codec_list.get(self.tags['audio-codec'], self.tags['audio-codec'])
def get_audio_info_cb(self, sink, buffer, pad):
caps = sink.sink_pads().next().get_negotiated_caps()
for s in caps:
self.metadata["channels"] = s['channels']
self.metadata["samplerate"] = s['rate']
self.audio.disconnect(self.audio_cb)
self.audio_done = True
if self.audio_done and self.video_done:
self.bus.post(gst.message_new_eos(self.pipeline))
def get_frame_info_cb(self, sink, buffer, pad):
caps = sink.sink_pads().next().get_negotiated_caps()
for s in caps:
self.metadata["width"] = s['width']
self.metadata["height"] = s['height']
self.metadata["framerate"] = float(s['framerate'])
if 'pixel-aspect-ratio' in s.keys():
self.metadata["pixel-aspect-ratio"] = "%d:%d" % (s['pixel-aspect-ratio'].num, s['pixel-aspect-ratio'].denom)
self.video.disconnect(self.video_cb)
self.video_done = True
if self.audio_done and self.video_done:
self.bus.post(gst.message_new_eos(self.pipeline))
def demux_pad_added(self, element, pad, bool):
caps = pad.get_caps()
structure = caps[0]
stream_type = structure.get_name()
if stream_type.startswith('video'):
colorspace = gst.element_factory_make("ffmpegcolorspace");
self.pipeline.add (colorspace);
colorspace.set_state (gst.STATE_PLAYING);
pad.link (colorspace.get_pad("sink"));
self.video_done = False
self.video = gst.element_factory_make("fakesink")
self.video.props.signal_handoffs = True
self.pipeline.add(self.video)
self.video.set_state (gst.STATE_PLAYING);
colorspace.link (self.video);
self.video_cb = self.video.connect("handoff", self.get_frame_info_cb)
elif stream_type.startswith('audio'):
self.audio_done = False
self.audio = gst.element_factory_make("fakesink")
self.audio.props.signal_handoffs = True
self.pipeline.add(self.audio)
self.audio.set_state (gst.STATE_PLAYING);
pad.link(self.audio.get_pad('sink'))
self.audio_cb = self.audio.connect("handoff", self.get_audio_info_cb)
def quit(self):
self.pipeline.set_state(gst.STATE_NULL)
self.pipeline.get_state()
self.mainloop.quit()
def onBusMessage(self, bus, message):
if message.type == gst.MESSAGE_TAG:
for key in message.parse_tag().keys():
self.tags[key] = message.structure[key]
if message.src == self.pipeline and message.type == gst.MESSAGE_EOS:
self.quit()

305
oxgst/singledecodebin.py Normal file
View File

@ -0,0 +1,305 @@
# -*- coding: utf-8 -*-
# -*- Mode: Python; -*-
# vi:si:et:sw=4:sts=4:ts=4
#
# pitivi/elements/singledecodebin.py
#
# Copyright (c) 2005, Edward Hervey <bilboed@bilboed.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
"""
Single-stream queue-less decodebin
"""
import gobject
import gst
def is_raw(caps):
""" returns True if the caps are RAW """
rep = caps.to_string()
valid = ["video/x-raw", "audio/x-raw", "text/plain", "text/x-pango-markup"]
for val in valid:
if rep.startswith(val):
return True
return False
class SingleDecodeBin(gst.Bin):
__gsttemplates__ = (
gst.PadTemplate ("sinkpadtemplate",
gst.PAD_SINK,
gst.PAD_ALWAYS,
gst.caps_new_any()),
gst.PadTemplate ("srcpadtemplate",
gst.PAD_SRC,
gst.PAD_SOMETIMES,
gst.caps_new_any())
)
def __init__(self, caps=None, uri=None, *args, **kwargs):
gst.Bin.__init__(self, *args, **kwargs)
if not caps:
caps = gst.caps_new_any()
self.caps = caps
self.typefind = gst.element_factory_make("typefind", "internal-typefind")
self.add(self.typefind)
self.uri = uri
if self.uri and gst.uri_is_valid(self.uri):
self.urisrc = gst.element_make_from_uri(gst.URI_SRC, uri, "urisrc")
self.log("created urisrc %s / %r" % (self.urisrc.get_name(),
self.urisrc))
self.add(self.urisrc)
self.urisrc.link(self.typefind)
else:
self._sinkpad = gst.GhostPad("sink", self.typefind.get_pad("sink"))
self._sinkpad.set_active(True)
self.add_pad(self._sinkpad)
self.typefind.connect("have_type", self._typefindHaveTypeCb)
self._srcpad = None
self._dynamics = []
self._validelements = [] #added elements
self._factories = self._getSortedFactoryList()
## internal methods
def _controlDynamicElement(self, element):
self.log("element:%s" % element.get_name())
self._dynamics.append(element)
element.connect("pad-added", self._dynamicPadAddedCb)
element.connect("no-more-pads", self._dynamicNoMorePadsCb)
def _getSortedFactoryList(self):
"""
Returns the list of demuxers, decoders and parsers available, sorted
by rank
"""
def myfilter(fact):
if fact.get_rank() < 64 :
return False
klass = fact.get_klass()
if not ("Demuxer" in klass or "Decoder" in klass or "Parse" in klass):
return False
return True
reg = gst.registry_get_default()
res = [x for x in reg.get_feature_list(gst.ElementFactory) if myfilter(x)]
res.sort(lambda a, b: int(b.get_rank() - a.get_rank()))
return res
def _findCompatibleFactory(self, caps):
"""
Returns a list of factories (sorted by rank) which can take caps as
input. Returns empty list if none are compatible
"""
self.debug("caps:%s" % caps.to_string())
res = []
for factory in self._factories:
for template in factory.get_static_pad_templates():
if template.direction == gst.PAD_SINK:
intersect = caps.intersect(template.static_caps.get())
if not intersect.is_empty():
res.append(factory)
break
self.debug("returning %r" % res)
return res
def _closeLink(self, element):
"""
Inspects element and tries to connect something on the srcpads.
If there are dynamic pads, it sets up a signal handler to
continue autoplugging when they become available.
"""
to_connect = []
dynamic = False
templates = element.get_pad_template_list()
for template in templates:
if not template.direction == gst.PAD_SRC:
continue
if template.presence == gst.PAD_ALWAYS:
pad = element.get_pad(template.name_template)
to_connect.append(pad)
elif template.presence == gst.PAD_SOMETIMES:
pad = element.get_pad(template.name_template)
if pad:
to_connect.append(pad)
else:
dynamic = True
else:
self.log("Template %s is a request pad, ignoring" % pad.name_template)
if dynamic:
self.debug("%s is a dynamic element" % element.get_name())
self._controlDynamicElement(element)
for pad in to_connect:
self._closePadLink(element, pad, pad.get_caps())
def _tryToLink1(self, source, pad, factories):
"""
Tries to link one of the factories' element to the given pad.
Returns the element that was successfully linked to the pad.
"""
self.debug("source:%s, pad:%s , factories:%r" % (source.get_name(),
pad.get_name(),
factories))
result = None
for factory in factories:
element = factory.create()
if not element:
self.warning("weren't able to create element from %r" % factory)
continue
sinkpad = element.get_pad("sink")
if not sinkpad:
continue
self.add(element)
try:
pad.link(sinkpad)
except:
element.set_state(gst.STATE_NULL)
self.remove(element)
continue
self._closeLink(element)
element.set_state(gst.STATE_PAUSED)
result = element
break
return result
def _closePadLink(self, element, pad, caps):
"""
Finds the list of elements that could connect to the pad.
If the pad has the desired caps, it will create a ghostpad.
If no compatible elements could be found, the search will stop.
"""
self.debug("element:%s, pad:%s, caps:%s" % (element.get_name(),
pad.get_name(),
caps.to_string()))
if caps.is_empty():
self.log("unknown type")
return
if caps.is_any():
self.log("type is not know yet, waiting")
return
if caps.intersect(self.caps):
# This is the desired caps
if not self._srcpad:
self._wrapUp(element, pad)
elif is_raw(caps):
self.log("We hit a raw caps which isn't the wanted one")
# FIXME : recursively remove everything until demux/typefind
else:
# Find something
if len(caps) > 1:
self.log("many possible types, delaying")
return
facts = self._findCompatibleFactory(caps)
if not facts:
self.log("unknown type")
return
self._tryToLink1(element, pad, facts)
def _wrapUp(self, element, pad):
"""
Ghost the given pad of element.
Remove non-used elements.
"""
if self._srcpad:
return
self._markValidElements(element)
self._removeUnusedElements(self.typefind)
self.log("ghosting pad %s" % pad.get_name)
self._srcpad = gst.GhostPad("src", pad)
self._srcpad.set_active(True)
self.add_pad(self._srcpad)
self.post_message(gst.message_new_state_dirty(self))
def _markValidElements(self, element):
"""
Mark this element and upstreams as valid
"""
self.log("element:%s" % element.get_name())
if element == self.typefind:
return
self._validelements.append(element)
# find upstream element
pad = list(element.sink_pads())[0]
parent = pad.get_peer().get_parent()
self._markValidElements(parent)
def _removeUnusedElements(self, element):
"""
Remove unused elements connected to srcpad(s) of element
"""
self.log("element:%s" % element)
for pad in element.src_pads():
if pad.is_linked():
peer = pad.get_peer().get_parent()
self._removeUnusedElements(peer)
if not peer in self._validelements:
self.log("removing %s" % peer.get_name())
pad.unlink(pad.get_peer())
peer.set_state(gst.STATE_NULL)
self.remove(peer)
def _cleanUp(self):
self.log("")
if self._srcpad:
self.remove_pad(self._srcpad)
self._srcpad = None
for element in self._validelements:
element.set_state(gst.STATE_NULL)
self.remove(element)
self._validelements = []
## Overrides
def do_change_state(self, transition):
self.debug("transition:%r" % transition)
res = gst.Bin.do_change_state(self, transition)
if transition in [gst.STATE_CHANGE_PAUSED_TO_READY, gst.STATE_CHANGE_READY_TO_NULL]:
self._cleanUp()
return res
## Signal callbacks
def _typefindHaveTypeCb(self, typefind, probability, caps):
self.debug("probability:%d, caps:%s" % (probability, caps.to_string()))
self._closePadLink(typefind, typefind.get_pad("src"), caps)
## Dynamic element Callbacks
def _dynamicPadAddedCb(self, element, pad):
self.log("element:%s, pad:%s" % (element.get_name(), pad.get_name()))
if not self._srcpad:
self._closePadLink(element, pad, pad.get_caps())
def _dynamicNoMorePadsCb(self, element):
self.log("element:%s" % element.get_name())
gobject.type_register(SingleDecodeBin)

216
oxgst/timeline.py Normal file
View File

@ -0,0 +1,216 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2008
import gobject
gobject.threads_init()
from glob import glob
import math
import os
import time
import pygst
pygst.require("0.10")
import gst
import Image
from singledecodebin import SingleDecodeBin
from imagesink import ImageSink
from video import Video
class Timeline(Video):
def __init__(self, uri):
Video.__init__(self, uri)
bus = self.get_bus()
bus.add_signal_watch()
self.watch_id = bus.connect("message", self.onBusMessage)
self.mainloop = gobject.MainLoop()
def extract(self, prefix, width, height):
self.tile_width = width
self.tile_height = height
self.prefix = prefix
self.timeline_fps = 25
self.input_tile_width = int(math.ceil((float(self.framerate)/self.timeline_fps) * width))
ntiles = int(math.ceil(float(self.frames)/self.input_tile_width))
self.tiles = []
for i in range(ntiles):
tile = Image.new("RGB", (self.input_tile_width, height))
self.tiles.append(tile)
self.set_state(gst.STATE_PLAYING)
self.mainloop.run()
for i in range(ntiles):
tile = self.tiles[i]
if tile.size[0] != self.tile_width:
tile = tile.resize((self.tile_width, self.tile_height), Image.ANTIALIAS)
if i < (ntiles-1):
frames = self.input_tile_width
else:
frames = self.frames-((ntiles-1)*self.input_tile_width)
tile_width = int(math.ceil(self.timeline_fps*frames)/float(self.framerate))
if -2 < self.tile_width - tile_width < 2:
tile_width = self.tile_width
tile = tile.crop((0, 0, tile_width, self.tile_height))
filename = "%s.%s.%04d.png" % (self.prefix, self.tile_height, i)
tile.save(filename)
def done(self):
self.mainloop.quit()
def _sbinPadAddedCb(self, unused_sbin, pad):
self.log("pad : %s" % pad)
pad.link(self.csp.get_pad("sink"))
def _frameCb(self, unused_thsink, frame, timestamp):
self.log("image:%s, timestamp:%s" % (frame, gst.TIME_ARGS(timestamp)))
if not self._ready:
# we know we're prerolled when we get the initial thumbnail
self._ready = True
else:
framePos = int(math.ceil((float(timestamp) / (gst.SECOND) * float(self.framerate))))
tile = int(math.floor(float(framePos) / self.input_tile_width))
tilePos = framePos - (tile * self.input_tile_width)
frame = frame.resize((1, self.tile_height), Image.ANTIALIAS)
for i in range(self.tile_height):
self.tiles[tile].putpixel((tilePos, i), frame.getpixel((0, i)))
if self.mainloop and timestamp >= self.duration:
self.done()
def onBusMessage(self, bus, message):
if message.src == self and message.type == gst.MESSAGE_EOS:
self.done()
def loadTimeline(timeline_prefix, height=64):
files = sorted(glob('%s.%s.*.png' % (timeline_prefix, height)))
f = Image.open(files[0])
width = f.size[0]
f = Image.open(files[-1])
duration = f.size[0] + (len(files)-1)*width
timeline = Image.new("RGB", (duration, height))
pos = 0
for f in files:
part = Image.open(f)
timeline.paste(part, (pos, 0, pos + part.size[0], height))
pos += part.size[0]
return timeline
def createTimelineMultiline(timeline_prefix, width=600, height=16):
lineWidth = width
timlelineHeight = height
timeline = loadTimeline(timeline_prefix)
duration = timeline.size[0]
width = duration/25 #one pixel per second
timeline = timeline.resize((width, timlelineHeight), Image.ANTIALIAS).convert('RGBA')
lineHeight = timlelineHeight + 2 * 4
lines = int(math.ceil(width / lineWidth) + 1)
size = (lineWidth, lineHeight * lines)
timelineColor = (64, 64, 64)
i = Image.new("RGBA", size)
#padd end with nothing to fit to grid
t = Image.new("RGBA", (lineWidth * lines, timlelineHeight))
t.paste(timeline, (0, 0))
for currentLine in range(0, lines):
offset = currentLine * lineHeight + 4
toffset = currentLine * lineWidth
try:
tbox = t.crop((toffset, 0, toffset + lineWidth, timlelineHeight))
box = ((0, offset , tbox.size[0], offset + tbox.size[1]))
i.paste(tbox, box)
except:
broken = True
width = lineWidth
if currentLine == lines -1:
width = duration - (lines - 1) * lineWidth
box = ((0, offset , width, offset + timlelineHeight))
i.paste(timelineColor, box)
timeline_file = '%s.timeline.overview.png' % (timeline_prefix)
i.save(timeline_file, 'PNG')
def makeTimelineByFramesPerPixel(timeline_prefix, frames_per_pixel, inpoint=0, outpoint=0, height=16):
pos = 0
input_scale = 25
timeline_file = '%s.timeline.%s.png' % (timeline_prefix, width)
if outpoint > 0:
timeline_file = '%s.timeline.%s.%d-%d.png' % (timeline_prefix, width, inpoint, outpoint)
timeline = loadTimeline(timeline_prefix)
duration = timeline.size[0]
width = duration / frames_per_pixel
if inpoint<=0:
inpoint = 0
else:
inpoint = inpoint * input_scale
if outpoint<=0:
outpoint = pos
else:
outpoint = outpoint * input_scale
timeline = timeline.crop((inpoint, 0, outpoint, timeline.size[1])).resize((width, height), Image.ANTIALIAS)
timeline.save(timeline_file)
def makeTimelineOverview(timeline_prefix, width, inpoint=0, outpoint=0, duration=-1, height=16):
input_scale = 25
timeline_file = '%s.timeline.%s.png' % (timeline_prefix, width)
if outpoint > 0:
timeline_file = '%s.timeline.%s.%d-%d.png' % (timeline_prefix, width, inpoint, outpoint)
timeline = loadTimeline(timeline_prefix)
duration = timeline.size[0]
if inpoint<=0:
inpoint = 0
else:
inpoint = inpoint * input_scale
if outpoint<=0:
outpoint = duration
else:
outpoint = outpoint * input_scale
timeline = timeline.crop((inpoint, 0, outpoint, timeline.size[1])).resize((width, height), Image.ANTIALIAS)
timeline.save(timeline_file)
def makeTiles(timeline_prefix, height=16):
files = glob('%s.64.*.png' % timeline_prefix)
part_step = 60
output_width = 300
width = len(files) * part_step
timeline = Image.new("RGB", (width, height))
pos = 0
for f in sorted(files):
part = Image.open(f)
part_width = int(part.size[0] / 25)
part = part.resize((part_width, height), Image.ANTIALIAS)
timeline.paste(part, (pos, 0, pos+part_width, height))
pos += part_width
timeline = timeline.crop((0, 0, pos, height))
pos = 0
i = 0
while pos < timeline.size[0]:
end = min(pos+output_width, timeline.size[0])
timeline.crop((pos, 0, end, timeline.size[1])).save('%s.%s.%04d.png' % (timeline_prefix, timeline.size[1], i))
pos += output_width
i += 5

131
oxgst/video.py Normal file
View File

@ -0,0 +1,131 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2008
import gobject
gobject.threads_init()
import pygst
pygst.require("0.10")
import gst
import Image
import time
from singledecodebin import SingleDecodeBin
from imagesink import ImageSink
class Video(gst.Pipeline):
def __init__(self, uri):
gst.Pipeline.__init__(self)
self.duration = -1
# queue of timestamps
self.queue = []
# queue callbacks
self.callback = {}
# extracted frames
self.frames = {}
# true only if we are prerolled
self._ready = False
uri = 'file://' + uri
self.log("uri : %s" % uri)
self.uri = uri
self.sbin = SingleDecodeBin(caps=gst.Caps("video/x-raw-rgb;video/x-raw-yuv"),
uri=self.uri)
self.csp = gst.element_factory_make("ffmpegcolorspace")
self.sink = ImageSink()
self.sink.connect('frame', self._frameCb)
self.add(self.sbin, self.csp, self.sink)
self.csp.link(self.sink)
self.sbin.connect('pad-added', self._sbinPadAddedCb)
self.set_state(gst.STATE_PAUSED)
self.get_state()
self.width = self.sink.width
self.height = self.sink.height
self.framerate = self.sink.framerate
self.getDuration()
self.frames = int((float(self.duration) / gst.SECOND) * float(self.framerate))
def _sbinPadAddedCb(self, unused_sbin, pad):
self.log("pad : %s" % pad)
pad.link(self.csp.get_pad("sink"))
def _frameCb(self, unused_thsink, frame, timestamp):
self.log("image:%s, timestamp:%s" % (frame, gst.TIME_ARGS(timestamp)))
if not self._ready:
# we know we're prerolled when we get the initial thumbnail
self._ready = True
elif timestamp in self.callback and self.callback[timestamp]:
self.callback[timestamp](frame, timestamp)
del self.callback[timestamp]
if timestamp in self.queue:
self.queue.remove(timestamp)
if self.queue:
# still some more thumbnails to process
gobject.idle_add(self._getFrame, self.queue.pop(0))
def getDuration(self):
if self.duration < 0:
pads = self.sink.sink_pads()
q = gst.query_new_duration(gst.FORMAT_TIME)
for pad in pads:
if pad.get_peer() and pad.get_peer().query(q):
format, self.duration = q.parse_duration()
return self.duration
def getFrame(self, timestamp, callback):
"""
Queue a frame request for the given timestamp,
callback is called once frame is extracted.
returns False if timestamp > duration
"""
self.log("timestamp %s" % gst.TIME_ARGS(timestamp))
if self.duration < 0:
self.getDuration()
if timestamp > self.duration:
self.log("timestamp %s > duration %s" % (timestamp, self.duration))
return False
self.callback[timestamp] = callback
if self.queue or not self._ready:
self.log('ready')
self.queue.append(timestamp)
else:
self.queue.append(timestamp)
self._getFrame(timestamp)
return True
def _getFrame(self, timestamp):
if not self._ready:
return
self.log("timestamp : %s" % gst.TIME_ARGS(timestamp))
self.seek(1.0, gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
gst.SEEK_TYPE_SET, timestamp,
gst.SEEK_TYPE_NONE, -1)
return False
def frame(self, timestamp):
mainloop = gobject.MainLoop()
self.frames[timestamp] = None
def callback(frame, timestamp):
self.frames[timestamp] = frame
mainloop.quit()
if self.getFrame(timestamp, callback):
mainloop.run()
frame = self.frames[timestamp]
del self.frames[timestamp]
return frame

0
oxposter/__init__.py Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 110 KiB

BIN
oxposter/data/icon.mask.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.7 KiB

54
oxposter/icon.py Executable file
View File

@ -0,0 +1,54 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
from optparse import OptionParser
import os
import sys
import Image
import ImageDraw
data_root = os.path.join(os.path.dirname(__file__), 'data')
def main():
parser = OptionParser()
parser.add_option('-f', '--frame', dest='frame', help='Poster frame (image file to be read)')
parser.add_option('-i', '--icon', dest='icon', help='Icon (image file to be written)')
(opts, args) = parser.parse_args()
if not opts.icon:
parser.print_help()
sys.exit()
iconSize = 1024
iconImage = Image.new('RGBA', (iconSize, iconSize))
if opts.frame:
frameImage = Image.open(opts.frame)
if frameImage.size[0] >= frameImage.size[1]:
frameWidth = int(frameImage.size[0] * iconSize / frameImage.size[1])
frameImage = frameImage.resize((frameWidth, iconSize), Image.ANTIALIAS)
crop = int((frameWidth - iconSize) / 2)
frameImage = frameImage.crop((crop, 0, crop + iconSize, iconSize))
else:
frameHeight = int(frameImage.size[1] * iconSize / frameImage.size[0])
frameImage = frameImage.resize((iconSize, frameHeight), Image.ANTIALIAS)
crop = int((frameHeight - iconSize) / 2)
frameImage = frameImage.crop((0, crop, iconSize, crop + iconSize))
iconImage.paste(frameImage, (0, 0, iconSize, iconSize))
else:
draw = ImageDraw.Draw(iconImage)
draw.polygon([(0, 0), (iconSize, 0), (iconSize, iconSize), (0, iconSize)], fill=(0, 0, 0))
for y in range(iconSize):
for x in range(iconSize):
if int((x + y + 192) / 128) % 2:
iconImage.putpixel((x, y), (32, 32, 32))
maskImage = Image.open(os.path.join(data_root, 'icon.mask.png'))
iconImage.putalpha(maskImage)
iconImage.save(opts.icon)

155
oxposter/poster.py Normal file
View File

@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
from optparse import OptionParser
import os
import sys
import Image
import ImageDraw
from oxlib import truncateString, wrapString
data_root = os.path.join(os.path.dirname(__file__), 'data')
def main():
parser = OptionParser()
parser.add_option('-o', '--oxdb', dest='oxdb', help='0xdb Id')
parser.add_option('-i', '--imdb', dest='imdb', help='IMDb Id')
parser.add_option('-t', '--title', dest='title', help='Movie title')
parser.add_option('-d', '--director', dest='director', help='Director(s)')
parser.add_option('-f', '--frame', dest='frame', help='Poster frame (image file to be read)')
parser.add_option('-p', '--poster', dest='poster', help='Poster (image file to be written)')
parser.add_option('-r', '--restricted', action='store_true', dest='restricted', help='If set, poster frame will have overlay')
(opts, args) = parser.parse_args()
if None in (opts.oxdb, opts.title, opts.director, opts.poster):
parser.print_help()
sys.exit()
opts.oxdb = '%s%s' % (opts.oxdb[:2], opts.oxdb[2:8].upper())
opts.title = opts.title.decode('utf-8')
opts.director = opts.director.decode('utf-8')
if opts.imdb == None:
id = opts.oxdb
else:
id = opts.imdb
posterWidth = 640
posterHeight = 1024
posterImage = Image.new('RGB', (posterWidth, posterHeight))
fontImage = Image.open(os.path.join(data_root, 'font.monaco.bold.png'))
oxdbImage = Image.open(os.path.join(data_root, 'logo.0xdb.large.png'))
# frame section
frameHeight = int(posterWidth * 10 / 16)
if opts.frame:
frameImage = Image.open(opts.frame)
if frameImage.size[0] / frameImage.size[1] > posterWidth / frameHeight:
# poster frame is too wide
frameImage = frameImage.resize((int(frameImage.size[0] * frameHeight / frameImage.size[1]), frameHeight), Image.ANTIALIAS)
crop = int((frameImage.size[0] - posterWidth) / 2)
frameImage = frameImage.crop((crop, 0, crop + posterWidth, frameHeight))
else:
# poster frame is not wide enough
frameImage = frameImage.resize((posterWidth, int(frameImage.size[1] * posterWidth / frameImage.size[0])), Image.ANTIALIAS)
crop = int((frameImage.size[1] - frameHeight) / 2)
frameImage = frameImage.crop((0, crop, posterWidth, crop + frameHeight))
posterImage.paste(frameImage, (0, 0, posterWidth, frameHeight))
else:
draw = ImageDraw.Draw(posterImage)
draw.polygon([(0, 0), (posterWidth, 0), (posterWidth, frameHeight), (0, frameHeight)], fill=(0, 0, 0))
for y in range(frameHeight):
for x in range(posterWidth):
if int((x + y + 54) / 128) % 2:
posterImage.putpixel((x, y), (32, 32, 32))
# restricted
if opts.restricted:
for y in range(frameHeight):
for x in range(posterWidth):
if int((x + y + 54) / 128) % 2:
rgb = posterImage.getpixel((x, y))
rgb = (int(rgb[0] / 2) + 128, int(rgb[1] / 2), int(rgb[2] / 2))
posterImage.putpixel((x, y), rgb)
# director section
colorHeight = int(posterHeight / 2);
draw = ImageDraw.Draw(posterImage)
draw.polygon([(0, frameHeight), (posterWidth, frameHeight), (posterWidth, colorHeight), (0, colorHeight)], fill=(0, 0, 0))
director = wrapString(opts.director, 36, '\n', True)
while len(director.split('\n')) > 3:
director = opts.director.split(', ')
director.pop()
opts.director = ', '.join(director)
director = wrapString(opts.director, 36, '\n', True)
posterMargin = 16
imagewrite(posterImage, director, posterMargin, colorHeight - 8 - len(director.split('\n')) * 32, fontImage, 32)
# title section
backgroundColor = getRGB(opts.oxdb)
draw.polygon([(0, colorHeight), (posterWidth, colorHeight), (posterWidth, posterHeight), (0, posterHeight)], fill=backgroundColor)
title = wrapString(opts.title, 24, '\n', True)
lines = title.split('\n')
if lines > 8:
# following line commented out since the only known case
# (http://0xdb.org/0071458) looks better without '...'
# lines[7] = truncateString(lines[7] + '...', 24)
title = '\n'.join(lines[:8])
offset = -6
posterimage = imagewrite(posterImage, title, posterMargin, colorHeight + posterMargin + offset, fontImage, 48)
offset = 12
posterimage = imagewrite(posterImage, id, posterMargin, posterHeight - posterMargin - 96 + offset, fontImage, 96)
# 0xdb logo
x = posterWidth - oxdbImage.size[0] - posterMargin
y = posterHeight - oxdbImage.size[1] - posterMargin
for dy in range(oxdbImage.size[1]):
for dx in range(oxdbImage.size[0]):
rgb = posterImage.getpixel((x + dx, y + dy))
bw = oxdbImage.getpixel((dx, dy))[0]
rgb = tuple(map(lambda x : x + bw, rgb))
posterImage.putpixel((x + dx, y + dy), rgb)
posterImage.save(opts.poster)
def getRGB(oxid):
i = int(int(oxid[2:8], 16) * 762 / 16777216)
if i < 127:
return (127, i, 0)
elif i < 254:
return (254 - i, 127, 0)
elif i < 381:
return (0, 127, i - 254)
elif i < 508:
return (0, 508 - i, 127)
elif i < 635:
return (i - 508, 0, 127)
else:
return (127, 0, 762 - i)
def imagewrite(posterImage, string, left, top, fontImage, charHeight):
x = left
y = top
fontWidth = int(fontImage.size[0] / 16)
fontHeight = int(fontImage.size[1] / 16)
charWidth = int(fontWidth * charHeight / fontHeight)
for i in range(len(string)):
char = string[i:i+1]
if char == '\n':
x = left
y += charHeight
else:
ascii = ord(char)
fontLeft = (ascii % 16) * fontWidth
fontTop = int(ascii / 16) * fontHeight
letterImage = fontImage.crop((fontLeft, fontTop, fontLeft + fontWidth, fontTop + fontHeight))
letterImage = letterImage.resize((charWidth, charHeight), Image.ANTIALIAS)
for dy in range(charHeight):
for dx in range(charWidth):
rgb = posterImage.getpixel((x + dx, y + dy))
bw = int(letterImage.getpixel((dx, dy))[0] / 2)
rgb = tuple(map(lambda x : x + bw, rgb))
posterImage.putpixel((x + dx, y + dy), rgb)
x += charWidth
return posterImage

34
setup.py Normal file
View File

@ -0,0 +1,34 @@
# setup.py
# -*- coding: UTF-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import sys
from glob import glob
from distutils.core import setup
setup(name="oxtools",
scripts = [
'bin/oxicon',
'bin/oxinfo',
'bin/oxframe',
'bin/oxposter',
'bin/oxtimeline',
],
packages = [
'oxgst',
'oxposter',
],
package_data = {
'oxposter': ['data/*.png'],
},
version="0.1",
author="j",
author_email="code@0xdb.org",
description="commandline tools and python api to extract information from movies",
classifiers = [
'Development Status :: 4 - Beta',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Topic :: Software Development :: Libraries :: Python Modules',
],
)