srt: Introduce SRT source and sink
SRT[0] is an open source transport technology[1] that optimizes
streaming performance across unpredictable networks.
Although SRT is based on UDP, it works like connection-oriented
protocol. However, it doesn't mean that the SRT server or client
is necessarily to link to a receiver or a sender so, here, the
pairs of source and sink elements are introduced.
- srtserversink: SRT server to feed SRT stream
- srtclientsrc: SRT client to get SRT stream from srtserversink
- srtclientsink: SRT client to send SRT stream
- srtserversrc: SRT server to listen from srtclientsink
[0] https://github.com/Haivision/srt
[1] http://www.srtalliance.org/
https://bugzilla.gnome.org/show_bug.cgi?id=785730
diff --git a/ext/srt/Makefile.am b/ext/srt/Makefile.am
new file mode 100644
index 0000000..02e0d11
--- /dev/null
+++ b/ext/srt/Makefile.am
@@ -0,0 +1,40 @@
+plugin_LTLIBRARIES = libgstsrt.la
+
+libgstsrt_la_SOURCES = \
+ gstsrt.c \
+ gstsrtbasesrc.c \
+ gstsrtclientsrc.c \
+ gstsrtserversrc.c \
+ gstsrtbasesink.c \
+ gstsrtclientsink.c \
+ gstsrtserversink.c \
+ $(NULL)
+
+libgstsrt_la_CFLAGS = \
+ $(GST_PLUGINS_BASE_CFLAGS) \
+ $(GST_CFLAGS) \
+ $(GIO_CFLAGS) \
+ $(SRT_CFLAGS) \
+ $(NULL)
+
+libgstsrt_la_LIBADD = \
+ $(GST_PLUGINS_BASE_LIBS) \
+ $(GST_LIBS) \
+ $(GIO_LIBS) \
+ -lgstbase-1.0 \
+ $(SRT_LIBS) \
+ $(NULL)
+
+libgstsrt_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
+
+CLEANFILES = $(BUILT_SOURCES)
+
+noinst_HEADERS = \
+ gstsrtbasesink.h \
+ gstsrtclientsink.h \
+ gstsrtserversrc.h \
+ gstsrtbasesrc.h \
+ gstsrtclientsrc.h \
+ gstsrtserversink.h
+
+include $(top_srcdir)/common/gst-glib-gen.mak
diff --git a/ext/srt/gstsrt.c b/ext/srt/gstsrt.c
new file mode 100644
index 0000000..4afe169
--- /dev/null
+++ b/ext/srt/gstsrt.c
@@ -0,0 +1,184 @@
+/* GStreamer
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstsrt.h"
+
+#include "gstsrtclientsrc.h"
+#include "gstsrtserversrc.h"
+#include "gstsrtclientsink.h"
+#include "gstsrtserversink.h"
+
+#define GST_CAT_DEFAULT gst_debug_srt
+GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
+
+SRTSOCKET
+gst_srt_client_connect (GstElement * elem, int sender,
+ const gchar * host, guint16 port, int rendez_vous,
+ const gchar * bind_address, guint16 bind_port, int latency,
+ GSocketAddress ** socket_address, gint * poll_id)
+{
+ SRTSOCKET sock;
+ GError *error = NULL;
+ gpointer sa;
+ size_t sa_len;
+
+ *socket_address = g_inet_socket_address_new_from_string (host, port);
+
+ if (*socket_address == NULL) {
+ GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid host"),
+ ("Failed to parse host"));
+ goto failed;
+ }
+
+ sa_len = g_socket_address_get_native_size (*socket_address);
+ sa = g_alloca (sa_len);
+ if (!g_socket_address_to_native (*socket_address, sa, sa_len, &error)) {
+ GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid address"),
+ ("cannot resolve address (reason: %s)", error->message));
+ goto failed;
+ }
+
+ sock = srt_socket (g_socket_address_get_family (*socket_address), SOCK_DGRAM,
+ 0);
+ if (sock == SRT_ERROR) {
+ GST_ELEMENT_ERROR (elem, LIBRARY, INIT, (NULL),
+ ("failed to create SRT socket (reason: %s)", srt_getlasterror_str ()));
+ goto failed;
+ }
+
+ /* Make sure TSBPD mode is enable (SRT mode) */
+ srt_setsockopt (sock, 0, SRTO_TSBPDMODE, &(int) {
+ 1}, sizeof (int));
+
+ /* This is a sink, we're always a receiver */
+ srt_setsockopt (sock, 0, SRTO_SENDER, &sender, sizeof (int));
+
+ srt_setsockopt (sock, 0, SRTO_TSBPDDELAY, &latency, sizeof (int));
+
+ srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &rendez_vous, sizeof (int));
+
+ if (bind_address || bind_port || rendez_vous) {
+ gpointer bsa;
+ size_t bsa_len;
+ GSocketAddress *b_socket_address = NULL;
+
+ if (bind_address == NULL)
+ bind_address = "0.0.0.0";
+
+ if (rendez_vous)
+ bind_port = port;
+
+ b_socket_address = g_inet_socket_address_new_from_string (bind_address,
+ bind_port);
+
+ if (b_socket_address == NULL) {
+ GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid bind address"),
+ ("Failed to parse bind address: %s:%d", bind_address, bind_port));
+ goto failed;
+ }
+
+ bsa_len = g_socket_address_get_native_size (b_socket_address);
+ bsa = g_alloca (bsa_len);
+ if (!g_socket_address_to_native (b_socket_address, bsa, bsa_len, &error)) {
+ GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid bind address"),
+ ("Can't parse bind address to sockaddr: %s", error->message));
+ g_clear_object (&b_socket_address);
+ goto failed;
+ }
+ g_clear_object (&b_socket_address);
+
+ if (srt_bind (sock, bsa, bsa_len) == SRT_ERROR) {
+ GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ,
+ ("Can't bind to address"),
+ ("Can't bind to %s:%d (reason: %s)", bind_address, bind_port,
+ srt_getlasterror_str ()));
+ goto failed;
+ }
+ }
+
+ *poll_id = srt_epoll_create ();
+ if (*poll_id == -1) {
+ GST_ELEMENT_ERROR (elem, LIBRARY, INIT, (NULL),
+ ("failed to create poll id for SRT socket (reason: %s)",
+ srt_getlasterror_str ()));
+ goto failed;
+ }
+
+ srt_epoll_add_usock (*poll_id, sock, &(int) {
+ SRT_EPOLL_OUT});
+
+ if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
+ GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Connection error"),
+ ("failed to connect to host (reason: %s)", srt_getlasterror_str ()));
+ goto failed;
+ }
+
+ return sock;
+
+failed:
+ if (*poll_id != SRT_ERROR) {
+ srt_epoll_release (*poll_id);
+ *poll_id = SRT_ERROR;
+ }
+
+ if (sock != SRT_INVALID_SOCK) {
+ srt_close (sock);
+ sock = SRT_INVALID_SOCK;
+ }
+
+ g_clear_error (&error);
+ g_clear_object (socket_address);
+
+ return SRT_INVALID_SOCK;
+}
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srt", 0, "SRT Common code");
+
+ if (!gst_element_register (plugin, "srtclientsrc", GST_RANK_PRIMARY,
+ GST_TYPE_SRT_CLIENT_SRC))
+ return FALSE;
+
+ if (!gst_element_register (plugin, "srtserversrc", GST_RANK_PRIMARY,
+ GST_TYPE_SRT_SERVER_SRC))
+ return FALSE;
+
+ if (!gst_element_register (plugin, "srtclientsink", GST_RANK_PRIMARY,
+ GST_TYPE_SRT_CLIENT_SINK))
+ return FALSE;
+
+ if (!gst_element_register (plugin, "srtserversink", GST_RANK_PRIMARY,
+ GST_TYPE_SRT_SERVER_SINK))
+ return FALSE;
+
+ return TRUE;
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+ GST_VERSION_MINOR,
+ srt,
+ "transfer data via SRT",
+ plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);
diff --git a/ext/srt/gstsrt.h b/ext/srt/gstsrt.h
new file mode 100644
index 0000000..7a7fec7
--- /dev/null
+++ b/ext/srt/gstsrt.h
@@ -0,0 +1,46 @@
+/* GStreamer
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author: Olivier Crete <olivier.crete@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_SRT_H__
+#define __GST_SRT_H__
+
+#include <gst/gst.h>
+#include <gio/gio.h>
+
+#include <srt/srt.h>
+
+#define SRT_URI_SCHEME "srt"
+#define SRT_DEFAULT_PORT 7001
+#define SRT_DEFAULT_HOST "127.0.0.1"
+#define SRT_DEFAULT_URI SRT_URI_SCHEME"://"SRT_DEFAULT_HOST":"G_STRINGIFY(SRT_DEFAULT_PORT)
+#define SRT_DEFAULT_LATENCY 125
+
+G_BEGIN_DECLS
+
+SRTSOCKET
+gst_srt_client_connect (GstElement * elem, int sender,
+ const gchar * host, guint16 port, int rendez_vous,
+ const gchar * bind_address, guint16 bind_port, int latency,
+ GSocketAddress ** socket_address, gint * poll_id);
+
+G_END_DECLS
+
+
+#endif /* __GST_SRT_H__ */
diff --git a/ext/srt/gstsrtbasesink.c b/ext/srt/gstsrtbasesink.c
new file mode 100644
index 0000000..a9fdc96
--- /dev/null
+++ b/ext/srt/gstsrtbasesink.c
@@ -0,0 +1,302 @@
+/* GStreamer SRT plugin based on libsrt
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstsrtserversink.h"
+#include "gstsrt.h"
+#include <srt/srt.h>
+
+#include <netinet/in.h>
+
+#define SRT_DEFAULT_POLL_TIMEOUT -1
+
+#define GST_CAT_DEFAULT gst_debug_srt_base_sink
+GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
+
+enum
+{
+ PROP_URI = 1,
+ PROP_LATENCY,
+ /*< private > */
+ PROP_LAST
+};
+
+static GParamSpec *properties[PROP_LAST];
+
+static void gst_srt_base_sink_uri_handler_init (gpointer g_iface,
+ gpointer iface_data);
+static gchar *gst_srt_base_sink_uri_get_uri (GstURIHandler * handler);
+static gboolean gst_srt_base_sink_uri_set_uri (GstURIHandler * handler,
+ const gchar * uri, GError ** error);
+
+#define gst_srt_base_sink_parent_class parent_class
+G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstSRTBaseSink, gst_srt_base_sink,
+ GST_TYPE_BASE_SINK,
+ G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
+ gst_srt_base_sink_uri_handler_init)
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtbasesink", 0,
+ "SRT Base Sink"));
+
+static void
+gst_srt_base_sink_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstSRTBaseSink *self = GST_SRT_BASE_SINK (object);
+
+ switch (prop_id) {
+ case PROP_URI:
+ if (self->uri != NULL) {
+ gchar *uri_str = gst_srt_base_sink_uri_get_uri (GST_URI_HANDLER (self));
+ g_value_take_string (value, uri_str);
+ }
+ break;
+ case PROP_LATENCY:
+ g_value_set_int (value, self->latency);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_base_sink_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstSRTBaseSink *self = GST_SRT_BASE_SINK (object);
+
+ switch (prop_id) {
+ case PROP_URI:
+ gst_srt_base_sink_uri_set_uri (GST_URI_HANDLER (self),
+ g_value_get_string (value), NULL);
+ break;
+ case PROP_LATENCY:
+ self->latency = g_value_get_int (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_base_sink_finalize (GObject * object)
+{
+ GstSRTBaseSink *self = GST_SRT_BASE_SINK (object);
+
+ g_clear_pointer (&self->uri, gst_uri_unref);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstFlowReturn
+gst_srt_base_sink_render (GstBaseSink * sink, GstBuffer * buffer)
+{
+ GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink);
+ GstMapInfo info;
+ GstSRTBaseSinkClass *bclass = GST_SRT_BASE_SINK_GET_CLASS (sink);
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ GST_TRACE_OBJECT (self, "sending buffer %p, offset %"
+ G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
+ ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
+ ", size %" G_GSIZE_FORMAT,
+ buffer, GST_BUFFER_OFFSET (buffer),
+ GST_BUFFER_OFFSET_END (buffer),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
+ gst_buffer_get_size (buffer));
+
+ if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, READ,
+ ("Could not map the input stream"), (NULL));
+ return GST_FLOW_ERROR;
+ }
+
+ if (!bclass->send_buffer (self, &info))
+ ret = GST_FLOW_ERROR;
+
+ gst_buffer_unmap (buffer, &info);
+
+ return ret;
+}
+
+static void
+gst_srt_base_sink_class_init (GstSRTBaseSinkClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
+
+ gobject_class->set_property = gst_srt_base_sink_set_property;
+ gobject_class->get_property = gst_srt_base_sink_get_property;
+ gobject_class->finalize = gst_srt_base_sink_finalize;
+
+ /**
+ * GstSRTBaseSink:uri:
+ *
+ * The URI used by SRT Connection.
+ */
+ properties[PROP_URI] = g_param_spec_string ("uri", "URI",
+ "URI in the form of srt://address:port", SRT_DEFAULT_URI,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_LATENCY] =
+ g_param_spec_int ("latency", "latency",
+ "Minimum latency (milliseconds)", 0,
+ G_MAXINT32, SRT_DEFAULT_LATENCY,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, PROP_LAST, properties);
+
+ gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_srt_base_sink_render);
+}
+
+static void
+gst_srt_base_sink_init (GstSRTBaseSink * self)
+{
+ self->uri = gst_uri_from_string (SRT_DEFAULT_URI);
+ self->queued_buffers = NULL;
+ self->latency = SRT_DEFAULT_LATENCY;
+}
+
+static GstURIType
+gst_srt_base_sink_uri_get_type (GType type)
+{
+ return GST_URI_SINK;
+}
+
+static const gchar *const *
+gst_srt_base_sink_uri_get_protocols (GType type)
+{
+ static const gchar *protocols[] = { SRT_URI_SCHEME, NULL };
+
+ return protocols;
+}
+
+static gchar *
+gst_srt_base_sink_uri_get_uri (GstURIHandler * handler)
+{
+ gchar *uri_str;
+ GstSRTBaseSink *self = GST_SRT_BASE_SINK (handler);
+
+ GST_OBJECT_LOCK (self);
+ uri_str = gst_uri_to_string (self->uri);
+ GST_OBJECT_UNLOCK (self);
+
+ return uri_str;
+}
+
+static gboolean
+gst_srt_base_sink_uri_set_uri (GstURIHandler * handler,
+ const gchar * uri, GError ** error)
+{
+ GstSRTBaseSink *self = GST_SRT_BASE_SINK (handler);
+ gboolean ret = TRUE;
+ GstUri *parsed_uri = gst_uri_from_string (uri);
+
+ GST_TRACE_OBJECT (self, "Requested URI=%s", uri);
+
+ if (g_strcmp0 (gst_uri_get_scheme (parsed_uri), SRT_URI_SCHEME) != 0) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
+ "Invalid SRT URI scheme");
+ ret = FALSE;
+ goto out;
+ }
+
+ GST_OBJECT_LOCK (self);
+
+ g_clear_pointer (&self->uri, gst_uri_unref);
+ self->uri = gst_uri_ref (parsed_uri);
+
+ GST_OBJECT_UNLOCK (self);
+
+out:
+ g_clear_pointer (&parsed_uri, gst_uri_unref);
+ return ret;
+}
+
+static void
+gst_srt_base_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
+{
+ GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
+
+ iface->get_type = gst_srt_base_sink_uri_get_type;
+ iface->get_protocols = gst_srt_base_sink_uri_get_protocols;
+ iface->get_uri = gst_srt_base_sink_uri_get_uri;
+ iface->set_uri = gst_srt_base_sink_uri_set_uri;
+}
+
+GstStructure *
+gst_srt_base_sink_get_stats (GSocketAddress * sockaddr, SRTSOCKET sock)
+{
+ SRT_TRACEBSTATS stats;
+ int ret;
+ GValue v = G_VALUE_INIT;
+ GstStructure *s;
+
+ if (sock == SRT_INVALID_SOCK || sockaddr == NULL)
+ return gst_structure_new_empty ("application/x-srt-statistics");
+
+ s = gst_structure_new ("application/x-srt-statistics",
+ "sockaddr", G_TYPE_SOCKET_ADDRESS, sockaddr, NULL);
+
+ ret = srt_bstats (sock, &stats, 0);
+ if (ret >= 0) {
+ gst_structure_set (s,
+ /* number of sent data packets, including retransmissions */
+ "packets-sent", G_TYPE_INT64, stats.pktSent,
+ /* number of lost packets (sender side) */
+ "packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
+ /* number of retransmitted packets */
+ "packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
+ /* number of received ACK packets */
+ "packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
+ /* number of received NAK packets */
+ "packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
+ /* time duration when UDT is sending data (idle time exclusive) */
+ "send-duration-us", G_TYPE_INT64, stats.usSndDuration,
+ /* number of sent data bytes, including retransmissions */
+ "bytes-sent", G_TYPE_UINT64, stats.byteSent,
+ /* number of retransmitted bytes */
+ "bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
+ /* number of too-late-to-send dropped bytes */
+ "bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
+ /* number of too-late-to-send dropped packets */
+ "packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
+ /* sending rate in Mb/s */
+ "send-rate-mbps", G_TYPE_DOUBLE, stats.msRTT,
+ /* estimated bandwidth, in Mb/s */
+ "bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
+ /* busy sending time (i.e., idle time exclusive) */
+ "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
+ "rtt-ms", G_TYPE_DOUBLE, stats.msRTT,
+ "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
+ }
+
+ g_value_init (&v, G_TYPE_STRING);
+ g_value_take_string (&v,
+ g_socket_connectable_to_string (G_SOCKET_CONNECTABLE (sockaddr)));
+ gst_structure_take_value (s, "sockaddr-str", &v);
+
+ return s;
+}
diff --git a/ext/srt/gstsrtbasesink.h b/ext/srt/gstsrtbasesink.h
new file mode 100644
index 0000000..6e0b918
--- /dev/null
+++ b/ext/srt/gstsrtbasesink.h
@@ -0,0 +1,73 @@
+/* GStreamer
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_SRT_BASE_SINK_H__
+#define __GST_SRT_BASE_SINK_H__
+
+#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+#include <gio/gio.h>
+
+#include <srt/srt.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SRT_BASE_SINK (gst_srt_base_sink_get_type ())
+#define GST_IS_SRT_BASE_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_BASE_SINK))
+#define GST_IS_SRT_BASE_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_BASE_SINK))
+#define GST_SRT_BASE_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSinkClass))
+#define GST_SRT_BASE_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSink))
+#define GST_SRT_BASE_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSinkClass))
+#define GST_SRT_BASE_SINK_CAST(obj) ((GstSRTBaseSink*)(obj))
+#define GST_SRT_BASE_SINK_CLASS_CAST(klass) ((GstSRTBaseSinkClass*)(klass))
+
+typedef struct _GstSRTBaseSink GstSRTBaseSink;
+typedef struct _GstSRTBaseSinkClass GstSRTBaseSinkClass;
+
+struct _GstSRTBaseSink {
+ GstBaseSink parent;
+
+ GstUri *uri;
+ GList *queued_buffers;
+ gint latency;
+
+ /*< private >*/
+ gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstSRTBaseSinkClass {
+ GstBaseSinkClass parent_class;
+
+ /* ask the subclass to send a buffer */
+ gboolean (*send_buffer) (GstSRTBaseSink *self, const GstMapInfo *mapinfo);
+
+ gpointer _gst_reserved[GST_PADDING_LARGE];
+};
+
+GST_EXPORT
+GType gst_srt_base_sink_get_type (void);
+
+GstStructure * gst_srt_base_sink_get_stats (GSocketAddress *sockaddr,
+ SRTSOCKET sock);
+
+
+G_END_DECLS
+
+#endif /* __GST_SRT_BASE_SINK_H__ */
diff --git a/ext/srt/gstsrtbasesrc.c b/ext/srt/gstsrtbasesrc.c
new file mode 100644
index 0000000..a34ae38
--- /dev/null
+++ b/ext/srt/gstsrtbasesrc.c
@@ -0,0 +1,262 @@
+/* GStreamer SRT plugin based on libsrt
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstsrtbasesrc.h"
+#include "gstsrt.h"
+#include <srt/srt.h>
+#include <gio/gio.h>
+
+#include <netinet/in.h>
+
+#define GST_CAT_DEFAULT gst_debug_srt_base_src
+GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
+
+enum
+{
+ PROP_URI = 1,
+ PROP_CAPS,
+ PROP_LATENCY,
+
+ /*< private > */
+ PROP_LAST
+};
+
+static GParamSpec *properties[PROP_LAST];
+
+static void gst_srt_base_src_uri_handler_init (gpointer g_iface,
+ gpointer iface_data);
+static gchar *gst_srt_base_src_uri_get_uri (GstURIHandler * handler);
+static gboolean gst_srt_base_src_uri_set_uri (GstURIHandler * handler,
+ const gchar * uri, GError ** error);
+
+#define gst_srt_base_src_parent_class parent_class
+G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstSRTBaseSrc, gst_srt_base_src,
+ GST_TYPE_PUSH_SRC, G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
+ gst_srt_base_src_uri_handler_init)
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtbasesrc", 0,
+ "SRT Base Source"));
+
+static void
+gst_srt_base_src_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object);
+
+ switch (prop_id) {
+ case PROP_URI:
+ if (self->uri != NULL) {
+ gchar *uri_str = gst_srt_base_src_uri_get_uri (GST_URI_HANDLER (self));
+ g_value_take_string (value, uri_str);
+ }
+ break;
+ case PROP_CAPS:
+ GST_OBJECT_LOCK (self);
+ gst_value_set_caps (value, self->caps);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_LATENCY:
+ g_value_set_int (value, self->latency);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_base_src_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object);
+
+ switch (prop_id) {
+ case PROP_URI:
+ gst_srt_base_src_uri_set_uri (GST_URI_HANDLER (self),
+ g_value_get_string (value), NULL);
+ break;
+ case PROP_CAPS:
+ GST_OBJECT_LOCK (self);
+ g_clear_pointer (&self->caps, gst_caps_unref);
+ self->caps = gst_caps_copy (gst_value_get_caps (value));
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_LATENCY:
+ self->latency = g_value_get_int (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_base_src_finalize (GObject * object)
+{
+ GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object);
+
+ g_clear_pointer (&self->uri, gst_uri_unref);
+ g_clear_pointer (&self->caps, gst_caps_unref);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstCaps *
+gst_srt_base_src_get_caps (GstBaseSrc * src, GstCaps * filter)
+{
+ GstSRTBaseSrc *self = GST_SRT_BASE_SRC (src);
+ GstCaps *result, *caps = NULL;
+
+ GST_OBJECT_LOCK (self);
+ if (self->caps != NULL) {
+ caps = gst_caps_ref (self->caps);
+ }
+ GST_OBJECT_UNLOCK (self);
+
+ if (caps) {
+ if (filter) {
+ result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
+ gst_caps_unref (caps);
+ } else {
+ result = caps;
+ }
+ } else {
+ result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
+ }
+
+ return result;
+}
+
+
+static void
+gst_srt_base_src_class_init (GstSRTBaseSrcClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
+
+ gobject_class->set_property = gst_srt_base_src_set_property;
+ gobject_class->get_property = gst_srt_base_src_get_property;
+ gobject_class->finalize = gst_srt_base_src_finalize;
+
+ /**
+ * GstSRTBaseSrc:uri:
+ *
+ * The URI used by SRT Connection.
+ */
+ properties[PROP_URI] = g_param_spec_string ("uri", "URI",
+ "URI in the form of srt://address:port", SRT_DEFAULT_URI,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
+
+ /**
+ * GstSRTBaseSrc:caps:
+ *
+ * The Caps used by the source pad.
+ */
+ properties[PROP_CAPS] =
+ g_param_spec_boxed ("caps", "Caps", "The caps of the source pad",
+ GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_LATENCY] =
+ g_param_spec_int ("latency", "latency",
+ "Minimum latency (milliseconds)", 0,
+ G_MAXINT32, SRT_DEFAULT_LATENCY,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, PROP_LAST, properties);
+
+ gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_srt_base_src_get_caps);
+}
+
+static void
+gst_srt_base_src_init (GstSRTBaseSrc * self)
+{
+ gst_srt_base_src_uri_set_uri (GST_URI_HANDLER (self), SRT_DEFAULT_URI, NULL);
+ gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
+ gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
+ self->latency = SRT_DEFAULT_LATENCY;
+}
+
+static GstURIType
+gst_srt_base_src_uri_get_type (GType type)
+{
+ return GST_URI_SRC;
+}
+
+static const gchar *const *
+gst_srt_base_src_uri_get_protocols (GType type)
+{
+ static const gchar *protocols[] = { SRT_URI_SCHEME, NULL };
+
+ return protocols;
+}
+
+static gchar *
+gst_srt_base_src_uri_get_uri (GstURIHandler * handler)
+{
+ gchar *uri_str;
+ GstSRTBaseSrc *self = GST_SRT_BASE_SRC (handler);
+
+ GST_OBJECT_LOCK (self);
+ uri_str = gst_uri_to_string (self->uri);
+ GST_OBJECT_UNLOCK (self);
+
+ return uri_str;
+}
+
+static gboolean
+gst_srt_base_src_uri_set_uri (GstURIHandler * handler,
+ const gchar * uri, GError ** error)
+{
+ GstSRTBaseSrc *self = GST_SRT_BASE_SRC (handler);
+ gboolean ret = TRUE;
+ GstUri *parsed_uri = gst_uri_from_string (uri);
+
+ if (g_strcmp0 (gst_uri_get_scheme (parsed_uri), SRT_URI_SCHEME) != 0) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
+ "Invalid SRT URI scheme");
+ ret = FALSE;
+ goto out;
+ }
+
+ GST_OBJECT_LOCK (self);
+
+ g_clear_pointer (&self->uri, gst_uri_unref);
+ self->uri = gst_uri_ref (parsed_uri);
+
+ GST_OBJECT_UNLOCK (self);
+
+out:
+ g_clear_pointer (&parsed_uri, gst_uri_unref);
+ return ret;
+}
+
+static void
+gst_srt_base_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
+{
+ GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
+
+ iface->get_type = gst_srt_base_src_uri_get_type;
+ iface->get_protocols = gst_srt_base_src_uri_get_protocols;
+ iface->get_uri = gst_srt_base_src_uri_get_uri;
+ iface->set_uri = gst_srt_base_src_uri_set_uri;
+}
diff --git a/ext/srt/gstsrtbasesrc.h b/ext/srt/gstsrtbasesrc.h
new file mode 100644
index 0000000..bf7414e
--- /dev/null
+++ b/ext/srt/gstsrtbasesrc.h
@@ -0,0 +1,63 @@
+/* GStreamer
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_SRT_BASE_SRC_H__
+#define __GST_SRT_BASE_SRC_H__
+
+#include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SRT_BASE_SRC (gst_srt_base_src_get_type ())
+#define GST_IS_SRT_BASE_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_BASE_SRC))
+#define GST_IS_SRT_BASE_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_BASE_SRC))
+#define GST_SRT_BASE_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrcClass))
+#define GST_SRT_BASE_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrc))
+#define GST_SRT_BASE_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrcClass))
+#define GST_SRT_BASE_SRC_CAST(obj) ((GstSRTBaseSrc*)(obj))
+#define GST_SRT_BASE_SRC_CLASS_CAST(klass) ((GstSRTBaseSrcClass*)(klass))
+
+typedef struct _GstSRTBaseSrc GstSRTBaseSrc;
+typedef struct _GstSRTBaseSrcClass GstSRTBaseSrcClass;
+
+struct _GstSRTBaseSrc {
+ GstPushSrc parent;
+
+ GstUri *uri;
+ GstCaps *caps;
+ gint latency;
+
+ /*< private >*/
+ gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstSRTBaseSrcClass {
+ GstPushSrcClass parent_class;
+
+ gpointer _gst_reserved[GST_PADDING_LARGE];
+};
+
+GST_EXPORT
+GType gst_srt_base_src_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_SRT_BASE_SRC_H__ */
diff --git a/ext/srt/gstsrtclientsink.c b/ext/srt/gstsrtclientsink.c
new file mode 100644
index 0000000..7371e08
--- /dev/null
+++ b/ext/srt/gstsrtclientsink.c
@@ -0,0 +1,268 @@
+/* GStreamer SRT plugin based on libsrt
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-srtserversink
+ * @title: srtserversink
+ *
+ * srtserversink is a network sink that sends <ulink url="http://www.srtalliance.org/">SRT</ulink>
+ * packets to the network. Although SRT is an UDP-based protocol, srtserversink works like
+ * a server socket of connection-oriented protocol.
+ *
+ * <refsect2>
+ * <title>Examples</title>
+ * |[
+ * gst-launch-1.0 -v audiotestsrc ! srtserversink
+ * ]| This pipeline shows how to serve SRT packets through the default port.
+
+ * |[
+ * gst-launch-1.0 -v audiotestsrc ! srtserversink uri=srt://192.168.1.10:8888/ rendez-vous=1
+ * ]| This pipeline shows how to serve SRT packets to 192.168.1.10 port 8888 using the rendez-vous mode.
+ * </refsect2>
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstsrtclientsink.h"
+#include "gstsrt.h"
+#include <srt/srt.h>
+#include <gio/gio.h>
+
+#define SRT_DEFAULT_POLL_TIMEOUT -1
+
+static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+#define GST_CAT_DEFAULT gst_debug_srt_client_sink
+GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
+
+struct _GstSRTClientSinkPrivate
+{
+ SRTSOCKET sock;
+ GSocketAddress *sockaddr;
+ gint poll_id;
+ gint poll_timeout;
+
+ gboolean rendez_vous;
+ gchar *bind_address;
+ guint16 bind_port;
+};
+
+#define GST_SRT_CLIENT_SINK_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkPrivate))
+
+enum
+{
+ PROP_POLL_TIMEOUT = 1,
+ PROP_BIND_ADDRESS,
+ PROP_BIND_PORT,
+ PROP_RENDEZ_VOUS,
+ PROP_STATS,
+ /*< private > */
+ PROP_LAST
+};
+
+static GParamSpec *properties[PROP_LAST];
+
+#define gst_srt_client_sink_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstSRTClientSink, gst_srt_client_sink,
+ GST_TYPE_SRT_BASE_SINK, G_ADD_PRIVATE (GstSRTClientSink)
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtclientsink", 0,
+ "SRT Client Sink"));
+
+static void
+gst_srt_client_sink_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstSRTClientSink *self = GST_SRT_CLIENT_SINK (object);
+ GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
+
+ switch (prop_id) {
+ case PROP_POLL_TIMEOUT:
+ g_value_set_int (value, priv->poll_timeout);
+ break;
+ case PROP_BIND_PORT:
+ g_value_set_int (value, priv->rendez_vous);
+ break;
+ case PROP_BIND_ADDRESS:
+ g_value_set_string (value, priv->bind_address);
+ break;
+ case PROP_RENDEZ_VOUS:
+ g_value_set_boolean (value, priv->bind_port);
+ break;
+ case PROP_STATS:
+ g_value_take_boxed (value, gst_srt_base_sink_get_stats (priv->sockaddr,
+ priv->sock));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_client_sink_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstSRTClientSink *self = GST_SRT_CLIENT_SINK (object);
+ GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
+
+ switch (prop_id) {
+ case PROP_POLL_TIMEOUT:
+ priv->poll_timeout = g_value_get_int (value);
+ break;
+ case PROP_BIND_ADDRESS:
+ g_free (priv->bind_address);
+ priv->bind_address = g_value_dup_string (value);
+ break;
+ case PROP_BIND_PORT:
+ priv->bind_port = g_value_get_int (value);
+ break;
+ case PROP_RENDEZ_VOUS:
+ priv->rendez_vous = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean
+gst_srt_client_sink_start (GstBaseSink * sink)
+{
+ GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink);
+ GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
+ GstSRTBaseSink *base = GST_SRT_BASE_SINK (sink);
+ GstUri *uri = gst_uri_ref (GST_SRT_BASE_SINK (self)->uri);
+
+ priv->sock = gst_srt_client_connect (GST_ELEMENT (sink), FALSE,
+ gst_uri_get_host (uri), gst_uri_get_port (uri), priv->rendez_vous,
+ priv->bind_address, priv->bind_port, base->latency,
+ &priv->sockaddr, &priv->poll_id);
+
+ g_clear_pointer (&uri, gst_uri_unref);
+
+ return (priv->sock != SRT_INVALID_SOCK);
+}
+
+static gboolean
+gst_srt_client_sink_send_buffer (GstSRTBaseSink * sink,
+ const GstMapInfo * mapinfo)
+{
+ GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink);
+ GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
+
+ if (srt_sendmsg2 (priv->sock, (char *) mapinfo->data, mapinfo->size,
+ 0) == SRT_ERROR) {
+ GST_ELEMENT_ERROR (self, RESOURCE, WRITE, NULL,
+ ("%s", srt_getlasterror_str ()));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
+gst_srt_client_sink_stop (GstBaseSink * sink)
+{
+ GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink);
+ GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
+
+ GST_DEBUG_OBJECT (self, "closing SRT connection");
+
+ if (priv->poll_id != SRT_ERROR) {
+ srt_epoll_remove_usock (priv->poll_id, priv->sock);
+ srt_epoll_release (priv->poll_id);
+ priv->poll_id = SRT_ERROR;
+ }
+
+ if (priv->sock != SRT_INVALID_SOCK) {
+ srt_close (priv->sock);
+ priv->sock = SRT_INVALID_SOCK;
+ }
+
+ g_clear_object (&priv->sockaddr);
+
+ return TRUE;
+}
+
+static void
+gst_srt_client_sink_class_init (GstSRTClientSinkClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+ GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
+ GstSRTBaseSinkClass *gstsrtbasesink_class = GST_SRT_BASE_SINK_CLASS (klass);
+
+ gobject_class->set_property = gst_srt_client_sink_set_property;
+ gobject_class->get_property = gst_srt_client_sink_get_property;
+
+ properties[PROP_POLL_TIMEOUT] =
+ g_param_spec_int ("poll-timeout", "Poll Timeout",
+ "Return poll wait after timeout miliseconds (-1 = infinite)", -1,
+ G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_BIND_ADDRESS] =
+ g_param_spec_string ("bind-address", "Bind Address",
+ "Address to bind socket to (required for rendez-vous mode) ", NULL,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_BIND_PORT] =
+ g_param_spec_int ("bind-port", "Bind Port",
+ "Port to bind socket to (Ignored in rendez-vous mode)", 0,
+ G_MAXUINT16, 0,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_RENDEZ_VOUS] =
+ g_param_spec_boolean ("rendez-vous", "Rendez Vous",
+ "Work in Rendez-Vous mode instead of client/caller mode", FALSE,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_STATS] = g_param_spec_boxed ("stats", "Statistics",
+ "SRT Statistics", GST_TYPE_STRUCTURE,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, PROP_LAST, properties);
+
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
+ gst_element_class_set_metadata (gstelement_class,
+ "SRT client sink", "Sink/Network",
+ "Send data over the network via SRT",
+ "Justin Kim <justin.kim@collabora.com>");
+
+ gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_srt_client_sink_start);
+ gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_client_sink_stop);
+
+ gstsrtbasesink_class->send_buffer =
+ GST_DEBUG_FUNCPTR (gst_srt_client_sink_send_buffer);
+}
+
+static void
+gst_srt_client_sink_init (GstSRTClientSink * self)
+{
+ GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
+ priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT;
+}
diff --git a/ext/srt/gstsrtclientsink.h b/ext/srt/gstsrtclientsink.h
new file mode 100644
index 0000000..e910050
--- /dev/null
+++ b/ext/srt/gstsrtclientsink.h
@@ -0,0 +1,59 @@
+/* GStreamer
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_SRT_CLIENT_SINK_H__
+#define __GST_SRT_CLIENT_SINK_H__
+
+#include "gstsrtbasesink.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SRT_CLIENT_SINK (gst_srt_client_sink_get_type ())
+#define GST_IS_SRT_CLIENT_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_CLIENT_SINK))
+#define GST_IS_SRT_CLIENT_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_CLIENT_SINK))
+#define GST_SRT_CLIENT_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkClass))
+#define GST_SRT_CLIENT_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSink))
+#define GST_SRT_CLIENT_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkClass))
+#define GST_SRT_CLIENT_SINK_CAST(obj) ((GstSRTClientSink*)(obj))
+#define GST_SRT_CLIENT_SINK_CLASS_CAST(klass) ((GstSRTClientSinkClass*)(klass))
+
+typedef struct _GstSRTClientSink GstSRTClientSink;
+typedef struct _GstSRTClientSinkClass GstSRTClientSinkClass;
+typedef struct _GstSRTClientSinkPrivate GstSRTClientSinkPrivate;
+
+struct _GstSRTClientSink {
+ GstSRTBaseSink parent;
+
+ /*< private >*/
+ gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstSRTClientSinkClass {
+ GstSRTBaseSinkClass parent_class;
+
+ gpointer _gst_reserved[GST_PADDING_LARGE];
+};
+
+GST_EXPORT
+GType gst_srt_client_sink_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_SRT_CLIENT_SINK_H__ */
diff --git a/ext/srt/gstsrtclientsrc.c b/ext/srt/gstsrtclientsrc.c
new file mode 100644
index 0000000..2ad35b9
--- /dev/null
+++ b/ext/srt/gstsrtclientsrc.c
@@ -0,0 +1,339 @@
+/* GStreamer SRT plugin based on libsrt
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-srtclientsrc
+ * @title: srtclientsrc
+ *
+ * srtclientsrc is a network source that reads <ulink url="http://www.srtalliance.org/">SRT</ulink>
+ * packets from the network. Although SRT is a protocol based on UDP, srtclientsrc works like
+ * a client socket of connection-oriented protocol.
+ *
+ * <refsect2>
+ * <title>Examples</title>
+ * |[
+ * gst-launch-1.0 -v srtclientsrc uri="srt://127.0.0.1:7001" ! fakesink
+ * ]| This pipeline shows how to connect SRT server by setting #GstSRTClientSrc:uri property.
+ *
+ * |[
+ * gst-launch-1.0 -v srtclientsrc uri="srt://192.168.1.10:7001" rendez-vous ! fakesink
+ * ]| This pipeline shows how to connect SRT server by setting #GstSRTClientSrc:uri property and using the rendez-vous mode.
+ * </refsect2>
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstsrtclientsrc.h"
+#include <srt/srt.h>
+#include <gio/gio.h>
+
+#include "gstsrt.h"
+
+#include <netinet/in.h>
+
+static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+#define GST_CAT_DEFAULT gst_debug_srt_client_src
+GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
+
+struct _GstSRTClientSrcPrivate
+{
+ SRTSOCKET sock;
+ gint poll_id;
+ gint poll_timeout;
+
+ gboolean rendez_vous;
+ gchar *bind_address;
+ guint16 bind_port;
+};
+
+#define GST_SRT_CLIENT_SRC_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcPrivate))
+
+#define SRT_DEFAULT_POLL_TIMEOUT -1
+enum
+{
+ PROP_POLL_TIMEOUT = 1,
+ PROP_BIND_ADDRESS,
+ PROP_BIND_PORT,
+ PROP_RENDEZ_VOUS,
+
+ /*< private > */
+ PROP_LAST
+};
+
+static GParamSpec *properties[PROP_LAST + 1];
+
+#define gst_srt_client_src_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstSRTClientSrc, gst_srt_client_src,
+ GST_TYPE_SRT_BASE_SRC, G_ADD_PRIVATE (GstSRTClientSrc)
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtclientsrc", 0,
+ "SRT Client Source"));
+
+static void
+gst_srt_client_src_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (object);
+ GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
+
+ switch (prop_id) {
+ case PROP_POLL_TIMEOUT:
+ g_value_set_int (value, priv->poll_timeout);
+ break;
+ case PROP_BIND_PORT:
+ g_value_set_int (value, priv->rendez_vous);
+ break;
+ case PROP_BIND_ADDRESS:
+ g_value_set_string (value, priv->bind_address);
+ break;
+ case PROP_RENDEZ_VOUS:
+ g_value_set_boolean (value, priv->bind_port);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_client_src_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object);
+ GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
+
+ switch (prop_id) {
+ case PROP_POLL_TIMEOUT:
+ priv->poll_timeout = g_value_get_int (value);
+ break;
+ case PROP_BIND_ADDRESS:
+ g_free (priv->bind_address);
+ priv->bind_address = g_value_dup_string (value);
+ break;
+ case PROP_BIND_PORT:
+ priv->bind_port = g_value_get_int (value);
+ break;
+ case PROP_RENDEZ_VOUS:
+ priv->rendez_vous = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_client_src_finalize (GObject * object)
+{
+ GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (object);
+ GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
+
+ if (priv->poll_id != SRT_ERROR) {
+ srt_epoll_release (priv->poll_id);
+ priv->poll_id = SRT_ERROR;
+ }
+
+ if (priv->sock != SRT_INVALID_SOCK) {
+ srt_close (priv->sock);
+ priv->sock = SRT_INVALID_SOCK;
+ }
+
+ g_free (priv->bind_address);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstFlowReturn
+gst_srt_client_src_fill (GstPushSrc * src, GstBuffer * outbuf)
+{
+ GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src);
+ GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstMapInfo info;
+ SRTSOCKET ready[2];
+ gint recv_len;
+
+ if (srt_epoll_wait (priv->poll_id, 0, 0, ready, &(int) {
+ 2}, priv->poll_timeout, 0, 0, 0, 0) == -1) {
+
+ /* Assuming that timeout error is normal */
+ if (srt_getlasterror (NULL) != SRT_ETIMEOUT) {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ,
+ (NULL), ("srt_epoll_wait error: %s", srt_getlasterror_str ()));
+ ret = GST_FLOW_ERROR;
+ }
+ srt_clearlasterror ();
+ goto out;
+ }
+
+ if (!gst_buffer_map (outbuf, &info, GST_MAP_WRITE)) {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ,
+ ("Could not map the buffer for writing "), (NULL));
+ ret = GST_FLOW_ERROR;
+ goto out;
+ }
+
+ recv_len = srt_recvmsg (priv->sock, (char *) info.data,
+ gst_buffer_get_size (outbuf));
+
+ gst_buffer_unmap (outbuf, &info);
+
+ if (recv_len == SRT_ERROR) {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ,
+ (NULL), ("srt_recvmsg error: %s", srt_getlasterror_str ()));
+ ret = GST_FLOW_ERROR;
+ goto out;
+ } else if (recv_len == 0) {
+ ret = GST_FLOW_EOS;
+ goto out;
+ }
+
+ GST_BUFFER_PTS (outbuf) =
+ gst_clock_get_time (GST_ELEMENT_CLOCK (src)) -
+ GST_ELEMENT_CAST (src)->base_time;
+
+ gst_buffer_resize (outbuf, 0, recv_len);
+
+ GST_LOG_OBJECT (src,
+ "filled buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
+ GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
+ ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
+ gst_buffer_get_size (outbuf),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
+ GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
+
+out:
+ return ret;
+}
+
+static gboolean
+gst_srt_client_src_start (GstBaseSrc * src)
+{
+ GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src);
+ GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
+ GstSRTBaseSrc *base = GST_SRT_BASE_SRC (src);
+ GstUri *uri = gst_uri_ref (base->uri);
+ GSocketAddress *socket_address = NULL;
+
+ priv->sock = gst_srt_client_connect (GST_ELEMENT (src), FALSE,
+ gst_uri_get_host (uri), gst_uri_get_port (uri), priv->rendez_vous,
+ priv->bind_address, priv->bind_port, base->latency,
+ &socket_address, &priv->poll_id);
+
+ g_clear_object (&socket_address);
+ g_clear_pointer (&uri, gst_uri_unref);
+
+ return (priv->sock != SRT_INVALID_SOCK);
+}
+
+static gboolean
+gst_srt_client_src_stop (GstBaseSrc * src)
+{
+ GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src);
+ GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
+
+ if (priv->poll_id != SRT_ERROR) {
+ if (priv->sock != SRT_INVALID_SOCK)
+ srt_epoll_remove_usock (priv->poll_id, priv->sock);
+ srt_epoll_release (priv->poll_id);
+ }
+ priv->poll_id = SRT_ERROR;
+
+ GST_DEBUG_OBJECT (self, "closing SRT connection");
+ if (priv->sock != SRT_INVALID_SOCK)
+ srt_close (priv->sock);
+ priv->sock = SRT_INVALID_SOCK;
+
+ return TRUE;
+}
+
+static void
+gst_srt_client_src_class_init (GstSRTClientSrcClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+ GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
+ GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
+
+ gobject_class->set_property = gst_srt_client_src_set_property;
+ gobject_class->get_property = gst_srt_client_src_get_property;
+ gobject_class->finalize = gst_srt_client_src_finalize;
+
+ /**
+ * GstSRTClientSrc:poll-timeout:
+ *
+ * The timeout(ms) value when polling SRT socket.
+ */
+ properties[PROP_POLL_TIMEOUT] =
+ g_param_spec_int ("poll-timeout", "Poll timeout",
+ "Return poll wait after timeout miliseconds (-1 = infinite)", -1,
+ G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_BIND_ADDRESS] =
+ g_param_spec_string ("bind-address", "Bind Address",
+ "Address to bind socket to (required for rendez-vous mode) ", NULL,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_BIND_PORT] =
+ g_param_spec_int ("bind-port", "Bind Port",
+ "Port to bind socket to (Ignored in rendez-vous mode)", 0,
+ G_MAXUINT16, 0,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_RENDEZ_VOUS] =
+ g_param_spec_boolean ("rendez-vous", "Rendez Vous",
+ "Work in Rendez-Vous mode instead of client/caller mode", FALSE,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, PROP_LAST, properties);
+
+ gst_element_class_add_static_pad_template (gstelement_class, &src_template);
+ gst_element_class_set_metadata (gstelement_class,
+ "SRT client source", "Source/Network",
+ "Receive data over the network via SRT",
+ "Justin Kim <justin.kim@collabora.com>");
+
+ gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_srt_client_src_start);
+ gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_client_src_stop);
+
+ gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_client_src_fill);
+}
+
+static void
+gst_srt_client_src_init (GstSRTClientSrc * self)
+{
+ GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
+
+ priv->sock = SRT_INVALID_SOCK;
+ priv->poll_id = SRT_ERROR;
+ priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT;
+ priv->rendez_vous = FALSE;
+ priv->bind_address = NULL;
+ priv->bind_port = 0;
+}
diff --git a/ext/srt/gstsrtclientsrc.h b/ext/srt/gstsrtclientsrc.h
new file mode 100644
index 0000000..b2003b6
--- /dev/null
+++ b/ext/srt/gstsrtclientsrc.h
@@ -0,0 +1,59 @@
+/* GStreamer
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_SRT_CLIENT_SRC_H__
+#define __GST_SRT_CLIENT_SRC_H__
+
+#include "gstsrtbasesrc.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SRT_CLIENT_SRC (gst_srt_client_src_get_type ())
+#define GST_IS_SRT_CLIENT_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_CLIENT_SRC))
+#define GST_IS_SRT_CLIENT_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_CLIENT_SRC))
+#define GST_SRT_CLIENT_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcClass))
+#define GST_SRT_CLIENT_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrc))
+#define GST_SRT_CLIENT_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcClass))
+#define GST_SRT_CLIENT_SRC_CAST(obj) ((GstSRTClientSrc*)(obj))
+#define GST_SRT_CLIENT_SRC_CLASS_CAST(klass) ((GstSRTClientSrcClass*)(klass))
+
+typedef struct _GstSRTClientSrc GstSRTClientSrc;
+typedef struct _GstSRTClientSrcClass GstSRTClientSrcClass;
+typedef struct _GstSRTClientSrcPrivate GstSRTClientSrcPrivate;
+
+struct _GstSRTClientSrc {
+ GstSRTBaseSrc parent;
+
+ /*< private >*/
+ gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstSRTClientSrcClass {
+ GstSRTBaseSrcClass parent_class;
+
+ gpointer _gst_reserved[GST_PADDING_LARGE];
+};
+
+GST_EXPORT
+GType gst_srt_client_src_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_SRT_CLIENT_SRC_H__ */
diff --git a/ext/srt/gstsrtserversink.c b/ext/srt/gstsrtserversink.c
new file mode 100644
index 0000000..a4095a0
--- /dev/null
+++ b/ext/srt/gstsrtserversink.c
@@ -0,0 +1,555 @@
+/* GStreamer SRT plugin based on libsrt
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-srtserversink
+ * @title: srtserversink
+ *
+ * srtserversink is a network sink that sends <ulink url="http://www.srtalliance.org/">SRT</ulink>
+ * packets to the network. Although SRT is an UDP-based protocol, srtserversink works like
+ * a server socket of connection-oriented protocol.
+ *
+ * <refsect2>
+ * <title>Examples</title>
+ * |[
+ * gst-launch-1.0 -v audiotestsrc ! srtserversink
+ * ]| This pipeline shows how to serve SRT packets through the default port.
+ * </refsect2>
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstsrtserversink.h"
+#include "gstsrt.h"
+#include <srt/srt.h>
+#include <gio/gio.h>
+
+#define SRT_DEFAULT_POLL_TIMEOUT -1
+
+static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+#define GST_CAT_DEFAULT gst_debug_srt_server_sink
+GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
+
+struct _GstSRTServerSinkPrivate
+{
+ gboolean cancelled;
+
+ SRTSOCKET sock;
+ gint poll_id;
+ gint poll_timeout;
+
+ GMainLoop *loop;
+ GMainContext *context;
+ GSource *server_source;
+ GThread *thread;
+
+ GList *clients;
+};
+
+#define GST_SRT_SERVER_SINK_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkPrivate))
+
+enum
+{
+ PROP_POLL_TIMEOUT = 1,
+ PROP_STATS,
+ /*< private > */
+ PROP_LAST
+};
+
+static GParamSpec *properties[PROP_LAST];
+
+enum
+{
+ SIG_CLIENT_ADDED,
+ SIG_CLIENT_REMOVED,
+
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+#define gst_srt_server_sink_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstSRTServerSink, gst_srt_server_sink,
+ GST_TYPE_SRT_BASE_SINK, G_ADD_PRIVATE (GstSRTServerSink)
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtserversink", 0,
+ "SRT Server Sink"));
+
+typedef struct
+{
+ int sock;
+ GSocketAddress *sockaddr;
+} SRTClient;
+
+static SRTClient *
+srt_client_new (void)
+{
+ SRTClient *client = g_new0 (SRTClient, 1);
+ client->sock = SRT_INVALID_SOCK;
+ return client;
+}
+
+static void
+srt_client_free (SRTClient * client)
+{
+ g_return_if_fail (client != NULL);
+
+ g_clear_object (&client->sockaddr);
+
+ if (client->sock != SRT_INVALID_SOCK) {
+ srt_close (client->sock);
+ }
+
+ g_free (client);
+}
+
+static void
+srt_emit_client_removed (SRTClient * client, gpointer user_data)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (user_data);
+ g_return_if_fail (client != NULL && GST_IS_SRT_SERVER_SINK (self));
+
+ g_signal_emit (self, signals[SIG_CLIENT_REMOVED], 0, client->sock,
+ client->sockaddr);
+}
+
+static void
+gst_srt_server_sink_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (object);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+
+ switch (prop_id) {
+ case PROP_POLL_TIMEOUT:
+ g_value_set_int (value, priv->poll_timeout);
+ break;
+ case PROP_STATS:
+ {
+ GList *item;
+
+ GST_OBJECT_LOCK (self);
+ for (item = priv->clients; item; item = item->next) {
+ SRTClient *client = item->data;
+ GValue tmp = G_VALUE_INIT;
+
+ g_value_init (&tmp, GST_TYPE_STRUCTURE);
+ g_value_take_boxed (&tmp, gst_srt_base_sink_get_stats (client->sockaddr,
+ client->sock));
+ gst_value_array_append_and_take_value (value, &tmp);
+ }
+ GST_OBJECT_UNLOCK (self);
+ break;
+ }
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_server_sink_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (object);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+
+ switch (prop_id) {
+ case PROP_POLL_TIMEOUT:
+ priv->poll_timeout = g_value_get_int (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean
+idle_listen_callback (gpointer data)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (data);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+ gboolean ret = TRUE;
+
+ SRTClient *client;
+ SRTSOCKET ready[2];
+ struct sockaddr sa;
+ int sa_len;
+
+ if (srt_epoll_wait (priv->poll_id, ready, &(int) {
+ 2}, 0, 0, priv->poll_timeout, 0, 0, 0, 0) == -1) {
+ int srt_errno = srt_getlasterror (NULL);
+
+ if (srt_errno != SRT_ETIMEOUT) {
+ GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
+ ("SRT error: %s", srt_getlasterror_str ()), (NULL));
+ ret = FALSE;
+ goto out;
+ }
+
+ /* Mimicking cancellable */
+ if (srt_errno == SRT_ETIMEOUT && priv->cancelled) {
+ GST_DEBUG_OBJECT (self, "Cancelled waiting for client");
+ ret = FALSE;
+ goto out;
+ }
+ }
+
+ client = srt_client_new ();
+ client->sock = srt_accept (priv->sock, &sa, &sa_len);
+
+ if (client->sock == SRT_INVALID_SOCK) {
+ GST_WARNING_OBJECT (self, "detected invalid SRT client socket (reason: %s)",
+ srt_getlasterror_str ());
+ srt_clearlasterror ();
+ srt_client_free (client);
+ ret = FALSE;
+ goto out;
+ }
+
+ client->sockaddr = g_socket_address_new_from_native (&sa, sa_len);
+
+ GST_OBJECT_LOCK (self);
+ priv->clients = g_list_append (priv->clients, client);
+ GST_OBJECT_UNLOCK (self);
+
+ g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0, client->sock,
+ client->sockaddr);
+ GST_DEBUG_OBJECT (self, "client added");
+
+out:
+ return ret;
+}
+
+static gpointer
+thread_func (gpointer data)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (data);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+
+ g_main_loop_run (priv->loop);
+
+ return NULL;
+}
+
+static gboolean
+gst_srt_server_sink_start (GstBaseSink * sink)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+ GstSRTBaseSink *base = GST_SRT_BASE_SINK (sink);
+ GstUri *uri = gst_uri_ref (GST_SRT_BASE_SINK (self)->uri);
+ GSocketAddress *socket_address = NULL;
+ GError *error = NULL;
+ gboolean ret = TRUE;
+ struct sockaddr sa;
+ size_t sa_len;
+ const gchar *host;
+ int lat = base->latency;
+
+ if (gst_uri_get_port (uri) == GST_URI_NO_PORT) {
+ GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, NULL, (("Invalid port")));
+ return FALSE;
+ }
+
+ host = gst_uri_get_host (uri);
+ if (host == NULL) {
+ GInetAddress *any = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
+
+ socket_address = g_inet_socket_address_new (any, gst_uri_get_port (uri));
+ g_object_unref (any);
+ } else {
+ socket_address =
+ g_inet_socket_address_new_from_string (host, gst_uri_get_port (uri));
+ }
+
+ if (socket_address == NULL) {
+ GST_WARNING_OBJECT (self,
+ "failed to extract host or port from the given URI");
+ goto failed;
+ }
+
+ sa_len = g_socket_address_get_native_size (socket_address);
+ if (!g_socket_address_to_native (socket_address, &sa, sa_len, &error)) {
+ GST_WARNING_OBJECT (self, "cannot resolve address (reason: %s)",
+ error->message);
+ goto failed;
+ }
+
+ priv->sock = srt_socket (sa.sa_family, SOCK_DGRAM, 0);
+ if (priv->sock == SRT_INVALID_SOCK) {
+ GST_WARNING_OBJECT (self, "failed to create SRT socket (reason: %s)",
+ srt_getlasterror_str ());
+ goto failed;
+ }
+
+ /* Make SRT non-blocking */
+ srt_setsockopt (priv->sock, 0, SRTO_SNDSYN, &(int) {
+ 0}, sizeof (int));
+
+ /* Make sure TSBPD mode is enable (SRT mode) */
+ srt_setsockopt (priv->sock, 0, SRTO_TSBPDMODE, &(int) {
+ 1}, sizeof (int));
+
+ /* This is a sink, we're always a sender */
+ srt_setsockopt (priv->sock, 0, SRTO_SENDER, &(int) {
+ 1}, sizeof (int));
+
+ srt_setsockopt (priv->sock, 0, SRTO_TSBPDDELAY, &lat, sizeof (int));
+
+ priv->poll_id = srt_epoll_create ();
+ if (priv->poll_id == -1) {
+ GST_WARNING_OBJECT (self,
+ "failed to create poll id for SRT socket (reason: %s)",
+ srt_getlasterror_str ());
+ goto failed;
+ }
+ srt_epoll_add_usock (priv->poll_id, priv->sock, &(int) {
+ SRT_EPOLL_IN});
+
+ if (srt_bind (priv->sock, &sa, sa_len) == SRT_ERROR) {
+ GST_WARNING_OBJECT (self, "failed to bind SRT server socket (reason: %s)",
+ srt_getlasterror_str ());
+ goto failed;
+ }
+
+ if (srt_listen (priv->sock, 1) == SRT_ERROR) {
+ GST_WARNING_OBJECT (self, "failed to listen SRT socket (reason: %s)",
+ srt_getlasterror_str ());
+ goto failed;
+ }
+
+ priv->context = g_main_context_new ();
+
+ priv->server_source = g_idle_source_new ();
+ g_source_set_callback (priv->server_source,
+ (GSourceFunc) idle_listen_callback, gst_object_ref (self),
+ (GDestroyNotify) gst_object_unref);
+
+ g_source_attach (priv->server_source, priv->context);
+ priv->loop = g_main_loop_new (priv->context, TRUE);
+
+ priv->thread = g_thread_try_new ("srtserversink", thread_func, self, &error);
+ if (error != NULL) {
+ GST_WARNING_OBJECT (self, "failed to create thread (reason: %s)",
+ error->message);
+ ret = FALSE;
+ }
+
+ g_clear_pointer (&uri, gst_uri_unref);
+ g_clear_object (&socket_address);
+
+ return ret;
+
+failed:
+ if (priv->poll_id != SRT_ERROR) {
+ srt_epoll_release (priv->poll_id);
+ priv->poll_id = SRT_ERROR;
+ }
+
+ if (priv->sock != SRT_INVALID_SOCK) {
+ srt_close (priv->sock);
+ priv->sock = SRT_INVALID_SOCK;
+ }
+
+ g_clear_error (&error);
+ g_clear_pointer (&uri, gst_uri_unref);
+ g_clear_object (&socket_address);
+
+ return FALSE;
+}
+
+static gboolean
+gst_srt_server_sink_send_buffer (GstSRTBaseSink * sink,
+ const GstMapInfo * mapinfo)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+ GList *clients = priv->clients;
+
+ GST_OBJECT_LOCK (sink);
+ while (clients != NULL) {
+ SRTClient *client = clients->data;
+ clients = clients->next;
+
+ if (srt_sendmsg2 (client->sock, (char *) mapinfo->data, mapinfo->size,
+ 0) == SRT_ERROR) {
+ GST_WARNING_OBJECT (self, "%s", srt_getlasterror_str ());
+
+ priv->clients = g_list_remove (priv->clients, client);
+ GST_OBJECT_UNLOCK (sink);
+ g_signal_emit (self, signals[SIG_CLIENT_REMOVED], 0, client->sock,
+ client->sockaddr);
+ srt_client_free (client);
+ GST_OBJECT_LOCK (sink);
+ }
+ }
+ GST_OBJECT_UNLOCK (sink);
+
+ return TRUE;
+}
+
+static gboolean
+gst_srt_server_sink_stop (GstBaseSink * sink)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+ gboolean ret = TRUE;
+ GList *clients;
+
+ GST_DEBUG_OBJECT (self, "closing client sockets");
+
+ GST_OBJECT_LOCK (sink);
+ clients = priv->clients;
+ priv->clients = NULL;
+ GST_OBJECT_UNLOCK (sink);
+
+ g_list_foreach (clients, (GFunc) srt_emit_client_removed, self);
+ g_list_free_full (clients, (GDestroyNotify) srt_client_free);
+
+ GST_DEBUG_OBJECT (self, "closing SRT connection");
+ srt_epoll_remove_usock (priv->poll_id, priv->sock);
+ srt_epoll_release (priv->poll_id);
+ srt_close (priv->sock);
+
+ if (priv->loop) {
+ g_main_loop_quit (priv->loop);
+ g_thread_join (priv->thread);
+ g_clear_pointer (&priv->loop, g_main_loop_unref);
+ g_clear_pointer (&priv->thread, g_thread_unref);
+ }
+
+ if (priv->server_source) {
+ g_source_destroy (priv->server_source);
+ g_clear_pointer (&priv->server_source, g_source_unref);
+ }
+
+ g_clear_pointer (&priv->context, g_main_context_unref);
+
+ return ret;
+}
+
+static gboolean
+gst_srt_server_sink_unlock (GstBaseSink * sink)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+
+ priv->cancelled = TRUE;
+
+ return TRUE;
+}
+
+static gboolean
+gst_srt_server_sink_unlock_stop (GstBaseSink * sink)
+{
+ GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+
+ priv->cancelled = FALSE;
+
+ return TRUE;
+}
+
+static void
+gst_srt_server_sink_class_init (GstSRTServerSinkClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+ GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
+ GstSRTBaseSinkClass *gstsrtbasesink_class = GST_SRT_BASE_SINK_CLASS (klass);
+
+ gobject_class->set_property = gst_srt_server_sink_set_property;
+ gobject_class->get_property = gst_srt_server_sink_get_property;
+
+ properties[PROP_POLL_TIMEOUT] =
+ g_param_spec_int ("poll-timeout", "Poll Timeout",
+ "Return poll wait after timeout miliseconds (-1 = infinite)", -1,
+ G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_STATS] = gst_param_spec_array ("stats", "Statistics",
+ "Array of GstStructures containing SRT statistics",
+ g_param_spec_boxed ("stats", "Statistics",
+ "Statistics for one client", GST_TYPE_STRUCTURE,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS),
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, PROP_LAST, properties);
+
+ /**
+ * GstSRTServerSink::client-added:
+ * @gstsrtserversink: the srtserversink element that emitted this signal
+ * @sock: the client socket descriptor that was added to srtserversink
+ * @addr: the pointer of "struct sockaddr" that describes the @sock
+ * @addr_len: the length of @addr
+ *
+ * The given socket descriptor was added to srtserversink.
+ */
+ signals[SIG_CLIENT_ADDED] =
+ g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSinkClass, client_added),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
+ 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
+
+ /**
+ * GstSRTServerSink::client-removed:
+ * @gstsrtserversink: the srtserversink element that emitted this signal
+ * @sock: the client socket descriptor that was added to srtserversink
+ * @addr: the pointer of "struct sockaddr" that describes the @sock
+ * @addr_len: the length of @addr
+ *
+ * The given socket descriptor was removed from srtserversink.
+ */
+ signals[SIG_CLIENT_REMOVED] =
+ g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSinkClass,
+ client_removed), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
+ 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
+
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
+ gst_element_class_set_metadata (gstelement_class,
+ "SRT server sink", "Sink/Network",
+ "Send data over the network via SRT",
+ "Justin Kim <justin.kim@collabora.com>");
+
+ gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_srt_server_sink_start);
+ gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_server_sink_stop);
+ gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_server_sink_unlock);
+ gstbasesink_class->unlock_stop =
+ GST_DEBUG_FUNCPTR (gst_srt_server_sink_unlock_stop);
+
+ gstsrtbasesink_class->send_buffer =
+ GST_DEBUG_FUNCPTR (gst_srt_server_sink_send_buffer);
+}
+
+static void
+gst_srt_server_sink_init (GstSRTServerSink * self)
+{
+ GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
+ priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT;
+}
diff --git a/ext/srt/gstsrtserversink.h b/ext/srt/gstsrtserversink.h
new file mode 100644
index 0000000..10eb52b
--- /dev/null
+++ b/ext/srt/gstsrtserversink.h
@@ -0,0 +1,63 @@
+/* GStreamer
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_SRT_SERVER_SINK_H__
+#define __GST_SRT_SERVER_SINK_H__
+
+#include "gstsrtbasesink.h"
+#include <sys/socket.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SRT_SERVER_SINK (gst_srt_server_sink_get_type ())
+#define GST_IS_SRT_SERVER_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_SERVER_SINK))
+#define GST_IS_SRT_SERVER_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_SERVER_SINK))
+#define GST_SRT_SERVER_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkClass))
+#define GST_SRT_SERVER_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSink))
+#define GST_SRT_SERVER_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkClass))
+#define GST_SRT_SERVER_SINK_CAST(obj) ((GstSRTServerSink*)(obj))
+#define GST_SRT_SERVER_SINK_CLASS_CAST(klass) ((GstSRTServerSinkClass*)(klass))
+
+typedef struct _GstSRTServerSink GstSRTServerSink;
+typedef struct _GstSRTServerSinkClass GstSRTServerSinkClass;
+typedef struct _GstSRTServerSinkPrivate GstSRTServerSinkPrivate;
+
+struct _GstSRTServerSink {
+ GstSRTBaseSink parent;
+
+ /*< private >*/
+ gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstSRTServerSinkClass {
+ GstSRTBaseSinkClass parent_class;
+
+ void (*client_added) (GstSRTServerSink *self, int sock, struct sockaddr *addr, int addr_len);
+ void (*client_removed) (GstSRTServerSink *self, int sock, struct sockaddr *addr, int addr_len);
+
+ gpointer _gst_reserved[GST_PADDING_LARGE];
+};
+
+GST_EXPORT
+GType gst_srt_server_sink_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_SRT_SERVER_SINK_H__ */
diff --git a/ext/srt/gstsrtserversrc.c b/ext/srt/gstsrtserversrc.c
new file mode 100644
index 0000000..9b0f586
--- /dev/null
+++ b/ext/srt/gstsrtserversrc.c
@@ -0,0 +1,504 @@
+/* GStreamer SRT plugin based on libsrt
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-srtserversrc
+ * @title: srtserversrc
+ *
+ * srtserversrc is a network source that reads <ulink url="http://www.srtalliance.org/">SRT</ulink>
+ * packets from the network. Although SRT is a protocol based on UDP, srtserversrc works like
+ * a server socket of connection-oriented protocol, but it accepts to only one client connection.
+ *
+ * <refsect2>
+ * <title>Examples</title>
+ * |[
+ * gst-launch-1.0 -v srtserversrc uri="srt://:7001" ! fakesink
+ * ]| This pipeline shows how to bind SRT server by setting #GstSRTServerSrc:uri property.
+ * </refsect2>
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstsrtserversrc.h"
+#include "gstsrt.h"
+#include <srt/srt.h>
+#include <gio/gio.h>
+
+#define SRT_DEFAULT_POLL_TIMEOUT 100
+
+static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+#define GST_CAT_DEFAULT gst_debug_srt_server_src
+GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
+
+struct _GstSRTServerSrcPrivate
+{
+ SRTSOCKET sock;
+ SRTSOCKET client_sock;
+ GSocketAddress *client_sockaddr;
+
+ gint poll_id;
+ gint poll_timeout;
+
+ gboolean has_client;
+ gboolean cancelled;
+};
+
+#define GST_SRT_SERVER_SRC_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcPrivate))
+
+enum
+{
+ PROP_POLL_TIMEOUT = 1,
+
+ /*< private > */
+ PROP_LAST
+};
+
+static GParamSpec *properties[PROP_LAST];
+
+enum
+{
+ SIG_CLIENT_ADDED,
+ SIG_CLIENT_CLOSED,
+
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+#define gst_srt_server_src_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstSRTServerSrc, gst_srt_server_src,
+ GST_TYPE_SRT_BASE_SRC, G_ADD_PRIVATE (GstSRTServerSrc)
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtserversrc", 0,
+ "SRT Server Source"));
+
+static void
+gst_srt_server_src_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object);
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+
+ switch (prop_id) {
+ case PROP_POLL_TIMEOUT:
+ g_value_set_int (value, priv->poll_timeout);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_server_src_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object);
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+
+ switch (prop_id) {
+ case PROP_POLL_TIMEOUT:
+ priv->poll_timeout = g_value_get_int (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_srt_server_src_finalize (GObject * object)
+{
+ GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object);
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+
+ if (priv->poll_id != SRT_ERROR) {
+ srt_epoll_release (priv->poll_id);
+ priv->poll_id = SRT_ERROR;
+ }
+
+ if (priv->sock != SRT_ERROR) {
+ srt_close (priv->sock);
+ priv->sock = SRT_ERROR;
+ }
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstFlowReturn
+gst_srt_server_src_fill (GstPushSrc * src, GstBuffer * outbuf)
+{
+ GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstMapInfo info;
+ SRTSOCKET ready[2];
+ gint recv_len;
+ struct sockaddr client_sa;
+ size_t client_sa_len;
+
+ while (!priv->has_client) {
+ GST_DEBUG_OBJECT (self, "poll wait (timeout: %d)", priv->poll_timeout);
+
+ /* Make SRT server socket non-blocking */
+ srt_setsockopt (priv->sock, 0, SRTO_SNDSYN, &(int) {
+ 0}, sizeof (int));
+
+ if (srt_epoll_wait (priv->poll_id, ready, &(int) {
+ 2}, 0, 0, priv->poll_timeout, 0, 0, 0, 0) == -1) {
+ int srt_errno = srt_getlasterror (NULL);
+
+ /* Assuming that timeout error is normal */
+ if (srt_errno != SRT_ETIMEOUT) {
+ GST_ELEMENT_ERROR (src, RESOURCE, FAILED,
+ ("SRT error: %s", srt_getlasterror_str ()), (NULL));
+
+ return GST_FLOW_ERROR;
+ }
+
+ /* Mimicking cancellable */
+ if (srt_errno == SRT_ETIMEOUT && priv->cancelled) {
+ GST_DEBUG_OBJECT (self, "Cancelled waiting for client");
+ return GST_FLOW_FLUSHING;
+ }
+
+ continue;
+ }
+
+ priv->client_sock =
+ srt_accept (priv->sock, &client_sa, (int *) &client_sa_len);
+
+ GST_DEBUG_OBJECT (self, "checking client sock");
+ if (priv->client_sock == SRT_INVALID_SOCK) {
+ GST_WARNING_OBJECT (self,
+ "detected invalid SRT client socket (reason: %s)",
+ srt_getlasterror_str ());
+ srt_clearlasterror ();
+ } else {
+ priv->has_client = TRUE;
+ g_clear_object (&priv->client_sockaddr);
+ priv->client_sockaddr = g_socket_address_new_from_native (&client_sa,
+ client_sa_len);
+ g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0,
+ priv->client_sock, priv->client_sockaddr);
+ }
+ }
+
+ GST_DEBUG_OBJECT (self, "filling buffer");
+
+ if (!gst_buffer_map (outbuf, &info, GST_MAP_WRITE)) {
+ GST_ELEMENT_ERROR (src, RESOURCE, WRITE,
+ ("Could not map the output stream"), (NULL));
+ ret = GST_FLOW_ERROR;
+ goto out;
+ }
+
+ recv_len = srt_recvmsg (priv->client_sock, (char *) info.data,
+ gst_buffer_get_size (outbuf));
+
+ gst_buffer_unmap (outbuf, &info);
+
+ if (recv_len == SRT_ERROR) {
+ GST_WARNING_OBJECT (self, "%s", srt_getlasterror_str ());
+
+ g_signal_emit (self, signals[SIG_CLIENT_CLOSED], 0,
+ priv->client_sock, priv->client_sockaddr);
+
+ srt_close (priv->client_sock);
+ priv->client_sock = SRT_INVALID_SOCK;
+ g_clear_object (&priv->client_sockaddr);
+ priv->has_client = FALSE;
+ gst_buffer_resize (outbuf, 0, 0);
+ ret = GST_FLOW_OK;
+ goto out;
+ } else if (recv_len == 0) {
+ ret = GST_FLOW_EOS;
+ goto out;
+ }
+
+ GST_BUFFER_PTS (outbuf) =
+ gst_clock_get_time (GST_ELEMENT_CLOCK (src)) -
+ GST_ELEMENT_CAST (src)->base_time;
+
+ gst_buffer_resize (outbuf, 0, recv_len);
+
+ GST_LOG_OBJECT (src,
+ "filled buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
+ GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
+ ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
+ gst_buffer_get_size (outbuf),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
+ GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
+
+out:
+ return ret;
+}
+
+static gboolean
+gst_srt_server_src_start (GstBaseSrc * src)
+{
+ GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+ GstSRTBaseSrc *base = GST_SRT_BASE_SRC (src);
+ GstUri *uri = gst_uri_ref (base->uri);
+ GError *error = NULL;
+ struct sockaddr sa;
+ size_t sa_len;
+ GSocketAddress *socket_address;
+ const gchar *host;
+ int lat = base->latency;
+
+ if (gst_uri_get_port (uri) == GST_URI_NO_PORT) {
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_WRITE, NULL, (("Invalid port")));
+ return FALSE;
+ }
+
+ host = gst_uri_get_host (uri);
+ if (host == NULL) {
+ GInetAddress *any = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
+
+ socket_address = g_inet_socket_address_new (any, gst_uri_get_port (uri));
+ g_object_unref (any);
+ } else {
+ socket_address =
+ g_inet_socket_address_new_from_string (host, gst_uri_get_port (uri));
+ }
+
+ if (socket_address == NULL) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, ("Invalid URI"),
+ ("failed to extract host or port from the given URI"));
+ goto failed;
+ }
+
+ sa_len = g_socket_address_get_native_size (socket_address);
+ if (!g_socket_address_to_native (socket_address, &sa, sa_len, &error)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, ("Invalid URI"),
+ ("cannot resolve address (reason: %s)", error->message));
+ goto failed;
+ }
+
+ priv->sock = srt_socket (sa.sa_family, SOCK_DGRAM, 0);
+ if (priv->sock == SRT_ERROR) {
+ GST_ELEMENT_ERROR (self, LIBRARY, INIT, (NULL),
+ ("failed to create poll id for SRT socket (reason: %s)",
+ srt_getlasterror_str ()));
+ goto failed;
+ }
+
+ /* Make sure TSBPD mode is enable (SRT mode) */
+ srt_setsockopt (priv->sock, 0, SRTO_TSBPDMODE, &(int) {
+ 1}, sizeof (int));
+
+ /* This is a sink, we're always a receiver */
+ srt_setsockopt (priv->sock, 0, SRTO_SENDER, &(int) {
+ 0}, sizeof (int));
+
+ srt_setsockopt (priv->sock, 0, SRTO_TSBPDDELAY, &lat, sizeof (int));
+
+ priv->poll_id = srt_epoll_create ();
+ if (priv->poll_id == -1) {
+ GST_ELEMENT_ERROR (self, LIBRARY, INIT, (NULL),
+ ("failed to create poll id for SRT socket (reason: %s)",
+ srt_getlasterror_str ()));
+ goto failed;
+ }
+
+ srt_epoll_add_usock (priv->poll_id, priv->sock, &(int) {
+ SRT_EPOLL_IN});
+
+ if (srt_bind (priv->sock, &sa, sa_len) == SRT_ERROR) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, (NULL),
+ ("failed to bind SRT server socket (reason: %s)",
+ srt_getlasterror_str ()));
+ goto failed;
+ }
+
+ if (srt_listen (priv->sock, 1) == SRT_ERROR) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, (NULL),
+ ("failed to listen SRT socket (reason: %s)", srt_getlasterror_str ()));
+ goto failed;
+ }
+
+ g_clear_pointer (&uri, gst_uri_unref);
+ g_clear_object (&socket_address);
+
+ return TRUE;
+
+failed:
+ if (priv->poll_id != SRT_ERROR) {
+ srt_epoll_release (priv->poll_id);
+ priv->poll_id = SRT_ERROR;
+ }
+
+ if (priv->sock != SRT_ERROR) {
+ srt_close (priv->sock);
+ priv->sock = SRT_ERROR;
+ }
+
+ g_clear_error (&error);
+ g_clear_pointer (&uri, gst_uri_unref);
+ g_clear_object (&socket_address);
+
+ return FALSE;
+}
+
+static gboolean
+gst_srt_server_src_stop (GstBaseSrc * src)
+{
+ GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+
+ if (priv->client_sock != SRT_INVALID_SOCK) {
+ g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0,
+ priv->client_sock, priv->client_sockaddr);
+ srt_close (priv->client_sock);
+ g_clear_object (&priv->client_sockaddr);
+ priv->client_sock = SRT_INVALID_SOCK;
+ priv->has_client = FALSE;
+ }
+
+ if (priv->poll_id != SRT_ERROR) {
+ srt_epoll_remove_usock (priv->poll_id, priv->sock);
+ srt_epoll_release (priv->poll_id);
+ priv->poll_id = SRT_ERROR;
+ }
+
+ if (priv->sock != SRT_INVALID_SOCK) {
+ GST_DEBUG_OBJECT (self, "closing SRT connection");
+ srt_close (priv->sock);
+ priv->sock = SRT_INVALID_SOCK;
+ }
+
+ priv->cancelled = FALSE;
+
+ return TRUE;
+}
+
+static gboolean
+gst_srt_server_src_unlock (GstBaseSrc * src)
+{
+ GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+
+ priv->cancelled = TRUE;
+
+ return TRUE;
+}
+
+static gboolean
+gst_srt_server_src_unlock_stop (GstBaseSrc * src)
+{
+ GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+
+ priv->cancelled = FALSE;
+
+ return TRUE;
+}
+
+static void
+gst_srt_server_src_class_init (GstSRTServerSrcClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+ GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
+ GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
+
+ gobject_class->set_property = gst_srt_server_src_set_property;
+ gobject_class->get_property = gst_srt_server_src_get_property;
+ gobject_class->finalize = gst_srt_server_src_finalize;
+
+ /**
+ * GstSRTServerSrc:poll-timeout:
+ *
+ * The timeout(ms) value when polling SRT socket. For #GstSRTServerSrc,
+ * this value shouldn't be set as -1 (infinite) because "srt_epoll_wait"
+ * isn't cancellable unless closing the socket.
+ */
+ properties[PROP_POLL_TIMEOUT] =
+ g_param_spec_int ("poll-timeout", "Poll timeout",
+ "Return poll wait after timeout miliseconds", 0, G_MAXINT32,
+ SRT_DEFAULT_POLL_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, PROP_LAST, properties);
+
+ /**
+ * GstSRTServerSrc::client-added:
+ * @gstsrtserversrc: the srtserversrc element that emitted this signal
+ * @sock: the client socket descriptor that was added to srtserversrc
+ * @addr: the pointer of "struct sockaddr" that describes the @sock
+ * @addr_len: the length of @addr
+ *
+ * The given socket descriptor was added to srtserversrc.
+ */
+ signals[SIG_CLIENT_ADDED] =
+ g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSrcClass, client_added),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
+ 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
+
+ /**
+ * GstSRTServerSrc::client-closed:
+ * @gstsrtserversrc: the srtserversrc element that emitted this signal
+ * @sock: the client socket descriptor that was added to srtserversrc
+ * @addr: the pointer of "struct sockaddr" that describes the @sock
+ * @addr_len: the length of @addr
+ *
+ * The given socket descriptor was closed.
+ */
+ signals[SIG_CLIENT_CLOSED] =
+ g_signal_new ("client-closed", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSrcClass, client_closed),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
+ 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
+
+ gst_element_class_add_static_pad_template (gstelement_class, &src_template);
+ gst_element_class_set_metadata (gstelement_class,
+ "SRT Server source", "Source/Network",
+ "Receive data over the network via SRT",
+ "Justin Kim <justin.kim@collabora.com>");
+
+ gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_srt_server_src_start);
+ gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_server_src_stop);
+ gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_server_src_unlock);
+ gstbasesrc_class->unlock_stop =
+ GST_DEBUG_FUNCPTR (gst_srt_server_src_unlock_stop);
+
+ gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_server_src_fill);
+}
+
+static void
+gst_srt_server_src_init (GstSRTServerSrc * self)
+{
+ GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
+
+ priv->sock = SRT_INVALID_SOCK;
+ priv->client_sock = SRT_INVALID_SOCK;
+ priv->poll_id = SRT_ERROR;
+ priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT;
+}
diff --git a/ext/srt/gstsrtserversrc.h b/ext/srt/gstsrtserversrc.h
new file mode 100644
index 0000000..098b5cd
--- /dev/null
+++ b/ext/srt/gstsrtserversrc.h
@@ -0,0 +1,63 @@
+/* GStreamer
+ * Copyright (C) 2017, Collabora Ltd.
+ * Author:Justin Kim <justin.kim@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_SRT_SERVER_SRC_H__
+#define __GST_SRT_SERVER_SRC_H__
+
+#include "gstsrtbasesrc.h"
+#include <sys/socket.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SRT_SERVER_SRC (gst_srt_server_src_get_type ())
+#define GST_IS_SRT_SERVER_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_SERVER_SRC))
+#define GST_IS_SRT_SERVER_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_SERVER_SRC))
+#define GST_SRT_SERVER_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcClass))
+#define GST_SRT_SERVER_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrc))
+#define GST_SRT_SERVER_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcClass))
+#define GST_SRT_SERVER_SRC_CAST(obj) ((GstSRTServerSrc*)(obj))
+#define GST_SRT_SERVER_SRC_CLASS_CAST(klass) ((GstSRTServerSrcClass*)(klass))
+
+typedef struct _GstSRTServerSrc GstSRTServerSrc;
+typedef struct _GstSRTServerSrcClass GstSRTServerSrcClass;
+typedef struct _GstSRTServerSrcPrivate GstSRTServerSrcPrivate;
+
+struct _GstSRTServerSrc {
+ GstSRTBaseSrc parent;
+
+ /*< private >*/
+ gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstSRTServerSrcClass {
+ GstSRTBaseSrcClass parent_class;
+
+ void (*client_added) (GstSRTServerSrc *self, int sock, struct sockaddr *addr, int addr_len);
+ void (*client_closed) (GstSRTServerSrc *self, int sock, struct sockaddr *addr, int addr_len);
+
+ gpointer _gst_reserved[GST_PADDING_LARGE];
+};
+
+GST_EXPORT
+GType gst_srt_server_src_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_SRT_SERVER_SRC_H__ */
diff --git a/ext/srt/meson.build b/ext/srt/meson.build
new file mode 100644
index 0000000..0d2835f
--- /dev/null
+++ b/ext/srt/meson.build
@@ -0,0 +1,27 @@
+srt_sources = [
+ 'gstsrt.c',
+ 'gstsrtbasesrc.c',
+ 'gstsrtclientsrc.c',
+ 'gstsrtserversrc.c',
+ 'gstsrtbasesink.c',
+ 'gstsrtclientsink.c',
+ 'gstsrtserversink.c',
+]
+
+srt_dep = dependency('libsrt', required : false)
+
+if not srt_dep.found() and cc.has_header_symbol('srt/srt.h', 'srt_startup')
+ srt_dep = cc.find_library('srt', required : false)
+endif
+
+if srt_dep.found()
+ gstsrt = library('gstsrt',
+ srt_sources,
+ c_args : gst_plugins_bad_args,
+ link_args : noseh_link_args,
+ include_directories : [configinc, libsinc],
+ dependencies : [gstbase_dep, gio_dep, srt_dep],
+ install : true,
+ install_dir : plugins_install_dir,
+ )
+endif