AP_Camera: add Python examples for GStreamer UDP and RTSP video streams

Signed-off-by: Rhys Mainwaring <rhys.mainwaring@me.com>
This commit is contained in:
Rhys Mainwaring 2024-09-21 09:51:32 +01:00 committed by Randy Mackay
parent 1dfb0f8042
commit 00163ce988
5 changed files with 823 additions and 0 deletions

View File

@ -0,0 +1,106 @@
# AP_Camera examples
## GStreamer examples
The following examples demonstrate how to manipulate GStreamer pipelines to
capture and display video streams from a UDP or RTSP source. They are useful
for providing test images to camera algorithms, or forwarding streams from
simulators such as Gazebo to ground control stations.
For further details see:
- [GStreamer tutorials](https://gstreamer.freedesktop.org/documentation/tutorials/index.html?gi-language=c)
### Install dependencies
#### Ubuntu
```bash
sudo apt install libopencv-dev libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-bad gstreamer1.0-libav gstreamer1.0-gl gstreamer1.0-tools gstreamer1.0-plugins-ugly libgstrtspserver-1.0-dev
```
#### macOS
```bash
brew install gstreamer opencv
```
### `gst_rtsp_server.py`
The example provides a number of test images as an RTSP stream that may be accessed at:
- rtsp://127.0.0.1:8554/camera
- rtsp://127.0.0.1:8554/ball
- rtsp://127.0.0.1:8554/snow
Start the RTSP server:
```bash
python ./gst_rtsp_server.py
```
Display the RTSP stream:
```bash
gst-launch-1.0 rtspsrc location=rtsp://localhost:8554/camera latency=50 ! decodebin ! autovideosink
```
### `gst_rtsp_to_wx.py`
The example displays an RTSP stream in a wxPython application.
Start the RTSP server:
```bash
python ./gst_rtsp_server.py
```
Display the RTSP stream in wxPython
```bash
python ./gst_rtsp_to_wx.py
```
### `gst_udp_to_rtsp.py`
Convert a UDP stream to RTSP. This example is configured to use the same UDP
pipeline provided by the ArduPilot Gazebo [`GstCameraPlugin`](https://github.com/ArduPilot/ardupilot_gazebo?tab=readme-ov-file#3-streaming-camera-video). By converting the
UDP stream to RTSP is may be consumed by multiple applications
(e.g. a camera tracking algorithm and a GCS).
Create a UDP video stream
```bash
gst-launch-1.0 -v videotestsrc ! 'video/x-raw,format=I420,width=640,height=480,framerate=50/1' ! queue ! videoconvert ! x264enc bitrate=800 speed-preset=6 tune=4 key-int-max=10 ! rtph264pay ! udpsink host=127.0.0.1 port=5600
```
Convert to RTSP:
```bash
python ./gst_udp_to_rtsp.py
```
Display the RTSP stream:
```bash
gst-launch-1.0 rtspsrc location=rtsp://localhost:8554/camera latency=50 ! decodebin ! autovideosink
```
### `gst_udp_to_wx.py`
The example displays a UDP video stream in a wxPython application.
Create a UDP video stream:
```bash
gst-launch-1.0 -v videotestsrc ! 'video/x-raw,format=I420,width=640,height=480,framerate=50/1' ! queue ! videoconvert ! x264enc bitrate=800 speed-preset=6 tune=4 key-int-max=10 ! rtph264pay ! udpsink host=127.0.0.1 port=5600
```
Display the UDP video stream in wxPython:
```bash
python ./gst_udp_to_wx.py
```

View File

@ -0,0 +1,115 @@
"""
GST RTSP Server Example
Usage
-----
1. Start the server:
python ./gst_rtsp_server.py
2. Display the RTSP stream
gst-launch-1.0 rtspsrc location=rtsp://localhost:8554/camera latency=50 ! decodebin ! autovideosink
Acknowledgments
---------------
GStreamer RTSP server example adapted from code by Jerome Carretero (Tamaggo)
https://github.com/tamaggo/gstreamer-examples
https://github.com/tamaggo/gstreamer-examples/blob/master/test_gst_rtsp_server.py
"""
# Copyright (c) 2015 Tamaggo Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
import sys
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import Gst
from gi.repository import GstRtspServer
from gi.repository import GLib
class VideoTestMediaFactory(GstRtspServer.RTSPMediaFactory):
"""
Create a GStreamer pipeline for a videotestsrc source.
The default videotestsrc uses the smpte test pattern. Other test patterns
may be specified when initialising the class.
"""
def __init__(self, pattern="smpte"):
GstRtspServer.RTSPMediaFactory.__init__(self)
self.pattern = pattern
def do_create_element(self, url):
s_src = f"videotestsrc pattern={self.pattern} ! video/x-raw,rate=30,width=640,height=480,format=I420"
s_h264 = "x264enc tune=zerolatency"
pipeline_str = f"( {s_src} ! queue max-size-buffers=1 name=q_enc ! {s_h264} ! rtph264pay name=pay0 pt=96 )"
if len(sys.argv) > 1:
pipeline_str = " ".join(sys.argv[1:])
print(pipeline_str)
return Gst.parse_launch(pipeline_str)
class GstServer:
"""
A GStreamer RTSP server streaming three different test patterns:
rtsp://127.0.0.1:8554/camera
rtsp://127.0.0.1:8554/ball
rtsp://127.0.0.1:8554/snow
"""
def __init__(self):
self.server = GstRtspServer.RTSPServer()
self.server.set_address("127.0.0.1")
self.server.set_service("8554")
media_factory1 = VideoTestMediaFactory()
media_factory1.set_shared(True)
media_factory2 = VideoTestMediaFactory("ball")
media_factory2.set_shared(True)
media_factory3 = VideoTestMediaFactory("snow")
media_factory3.set_shared(True)
mount_points = self.server.get_mount_points()
mount_points.add_factory("/camera", media_factory1)
mount_points.add_factory("/ball", media_factory2)
mount_points.add_factory("/snow", media_factory3)
self.server.attach(None)
def main():
Gst.init(None)
server = GstServer()
loop = GLib.MainLoop()
loop.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,233 @@
""""
Capture a RTSP video stream and display in wxpython
Usage
-----
1. rtsp server
python ./gst_rtsp_server.py
3. display in wxpython
python ./gst_rtsp_to_wx.py
Acknowledgments
---------------
Video class to capture GStreamer frames
https://www.ardusub.com/developers/opencv.html
ImagePanel class to display openCV images in wxWidgets
https://stackoverflow.com/questions/14804741/opencv-integration-with-wxpython
"""
import copy
import cv2
import gi
import numpy as np
import threading
import wx
gi.require_version("Gst", "1.0")
from gi.repository import Gst
class VideoStream:
"""BlueRov video capture class constructor - adapted to capture rtspsrc
Attributes:
address (string): RTSP address
port (int): RTSP port
mount_point (string): video stream mount point
video_decode (string): Transform YUV (12bits) to BGR (24bits)
video_pipe (object): GStreamer top-level pipeline
video_sink (object): Gstreamer sink element
video_sink_conf (string): Sink configuration
video_source (string): Udp source ip and port
latest_frame (np.ndarray): Latest retrieved video frame
"""
def __init__(
self, address="127.0.0.1", port=8554, mount_point="/camera", latency=50
):
Gst.init(None)
self.address = address
self.port = port
self.mount_point = mount_point
self.latency = latency
self.latest_frame = self._new_frame = None
self.video_source = (
f"rtspsrc location=rtsp://{address}:{port}{mount_point} latency={latency}"
)
# Python does not have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)
self.video_decode = (
"! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert"
)
# Create a sink to get data
self.video_sink_conf = (
"! appsink emit-signals=true sync=false max-buffers=2 drop=true"
)
self.video_pipe = None
self.video_sink = None
self.run()
def start_gst(self, config=None):
""" Start gstreamer pipeline and sink
Pipeline description list e.g:
[
'videotestsrc ! decodebin', \
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
'! appsink'
]
Args:
config (list, optional): Gstreamer pileline description list
"""
if not config:
config = [
"videotestsrc ! decodebin",
"! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert",
"! appsink",
]
command = " ".join(config)
self.video_pipe = Gst.parse_launch(command)
self.video_pipe.set_state(Gst.State.PLAYING)
self.video_sink = self.video_pipe.get_by_name("appsink0")
@staticmethod
def gst_to_opencv(sample):
"""Transform byte array into np array
Args:
sample (TYPE): Description
Returns:
TYPE: Description
"""
buf = sample.get_buffer()
caps_structure = sample.get_caps().get_structure(0)
array = np.ndarray(
(caps_structure.get_value("height"), caps_structure.get_value("width"), 3),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=np.uint8,
)
return array
def frame(self):
"""Get Frame
Returns:
np.ndarray: latest retrieved image frame
"""
if self.frame_available:
self.latest_frame = self._new_frame
# reset to indicate latest frame has been 'consumed'
self._new_frame = None
return self.latest_frame
def frame_available(self):
"""Check if a new frame is available
Returns:
bool: true if a new frame is available
"""
return self._new_frame is not None
def run(self):
"""Get frame to update _new_frame"""
self.start_gst(
[
self.video_source,
self.video_decode,
self.video_sink_conf,
]
)
self.video_sink.connect("new-sample", self.callback)
def callback(self, sink):
sample = sink.emit("pull-sample")
self._new_frame = self.gst_to_opencv(sample)
return Gst.FlowReturn.OK
class ImagePanel(wx.Panel):
def __init__(self, parent, video_stream, fps=30):
wx.Panel.__init__(self, parent)
self._video_stream = video_stream
# Shared between threads
self._frame_lock = threading.Lock()
self._latest_frame = None
print("Waiting for video stream...")
waited = 0
while not self._video_stream.frame_available():
waited += 1
print("\r Frame not available (x{})".format(waited), end="")
cv2.waitKey(30)
print("\nSuccess! Video stream available")
if self._video_stream.frame_available():
# Only retrieve and display a frame if it's new
frame = copy.deepcopy(self._video_stream.frame())
# Frame size
height, width, _ = frame.shape
parent.SetSize((width, height))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.bmp = wx.Bitmap.FromBuffer(width, height, frame)
self.timer = wx.Timer(self)
self.timer.Start(int(1000 / fps))
self.Bind(wx.EVT_PAINT, self.OnPaint)
self.Bind(wx.EVT_TIMER, self.NextFrame)
def OnPaint(self, evt):
dc = wx.BufferedPaintDC(self)
dc.DrawBitmap(self.bmp, 0, 0)
def NextFrame(self, event):
if self._video_stream.frame_available():
frame = copy.deepcopy(self._video_stream.frame())
# Convert frame to bitmap for wxFrame
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.bmp.CopyFromBuffer(frame)
self.Refresh()
def main():
# create the video stream
video_stream = VideoStream(mount_point="/camera")
# app must run on the main thread
app = wx.App()
wx_frame = wx.Frame(None)
# create the image panel
image_panel = ImagePanel(wx_frame, video_stream, fps=30)
wx_frame.Show()
app.MainLoop()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,132 @@
"""
Capture a UDP video stream and pipe to an GStreamer RTSP server
The Gazebo GStreamerPlugin with <use_basic_pipeline>1</use_basic_pipeline>
has elements:
source: appsrc
caps='video/x-raw, format=I420, width=640, height=480, framerate=50'
is-live=1
do-timestamp=1
stream-type=0
queue: queue
converter: videoconvert
encoder: x264enc bitrate=800 speed-preset=6 tune=4 key-int-max=10
payloader: rtph264pay
sink: udpsink host=127.0.0.1 port=5600
A udpsink example streaming a test pattern using the same settings as the basic
pipeline in the Gazebo GstCameraPlugin.
1. udpsink
gst-launch-1.0 -v videotestsrc ! 'video/x-raw,format=I420,width=640,height=480,framerate=50/1' ! queue ! videoconvert ! x264enc bitrate=800 speed-preset=6 tune=4 key-int-max=10 ! rtph264pay ! udpsink host=127.0.0.1 port=5600
2a. Display the udpsink directly
udpsrc
gst-launch-1.0 -v udpsrc address=127.0.0.1 port=5600 caps='application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264' ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink sync=false
2b. Or run the udpsink, this script to convert to RTSP, and view the RTSP stream:
python ./gst_udp_to_rtsp.py
3. rtspsrc
gst-launch-1.0 rtspsrc location=rtsp://localhost:8554/camera latency=50 ! decodebin ! autovideosink
Acknowledgments
---------------
GStreamer RTSP server example adapted from code by Jerome Carretero (Tamaggo)
https://github.com/tamaggo/gstreamer-examples
https://github.com/tamaggo/gstreamer-examples/blob/master/test_gst_rtsp_server.py
"""
# Copyright (c) 2015 Tamaggo Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import Gst
from gi.repository import GstRtspServer
from gi.repository import GLib
class GstUdpMediaFactory(GstRtspServer.RTSPMediaFactory):
"""
Create a GStreamer pipeline for a udpsrc source.
"""
def __init__(self, address="127.0.0.1", port="5600"):
GstRtspServer.RTSPMediaFactory.__init__(self)
# udpsrc options
self.address = address
self.port = port
def do_create_element(self, url):
source = f"udpsrc address={self.address} port={self.port}"
codec = "application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264"
s_h264 = "x264enc tune=zerolatency"
pipeline_str = f"( {source} ! {codec} ! queue max-size-buffers=1 name=q_enc ! {s_h264} ! rtph264pay name=pay0 pt=96 )"
print(pipeline_str)
return Gst.parse_launch(pipeline_str)
class GstServer:
"""
A GStreamer RTSP server streaming a udpsrc to:
rtsp://127.0.0.1:8554/camera
"""
def __init__(self):
self.server = GstRtspServer.RTSPServer()
self.server.set_address("127.0.0.1")
self.server.set_service("8554")
media_factory = GstUdpMediaFactory(address="127.0.0.1", port="5600")
media_factory.set_shared(True)
mount_points = self.server.get_mount_points()
mount_points.add_factory("/camera", media_factory)
self.server.attach(None)
def main():
Gst.init(None)
server = GstServer()
loop = GLib.MainLoop()
loop.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,237 @@
""""
Capture a UDP video stream and display in wxpython
Usage
-----
1. udpsink
gst-launch-1.0 -v videotestsrc ! 'video/x-raw,format=I420,width=640,height=480,framerate=50/1' ! queue ! videoconvert ! x264enc bitrate=800 speed-preset=6 tune=4 key-int-max=10 ! rtph264pay ! udpsink host=127.0.0.1 port=5600
2. display in wxpython
python ./gst_udp_to_wx.py
Acknowledgments
---------------
Video class to capture GStreamer frames
https://www.ardusub.com/developers/opencv.html
ImagePanel class to display openCV images in wxWidgets
https://stackoverflow.com/questions/14804741/opencv-integration-with-wxpython
"""
import copy
import cv2
import gi
import numpy as np
import threading
import wx
gi.require_version("Gst", "1.0")
from gi.repository import Gst
class VideoStream:
"""BlueRov video capture class constructor
Attributes:
port (int): Video UDP port
video_codec (string): Source h264 parser
video_decode (string): Transform YUV (12bits) to BGR (24bits)
video_pipe (object): GStreamer top-level pipeline
video_sink (object): Gstreamer sink element
video_sink_conf (string): Sink configuration
video_source (string): Udp source ip and port
latest_frame (np.ndarray): Latest retrieved video frame
"""
def __init__(self, port=5600):
"""Summary
Args:
port (int, optional): UDP port
"""
Gst.init(None)
self.port = port
self.latest_frame = self._new_frame = None
# [Software component diagram](https://www.ardusub.com/software/components.html)
# UDP video stream (:5600)
self.video_source = "udpsrc port={}".format(self.port)
# [Rasp raw image](http://picamera.readthedocs.io/en/release-0.7/recipes2.html#raw-image-capture-yuv-format)
# Cam -> CSI-2 -> H264 Raw (YUV 4-4-4 (12bits) I420)
self.video_codec = (
"! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264"
)
# Python don't have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)
self.video_decode = (
"! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert"
)
# Create a sink to get data
self.video_sink_conf = (
"! appsink emit-signals=true sync=false max-buffers=2 drop=true"
)
self.video_pipe = None
self.video_sink = None
self.run()
def start_gst(self, config=None):
""" Start gstreamer pipeline and sink
Pipeline description list e.g:
[
'videotestsrc ! decodebin', \
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
'! appsink'
]
Args:
config (list, optional): Gstreamer pileline description list
"""
if not config:
config = [
"videotestsrc ! decodebin",
"! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert",
"! appsink",
]
command = " ".join(config)
self.video_pipe = Gst.parse_launch(command)
self.video_pipe.set_state(Gst.State.PLAYING)
self.video_sink = self.video_pipe.get_by_name("appsink0")
@staticmethod
def gst_to_opencv(sample):
"""Transform byte array into np array
Args:
sample (TYPE): Description
Returns:
TYPE: Description
"""
buf = sample.get_buffer()
caps_structure = sample.get_caps().get_structure(0)
array = np.ndarray(
(caps_structure.get_value("height"), caps_structure.get_value("width"), 3),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=np.uint8,
)
return array
def frame(self):
"""Get Frame
Returns:
np.ndarray: latest retrieved image frame
"""
if self.frame_available:
self.latest_frame = self._new_frame
# reset to indicate latest frame has been 'consumed'
self._new_frame = None
return self.latest_frame
def frame_available(self):
"""Check if a new frame is available
Returns:
bool: true if a new frame is available
"""
return self._new_frame is not None
def run(self):
"""Get frame to update _new_frame"""
self.start_gst(
[
self.video_source,
self.video_codec,
self.video_decode,
self.video_sink_conf,
]
)
self.video_sink.connect("new-sample", self.callback)
def callback(self, sink):
sample = sink.emit("pull-sample")
self._new_frame = self.gst_to_opencv(sample)
return Gst.FlowReturn.OK
class ImagePanel(wx.Panel):
def __init__(self, parent, video_stream, fps=30):
wx.Panel.__init__(self, parent)
self._video_stream = video_stream
# Shared between threads
self._frame_lock = threading.Lock()
self._latest_frame = None
print("Waiting for video stream...")
waited = 0
while not self._video_stream.frame_available():
waited += 1
print("\r Frame not available (x{})".format(waited), end="")
cv2.waitKey(30)
print("\nSuccess! Video stream available")
if self._video_stream.frame_available():
# Only retrieve and display a frame if it's new
frame = copy.deepcopy(self._video_stream.frame())
# Frame size
height, width, _ = frame.shape
parent.SetSize((width, height))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.bmp = wx.Bitmap.FromBuffer(width, height, frame)
self.timer = wx.Timer(self)
self.timer.Start(int(1000 / fps))
self.Bind(wx.EVT_PAINT, self.OnPaint)
self.Bind(wx.EVT_TIMER, self.NextFrame)
def OnPaint(self, evt):
dc = wx.BufferedPaintDC(self)
dc.DrawBitmap(self.bmp, 0, 0)
def NextFrame(self, event):
if self._video_stream.frame_available():
frame = copy.deepcopy(self._video_stream.frame())
# Convert frame to bitmap for wxFrame
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.bmp.CopyFromBuffer(frame)
self.Refresh()
def main():
# create the video stream
video_stream = VideoStream(port=5600)
# app must run on the main thread
app = wx.App()
wx_frame = wx.Frame(None)
# create the image panel
image_panel = ImagePanel(wx_frame, video_stream, fps=30)
wx_frame.Show()
app.MainLoop()
if __name__ == "__main__":
main()