问题描述
我正在尝试使用python中的 Gstreamer 将 .mp4 流式传输到RTSP服务器
I am trying to stream a .mp4 to a RTSP server using Gstreamer in python
import sys
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
gi.require_version('GstRtsp', '1.0')
from gi.repository import Gst, GstRtspServer, GObject, GLib, GstRtsp
loop = GLib.MainLoop()
Gst.init(None)
file_path = "test.mp4"
class TestRtspMediaFactory(GstRtspServer.RTSPMediaFactory):
def __init__(self):
GstRtspServer.RTSPMediaFactory.__init__(self)
def do_create_element(self, url):
src_demux = f"filesrc location={file_path} ! qtdemux name=demux"
h264_transcode = "demux.video_0"
pipeline = "{0} {1} ! queue ! rtph264pay name=pay0 config-interval=1 pt=96".format(src_demux, h264_transcode)
print ("Element created: " + pipeline)
self._pipeline = Gst.parse_launch(pipeline)
def bus_handler(bus, message):
print(message)
self.bus = self._pipeline.get_bus()
self.bus.connect('message', bus_handler)
self.bus.add_signal_watch_full(1)
return self._pipeline
class GstreamerRtspServer():
def __init__(self):
self.rtspServer = GstRtspServer.RTSPServer()
factory = TestRtspMediaFactory()
factory.set_shared(True)
mountPoints = self.rtspServer.get_mount_points()
self.address = '127.0.0.1' #my RPi's local IP
self.port = '8553'
self.rtspServer.set_address(self.address)
self.rtspServer.set_service(self.port)
urlstr = "/user=&password=.sdp"
url = GstRtsp.RTSPUrl.parse(urlstr)
mountPoints.add_factory(urlstr, factory)
self.rtspServer.attach(None)
if __name__ == '__main__':
s = GstreamerRtspServer()
loop.run()
但是我试图了解如何使用 Gstreamer总线记录诸如 eos 之类的消息或错误和警告,但是即使发送eos也看不到任何消息事件和流媒体有效停止
However I am trying to understand how to use Gstreamer bus to log messages like eos or errors and warnings but I don't see any, even when I send eos events and the streaming effectively stops
s.rtspServer._pipeline._end_stream_event.set()
s.rtspServer._pipeline.send_event(Gst.Event.new_eos())
我可以正确使用它吗?如果没有,我该如何解决才能正确记录总线消息?
Am I using it properly? If not, what can I fix to properly log bus messages?
推荐答案
以下解决方案基于已接受但答案不完整.
Following solution is based on this accepted but somehow incomplete answer.
我发现不需要手动"创建管道元素的方法,而是(在这种情况下)它保留了方便的Gst.parse_launch(pipelineCmd)
方法并扩展了Gst.Bin以启用消息调试.
I found out the way that does not require "manual" creation of pipeline elements but instead it keeps (in this scenario) convenient Gst.parse_launch(pipelineCmd)
method and extends Gst.Bin to enable message debugging.
这里是完整的示例源代码(查看注释行以获得一些解释):
Here is full example source code (check out commented lines for some explanations):
#!/usr/bin/env python
import sys
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GstRtspServer, GObject, GLib
Gst.init(None)
loop = GLib.MainLoop()
# extended Gst.Bin that overrides do_handle_message and adds debugging
class ExtendedBin(Gst.Bin):
def do_handle_message(self,message):
if message.type == Gst.MessageType.ERROR:
error, debug = message.parse_error()
print("ERROR:", message.src.get_name(), ":", error.message)
if debug:
print ("Debug info: " + debug)
elif message.type == Gst.MessageType.EOS:
print ("End of stream")
elif message.type == Gst.MessageType.STATE_CHANGED:
oldState, newState, pendingState = message.parse_state_changed()
print ("State changed -> old:{}, new:{}, pending:{}".format(oldState,newState,pendingState))
else :
print("Some other message type: " + str(message.type))
#call base handler to enable message propagation
Gst.Bin.do_handle_message(self,message)
class TestRtspMediaFactory(GstRtspServer.RTSPMediaFactory):
def __init__(self):
GstRtspServer.RTSPMediaFactory.__init__(self)
def do_create_element(self, url):
#set mp4 file path to filesrc's location property
src_demux = "filesrc location=/path/to/dir/test.mp4 ! qtdemux name=demux"
h264_transcode = "demux.video_0"
#uncomment following line if video transcoding is necessary
#h264_transcode = "demux.video_0 ! decodebin ! queue ! x264enc"
pipelineCmd = "{0} {1} ! queue ! rtph264pay name=pay0 config-interval=1 pt=96".format(src_demux, h264_transcode)
self.pipeline = Gst.parse_launch(pipelineCmd)
print ("Pipeline created: " + pipelineCmd)
# creates extended Gst.Bin with message debugging enabled
extendedBin = ExtendedBin("extendedBin")
# Gst.pipeline inherits Gst.Bin and Gst.Element so following is possible
extendedBin.add(self.pipeline)
# creates new Pipeline and adds extended Bin to it
self.extendedPipeline = Gst.Pipeline.new("extendedPipeline")
self.extendedPipeline.add(extendedBin)
return self.extendedPipeline
class GstreamerRtspServer(GstRtspServer.RTSPServer):
def __init__(self):
self.rtspServer = GstRtspServer.RTSPServer()
self.factory = TestRtspMediaFactory()
self.factory.set_shared(True)
mountPoints = self.rtspServer.get_mount_points()
mountPoints.add_factory("/stream", self.factory)
self.rtspServer.attach(None)
print ("RTSP server is ready")
if __name__ == '__main__':
s = GstreamerRtspServer()
loop.run()
请注意,实际上 Gst.Pipeline inherists/extends Gst.Bin (和Gst.元素),因此有可能(无论听起来多么奇怪)将管道添加到bin .
Please note that Gst.Pipeline actually inherists/extends Gst.Bin (and Gst.Element) so it is possible (no matter how strange it sounds) to add pipeline to bin.
这个小技巧为我们懒惰"的程序员节省了继续使用命令行语法解析来创建管道元素的时间.
This little "trick" saves time for us "lazy" programmers to keep using parsing of command line syntax to create pipeline elements.
在某些更复杂的情况下,其中不适用命令行语法的分析,将采用以下解决方案:
In some more complex scenarios, where parsing of command line syntax is not applicable, solution would be following:
- 创建ExtendedBin,
- 手动"使用
Gst.ElementFactory.make
方法(并设置必要的属性) - 将创建的元素添加到
ExtendedBean
- 链接元素
- 创建新管道并向其中添加bin
- 在需要的地方使用管道.
- create ExtendedBin,
- "manually" create elements with
Gst.ElementFactory.make
method (and set necessary properties) - add created elements to
ExtendedBean
- link elements
- create new pipeline and add bin to it
- use pipeline where it is needed.
这篇关于Gstreamer总线日志消息在哪里?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!