oxtimeline/oxgst/info.py

128 lines
4.8 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
# GPL 2008
import gobject
gobject.threads_init()
import os
import pygst
pygst.require("0.10")
import gst
codec_list = {
'MPEG-1 layer 3': 'MPEG-1 layer 3',
'MPEG-1 layer 3 audio': 'MPEG-1 layer 3',
'VP6 Flash video': 'VP6',
'AC-3 audio': 'AC-3',
'Uncompressed 16-bit PCM audio': 'Uncompressed 16-bit PCM',
'Generic DV': 'DV Video',
}
class Info:
video_done = True
audio_done = True
video = None
audio = None
metadata = {}
tags = {}
def __init__(self, videofile):
self.mainloop = gobject.MainLoop()
self.pipeline = gst.parse_launch('filesrc name=input ! decodebin name=dbin')
self.input = self.pipeline.get_by_name('input')
self.input.props.location = videofile
self.dbin = self.pipeline.get_by_name('dbin')
self.metadata['size'] = os.stat(videofile).st_size
if self.metadata['size'] != 0:
self.bus = self.pipeline.get_bus()
self.dbin.connect('new-decoded-pad', self.demux_pad_added)
self.bus.add_signal_watch()
self.watch_id = self.bus.connect("message", self.onBusMessage)
self.pipeline.set_state(gst.STATE_PAUSED)
self.pipeline.get_state()
#duration
pads = None
if self.video:
pads = self.video.sink_pads()
elif self.audio:
pads = self.audio.sink_pads()
if pads:
q = gst.query_new_duration(gst.FORMAT_TIME)
for pad in pads:
if pad.get_peer() and pad.get_peer().query(q):
format, self.duration = q.parse_duration()
self.metadata["duration"] = self.duration/gst.MSECOND
self.mainloop.run()
if 'video-codec' in self.tags:
self.metadata['video-codec'] = codec_list.get(self.tags['video-codec'], self.tags['video-codec'])
if 'audio-codec' in self.tags:
self.metadata['audio-codec'] = codec_list.get(self.tags['audio-codec'], self.tags['audio-codec'])
def get_audio_info_cb(self, sink, buffer, pad):
caps = sink.sink_pads().next().get_negotiated_caps()
for s in caps:
self.metadata["channels"] = s['channels']
self.metadata["samplerate"] = s['rate']
self.audio.disconnect(self.audio_cb)
self.audio_done = True
if self.audio_done and self.video_done:
self.bus.post(gst.message_new_eos(self.pipeline))
def get_frame_info_cb(self, sink, buffer, pad):
caps = sink.sink_pads().next().get_negotiated_caps()
for s in caps:
self.metadata["width"] = s['width']
self.metadata["height"] = s['height']
self.metadata["framerate"] = float(s['framerate'])
if 'pixel-aspect-ratio' in s.keys():
self.metadata["pixel-aspect-ratio"] = "%d:%d" % (s['pixel-aspect-ratio'].num, s['pixel-aspect-ratio'].denom)
self.video.disconnect(self.video_cb)
self.video_done = True
if self.audio_done and self.video_done:
self.bus.post(gst.message_new_eos(self.pipeline))
def demux_pad_added(self, element, pad, bool):
caps = pad.get_caps()
structure = caps[0]
stream_type = structure.get_name()
if stream_type.startswith('video'):
colorspace = gst.element_factory_make("ffmpegcolorspace");
self.pipeline.add (colorspace);
colorspace.set_state (gst.STATE_PLAYING);
pad.link (colorspace.get_pad("sink"));
self.video_done = False
self.video = gst.element_factory_make("fakesink")
self.video.props.signal_handoffs = True
self.pipeline.add(self.video)
self.video.set_state (gst.STATE_PLAYING);
colorspace.link (self.video);
self.video_cb = self.video.connect("handoff", self.get_frame_info_cb)
elif stream_type.startswith('audio'):
self.audio_done = False
self.audio = gst.element_factory_make("fakesink")
self.audio.props.signal_handoffs = True
self.pipeline.add(self.audio)
self.audio.set_state (gst.STATE_PLAYING);
pad.link(self.audio.get_pad('sink'))
self.audio_cb = self.audio.connect("handoff", self.get_audio_info_cb)
def quit(self):
self.pipeline.set_state(gst.STATE_NULL)
self.pipeline.get_state()
self.mainloop.quit()
def onBusMessage(self, bus, message):
if message.type == gst.MESSAGE_TAG:
for key in message.parse_tag().keys():
self.tags[key] = message.structure[key]
if message.src == self.pipeline and message.type == gst.MESSAGE_EOS:
self.quit()