#!/usr/bin/python # -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 # GPL 2008 import gobject gobject.threads_init() import os import pygst pygst.require("0.10") import gst codec_list = { 'MPEG-1 layer 3': 'MPEG-1 layer 3', 'MPEG-1 layer 3 audio': 'MPEG-1 layer 3', 'VP6 Flash video': 'VP6', 'AC-3 audio': 'AC-3', 'Uncompressed 16-bit PCM audio': 'Uncompressed 16-bit PCM', 'Generic DV': 'DV Video', } class Info: video_done = True audio_done = True video = None audio = None metadata = {} tags = {} def __init__(self, videofile): self.mainloop = gobject.MainLoop() self.pipeline = gst.parse_launch('filesrc name=input ! decodebin name=dbin') self.input = self.pipeline.get_by_name('input') self.input.props.location = videofile self.dbin = self.pipeline.get_by_name('dbin') self.metadata['size'] = os.stat(videofile).st_size if self.metadata['size'] != 0: self.bus = self.pipeline.get_bus() self.dbin.connect('new-decoded-pad', self.demux_pad_added) self.bus.add_signal_watch() self.watch_id = self.bus.connect("message", self.onBusMessage) self.pipeline.set_state(gst.STATE_PAUSED) self.pipeline.get_state() #duration pads = None if self.video: pads = self.video.sink_pads() elif self.audio: pads = self.audio.sink_pads() if pads: q = gst.query_new_duration(gst.FORMAT_TIME) for pad in pads: if pad.get_peer() and pad.get_peer().query(q): format, self.duration = q.parse_duration() self.metadata["duration"] = self.duration/gst.MSECOND self.mainloop.run() if 'video-codec' in self.tags: self.metadata['video-codec'] = codec_list.get(self.tags['video-codec'], self.tags['video-codec']) if 'audio-codec' in self.tags: self.metadata['audio-codec'] = codec_list.get(self.tags['audio-codec'], self.tags['audio-codec']) def get_audio_info_cb(self, sink, buffer, pad): caps = sink.sink_pads().next().get_negotiated_caps() for s in caps: self.metadata["channels"] = s['channels'] self.metadata["samplerate"] = s['rate'] self.audio.disconnect(self.audio_cb) self.audio_done = True if self.audio_done and self.video_done: self.bus.post(gst.message_new_eos(self.pipeline)) def get_frame_info_cb(self, sink, buffer, pad): caps = sink.sink_pads().next().get_negotiated_caps() for s in caps: self.metadata["width"] = s['width'] self.metadata["height"] = s['height'] self.metadata["framerate"] = float(s['framerate']) if 'pixel-aspect-ratio' in s.keys(): self.metadata["pixel-aspect-ratio"] = "%d:%d" % (s['pixel-aspect-ratio'].num, s['pixel-aspect-ratio'].denom) self.video.disconnect(self.video_cb) self.video_done = True if self.audio_done and self.video_done: self.bus.post(gst.message_new_eos(self.pipeline)) def demux_pad_added(self, element, pad, bool): caps = pad.get_caps() structure = caps[0] stream_type = structure.get_name() if stream_type.startswith('video'): colorspace = gst.element_factory_make("ffmpegcolorspace"); self.pipeline.add (colorspace); colorspace.set_state (gst.STATE_PLAYING); pad.link (colorspace.get_pad("sink")); self.video_done = False self.video = gst.element_factory_make("fakesink") self.video.props.signal_handoffs = True self.pipeline.add(self.video) self.video.set_state (gst.STATE_PLAYING); colorspace.link (self.video); self.video_cb = self.video.connect("handoff", self.get_frame_info_cb) elif stream_type.startswith('audio'): self.audio_done = False self.audio = gst.element_factory_make("fakesink") self.audio.props.signal_handoffs = True self.pipeline.add(self.audio) self.audio.set_state (gst.STATE_PLAYING); pad.link(self.audio.get_pad('sink')) self.audio_cb = self.audio.connect("handoff", self.get_audio_info_cb) def quit(self): self.pipeline.set_state(gst.STATE_NULL) self.pipeline.get_state() self.mainloop.quit() def onBusMessage(self, bus, message): if message.type == gst.MESSAGE_TAG: for key in message.parse_tag().keys(): self.tags[key] = message.structure[key] if message.type == gst.MESSAGE_ERROR: self.quit() if message.src == self.pipeline and message.type == gst.MESSAGE_EOS: self.quit()