| /* GStreamer Muxer bin that splits output stream by size/time |
| * Copyright (C) <2014> Jan Schmidt <jan@centricular.com> |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public |
| * License along with this library; if not, write to the |
| * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, |
| * Boston, MA 02110-1301, USA. |
| */ |
| |
| /** |
| * SECTION:element-splitmuxsink |
| * @short_description: Muxer wrapper for splitting output stream by size or time |
| * |
| * This element wraps a muxer and a sink, and starts a new file when the mux |
| * contents are about to cross a threshold of maximum size of maximum time, |
| * splitting at video keyframe boundaries. Exactly one input video stream |
| * can be muxed, with as many accompanying audio and subtitle streams as |
| * desired. |
| * |
| * By default, it uses mp4mux and filesink, but they can be changed via |
| * the 'muxer' and 'sink' properties. |
| * |
| * The minimum file size is 1 GOP, however - so limits may be overrun if the |
| * distance between any 2 keyframes is larger than the limits. |
| * |
| * If a video stream is available, the splitting process is driven by the video |
| * stream contents, and the video stream must contain closed GOPs for the output |
| * file parts to be played individually correctly. In the absence of a video |
| * stream, the first available stream is used as reference for synchronization. |
| * |
| * <refsect2> |
| * <title>Example pipelines</title> |
| * |[ |
| * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000 |
| * ]| |
| * Records a video stream captured from a v4l2 device and muxes it into |
| * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds |
| * and 1MB maximum size. |
| * </refsect2> |
| */ |
| |
| #ifdef HAVE_CONFIG_H |
| #include "config.h" |
| #endif |
| |
| #include <string.h> |
| #include "gstsplitmuxsink.h" |
| |
| GST_DEBUG_CATEGORY_STATIC (splitmux_debug); |
| #define GST_CAT_DEFAULT splitmux_debug |
| |
| #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock) |
| #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock) |
| #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock) |
| #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond) |
| |
| enum |
| { |
| PROP_0, |
| PROP_LOCATION, |
| PROP_MAX_SIZE_TIME, |
| PROP_MAX_SIZE_BYTES, |
| PROP_MUXER_OVERHEAD, |
| PROP_MUXER, |
| PROP_SINK |
| }; |
| |
| #define DEFAULT_MAX_SIZE_TIME 0 |
| #define DEFAULT_MAX_SIZE_BYTES 0 |
| #define DEFAULT_MUXER_OVERHEAD 0.02 |
| #define DEFAULT_MUXER "mp4mux" |
| #define DEFAULT_SINK "filesink" |
| |
| enum |
| { |
| SIGNAL_FORMAT_LOCATION, |
| SIGNAL_LAST |
| }; |
| |
| static guint signals[SIGNAL_LAST]; |
| |
| static GstStaticPadTemplate video_sink_template = |
| GST_STATIC_PAD_TEMPLATE ("video", |
| GST_PAD_SINK, |
| GST_PAD_REQUEST, |
| GST_STATIC_CAPS_ANY); |
| static GstStaticPadTemplate audio_sink_template = |
| GST_STATIC_PAD_TEMPLATE ("audio_%u", |
| GST_PAD_SINK, |
| GST_PAD_REQUEST, |
| GST_STATIC_CAPS_ANY); |
| static GstStaticPadTemplate subtitle_sink_template = |
| GST_STATIC_PAD_TEMPLATE ("subtitle_%u", |
| GST_PAD_SINK, |
| GST_PAD_REQUEST, |
| GST_STATIC_CAPS_ANY); |
| |
| static GQuark PAD_CONTEXT; |
| |
| static void |
| _do_init (void) |
| { |
| PAD_CONTEXT = g_quark_from_static_string ("pad-context"); |
| } |
| |
| #define gst_splitmux_sink_parent_class parent_class |
| G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0, |
| _do_init ()); |
| |
| static gboolean create_elements (GstSplitMuxSink * splitmux); |
| static gboolean create_sink (GstSplitMuxSink * splitmux); |
| static void gst_splitmux_sink_set_property (GObject * object, guint prop_id, |
| const GValue * value, GParamSpec * pspec); |
| static void gst_splitmux_sink_get_property (GObject * object, guint prop_id, |
| GValue * value, GParamSpec * pspec); |
| static void gst_splitmux_sink_dispose (GObject * object); |
| static void gst_splitmux_sink_finalize (GObject * object); |
| |
| static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element, |
| GstPadTemplate * templ, const gchar * name, const GstCaps * caps); |
| static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad); |
| |
| static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * |
| element, GstStateChange transition); |
| |
| static void bus_handler (GstBin * bin, GstMessage * msg); |
| static void set_next_filename (GstSplitMuxSink * splitmux); |
| static void start_next_fragment (GstSplitMuxSink * splitmux); |
| static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); |
| static void mq_stream_ctx_unref (MqStreamCtx * ctx); |
| |
| static MqStreamBuf * |
| mq_stream_buf_new (void) |
| { |
| return g_slice_new0 (MqStreamBuf); |
| } |
| |
| static void |
| mq_stream_buf_free (MqStreamBuf * data) |
| { |
| g_slice_free (MqStreamBuf, data); |
| } |
| |
| static void |
| gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) |
| { |
| GObjectClass *gobject_class = (GObjectClass *) klass; |
| GstElementClass *gstelement_class = (GstElementClass *) klass; |
| GstBinClass *gstbin_class = (GstBinClass *) klass; |
| |
| gobject_class->set_property = gst_splitmux_sink_set_property; |
| gobject_class->get_property = gst_splitmux_sink_get_property; |
| gobject_class->dispose = gst_splitmux_sink_dispose; |
| gobject_class->finalize = gst_splitmux_sink_finalize; |
| |
| gst_element_class_set_static_metadata (gstelement_class, |
| "Split Muxing Bin", "Generic/Bin/Muxer", |
| "Convenience bin that muxes incoming streams into multiple time/size limited files", |
| "Jan Schmidt <jan@centricular.com>"); |
| |
| gst_element_class_add_pad_template (gstelement_class, |
| gst_static_pad_template_get (&video_sink_template)); |
| gst_element_class_add_pad_template (gstelement_class, |
| gst_static_pad_template_get (&audio_sink_template)); |
| gst_element_class_add_pad_template (gstelement_class, |
| gst_static_pad_template_get (&subtitle_sink_template)); |
| |
| gstelement_class->change_state = |
| GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state); |
| gstelement_class->request_new_pad = |
| GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad); |
| gstelement_class->release_pad = |
| GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad); |
| |
| gstbin_class->handle_message = bus_handler; |
| |
| g_object_class_install_property (gobject_class, PROP_LOCATION, |
| g_param_spec_string ("location", "File Output Pattern", |
| "Format string pattern for the location of the files to write (e.g. video%05d.mp4)", |
| NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD, |
| g_param_spec_double ("mux-overhead", "Muxing Overhead", |
| "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0, |
| DEFAULT_MUXER_OVERHEAD, |
| G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, |
| g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
| "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64, |
| DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, |
| g_param_spec_uint64 ("max-size-bytes", "Max. size bytes", |
| "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64, |
| DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_MUXER, |
| g_param_spec_object ("muxer", "Muxer", |
| "The muxer element to use (NULL = default mp4mux)", |
| GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_SINK, |
| g_param_spec_object ("sink", "Sink", |
| "The sink element (or element chain) to use (NULL = default filesink)", |
| GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| /** |
| * GstSplitMuxSink::format-location: |
| * @splitmux: the #GstSplitMuxSink |
| * @fragment_id: the sequence number of the file to be created |
| * |
| * Returns: the location to be used for the next output file |
| */ |
| signals[SIGNAL_FORMAT_LOCATION] = |
| g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass), |
| G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT); |
| } |
| |
| static void |
| gst_splitmux_sink_init (GstSplitMuxSink * splitmux) |
| { |
| g_mutex_init (&splitmux->lock); |
| g_cond_init (&splitmux->data_cond); |
| |
| splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD; |
| splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME; |
| splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES; |
| |
| GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK); |
| } |
| |
| static void |
| gst_splitmux_reset (GstSplitMuxSink * splitmux) |
| { |
| if (splitmux->mq) |
| gst_bin_remove (GST_BIN (splitmux), splitmux->mq); |
| if (splitmux->muxer) |
| gst_bin_remove (GST_BIN (splitmux), splitmux->muxer); |
| if (splitmux->active_sink) |
| gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink); |
| |
| splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq = |
| NULL; |
| } |
| |
| static void |
| gst_splitmux_sink_dispose (GObject * object) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); |
| |
| G_OBJECT_CLASS (parent_class)->dispose (object); |
| |
| /* Calling parent dispose invalidates all child pointers */ |
| splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq = |
| NULL; |
| } |
| |
| static void |
| gst_splitmux_sink_finalize (GObject * object) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); |
| g_cond_clear (&splitmux->data_cond); |
| g_mutex_clear (&splitmux->lock); |
| if (splitmux->provided_sink) |
| gst_object_unref (splitmux->provided_sink); |
| if (splitmux->provided_muxer) |
| gst_object_unref (splitmux->provided_muxer); |
| |
| g_free (splitmux->location); |
| |
| /* Make sure to free any un-released contexts */ |
| g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL); |
| g_list_free (splitmux->contexts); |
| |
| G_OBJECT_CLASS (parent_class)->finalize (object); |
| } |
| |
| static void |
| gst_splitmux_sink_set_property (GObject * object, guint prop_id, |
| const GValue * value, GParamSpec * pspec) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); |
| |
| switch (prop_id) { |
| case PROP_LOCATION:{ |
| GST_OBJECT_LOCK (splitmux); |
| g_free (splitmux->location); |
| splitmux->location = g_value_dup_string (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| } |
| case PROP_MAX_SIZE_BYTES: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->threshold_bytes = g_value_get_uint64 (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_SIZE_TIME: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->threshold_time = g_value_get_uint64 (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MUXER_OVERHEAD: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->mux_overhead = g_value_get_double (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_SINK: |
| GST_OBJECT_LOCK (splitmux); |
| if (splitmux->provided_sink) |
| gst_object_unref (splitmux->provided_sink); |
| splitmux->provided_sink = g_value_dup_object (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MUXER: |
| GST_OBJECT_LOCK (splitmux); |
| if (splitmux->provided_muxer) |
| gst_object_unref (splitmux->provided_muxer); |
| splitmux->provided_muxer = g_value_dup_object (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| default: |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
| break; |
| } |
| } |
| |
| static void |
| gst_splitmux_sink_get_property (GObject * object, guint prop_id, |
| GValue * value, GParamSpec * pspec) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); |
| |
| switch (prop_id) { |
| case PROP_LOCATION: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_string (value, splitmux->location); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_SIZE_BYTES: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_uint64 (value, splitmux->threshold_bytes); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_SIZE_TIME: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_uint64 (value, splitmux->threshold_time); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MUXER_OVERHEAD: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_double (value, splitmux->mux_overhead); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_SINK: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_object (value, splitmux->provided_sink); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MUXER: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_object (value, splitmux->provided_muxer); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| default: |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
| break; |
| } |
| } |
| |
| static GstPad * |
| mq_sink_to_src (GstElement * mq, GstPad * sink_pad) |
| { |
| gchar *tmp, *sinkname, *srcname; |
| GstPad *mq_src; |
| |
| sinkname = gst_pad_get_name (sink_pad); |
| tmp = sinkname + 5; |
| srcname = g_strdup_printf ("src_%s", tmp); |
| |
| mq_src = gst_element_get_static_pad (mq, srcname); |
| |
| g_free (sinkname); |
| g_free (srcname); |
| |
| return mq_src; |
| } |
| |
| static gboolean |
| get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad, |
| GstPad ** src_pad) |
| { |
| GstPad *mq_sink; |
| GstPad *mq_src; |
| |
| /* Request a pad from multiqueue, then connect this one, then |
| * discover the corresponding output pad and return both */ |
| mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u"); |
| if (mq_sink == NULL) |
| return FALSE; |
| |
| mq_src = mq_sink_to_src (splitmux->mq, mq_sink); |
| if (mq_src == NULL) |
| goto fail; |
| |
| *sink_pad = mq_sink; |
| *src_pad = mq_src; |
| |
| return TRUE; |
| |
| fail: |
| gst_element_release_request_pad (splitmux->mq, mq_sink); |
| return FALSE; |
| } |
| |
| static MqStreamCtx * |
| mq_stream_ctx_new (GstSplitMuxSink * splitmux) |
| { |
| MqStreamCtx *ctx; |
| |
| ctx = g_new0 (MqStreamCtx, 1); |
| g_atomic_int_set (&ctx->refcount, 1); |
| ctx->splitmux = splitmux; |
| gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); |
| gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); |
| ctx->in_running_time = ctx->out_running_time = 0; |
| g_queue_init (&ctx->queued_bufs); |
| return ctx; |
| } |
| |
| static void |
| mq_stream_ctx_free (MqStreamCtx * ctx) |
| { |
| g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); |
| g_queue_clear (&ctx->queued_bufs); |
| g_free (ctx); |
| } |
| |
| static void |
| mq_stream_ctx_unref (MqStreamCtx * ctx) |
| { |
| if (g_atomic_int_dec_and_test (&ctx->refcount)) |
| mq_stream_ctx_free (ctx); |
| } |
| |
| static void |
| mq_stream_ctx_ref (MqStreamCtx * ctx) |
| { |
| g_atomic_int_inc (&ctx->refcount); |
| } |
| |
| static void |
| _pad_block_destroy_sink_notify (MqStreamCtx * ctx) |
| { |
| ctx->sink_pad_block_id = 0; |
| mq_stream_ctx_unref (ctx); |
| } |
| |
| static void |
| _pad_block_destroy_src_notify (MqStreamCtx * ctx) |
| { |
| ctx->src_pad_block_id = 0; |
| mq_stream_ctx_unref (ctx); |
| } |
| |
| static void |
| send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened) |
| { |
| gchar *location = NULL; |
| GstMessage *msg; |
| const gchar *msg_name = opened ? |
| "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed"; |
| |
| g_object_get (splitmux->sink, "location", &location, NULL); |
| |
| msg = gst_message_new_element (GST_OBJECT (splitmux), |
| gst_structure_new (msg_name, |
| "location", G_TYPE_STRING, location, |
| "running-time", GST_TYPE_CLOCK_TIME, |
| splitmux->reference_ctx->out_running_time, NULL)); |
| gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg); |
| |
| g_free (location); |
| } |
| |
| /* Called with lock held, drops the lock to send EOS to the |
| * pad |
| */ |
| static void |
| send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| GstEvent *eos; |
| GstPad *pad; |
| |
| eos = gst_event_new_eos (); |
| pad = gst_pad_get_peer (ctx->srcpad); |
| |
| ctx->out_eos = TRUE; |
| |
| GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| gst_pad_send_event (pad, eos); |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| gst_object_unref (pad); |
| } |
| |
| /* Called with splitmux lock held to check if this output |
| * context needs to sleep to wait for the release of the |
| * next GOP, or to send EOS to close out the current file |
| */ |
| static void |
| complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| do { |
| |
| GST_LOG_OBJECT (ctx->srcpad, |
| "Checking running time %" GST_TIME_FORMAT " against max %" |
| GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time), |
| GST_TIME_ARGS (splitmux->max_out_running_time)); |
| |
| if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE || |
| ctx->out_running_time < splitmux->max_out_running_time) { |
| splitmux->have_muxed_something = TRUE; |
| return; |
| } |
| |
| if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED) |
| return; |
| |
| if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) { |
| if (ctx->out_eos == FALSE) { |
| send_eos (splitmux, ctx); |
| continue; |
| } |
| } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) { |
| start_next_fragment (splitmux); |
| continue; |
| } |
| |
| GST_INFO_OBJECT (ctx->srcpad, |
| "Sleeping for running time %" |
| GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")", |
| GST_TIME_ARGS (ctx->out_running_time), |
| GST_TIME_ARGS (splitmux->max_out_running_time)); |
| ctx->out_blocked = TRUE; |
| /* Expand the mq if needed before sleeping */ |
| check_queue_length (splitmux, ctx); |
| GST_SPLITMUX_WAIT (splitmux); |
| ctx->out_blocked = FALSE; |
| GST_INFO_OBJECT (ctx->srcpad, |
| "Woken for new max running time %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (splitmux->max_out_running_time)); |
| } while (1); |
| } |
| |
| static GstPadProbeReturn |
| handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) |
| { |
| GstSplitMuxSink *splitmux = ctx->splitmux; |
| MqStreamBuf *buf_info = NULL; |
| |
| GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type); |
| |
| /* FIXME: Handle buffer lists, until then make it clear they won't work */ |
| if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { |
| g_warning ("Buffer list handling not implemented"); |
| return GST_PAD_PROBE_DROP; |
| } |
| if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) { |
| GstEvent *event = gst_pad_probe_info_get_event (info); |
| |
| GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); |
| |
| switch (GST_EVENT_TYPE (event)) { |
| case GST_EVENT_SEGMENT: |
| gst_event_copy_segment (event, &ctx->out_segment); |
| break; |
| case GST_EVENT_FLUSH_STOP: |
| GST_SPLITMUX_LOCK (splitmux); |
| gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); |
| g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); |
| g_queue_clear (&ctx->queued_bufs); |
| ctx->flushing = FALSE; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| case GST_EVENT_FLUSH_START: |
| GST_SPLITMUX_LOCK (splitmux); |
| GST_LOG_OBJECT (pad, "Flush start"); |
| ctx->flushing = TRUE; |
| GST_SPLITMUX_BROADCAST (splitmux); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| case GST_EVENT_EOS: |
| GST_SPLITMUX_LOCK (splitmux); |
| if (splitmux->state == SPLITMUX_STATE_STOPPED) |
| goto beach; |
| ctx->out_eos = TRUE; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| case GST_EVENT_GAP:{ |
| GstClockTime gap_ts; |
| |
| gst_event_parse_gap (event, &gap_ts, NULL); |
| if (gap_ts == GST_CLOCK_TIME_NONE) |
| break; |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| gap_ts = gst_segment_to_running_time (&ctx->out_segment, |
| GST_FORMAT_TIME, gap_ts); |
| |
| GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (gap_ts)); |
| |
| if (splitmux->state == SPLITMUX_STATE_STOPPED) |
| goto beach; |
| ctx->out_running_time = gap_ts; |
| complete_or_wait_on_out (splitmux, ctx); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| } |
| default: |
| break; |
| } |
| return GST_PAD_PROBE_PASS; |
| } |
| |
| /* Allow everything through until the configured next stopping point */ |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| buf_info = g_queue_pop_tail (&ctx->queued_bufs); |
| if (buf_info == NULL) |
| /* Can only happen due to a poorly timed flush */ |
| goto beach; |
| |
| /* If we have popped a keyframe, decrement the queued_gop count */ |
| if (buf_info->keyframe && splitmux->queued_gops > 0) |
| splitmux->queued_gops--; |
| |
| ctx->out_running_time = buf_info->run_ts; |
| |
| GST_LOG_OBJECT (splitmux, |
| "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT |
| " size %" G_GSIZE_FORMAT, |
| pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size); |
| |
| if (splitmux->opening_first_fragment) { |
| send_fragment_opened_closed_msg (splitmux, TRUE); |
| splitmux->opening_first_fragment = FALSE; |
| } |
| |
| complete_or_wait_on_out (splitmux, ctx); |
| |
| if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE || |
| splitmux->muxed_out_time < buf_info->run_ts) |
| splitmux->muxed_out_time = buf_info->run_ts; |
| |
| splitmux->muxed_out_bytes += buf_info->buf_size; |
| |
| #ifndef GST_DISABLE_GST_DEBUG |
| { |
| GstBuffer *buf = gst_pad_probe_info_get_buffer (info); |
| GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT |
| " run ts %" GST_TIME_FORMAT, buf, |
| GST_TIME_ARGS (ctx->out_running_time)); |
| } |
| #endif |
| |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| mq_stream_buf_free (buf_info); |
| |
| return GST_PAD_PROBE_PASS; |
| |
| beach: |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return GST_PAD_PROBE_DROP; |
| } |
| |
| static gboolean |
| resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer) |
| { |
| return gst_pad_send_event (peer, gst_event_ref (*event)); |
| } |
| |
| static void |
| restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) |
| { |
| GstPad *peer = gst_pad_get_peer (ctx->srcpad); |
| |
| gst_pad_sticky_events_foreach (ctx->srcpad, |
| (GstPadStickyEventsForeachFunction) (resend_sticky), peer); |
| |
| /* Clear EOS flag */ |
| ctx->out_eos = FALSE; |
| |
| gst_object_unref (peer); |
| } |
| |
| /* Called with lock held when a fragment |
| * reaches EOS and it is time to restart |
| * a new fragment |
| */ |
| static void |
| start_next_fragment (GstSplitMuxSink * splitmux) |
| { |
| /* 1 change to new file */ |
| gst_element_set_state (splitmux->muxer, GST_STATE_NULL); |
| gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); |
| |
| set_next_filename (splitmux); |
| |
| gst_element_sync_state_with_parent (splitmux->active_sink); |
| gst_element_sync_state_with_parent (splitmux->muxer); |
| |
| g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); |
| |
| /* Switch state and go back to processing */ |
| splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; |
| |
| if (!splitmux->reference_ctx->in_eos) { |
| splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; |
| } else { |
| splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; |
| splitmux->have_muxed_something = FALSE; |
| } |
| splitmux->have_muxed_something = |
| (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time); |
| |
| /* Store the overflow parameters as the basis for the next fragment */ |
| splitmux->mux_start_time = splitmux->muxed_out_time; |
| splitmux->mux_start_bytes = splitmux->muxed_out_bytes; |
| |
| GST_DEBUG_OBJECT (splitmux, |
| "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (splitmux->max_out_running_time)); |
| |
| send_fragment_opened_closed_msg (splitmux, TRUE); |
| |
| GST_SPLITMUX_BROADCAST (splitmux); |
| } |
| |
| static void |
| bus_handler (GstBin * bin, GstMessage * message) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin); |
| |
| switch (GST_MESSAGE_TYPE (message)) { |
| case GST_MESSAGE_EOS: |
| /* If the state is draining out the current file, drop this EOS */ |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| send_fragment_opened_closed_msg (splitmux, FALSE); |
| |
| if (splitmux->state == SPLITMUX_STATE_ENDING_FILE && |
| splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) { |
| GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); |
| splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT; |
| GST_SPLITMUX_BROADCAST (splitmux); |
| |
| gst_message_unref (message); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return; |
| } |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| default: |
| break; |
| } |
| |
| GST_BIN_CLASS (parent_class)->handle_message (bin, message); |
| } |
| |
| /* Called with splitmux lock held */ |
| /* Called when entering ProcessingCompleteGop state |
| * Assess if mq contents overflowed the current file |
| * -> If yes, need to switch to new file |
| * -> if no, set max_out_running_time to let this GOP in and |
| * go to COLLECTING_GOP_START state |
| */ |
| static void |
| handle_gathered_gop (GstSplitMuxSink * splitmux) |
| { |
| GList *cur; |
| gsize queued_bytes = 0; |
| GstClockTime queued_time = 0; |
| |
| /* Assess if the multiqueue contents overflowed the current file */ |
| for (cur = g_list_first (splitmux->contexts); |
| cur != NULL; cur = g_list_next (cur)) { |
| MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); |
| if (tmpctx->in_running_time > queued_time) |
| queued_time = tmpctx->in_running_time; |
| queued_bytes += tmpctx->in_bytes; |
| } |
| |
| g_assert (queued_bytes >= splitmux->mux_start_bytes); |
| g_assert (queued_time >= splitmux->mux_start_time); |
| |
| queued_bytes -= splitmux->mux_start_bytes; |
| queued_time -= splitmux->mux_start_time; |
| |
| /* Expand queued bytes estimate by muxer overhead */ |
| queued_bytes += (queued_bytes * splitmux->mux_overhead); |
| |
| GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT |
| " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes); |
| |
| /* Check for overrun - have we output at least one byte and overrun |
| * either threshold? */ |
| if (splitmux->have_muxed_something && |
| ((splitmux->threshold_bytes > 0 && |
| queued_bytes >= splitmux->threshold_bytes) || |
| (splitmux->threshold_time > 0 && |
| queued_time >= splitmux->threshold_time))) { |
| |
| splitmux->state = SPLITMUX_STATE_ENDING_FILE; |
| |
| GST_INFO_OBJECT (splitmux, |
| "mq overflowed since last, draining out. max out TS is %" |
| GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); |
| GST_SPLITMUX_BROADCAST (splitmux); |
| |
| } else { |
| /* No overflow */ |
| GST_LOG_OBJECT (splitmux, |
| "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT |
| " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.", |
| splitmux->muxed_out_bytes - splitmux->mux_start_bytes, |
| queued_bytes, GST_TIME_ARGS (queued_time)); |
| |
| /* Wake everyone up to push this one GOP, then sleep */ |
| splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; |
| splitmux->have_muxed_something = TRUE; |
| |
| if (!splitmux->reference_ctx->in_eos) |
| splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; |
| else |
| splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; |
| |
| GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %" |
| GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); |
| GST_SPLITMUX_BROADCAST (splitmux); |
| } |
| |
| } |
| |
| /* Called with splitmux lock held */ |
| /* Called from each input pad when it is has all the pieces |
| * for a GOP or EOS, starting with the reference pad which has set the |
| * splitmux->max_in_running_time |
| */ |
| static void |
| check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| GList *cur; |
| gboolean ready = TRUE; |
| GstClockTime current_max_in_running_time; |
| |
| if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { |
| /* Iterate each pad, and check that the input running time is at least |
| * up to the reference running time, and if so handle the collected GOP */ |
| GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx); |
| for (cur = g_list_first (splitmux->contexts); |
| cur != NULL; cur = g_list_next (cur)) { |
| MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); |
| |
| GST_LOG_OBJECT (splitmux, |
| "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT |
| " EOS %d", tmpctx, tmpctx->srcpad, |
| GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); |
| |
| if (tmpctx->in_running_time < splitmux->max_in_running_time && |
| !tmpctx->in_eos) { |
| GST_LOG_OBJECT (splitmux, |
| "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep", |
| tmpctx, tmpctx->srcpad); |
| ready = FALSE; |
| break; |
| } |
| } |
| if (ready) { |
| GST_DEBUG_OBJECT (splitmux, |
| "Collected GOP is complete. Processing (ctx %p)", ctx); |
| /* All pads have a complete GOP, release it into the multiqueue */ |
| handle_gathered_gop (splitmux); |
| } |
| } |
| |
| /* Some pad is not yet ready, or GOP is being pushed |
| * either way, sleep and wait to get woken */ |
| current_max_in_running_time = splitmux->max_in_running_time; |
| while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE || |
| splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) && |
| !ctx->flushing && |
| (current_max_in_running_time == splitmux->max_in_running_time)) { |
| |
| GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)", |
| splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ? |
| "GOP complete" : "EOF draining", ctx); |
| GST_SPLITMUX_WAIT (splitmux); |
| |
| GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx); |
| } |
| } |
| |
| static void |
| check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| GList *cur; |
| guint cur_len = g_queue_get_length (&ctx->queued_bufs); |
| |
| GST_DEBUG_OBJECT (ctx->sinkpad, |
| "Checking queue length len %u cur_max %u queued gops %u", |
| cur_len, splitmux->mq_max_buffers, splitmux->queued_gops); |
| |
| if (cur_len >= splitmux->mq_max_buffers) { |
| gboolean allow_grow = FALSE; |
| |
| /* If collecting a GOP and this pad might block, |
| * and there isn't already a pending GOP in the queue |
| * then grow |
| */ |
| if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE && |
| ctx->in_running_time < splitmux->max_in_running_time && |
| splitmux->queued_gops <= 1) { |
| allow_grow = TRUE; |
| } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START && |
| ctx->is_reference && splitmux->queued_gops <= 1) { |
| allow_grow = TRUE; |
| } |
| |
| if (!allow_grow) { |
| for (cur = g_list_first (splitmux->contexts); |
| cur != NULL; cur = g_list_next (cur)) { |
| MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); |
| GST_DEBUG_OBJECT (tmpctx->sinkpad, |
| " len %u out_blocked %d", |
| g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked); |
| /* If another stream is starving, grow */ |
| if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) { |
| allow_grow = TRUE; |
| } |
| } |
| } |
| |
| if (allow_grow) { |
| splitmux->mq_max_buffers = cur_len + 1; |
| |
| GST_INFO_OBJECT (splitmux, |
| "Multiqueue overrun - enlarging to %u buffers ctx %p", |
| splitmux->mq_max_buffers, ctx); |
| |
| g_object_set (splitmux->mq, "max-size-buffers", |
| splitmux->mq_max_buffers, NULL); |
| } |
| } |
| } |
| |
| static GstPadProbeReturn |
| handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) |
| { |
| GstSplitMuxSink *splitmux = ctx->splitmux; |
| GstBuffer *buf; |
| MqStreamBuf *buf_info = NULL; |
| GstClockTime ts; |
| gboolean loop_again; |
| gboolean keyframe = FALSE; |
| |
| GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); |
| |
| /* FIXME: Handle buffer lists, until then make it clear they won't work */ |
| if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { |
| g_warning ("Buffer list handling not implemented"); |
| return GST_PAD_PROBE_DROP; |
| } |
| if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) { |
| GstEvent *event = gst_pad_probe_info_get_event (info); |
| switch (GST_EVENT_TYPE (event)) { |
| case GST_EVENT_SEGMENT: |
| gst_event_copy_segment (event, &ctx->in_segment); |
| break; |
| case GST_EVENT_FLUSH_STOP: |
| GST_SPLITMUX_LOCK (splitmux); |
| gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); |
| ctx->in_eos = FALSE; |
| ctx->in_bytes = 0; |
| ctx->in_running_time = 0; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| case GST_EVENT_EOS: |
| GST_SPLITMUX_LOCK (splitmux); |
| ctx->in_eos = TRUE; |
| |
| if (splitmux->state == SPLITMUX_STATE_STOPPED) |
| goto beach; |
| |
| if (ctx->is_reference) { |
| GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); |
| /* Act as if this is a new keyframe with infinite timestamp */ |
| splitmux->max_in_running_time = GST_CLOCK_TIME_NONE; |
| splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; |
| /* Wake up other input pads to collect this GOP */ |
| GST_SPLITMUX_BROADCAST (splitmux); |
| check_completed_gop (splitmux, ctx); |
| } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { |
| /* If we are waiting for a GOP to be completed (ie, for aux |
| * pads to catch up), then this pad is complete, so check |
| * if the whole GOP is. |
| */ |
| check_completed_gop (splitmux, ctx); |
| } |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| default: |
| break; |
| } |
| return GST_PAD_PROBE_PASS; |
| } |
| |
| buf = gst_pad_probe_info_get_buffer (info); |
| buf_info = mq_stream_buf_new (); |
| |
| if (GST_BUFFER_PTS_IS_VALID (buf)) |
| ts = GST_BUFFER_PTS (buf); |
| else |
| ts = GST_BUFFER_DTS (buf); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| if (splitmux->state == SPLITMUX_STATE_STOPPED) |
| goto beach; |
| |
| /* If this buffer has a timestamp, advance the input timestamp of the |
| * stream */ |
| if (GST_CLOCK_TIME_IS_VALID (ts)) { |
| GstClockTime running_time = |
| gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, |
| GST_BUFFER_TIMESTAMP (buf)); |
| |
| if (GST_CLOCK_TIME_IS_VALID (running_time) && |
| (ctx->in_running_time == GST_CLOCK_TIME_NONE |
| || running_time > ctx->in_running_time)) |
| ctx->in_running_time = running_time; |
| } |
| |
| /* Try to make sure we have a valid running time */ |
| if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) { |
| ctx->in_running_time = |
| gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, |
| ctx->in_segment.start); |
| } |
| |
| buf_info->run_ts = ctx->in_running_time; |
| buf_info->buf_size = gst_buffer_get_size (buf); |
| |
| /* Update total input byte counter for overflow detect */ |
| ctx->in_bytes += buf_info->buf_size; |
| |
| /* initialize mux_start_time */ |
| if (ctx->is_reference && splitmux->mux_start_time == 0) |
| splitmux->mux_start_time = buf_info->run_ts; |
| |
| GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT |
| " total in_bytes %" G_GSIZE_FORMAT, |
| GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes); |
| |
| loop_again = TRUE; |
| do { |
| if (ctx->flushing) |
| break; |
| |
| switch (splitmux->state) { |
| case SPLITMUX_STATE_COLLECTING_GOP_START: |
| if (ctx->is_reference) { |
| /* If a keyframe, we have a complete GOP */ |
| if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || |
| !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) || |
| splitmux->max_in_running_time >= ctx->in_running_time) { |
| /* Pass this buffer through */ |
| loop_again = FALSE; |
| break; |
| } |
| GST_INFO_OBJECT (pad, |
| "Have keyframe with running time %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (ctx->in_running_time)); |
| keyframe = TRUE; |
| splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; |
| splitmux->max_in_running_time = ctx->in_running_time; |
| /* Wake up other input pads to collect this GOP */ |
| GST_SPLITMUX_BROADCAST (splitmux); |
| check_completed_gop (splitmux, ctx); |
| } else { |
| /* We're still waiting for a keyframe on the reference pad, sleep */ |
| GST_LOG_OBJECT (pad, "Sleeping for GOP start"); |
| GST_SPLITMUX_WAIT (splitmux); |
| GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d", |
| splitmux->state); |
| } |
| break; |
| case SPLITMUX_STATE_WAITING_GOP_COMPLETE: |
| /* After a GOP start is found, this buffer might complete the GOP */ |
| /* If we overran the target timestamp, it might be time to process |
| * the GOP, otherwise bail out for more data |
| */ |
| GST_LOG_OBJECT (pad, |
| "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (ctx->in_running_time), |
| GST_TIME_ARGS (splitmux->max_in_running_time)); |
| |
| if (ctx->in_running_time < splitmux->max_in_running_time) { |
| loop_again = FALSE; |
| break; |
| } |
| |
| GST_LOG_OBJECT (pad, |
| "Collected last packet of GOP. Checking other pads"); |
| check_completed_gop (splitmux, ctx); |
| break; |
| case SPLITMUX_STATE_ENDING_FILE: |
| case SPLITMUX_STATE_START_NEXT_FRAGMENT: |
| /* A fragment is ending, wait until that's done before continuing */ |
| GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart"); |
| GST_SPLITMUX_WAIT (splitmux); |
| GST_DEBUG_OBJECT (pad, |
| "Done sleeping for fragment restart state now %d", splitmux->state); |
| break; |
| default: |
| loop_again = FALSE; |
| break; |
| } |
| } while (loop_again); |
| |
| if (keyframe) { |
| splitmux->queued_gops++; |
| buf_info->keyframe = TRUE; |
| } |
| |
| /* Now add this buffer to the queue just before returning */ |
| g_queue_push_head (&ctx->queued_bufs, buf_info); |
| |
| /* Check the buffer will fit in the mq */ |
| check_queue_length (splitmux, ctx); |
| |
| GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT |
| " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time)); |
| |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return GST_PAD_PROBE_PASS; |
| |
| beach: |
| GST_SPLITMUX_UNLOCK (splitmux); |
| if (buf_info) |
| mq_stream_buf_free (buf_info); |
| return GST_PAD_PROBE_PASS; |
| } |
| |
| static GstPad * |
| gst_splitmux_sink_request_new_pad (GstElement * element, |
| GstPadTemplate * templ, const gchar * name, const GstCaps * caps) |
| { |
| GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; |
| GstPadTemplate *mux_template = NULL; |
| GstPad *res = NULL; |
| GstPad *mq_sink, *mq_src; |
| gchar *gname; |
| gboolean is_video = FALSE; |
| MqStreamCtx *ctx; |
| |
| GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| if (!create_elements (splitmux)) |
| goto fail; |
| |
| if (templ->name_template) { |
| if (g_str_equal (templ->name_template, "video")) { |
| /* FIXME: Look for a pad template with matching caps, rather than by name */ |
| mux_template = |
| gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS |
| (splitmux->muxer), "video_%u"); |
| is_video = TRUE; |
| name = NULL; |
| } else { |
| mux_template = |
| gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS |
| (splitmux->muxer), templ->name_template); |
| } |
| if (mux_template == NULL) { |
| /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */ |
| mux_template = |
| gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS |
| (splitmux->muxer), "sink_%d"); |
| } |
| } |
| |
| res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps); |
| if (res == NULL) |
| goto fail; |
| |
| if (is_video) |
| gname = g_strdup ("video"); |
| else if (name == NULL) |
| gname = gst_pad_get_name (res); |
| else |
| gname = g_strdup (name); |
| |
| if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) { |
| gst_element_release_request_pad (splitmux->muxer, res); |
| gst_object_unref (GST_OBJECT (res)); |
| goto fail; |
| } |
| |
| if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) { |
| gst_element_release_request_pad (splitmux->muxer, res); |
| gst_object_unref (GST_OBJECT (res)); |
| gst_element_release_request_pad (splitmux->mq, mq_sink); |
| gst_object_unref (GST_OBJECT (mq_sink)); |
| goto fail; |
| } |
| |
| gst_object_unref (GST_OBJECT (res)); |
| |
| ctx = mq_stream_ctx_new (splitmux); |
| ctx->srcpad = mq_src; |
| ctx->sinkpad = mq_sink; |
| |
| mq_stream_ctx_ref (ctx); |
| ctx->src_pad_block_id = |
| gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, |
| (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify) |
| _pad_block_destroy_src_notify); |
| if (is_video && splitmux->reference_ctx != NULL) { |
| splitmux->reference_ctx->is_reference = FALSE; |
| splitmux->reference_ctx = NULL; |
| } |
| if (splitmux->reference_ctx == NULL) { |
| splitmux->reference_ctx = ctx; |
| ctx->is_reference = TRUE; |
| } |
| |
| res = gst_ghost_pad_new (gname, mq_sink); |
| g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx); |
| |
| mq_stream_ctx_ref (ctx); |
| ctx->sink_pad_block_id = |
| gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, |
| (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify) |
| _pad_block_destroy_sink_notify); |
| |
| GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT |
| " is mq pad %" GST_PTR_FORMAT, res, mq_sink); |
| |
| splitmux->contexts = g_list_prepend (splitmux->contexts, ctx); |
| |
| g_free (gname); |
| |
| gst_object_unref (mq_sink); |
| gst_object_unref (mq_src); |
| |
| gst_pad_set_active (res, TRUE); |
| gst_element_add_pad (element, res); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| return res; |
| fail: |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return NULL; |
| } |
| |
| static void |
| gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad) |
| { |
| GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; |
| GstPad *mqsink, *mqsrc, *muxpad; |
| MqStreamCtx *ctx = |
| (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT)); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| if (splitmux->muxer == NULL || splitmux->mq == NULL) |
| goto fail; /* Elements don't exist yet - nothing to release */ |
| |
| GST_INFO_OBJECT (pad, "releasing request pad"); |
| |
| mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad)); |
| mqsrc = mq_sink_to_src (splitmux->mq, mqsink); |
| muxpad = gst_pad_get_peer (mqsrc); |
| |
| /* Remove the context from our consideration */ |
| splitmux->contexts = g_list_remove (splitmux->contexts, ctx); |
| |
| if (ctx->sink_pad_block_id) |
| gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id); |
| |
| if (ctx->src_pad_block_id) |
| gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id); |
| |
| /* Can release the context now */ |
| mq_stream_ctx_unref (ctx); |
| |
| /* Release and free the mq input */ |
| gst_element_release_request_pad (splitmux->mq, mqsink); |
| |
| /* Release and free the muxer input */ |
| gst_element_release_request_pad (splitmux->muxer, muxpad); |
| |
| gst_object_unref (mqsink); |
| gst_object_unref (mqsrc); |
| gst_object_unref (muxpad); |
| |
| gst_element_remove_pad (element, pad); |
| |
| /* Reset the internal elements only after all request pads are released */ |
| if (splitmux->contexts == NULL) |
| gst_splitmux_reset (splitmux); |
| |
| fail: |
| GST_SPLITMUX_UNLOCK (splitmux); |
| } |
| |
| static GstElement * |
| create_element (GstSplitMuxSink * splitmux, |
| const gchar * factory, const gchar * name) |
| { |
| GstElement *ret = gst_element_factory_make (factory, name); |
| if (ret == NULL) { |
| g_warning ("Failed to create %s - splitmuxsink will not work", name); |
| return NULL; |
| } |
| |
| if (!gst_bin_add (GST_BIN (splitmux), ret)) { |
| g_warning ("Could not add %s element - splitmuxsink will not work", name); |
| gst_object_unref (ret); |
| return NULL; |
| } |
| |
| return ret; |
| } |
| |
| static gboolean |
| create_elements (GstSplitMuxSink * splitmux) |
| { |
| /* Create internal elements */ |
| if (splitmux->mq == NULL) { |
| if ((splitmux->mq = |
| create_element (splitmux, "multiqueue", "multiqueue")) == NULL) |
| goto fail; |
| |
| splitmux->mq_max_buffers = 5; |
| /* No bytes or time limit, we limit buffers manually */ |
| g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time", |
| (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL); |
| } |
| |
| if (splitmux->muxer == NULL) { |
| GstElement *provided_muxer = NULL; |
| |
| GST_OBJECT_LOCK (splitmux); |
| if (splitmux->provided_muxer != NULL) |
| provided_muxer = gst_object_ref (splitmux->provided_muxer); |
| GST_OBJECT_UNLOCK (splitmux); |
| |
| if (provided_muxer == NULL) { |
| if ((splitmux->muxer = |
| create_element (splitmux, "mp4mux", "muxer")) == NULL) |
| goto fail; |
| } else { |
| if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) { |
| g_warning ("Could not add muxer element - splitmuxsink will not work"); |
| gst_object_unref (provided_muxer); |
| goto fail; |
| } |
| |
| splitmux->muxer = provided_muxer; |
| gst_object_unref (provided_muxer); |
| } |
| } |
| |
| return TRUE; |
| fail: |
| return FALSE; |
| } |
| |
| static GstElement * |
| find_sink (GstElement * e) |
| { |
| GstElement *res = NULL; |
| GstIterator *iter; |
| gboolean done = FALSE; |
| GValue data = { 0, }; |
| |
| if (!GST_IS_BIN (e)) |
| return e; |
| |
| iter = gst_bin_iterate_sinks (GST_BIN (e)); |
| while (!done) { |
| switch (gst_iterator_next (iter, &data)) { |
| case GST_ITERATOR_OK: |
| { |
| GstElement *child = g_value_get_object (&data); |
| if (g_object_class_find_property (G_OBJECT_GET_CLASS (child), |
| "location") != NULL) { |
| res = child; |
| done = TRUE; |
| } |
| g_value_reset (&data); |
| break; |
| } |
| case GST_ITERATOR_RESYNC: |
| gst_iterator_resync (iter); |
| break; |
| case GST_ITERATOR_DONE: |
| done = TRUE; |
| break; |
| case GST_ITERATOR_ERROR: |
| g_assert_not_reached (); |
| break; |
| } |
| } |
| g_value_unset (&data); |
| gst_iterator_free (iter); |
| |
| return res; |
| } |
| |
| static gboolean |
| create_sink (GstSplitMuxSink * splitmux) |
| { |
| GstElement *provided_sink = NULL; |
| |
| g_return_val_if_fail (splitmux->active_sink == NULL, TRUE); |
| |
| GST_OBJECT_LOCK (splitmux); |
| if (splitmux->provided_sink != NULL) |
| provided_sink = gst_object_ref (splitmux->provided_sink); |
| GST_OBJECT_UNLOCK (splitmux); |
| |
| if (provided_sink == NULL) { |
| if ((splitmux->sink = |
| create_element (splitmux, DEFAULT_SINK, "sink")) == NULL) |
| goto fail; |
| splitmux->active_sink = splitmux->sink; |
| } else { |
| if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) { |
| g_warning ("Could not add sink elements - splitmuxsink will not work"); |
| gst_object_unref (provided_sink); |
| goto fail; |
| } |
| |
| splitmux->active_sink = provided_sink; |
| |
| /* The bin holds a ref now, we can drop our tmp ref */ |
| gst_object_unref (provided_sink); |
| |
| /* Find the sink element */ |
| splitmux->sink = find_sink (splitmux->active_sink); |
| if (splitmux->sink == NULL) { |
| g_warning |
| ("Could not locate sink element in provided sink - splitmuxsink will not work"); |
| goto fail; |
| } |
| } |
| |
| if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) { |
| g_warning ("Failed to link muxer and sink- splitmuxsink will not work"); |
| goto fail; |
| } |
| |
| return TRUE; |
| fail: |
| return FALSE; |
| } |
| |
| #ifdef __GNUC__ |
| #pragma GCC diagnostic ignored "-Wformat-nonliteral" |
| #endif |
| static void |
| set_next_filename (GstSplitMuxSink * splitmux) |
| { |
| gchar *fname = NULL; |
| |
| g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, |
| splitmux->fragment_id, &fname); |
| |
| if (!fname) |
| fname = splitmux->location ? |
| g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL; |
| |
| if (fname) { |
| GST_INFO_OBJECT (splitmux, "Setting file to %s", fname); |
| g_object_set (splitmux->sink, "location", fname, NULL); |
| g_free (fname); |
| |
| splitmux->fragment_id++; |
| } |
| } |
| |
| static GstStateChangeReturn |
| gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) |
| { |
| GstStateChangeReturn ret; |
| GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; |
| |
| switch (transition) { |
| case GST_STATE_CHANGE_NULL_TO_READY:{ |
| GST_SPLITMUX_LOCK (splitmux); |
| if (!create_elements (splitmux) || !create_sink (splitmux)) { |
| ret = GST_STATE_CHANGE_FAILURE; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| goto beach; |
| } |
| GST_SPLITMUX_UNLOCK (splitmux); |
| splitmux->fragment_id = 0; |
| set_next_filename (splitmux); |
| break; |
| } |
| case GST_STATE_CHANGE_READY_TO_PAUSED:{ |
| GST_SPLITMUX_LOCK (splitmux); |
| /* Start by collecting one input on each pad */ |
| splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; |
| splitmux->max_in_running_time = 0; |
| splitmux->muxed_out_time = splitmux->mux_start_time = 0; |
| splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0; |
| splitmux->opening_first_fragment = TRUE; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| } |
| case GST_STATE_CHANGE_PAUSED_TO_READY: |
| case GST_STATE_CHANGE_READY_TO_NULL: |
| GST_SPLITMUX_LOCK (splitmux); |
| splitmux->state = SPLITMUX_STATE_STOPPED; |
| /* Wake up any blocked threads */ |
| GST_LOG_OBJECT (splitmux, |
| "State change -> NULL or READY. Waking threads"); |
| GST_SPLITMUX_BROADCAST (splitmux); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| default: |
| break; |
| } |
| |
| ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); |
| if (ret == GST_STATE_CHANGE_FAILURE) |
| goto beach; |
| |
| switch (transition) { |
| case GST_STATE_CHANGE_READY_TO_NULL: |
| GST_SPLITMUX_LOCK (splitmux); |
| splitmux->fragment_id = 0; |
| /* Reset internal elements only if no pad contexts are using them */ |
| if (splitmux->contexts == NULL) |
| gst_splitmux_reset (splitmux); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| default: |
| break; |
| } |
| |
| beach: |
| |
| if (transition == GST_STATE_CHANGE_NULL_TO_READY && |
| ret == GST_STATE_CHANGE_FAILURE) { |
| /* Cleanup elements on failed transition out of NULL */ |
| gst_splitmux_reset (splitmux); |
| } |
| return ret; |
| } |
| |
| gboolean |
| register_splitmuxsink (GstPlugin * plugin) |
| { |
| GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0, |
| "Split File Muxing Sink"); |
| |
| return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE, |
| GST_TYPE_SPLITMUX_SINK); |
| } |