diff --git a/README b/README index 620fb5e..2f99ec1 100644 --- a/README +++ b/README @@ -1,4 +1,9 @@ -oxgt-tools +oxtools + + depends on gstreamer 0.10.30 or newer + + on ubuntu 10.04 you need + sudo add-apt-repository ppa:gstreamer-developers/ppa Tools oxframe diff --git a/oxgst/imagesink.py b/oxgst/imagesink.py index bf5545a..5040725 100644 --- a/oxgst/imagesink.py +++ b/oxgst/imagesink.py @@ -1,67 +1,64 @@ # -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 -# GPL 2008 - -import gobject - -import pygst -pygst.require("0.10") -import gst -import Image - -from singledecodebin import SingleDecodeBin - - -class ImageSink(gst.BaseSink): - - __gsignals__ = { - "frame" : (gobject.SIGNAL_RUN_LAST, - gobject.TYPE_NONE, - ( gobject.TYPE_PYOBJECT, gobject.TYPE_UINT64 )) - } - - __gsttemplates__ = ( - gst.PadTemplate ("sink", - gst.PAD_SINK, - gst.PAD_ALWAYS, - gst.Caps("video/x-raw-rgb," - "bpp = (int) 24, depth = (int) 24," - "endianness = (int) BIG_ENDIAN," - "red_mask = (int) 0x00FF0000, " - "green_mask = (int) 0x0000FF00, " - "blue_mask = (int) 0x000000FF, " - "width = (int) [ 1, max ], " - "height = (int) [ 1, max ], " - "framerate = (fraction) [ 0, max ]")) - ) - - def __init__(self): - gst.BaseSink.__init__(self) - self.width = 1 - self.height = 1 - self.set_sync(False) - - def do_set_caps(self, caps): - self.log("caps %s" % caps.to_string()) - self.log("padcaps %s" % self.get_pad("sink").get_caps().to_string()) - self.width = caps[0]["width"] - self.height = caps[0]["height"] - self.framerate = caps[0]["framerate"] - - if not caps[0].get_name() == "video/x-raw-rgb": - return False - return True - - def do_render(self, buf): - self.log("buffer %s %d" % (gst.TIME_ARGS(buf.timestamp), - len(buf.data))) - - frame = Image.fromstring('RGB', (self.width, self.height), buf.data) - self.emit('frame', frame, buf.timestamp) - return gst.FLOW_OK - - def do_preroll(self, buf): - return self.do_render(buf) - -gobject.type_register(ImageSink) - +# GPL 2008 + +import gobject + +import pygst +pygst.require("0.10") +import gst +import Image + +class ImageSink(gst.BaseSink): + + __gsignals__ = { + "frame" : (gobject.SIGNAL_RUN_LAST, + gobject.TYPE_NONE, + ( gobject.TYPE_PYOBJECT, gobject.TYPE_UINT64 )) + } + + __gsttemplates__ = ( + gst.PadTemplate ("sink", + gst.PAD_SINK, + gst.PAD_ALWAYS, + gst.Caps("video/x-raw-rgb," + "bpp = (int) 24, depth = (int) 24," + "endianness = (int) BIG_ENDIAN," + "red_mask = (int) 0x00FF0000, " + "green_mask = (int) 0x0000FF00, " + "blue_mask = (int) 0x000000FF, " + "width = (int) [ 1, max ], " + "height = (int) [ 1, max ], " + "framerate = (fraction) [ 0, max ]")) + ) + + def __init__(self): + gst.BaseSink.__init__(self) + self.width = 1 + self.height = 1 + self.set_sync(False) + + def do_set_caps(self, caps): + self.log("caps %s" % caps.to_string()) + self.log("padcaps %s" % self.get_pad("sink").get_caps().to_string()) + self.width = caps[0]["width"] + self.height = caps[0]["height"] + self.framerate = caps[0]["framerate"] + + if not caps[0].get_name() == "video/x-raw-rgb": + return False + return True + + def do_render(self, buf): + self.log("buffer %s %d" % (gst.TIME_ARGS(buf.timestamp), + len(buf.data))) + + frame = Image.fromstring('RGB', (self.width, self.height), buf.data) + self.emit('frame', frame, buf.timestamp) + return gst.FLOW_OK + + def do_preroll(self, buf): + return self.do_render(buf) + +gobject.type_register(ImageSink) + diff --git a/oxgst/singledecodebin.py b/oxgst/singledecodebin.py index 839275d..bda5a96 100644 --- a/oxgst/singledecodebin.py +++ b/oxgst/singledecodebin.py @@ -28,6 +28,38 @@ Single-stream queue-less decodebin import gobject import gst + +class CachedFactoryList(object): + def __init__(self, factoryFilter=None): + self._factoryFilter = factoryFilter + self._factories = None + self._registry = gst.registry_get_default() + self._registry.connect("feature-added", self._registryFeatureAddedCb) + + def get(self): + if self._factories is None: + self._buildFactories() + + return self._factories + + def _buildFactories(self): + # build the cache + #log.debug("utils", "Getting factories list") + factories = self._registry.get_feature_list(gst.ElementFactory) + if self._factoryFilter is not None: + #log.debug("utils", "filtering") + factories = filter(self._factoryFilter, factories) + + #log.debug("utils", "Sorting by rank") + factories.sort(key=lambda factory: factory.get_rank(), reverse=True) + self._factories = factories + #log.debug("utils", "Cached factories is now %r", self._factories) + + def _registryFeatureAddedCb(self, registry, feature): + # invalidate the cache + #log.warning("utils", "New feature added, invalidating cached factories") + self._factories = None + def is_raw(caps): """ returns True if the caps are RAW """ rep = caps.to_string() @@ -37,7 +69,28 @@ def is_raw(caps): return True return False +def factoryFilter(factory): + if factory.get_rank() < 64 : + return False + + klass = factory.get_klass() + for cat in ("Demuxer", "Decoder", "Parse"): + if cat in klass: + return True + + return False + +_factoryCache = CachedFactoryList(factoryFilter) + class SingleDecodeBin(gst.Bin): + """ + A variant of decodebin. + + * Only outputs one stream + * Doesn't contain any internal queue + """ + + QUEUE_SIZE = 1 * gst.SECOND __gsttemplates__ = ( gst.PadTemplate ("sinkpadtemplate", @@ -49,11 +102,13 @@ class SingleDecodeBin(gst.Bin): gst.PAD_SOMETIMES, gst.caps_new_any()) ) - def __init__(self, caps=None, uri=None, *args, **kwargs): + + def __init__(self, caps=None, uri=None, stream=None, *args, **kwargs): gst.Bin.__init__(self, *args, **kwargs) if not caps: caps = gst.caps_new_any() self.caps = caps + self.stream = stream self.typefind = gst.element_factory_make("typefind", "internal-typefind") self.add(self.typefind) @@ -63,7 +118,11 @@ class SingleDecodeBin(gst.Bin): self.log("created urisrc %s / %r" % (self.urisrc.get_name(), self.urisrc)) self.add(self.urisrc) - self.urisrc.link(self.typefind) + # Set the blocksize to 512kbytes, this will only matter for push-based sources + if hasattr(self.urisrc.props, "blocksize"): + self.urisrc.props.blocksize = 524288 + self.urisrc.link_pads_full("src", self.typefind, "sink", + gst.PAD_LINK_CHECK_NOTHING) else: self._sinkpad = gst.GhostPad("sink", self.typefind.get_pad("sink")) self._sinkpad.set_active(True) @@ -77,8 +136,10 @@ class SingleDecodeBin(gst.Bin): self._validelements = [] #added elements - self._factories = self._getSortedFactoryList() + self.debug("stream:%r" % self.stream) + self.pending_newsegment = False + self.eventProbeId = None ## internal methods @@ -88,23 +149,6 @@ class SingleDecodeBin(gst.Bin): element.connect("pad-added", self._dynamicPadAddedCb) element.connect("no-more-pads", self._dynamicNoMorePadsCb) - def _getSortedFactoryList(self): - """ - Returns the list of demuxers, decoders and parsers available, sorted - by rank - """ - def myfilter(fact): - if fact.get_rank() < 64 : - return False - klass = fact.get_klass() - if not ("Demuxer" in klass or "Decoder" in klass or "Parse" in klass): - return False - return True - reg = gst.registry_get_default() - res = [x for x in reg.get_feature_list(gst.ElementFactory) if myfilter(x)] - res.sort(lambda a, b: int(b.get_rank() - a.get_rank())) - return res - def _findCompatibleFactory(self, caps): """ Returns a list of factories (sorted by rank) which can take caps as @@ -112,7 +156,7 @@ class SingleDecodeBin(gst.Bin): """ self.debug("caps:%s" % caps.to_string()) res = [] - for factory in self._factories: + for factory in _factoryCache.get(): for template in factory.get_static_pad_templates(): if template.direction == gst.PAD_SINK: intersect = caps.intersect(template.static_caps.get()) @@ -153,6 +197,35 @@ class SingleDecodeBin(gst.Bin): for pad in to_connect: self._closePadLink(element, pad, pad.get_caps()) + def _isDemuxer(self, element): + if not 'Demux' in element.get_factory().get_klass(): + return False + + potential_src_pads = 0 + for template in element.get_pad_template_list(): + if template.direction != gst.PAD_SRC: + continue + + if template.presence == gst.PAD_REQUEST or \ + "%" in template.name_template: + potential_src_pads += 2 + break + else: + potential_src_pads += 1 + + return potential_src_pads > 1 + + def _plugDecodingQueue(self, pad): + queue = gst.element_factory_make("queue") + queue.props.max_size_time = self.QUEUE_SIZE + queue.props.max_size_buffers = 3 + self.add(queue) + queue.sync_state_with_parent() + pad.link_full(queue.get_pad("sink"), gst.PAD_LINK_CHECK_NOTHING) + pad = queue.get_pad("src") + + return pad + def _tryToLink1(self, source, pad, factories): """ Tries to link one of the factories' element to the given pad. @@ -162,6 +235,10 @@ class SingleDecodeBin(gst.Bin): self.debug("source:%s, pad:%s , factories:%r" % (source.get_name(), pad.get_name(), factories)) + + if self._isDemuxer(source): + pad = self._plugDecodingQueue(pad) + result = None for factory in factories: element = factory.create() @@ -174,6 +251,7 @@ class SingleDecodeBin(gst.Bin): continue self.add(element) + element.set_state(gst.STATE_READY) try: pad.link(sinkpad) except: @@ -204,10 +282,17 @@ class SingleDecodeBin(gst.Bin): if caps.is_any(): self.log("type is not know yet, waiting") return - if caps.intersect(self.caps): + + self.debug("stream %r" % (self.stream)) + if caps.intersect(self.caps) and (self.stream is None or + (self.stream.pad_name == get_pad_id(pad))): # This is the desired caps if not self._srcpad: self._wrapUp(element, pad) + elif is_raw(caps) and pad_compatible_stream(pad, self.stream): + self.log ("not the target stream, but compatible") + if not self._srcpad: + self._wrapUp(element, pad) elif is_raw(caps): self.log("We hit a raw caps which isn't the wanted one") # FIXME : recursively remove everything until demux/typefind @@ -232,13 +317,64 @@ class SingleDecodeBin(gst.Bin): if self._srcpad: return self._markValidElements(element) - self._removeUnusedElements(self.typefind) - self.log("ghosting pad %s" % pad.get_name) + gobject.idle_add(self._removeUnusedElements, self.typefind) + if pad.props.caps is not None: + caps = pad.props.caps + else: + caps = pad.get_caps() + self._srcpad = gst.GhostPad("src", pad) self._srcpad.set_active(True) + + if caps.is_fixed(): + self._exposePad(target=pad) + else: + self._blockPad(target=pad) + + def _exposePad(self, target): + self.log("ghosting pad %s" % target.get_name()) self.add_pad(self._srcpad) self.post_message(gst.message_new_state_dirty(self)) + def _blockPad(self, target): + # don't pass target as an argument to set_blocked_async. Avoids + # triggering a bug in gst-python where pad_block_destroy_data calls + # CPython without acquiring the GIL + self._target = target + self._eventProbeId = target.add_event_probe(self._padEventCb) + self._srcpad.set_blocked_async(True, self._padBlockedCb) + + def _unblockPad(self, target): + target.remove_event_probe(self._eventProbeId) + self._eventProbeId = None + self._srcpad.set_blocked_async(False, self._padBlockedCb) + + def _padBlockedCb(self, ghost, blocked): + if not blocked: + if self.pending_newsegment is not None: + self._srcpad.push_event(self.pending_newsegment) + self.pending_newsegment = None + return + + self._exposePad(target=self._target) + self._unblockPad(target=self._target) + + def _padEventCb(self, pad, event): + if event.type == gst.EVENT_TAG: + self.debug("dropping TAG event") + return False + + if event.type != gst.EVENT_NEWSEGMENT: + self.warning("first event: %s is not a NEWSEGMENT, bailing out" % + event) + self._exposePad(target=pad) + self._unblockPad(target=pad) + return True + + self.debug("stored pending newsegment") + self.pending_newsegment = event + return False + def _markValidElements(self, element): """ Mark this element and upstreams as valid @@ -256,22 +392,24 @@ class SingleDecodeBin(gst.Bin): """ Remove unused elements connected to srcpad(s) of element """ - self.log("element:%s" % element) - for pad in element.src_pads(): + self.log("element:%r" % element) + for pad in list(element.src_pads()): if pad.is_linked(): peer = pad.get_peer().get_parent() - self._removeUnusedElements(peer) - if not peer in self._validelements: - self.log("removing %s" % peer.get_name()) - pad.unlink(pad.get_peer()) - peer.set_state(gst.STATE_NULL) - self.remove(peer) + if isinstance(peer, gst.Element): + self._removeUnusedElements(peer) + if not peer in self._validelements: + self.log("removing %s" % peer.get_name()) + pad.unlink(pad.get_peer()) + peer.set_state(gst.STATE_NULL) + self.remove(peer) def _cleanUp(self): self.log("") if self._srcpad: self.remove_pad(self._srcpad) self._srcpad = None + self._target = None for element in self._validelements: element.set_state(gst.STATE_NULL) self.remove(element) @@ -282,7 +420,7 @@ class SingleDecodeBin(gst.Bin): def do_change_state(self, transition): self.debug("transition:%r" % transition) res = gst.Bin.do_change_state(self, transition) - if transition in [gst.STATE_CHANGE_PAUSED_TO_READY, gst.STATE_CHANGE_READY_TO_NULL]: + if transition == gst.STATE_CHANGE_PAUSED_TO_READY: self._cleanUp() return res diff --git a/oxgst/video.py b/oxgst/video.py index 0d20f87..1550076 100644 --- a/oxgst/video.py +++ b/oxgst/video.py @@ -1,6 +1,6 @@ #!/usr/bin/python -# -*- coding: utf-8 -*- -# vi:si:et:sw=4:sts=4:ts=4 +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 # GPL 2008 import gobject gobject.threads_init()