| /* GStreamer plugin for forward error correction |
| * Copyright (C) 2017 Pexip |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Lesser General Public |
| * License as published by the Free Software Foundation; either |
| * version 2.1 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Lesser General Public License for more details. |
| * |
| * You should have received a copy of the GNU Lesser General Public |
| * License along with this library; if not, write to the Free Software |
| * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| * |
| * Author: Mikhail Fludkov <misha@pexip.com> |
| */ |
| |
| #include <gst/rtp/gstrtpbuffer.h> |
| |
| #include "rtpstorage.h" |
| #include "rtpstoragestream.h" |
| |
| enum |
| { |
| SIGNAL_PACKET_RECOVERED, |
| LAST_SIGNAL, |
| }; |
| |
| static guint rtp_storage_signals[LAST_SIGNAL] = { 0 }; |
| |
| G_DEFINE_TYPE (RtpStorage, rtp_storage, G_TYPE_OBJECT); |
| |
| #define STORAGE_LOCK(s) g_mutex_lock (&(s)->streams_lock) |
| #define STORAGE_UNLOCK(s) g_mutex_unlock (&(s)->streams_lock) |
| #define DEFAULT_SIZE_TIME (0) |
| |
| static void |
| rtp_storage_init (RtpStorage * self) |
| { |
| self->size_time = DEFAULT_SIZE_TIME; |
| self->streams = g_hash_table_new_full (NULL, NULL, NULL, |
| (GDestroyNotify) rtp_storage_stream_free); |
| g_mutex_init (&self->streams_lock); |
| } |
| |
| static void |
| rtp_storage_dispose (GObject * obj) |
| { |
| RtpStorage *self = RTP_STORAGE (obj); |
| STORAGE_LOCK (self); |
| g_hash_table_unref (self->streams); |
| self->streams = NULL; |
| STORAGE_UNLOCK (self); |
| g_mutex_clear (&self->streams_lock); |
| G_OBJECT_CLASS (rtp_storage_parent_class)->dispose (obj); |
| } |
| |
| static void |
| rtp_storage_class_init (RtpStorageClass * klass) |
| { |
| GObjectClass *gobject_class = G_OBJECT_CLASS (klass); |
| |
| rtp_storage_signals[SIGNAL_PACKET_RECOVERED] = |
| g_signal_new ("packet-recovered", G_TYPE_FROM_CLASS (klass), |
| G_SIGNAL_RUN_LAST, 0, NULL, NULL, |
| g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_BUFFER); |
| |
| gobject_class->dispose = rtp_storage_dispose; |
| } |
| |
| GstBufferList * |
| rtp_storage_get_packets_for_recovery (RtpStorage * self, gint fec_pt, |
| guint32 ssrc, guint16 lost_seq) |
| { |
| GstBufferList *ret = NULL; |
| RtpStorageStream *stream; |
| |
| STORAGE_LOCK (self); |
| stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc)); |
| STORAGE_UNLOCK (self); |
| |
| if (NULL == stream) { |
| GST_ERROR_OBJECT (self, "Cant find ssrc = 0x%x", ssrc); |
| } else { |
| STREAM_LOCK (stream); |
| if (stream->queue.length > 0) { |
| ret = |
| rtp_storage_stream_get_packets_for_recovery (stream, fec_pt, |
| lost_seq); |
| } |
| STREAM_UNLOCK (stream); |
| } |
| |
| return ret; |
| } |
| |
| GstBuffer * |
| rtp_storage_get_redundant_packet (RtpStorage * self, guint32 ssrc, |
| guint16 lost_seq) |
| { |
| GstBuffer *ret = NULL; |
| RtpStorageStream *stream; |
| |
| STORAGE_LOCK (self); |
| stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc)); |
| STORAGE_UNLOCK (self); |
| |
| if (NULL == stream) { |
| GST_ERROR_OBJECT (self, "Cant find ssrc = 0x%x", ssrc); |
| } else { |
| STREAM_LOCK (stream); |
| if (stream->queue.length > 0) { |
| ret = rtp_storage_stream_get_redundant_packet (stream, lost_seq); |
| } |
| STREAM_UNLOCK (stream); |
| } |
| |
| return ret; |
| } |
| |
| static void |
| rtp_storage_do_put_recovered_packet (RtpStorage * self, |
| GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq) |
| { |
| RtpStorageStream *stream; |
| |
| STORAGE_LOCK (self); |
| stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc)); |
| STORAGE_UNLOCK (self); |
| |
| g_assert (stream); |
| |
| STREAM_LOCK (stream); |
| rtp_storage_stream_add_item (stream, buffer, pt, seq); |
| STREAM_UNLOCK (stream); |
| } |
| |
| void |
| rtp_storage_put_recovered_packet (RtpStorage * self, |
| GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq) |
| { |
| rtp_storage_do_put_recovered_packet (self, buffer, pt, ssrc, seq); |
| g_signal_emit (self, rtp_storage_signals[SIGNAL_PACKET_RECOVERED], 0, buffer); |
| } |
| |
| gboolean |
| rtp_storage_append_buffer (RtpStorage * self, GstBuffer * buf) |
| { |
| GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT; |
| RtpStorageStream *stream; |
| guint32 ssrc; |
| guint8 pt; |
| guint16 seq; |
| |
| if (0 == self->size_time) |
| return TRUE; |
| |
| /* We are about to save it in the queue, it so it is better take a ref before |
| * mapping the buffer */ |
| gst_buffer_ref (buf); |
| |
| if (!gst_rtp_buffer_map (buf, GST_MAP_READ | |
| GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtpbuf)) { |
| gst_buffer_unref (buf); |
| return TRUE; |
| } |
| |
| ssrc = gst_rtp_buffer_get_ssrc (&rtpbuf); |
| pt = gst_rtp_buffer_get_payload_type (&rtpbuf); |
| seq = gst_rtp_buffer_get_seq (&rtpbuf); |
| |
| STORAGE_LOCK (self); |
| |
| stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc)); |
| if (NULL == stream) { |
| GST_DEBUG_OBJECT (self, |
| "New media stream (ssrc=0x%08x, pt=%u) detected", ssrc, pt); |
| stream = rtp_storage_stream_new (ssrc); |
| g_hash_table_insert (self->streams, GUINT_TO_POINTER (ssrc), stream); |
| } |
| |
| STORAGE_UNLOCK (self); |
| |
| STREAM_LOCK (stream); |
| |
| /* Saving the buffer, now the storage owns it */ |
| rtp_storage_stream_resize_and_add_item (stream, self->size_time, buf, pt, |
| seq); |
| |
| STREAM_UNLOCK (stream); |
| |
| gst_rtp_buffer_unmap (&rtpbuf); |
| |
| if (GST_BUFFER_FLAG_IS_SET (buf, GST_RTP_BUFFER_FLAG_REDUNDANT)) { |
| gst_buffer_unref (buf); |
| return FALSE; |
| } |
| |
| return TRUE; |
| } |
| |
| void |
| rtp_storage_clear (RtpStorage * self) |
| { |
| STORAGE_LOCK (self); |
| g_hash_table_remove_all (self->streams); |
| STORAGE_UNLOCK (self); |
| } |
| |
| void |
| rtp_storage_set_size (RtpStorage * self, GstClockTime size) |
| { |
| self->size_time = size; |
| if (0 == self->size_time) |
| rtp_storage_clear (self); |
| } |
| |
| GstClockTime |
| rtp_storage_get_size (RtpStorage * self) |
| { |
| return self->size_time; |
| } |
| |
| RtpStorage * |
| rtp_storage_new (void) |
| { |
| return g_object_new (RTP_TYPE_STORAGE, NULL); |
| } |