128 lines
4.7 KiB
Python
128 lines
4.7 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
# vi:si:et:sw=4:sts=4:ts=4
|
|
# GPL 2008
|
|
import gobject
|
|
gobject.threads_init()
|
|
|
|
import os
|
|
import pygst
|
|
pygst.require("0.10")
|
|
import gst
|
|
|
|
codec_list = {
|
|
'MPEG-1 layer 3': 'MPEG-1 layer 3',
|
|
'MPEG-1 layer 3 audio': 'MPEG-1 layer 3',
|
|
'VP6 Flash video': 'VP6',
|
|
'AC-3 audio': 'AC-3',
|
|
'Uncompressed 16-bit PCM audio': 'Uncompressed 16-bit PCM',
|
|
'Generic DV': 'DV Video',
|
|
}
|
|
|
|
class Info:
|
|
video_done = True
|
|
audio_done = True
|
|
video = None
|
|
audio = None
|
|
metadata = {}
|
|
tags = {}
|
|
|
|
def __init__(self, videofile):
|
|
self.mainloop = gobject.MainLoop()
|
|
self.pipeline = gst.parse_launch('filesrc name=input ! decodebin name=dbin')
|
|
self.input = self.pipeline.get_by_name('input')
|
|
self.input.props.location = videofile
|
|
self.dbin = self.pipeline.get_by_name('dbin')
|
|
|
|
self.metadata['size'] = os.stat(videofile).st_size
|
|
|
|
self.bus = self.pipeline.get_bus()
|
|
self.dbin.connect('new-decoded-pad', self.demux_pad_added)
|
|
|
|
self.bus.add_signal_watch()
|
|
self.watch_id = self.bus.connect("message", self.onBusMessage)
|
|
|
|
self.pipeline.set_state(gst.STATE_PAUSED)
|
|
self.pipeline.get_state()
|
|
|
|
#duration
|
|
pads = None
|
|
if self.video:
|
|
pads = self.video.sink_pads()
|
|
elif self.audio:
|
|
pads = self.audio.sink_pads()
|
|
if pads:
|
|
q = gst.query_new_duration(gst.FORMAT_TIME)
|
|
for pad in pads:
|
|
if pad.get_peer() and pad.get_peer().query(q):
|
|
format, self.duration = q.parse_duration()
|
|
self.metadata["duration"] = self.duration/gst.MSECOND
|
|
|
|
self.mainloop.run()
|
|
if 'video-codec' in self.tags:
|
|
self.metadata['video-codec'] = codec_list.get(self.tags['video-codec'], self.tags['video-codec'])
|
|
if 'audio-codec' in self.tags:
|
|
self.metadata['audio-codec'] = codec_list.get(self.tags['audio-codec'], self.tags['audio-codec'])
|
|
|
|
def get_audio_info_cb(self, sink, buffer, pad):
|
|
caps = sink.sink_pads().next().get_negotiated_caps()
|
|
for s in caps:
|
|
self.metadata["channels"] = s['channels']
|
|
self.metadata["samplerate"] = s['rate']
|
|
self.audio.disconnect(self.audio_cb)
|
|
self.audio_done = True
|
|
if self.audio_done and self.video_done:
|
|
self.bus.post(gst.message_new_eos(self.pipeline))
|
|
|
|
def get_frame_info_cb(self, sink, buffer, pad):
|
|
caps = sink.sink_pads().next().get_negotiated_caps()
|
|
for s in caps:
|
|
self.metadata["width"] = s['width']
|
|
self.metadata["height"] = s['height']
|
|
self.metadata["framerate"] = float(s['framerate'])
|
|
if 'pixel-aspect-ratio' in s.keys():
|
|
self.metadata["pixel-aspect-ratio"] = "%d:%d" % (s['pixel-aspect-ratio'].num, s['pixel-aspect-ratio'].denom)
|
|
self.video.disconnect(self.video_cb)
|
|
self.video_done = True
|
|
if self.audio_done and self.video_done:
|
|
self.bus.post(gst.message_new_eos(self.pipeline))
|
|
|
|
def demux_pad_added(self, element, pad, bool):
|
|
caps = pad.get_caps()
|
|
structure = caps[0]
|
|
stream_type = structure.get_name()
|
|
if stream_type.startswith('video'):
|
|
colorspace = gst.element_factory_make("ffmpegcolorspace");
|
|
self.pipeline.add (colorspace);
|
|
colorspace.set_state (gst.STATE_PLAYING);
|
|
pad.link (colorspace.get_pad("sink"));
|
|
|
|
self.video_done = False
|
|
self.video = gst.element_factory_make("fakesink")
|
|
self.video.props.signal_handoffs = True
|
|
self.pipeline.add(self.video)
|
|
self.video.set_state (gst.STATE_PLAYING);
|
|
colorspace.link (self.video);
|
|
self.video_cb = self.video.connect("handoff", self.get_frame_info_cb)
|
|
elif stream_type.startswith('audio'):
|
|
self.audio_done = False
|
|
self.audio = gst.element_factory_make("fakesink")
|
|
self.audio.props.signal_handoffs = True
|
|
self.pipeline.add(self.audio)
|
|
self.audio.set_state (gst.STATE_PLAYING);
|
|
pad.link(self.audio.get_pad('sink'))
|
|
self.audio_cb = self.audio.connect("handoff", self.get_audio_info_cb)
|
|
|
|
def quit(self):
|
|
self.pipeline.set_state(gst.STATE_NULL)
|
|
self.pipeline.get_state()
|
|
self.mainloop.quit()
|
|
|
|
def onBusMessage(self, bus, message):
|
|
if message.type == gst.MESSAGE_TAG:
|
|
for key in message.parse_tag().keys():
|
|
self.tags[key] = message.structure[key]
|
|
if message.src == self.pipeline and message.type == gst.MESSAGE_EOS:
|
|
self.quit()
|
|
|
|
|