| /* GStreamer Muxer bin that splits output stream by size/time |
| * Copyright (C) <2014> Jan Schmidt <jan@centricular.com> |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public |
| * License along with this library; if not, write to the |
| * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, |
| * Boston, MA 02110-1301, USA. |
| */ |
| |
| /** |
| * SECTION:element-splitmuxsink |
| * @short_description: Muxer wrapper for splitting output stream by size or time |
| * |
| * This element wraps a muxer and a sink, and starts a new file when the mux |
| * contents are about to cross a threshold of maximum size of maximum time, |
| * splitting at video keyframe boundaries. Exactly one input video stream |
| * can be muxed, with as many accompanying audio and subtitle streams as |
| * desired. |
| * |
| * By default, it uses mp4mux and filesink, but they can be changed via |
| * the 'muxer' and 'sink' properties. |
| * |
| * The minimum file size is 1 GOP, however - so limits may be overrun if the |
| * distance between any 2 keyframes is larger than the limits. |
| * |
| * If a video stream is available, the splitting process is driven by the video |
| * stream contents, and the video stream must contain closed GOPs for the output |
| * file parts to be played individually correctly. In the absence of a video |
| * stream, the first available stream is used as reference for synchronization. |
| * |
| * <refsect2> |
| * <title>Example pipelines</title> |
| * |[ |
| * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000 |
| * ]| |
| * Records a video stream captured from a v4l2 device and muxes it into |
| * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds |
| * and 1MB maximum size. |
| * </refsect2> |
| */ |
| |
| #ifdef HAVE_CONFIG_H |
| #include "config.h" |
| #endif |
| |
| #include <string.h> |
| #include <glib/gstdio.h> |
| #include <gst/video/video.h> |
| #include "gstsplitmuxsink.h" |
| |
| GST_DEBUG_CATEGORY_STATIC (splitmux_debug); |
| #define GST_CAT_DEFAULT splitmux_debug |
| |
| #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock) |
| #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock) |
| #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock) |
| #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond) |
| |
| #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock) |
| #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond) |
| |
| static void split_now (GstSplitMuxSink * splitmux); |
| |
| enum |
| { |
| PROP_0, |
| PROP_LOCATION, |
| PROP_MAX_SIZE_TIME, |
| PROP_MAX_SIZE_BYTES, |
| PROP_MAX_SIZE_TIMECODE, |
| PROP_SEND_KEYFRAME_REQUESTS, |
| PROP_MAX_FILES, |
| PROP_MUXER_OVERHEAD, |
| PROP_USE_ROBUST_MUXING, |
| PROP_ALIGNMENT_THRESHOLD, |
| PROP_MUXER, |
| PROP_SINK |
| }; |
| |
| #define DEFAULT_MAX_SIZE_TIME 0 |
| #define DEFAULT_MAX_SIZE_BYTES 0 |
| #define DEFAULT_MAX_FILES 0 |
| #define DEFAULT_MUXER_OVERHEAD 0.02 |
| #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE |
| #define DEFAULT_ALIGNMENT_THRESHOLD 0 |
| #define DEFAULT_MUXER "mp4mux" |
| #define DEFAULT_SINK "filesink" |
| #define DEFAULT_USE_ROBUST_MUXING FALSE |
| |
| enum |
| { |
| SIGNAL_FORMAT_LOCATION, |
| SIGNAL_FORMAT_LOCATION_FULL, |
| SIGNAL_SPLIT_NOW, |
| SIGNAL_LAST |
| }; |
| |
| static guint signals[SIGNAL_LAST]; |
| |
| static GstStaticPadTemplate video_sink_template = |
| GST_STATIC_PAD_TEMPLATE ("video", |
| GST_PAD_SINK, |
| GST_PAD_REQUEST, |
| GST_STATIC_CAPS_ANY); |
| static GstStaticPadTemplate audio_sink_template = |
| GST_STATIC_PAD_TEMPLATE ("audio_%u", |
| GST_PAD_SINK, |
| GST_PAD_REQUEST, |
| GST_STATIC_CAPS_ANY); |
| static GstStaticPadTemplate subtitle_sink_template = |
| GST_STATIC_PAD_TEMPLATE ("subtitle_%u", |
| GST_PAD_SINK, |
| GST_PAD_REQUEST, |
| GST_STATIC_CAPS_ANY); |
| |
| static GQuark PAD_CONTEXT; |
| |
| static void |
| _do_init (void) |
| { |
| PAD_CONTEXT = g_quark_from_static_string ("pad-context"); |
| } |
| |
| #define gst_splitmux_sink_parent_class parent_class |
| G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0, |
| _do_init ()); |
| |
| static gboolean create_muxer (GstSplitMuxSink * splitmux); |
| static gboolean create_sink (GstSplitMuxSink * splitmux); |
| static void gst_splitmux_sink_set_property (GObject * object, guint prop_id, |
| const GValue * value, GParamSpec * pspec); |
| static void gst_splitmux_sink_get_property (GObject * object, guint prop_id, |
| GValue * value, GParamSpec * pspec); |
| static void gst_splitmux_sink_dispose (GObject * object); |
| static void gst_splitmux_sink_finalize (GObject * object); |
| |
| static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element, |
| GstPadTemplate * templ, const gchar * name, const GstCaps * caps); |
| static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad); |
| |
| static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * |
| element, GstStateChange transition); |
| |
| static void bus_handler (GstBin * bin, GstMessage * msg); |
| static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); |
| static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); |
| static void mq_stream_ctx_unref (MqStreamCtx * ctx); |
| static void grow_blocked_queues (GstSplitMuxSink * splitmux); |
| |
| static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux); |
| static GstElement *create_element (GstSplitMuxSink * splitmux, |
| const gchar * factory, const gchar * name, gboolean locked); |
| |
| static void do_async_done (GstSplitMuxSink * splitmux); |
| |
| static MqStreamBuf * |
| mq_stream_buf_new (void) |
| { |
| return g_slice_new0 (MqStreamBuf); |
| } |
| |
| static void |
| mq_stream_buf_free (MqStreamBuf * data) |
| { |
| g_slice_free (MqStreamBuf, data); |
| } |
| |
| static SplitMuxOutputCommand * |
| out_cmd_buf_new (void) |
| { |
| return g_slice_new0 (SplitMuxOutputCommand); |
| } |
| |
| static void |
| out_cmd_buf_free (SplitMuxOutputCommand * data) |
| { |
| g_slice_free (SplitMuxOutputCommand, data); |
| } |
| |
| static void |
| gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) |
| { |
| GObjectClass *gobject_class = (GObjectClass *) klass; |
| GstElementClass *gstelement_class = (GstElementClass *) klass; |
| GstBinClass *gstbin_class = (GstBinClass *) klass; |
| |
| gobject_class->set_property = gst_splitmux_sink_set_property; |
| gobject_class->get_property = gst_splitmux_sink_get_property; |
| gobject_class->dispose = gst_splitmux_sink_dispose; |
| gobject_class->finalize = gst_splitmux_sink_finalize; |
| |
| gst_element_class_set_static_metadata (gstelement_class, |
| "Split Muxing Bin", "Generic/Bin/Muxer", |
| "Convenience bin that muxes incoming streams into multiple time/size limited files", |
| "Jan Schmidt <jan@centricular.com>"); |
| |
| gst_element_class_add_static_pad_template (gstelement_class, |
| &video_sink_template); |
| gst_element_class_add_static_pad_template (gstelement_class, |
| &audio_sink_template); |
| gst_element_class_add_static_pad_template (gstelement_class, |
| &subtitle_sink_template); |
| |
| gstelement_class->change_state = |
| GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state); |
| gstelement_class->request_new_pad = |
| GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad); |
| gstelement_class->release_pad = |
| GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad); |
| |
| gstbin_class->handle_message = bus_handler; |
| |
| g_object_class_install_property (gobject_class, PROP_LOCATION, |
| g_param_spec_string ("location", "File Output Pattern", |
| "Format string pattern for the location of the files to write (e.g. video%05d.mp4)", |
| NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD, |
| g_param_spec_double ("mux-overhead", "Muxing Overhead", |
| "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0, |
| DEFAULT_MUXER_OVERHEAD, |
| G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, |
| g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
| "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64, |
| DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, |
| g_param_spec_uint64 ("max-size-bytes", "Max. size bytes", |
| "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64, |
| DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE, |
| g_param_spec_string ("max-size-timecode", "Maximum timecode difference", |
| "Maximum difference in timecode between first and last frame. " |
| "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). " |
| "Will only be effective if a timecode track is present.", |
| NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS, |
| g_param_spec_boolean ("send-keyframe-requests", |
| "Request keyframes at max-size-time", |
| "Request a keyframe every max-size-time ns to try splitting at that point. " |
| "Needs max-size-bytes to be 0 in order to be effective.", |
| DEFAULT_SEND_KEYFRAME_REQUESTS, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_MAX_FILES, |
| g_param_spec_uint ("max-files", "Max files", |
| "Maximum number of files to keep on disk. Once the maximum is reached," |
| "old files start to be deleted to make room for new ones.", 0, |
| G_MAXUINT, DEFAULT_MAX_FILES, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD, |
| g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)", |
| "Allow non-reference streams to be that many ns before the reference" |
| " stream", |
| 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_MUXER, |
| g_param_spec_object ("muxer", "Muxer", |
| "The muxer element to use (NULL = default mp4mux)", |
| GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_SINK, |
| g_param_spec_object ("sink", "Sink", |
| "The sink element (or element chain) to use (NULL = default filesink)", |
| GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING, |
| g_param_spec_boolean ("use-robust-muxing", |
| "Support robust-muxing mode of some muxers", |
| "Check if muxers support robust muxing via the reserved-max-duration and " |
| "reserved-duration-remaining properties and use them if so. " |
| "(Only present on qtmux and mp4mux for now). splitmuxsink may then also " |
| " create new fragments if the reserved header space is about to overflow. " |
| "Note this does not set reserved-moov-update-period - apps should do that manually", |
| DEFAULT_USE_ROBUST_MUXING, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| /** |
| * GstSplitMuxSink::format-location: |
| * @splitmux: the #GstSplitMuxSink |
| * @fragment_id: the sequence number of the file to be created |
| * |
| * Returns: the location to be used for the next output file |
| */ |
| signals[SIGNAL_FORMAT_LOCATION] = |
| g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass), |
| G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT); |
| |
| /** |
| * GstSplitMuxSink::format-location-full: |
| * @splitmux: the #GstSplitMuxSink |
| * @fragment_id: the sequence number of the file to be created |
| * @first_sample: A #GstSample containing the first buffer |
| * from the reference stream in the new file |
| * |
| * Returns: the location to be used for the next output file |
| */ |
| signals[SIGNAL_FORMAT_LOCATION_FULL] = |
| g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass), |
| G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT, |
| GST_TYPE_SAMPLE); |
| |
| /** |
| * GstSplitMuxSink::split-now: |
| * @splitmux: the #GstSplitMuxSink |
| * |
| * When called by the user, this action signal splits the video file (and begins a new one) immediately. |
| * |
| * |
| * Since: 1.14 |
| */ |
| |
| signals[SIGNAL_SPLIT_NOW] = |
| g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass), |
| G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass, |
| split_now), NULL, NULL, NULL, G_TYPE_NONE, 0); |
| |
| klass->split_now = split_now; |
| } |
| |
| static void |
| gst_splitmux_sink_init (GstSplitMuxSink * splitmux) |
| { |
| g_mutex_init (&splitmux->lock); |
| g_cond_init (&splitmux->input_cond); |
| g_cond_init (&splitmux->output_cond); |
| g_queue_init (&splitmux->out_cmd_q); |
| |
| splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD; |
| splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME; |
| splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES; |
| splitmux->max_files = DEFAULT_MAX_FILES; |
| splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS; |
| splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE; |
| splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD; |
| splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING; |
| |
| splitmux->threshold_timecode_str = NULL; |
| |
| GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK); |
| splitmux->split_now = FALSE; |
| } |
| |
| static void |
| gst_splitmux_reset (GstSplitMuxSink * splitmux) |
| { |
| if (splitmux->muxer) { |
| gst_element_set_locked_state (splitmux->muxer, TRUE); |
| gst_element_set_state (splitmux->muxer, GST_STATE_NULL); |
| gst_bin_remove (GST_BIN (splitmux), splitmux->muxer); |
| } |
| if (splitmux->active_sink) { |
| gst_element_set_locked_state (splitmux->active_sink, TRUE); |
| gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); |
| gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink); |
| } |
| |
| splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL; |
| } |
| |
| static void |
| gst_splitmux_sink_dispose (GObject * object) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); |
| |
| /* Calling parent dispose invalidates all child pointers */ |
| splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL; |
| |
| G_OBJECT_CLASS (parent_class)->dispose (object); |
| } |
| |
| static void |
| gst_splitmux_sink_finalize (GObject * object) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); |
| g_cond_clear (&splitmux->input_cond); |
| g_cond_clear (&splitmux->output_cond); |
| g_mutex_clear (&splitmux->lock); |
| g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL); |
| g_queue_clear (&splitmux->out_cmd_q); |
| |
| if (splitmux->provided_sink) |
| gst_object_unref (splitmux->provided_sink); |
| if (splitmux->provided_muxer) |
| gst_object_unref (splitmux->provided_muxer); |
| |
| if (splitmux->threshold_timecode_str) |
| g_free (splitmux->threshold_timecode_str); |
| |
| g_free (splitmux->location); |
| |
| /* Make sure to free any un-released contexts */ |
| g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL); |
| g_list_free (splitmux->contexts); |
| |
| G_OBJECT_CLASS (parent_class)->finalize (object); |
| } |
| |
| /* |
| * Set any time threshold to the muxer, if it has |
| * reserved-max-duration and reserved-duration-remaining |
| * properties. Called when creating/claiming the muxer |
| * in create_elements() */ |
| static void |
| update_muxer_properties (GstSplitMuxSink * sink) |
| { |
| GObjectClass *klass; |
| GstClockTime threshold_time; |
| |
| sink->muxer_has_reserved_props = FALSE; |
| if (sink->muxer == NULL) |
| return; |
| klass = G_OBJECT_GET_CLASS (sink->muxer); |
| if (g_object_class_find_property (klass, "reserved-max-duration") == NULL) |
| return; |
| if (g_object_class_find_property (klass, |
| "reserved-duration-remaining") == NULL) |
| return; |
| sink->muxer_has_reserved_props = TRUE; |
| |
| GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (sink->threshold_time)); |
| GST_OBJECT_LOCK (sink); |
| threshold_time = sink->threshold_time; |
| GST_OBJECT_UNLOCK (sink); |
| |
| if (threshold_time > 0) { |
| /* Tell the muxer how much space to reserve */ |
| GstClockTime muxer_threshold = threshold_time; |
| g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL); |
| } |
| } |
| |
| static void |
| gst_splitmux_sink_set_property (GObject * object, guint prop_id, |
| const GValue * value, GParamSpec * pspec) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); |
| |
| switch (prop_id) { |
| case PROP_LOCATION:{ |
| GST_OBJECT_LOCK (splitmux); |
| g_free (splitmux->location); |
| splitmux->location = g_value_dup_string (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| } |
| case PROP_MAX_SIZE_BYTES: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->threshold_bytes = g_value_get_uint64 (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_SIZE_TIME: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->threshold_time = g_value_get_uint64 (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_SIZE_TIMECODE: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->threshold_timecode_str = g_value_dup_string (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_SEND_KEYFRAME_REQUESTS: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->send_keyframe_requests = g_value_get_boolean (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_FILES: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->max_files = g_value_get_uint (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MUXER_OVERHEAD: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->mux_overhead = g_value_get_double (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_USE_ROBUST_MUXING: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->use_robust_muxing = g_value_get_boolean (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| if (splitmux->use_robust_muxing) |
| update_muxer_properties (splitmux); |
| break; |
| case PROP_ALIGNMENT_THRESHOLD: |
| GST_OBJECT_LOCK (splitmux); |
| splitmux->alignment_threshold = g_value_get_uint64 (value); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_SINK: |
| GST_OBJECT_LOCK (splitmux); |
| if (splitmux->provided_sink) |
| gst_object_unref (splitmux->provided_sink); |
| splitmux->provided_sink = g_value_get_object (value); |
| gst_object_ref_sink (splitmux->provided_sink); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MUXER: |
| GST_OBJECT_LOCK (splitmux); |
| if (splitmux->provided_muxer) |
| gst_object_unref (splitmux->provided_muxer); |
| splitmux->provided_muxer = g_value_get_object (value); |
| gst_object_ref_sink (splitmux->provided_muxer); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| default: |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
| break; |
| } |
| } |
| |
| static void |
| gst_splitmux_sink_get_property (GObject * object, guint prop_id, |
| GValue * value, GParamSpec * pspec) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); |
| |
| switch (prop_id) { |
| case PROP_LOCATION: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_string (value, splitmux->location); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_SIZE_BYTES: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_uint64 (value, splitmux->threshold_bytes); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_SIZE_TIME: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_uint64 (value, splitmux->threshold_time); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_SIZE_TIMECODE: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_string (value, splitmux->threshold_timecode_str); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_SEND_KEYFRAME_REQUESTS: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_boolean (value, splitmux->send_keyframe_requests); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MAX_FILES: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_uint (value, splitmux->max_files); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MUXER_OVERHEAD: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_double (value, splitmux->mux_overhead); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_USE_ROBUST_MUXING: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_boolean (value, splitmux->use_robust_muxing); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_ALIGNMENT_THRESHOLD: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_uint64 (value, splitmux->alignment_threshold); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_SINK: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_object (value, splitmux->provided_sink); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| case PROP_MUXER: |
| GST_OBJECT_LOCK (splitmux); |
| g_value_set_object (value, splitmux->provided_muxer); |
| GST_OBJECT_UNLOCK (splitmux); |
| break; |
| default: |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
| break; |
| } |
| } |
| |
| /* Convenience function */ |
| static inline GstClockTimeDiff |
| my_segment_to_running_time (GstSegment * segment, GstClockTime val) |
| { |
| GstClockTimeDiff res = GST_CLOCK_STIME_NONE; |
| |
| if (GST_CLOCK_TIME_IS_VALID (val)) { |
| gboolean sign = |
| gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val); |
| if (sign > 0) |
| res = val; |
| else if (sign < 0) |
| res = -val; |
| } |
| return res; |
| } |
| |
| static MqStreamCtx * |
| mq_stream_ctx_new (GstSplitMuxSink * splitmux) |
| { |
| MqStreamCtx *ctx; |
| |
| ctx = g_new0 (MqStreamCtx, 1); |
| g_atomic_int_set (&ctx->refcount, 1); |
| ctx->splitmux = splitmux; |
| gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); |
| gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); |
| ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE; |
| g_queue_init (&ctx->queued_bufs); |
| return ctx; |
| } |
| |
| static void |
| mq_stream_ctx_free (MqStreamCtx * ctx) |
| { |
| if (ctx->q) { |
| g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id); |
| gst_element_set_locked_state (ctx->q, TRUE); |
| gst_element_set_state (ctx->q, GST_STATE_NULL); |
| gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q); |
| gst_object_unref (ctx->q); |
| } |
| gst_buffer_replace (&ctx->prev_in_keyframe, NULL); |
| gst_object_unref (ctx->sinkpad); |
| gst_object_unref (ctx->srcpad); |
| g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); |
| g_queue_clear (&ctx->queued_bufs); |
| g_free (ctx); |
| } |
| |
| static void |
| mq_stream_ctx_unref (MqStreamCtx * ctx) |
| { |
| if (g_atomic_int_dec_and_test (&ctx->refcount)) |
| mq_stream_ctx_free (ctx); |
| } |
| |
| static void |
| mq_stream_ctx_ref (MqStreamCtx * ctx) |
| { |
| g_atomic_int_inc (&ctx->refcount); |
| } |
| |
| static void |
| _pad_block_destroy_sink_notify (MqStreamCtx * ctx) |
| { |
| ctx->sink_pad_block_id = 0; |
| mq_stream_ctx_unref (ctx); |
| } |
| |
| static void |
| _pad_block_destroy_src_notify (MqStreamCtx * ctx) |
| { |
| ctx->src_pad_block_id = 0; |
| mq_stream_ctx_unref (ctx); |
| } |
| |
| static void |
| send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened) |
| { |
| gchar *location = NULL; |
| GstMessage *msg; |
| const gchar *msg_name = opened ? |
| "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed"; |
| |
| g_object_get (splitmux->sink, "location", &location, NULL); |
| |
| msg = gst_message_new_element (GST_OBJECT (splitmux), |
| gst_structure_new (msg_name, |
| "location", G_TYPE_STRING, location, |
| "running-time", GST_TYPE_CLOCK_TIME, |
| splitmux->reference_ctx->out_running_time, NULL)); |
| gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg); |
| |
| g_free (location); |
| } |
| |
| /* Called with lock held, drops the lock to send EOS to the |
| * pad |
| */ |
| static void |
| send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| GstEvent *eos; |
| GstPad *pad; |
| |
| eos = gst_event_new_eos (); |
| pad = gst_pad_get_peer (ctx->srcpad); |
| |
| ctx->out_eos = TRUE; |
| |
| GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| gst_pad_send_event (pad, eos); |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| gst_object_unref (pad); |
| } |
| |
| /* Called with splitmux lock held to check if this output |
| * context needs to sleep to wait for the release of the |
| * next GOP, or to send EOS to close out the current file |
| */ |
| static void |
| complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| if (ctx->caps_change) |
| return; |
| |
| do { |
| /* When first starting up, the reference stream has to output |
| * the first buffer to prepare the muxer and sink */ |
| gboolean can_output = (ctx->is_reference || splitmux->ready_for_output); |
| GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time; |
| |
| if (!(splitmux->max_out_running_time == 0 || |
| splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || |
| splitmux->alignment_threshold == 0 || |
| splitmux->max_out_running_time < splitmux->alignment_threshold)) { |
| my_max_out_running_time -= splitmux->alignment_threshold; |
| GST_LOG_OBJECT (ctx->srcpad, |
| "Max out running time currently %" GST_STIME_FORMAT |
| ", with threshold applied it is %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (splitmux->max_out_running_time), |
| GST_STIME_ARGS (my_max_out_running_time)); |
| } |
| |
| if (ctx->flushing |
| || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) |
| return; |
| |
| GST_LOG_OBJECT (ctx->srcpad, |
| "Checking running time %" GST_STIME_FORMAT " against max %" |
| GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time), |
| GST_STIME_ARGS (my_max_out_running_time)); |
| |
| if (can_output) { |
| if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || |
| ctx->out_running_time < my_max_out_running_time) { |
| return; |
| } |
| |
| switch (splitmux->output_state) { |
| case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP: |
| /* We only get here if we've finished outputting a GOP and need to know |
| * what to do next */ |
| splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| continue; |
| |
| case SPLITMUX_OUTPUT_STATE_ENDING_FILE: |
| /* We've reached the max out running_time to get here, so end this file now */ |
| if (ctx->out_eos == FALSE) { |
| send_eos (splitmux, ctx); |
| continue; |
| } |
| break; |
| case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE: |
| if (ctx->is_reference) { |
| /* Special handling on the reference ctx to start new fragments |
| * and collect commands from the command queue */ |
| /* drops the splitmux lock briefly: */ |
| start_next_fragment (splitmux, ctx); |
| continue; |
| } |
| break; |
| |
| case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{ |
| do { |
| SplitMuxOutputCommand *cmd = |
| g_queue_pop_tail (&splitmux->out_cmd_q); |
| if (cmd != NULL) { |
| /* If we pop the last command, we need to make our queues bigger */ |
| if (g_queue_get_length (&splitmux->out_cmd_q) == 0) |
| grow_blocked_queues (splitmux); |
| |
| if (cmd->start_new_fragment) { |
| GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment"); |
| splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE; |
| } else { |
| GST_DEBUG_OBJECT (splitmux, |
| "Got new output cmd for time %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (cmd->max_output_ts)); |
| |
| /* Extend the output range immediately */ |
| splitmux->max_out_running_time = cmd->max_output_ts; |
| splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP; |
| } |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| |
| out_cmd_buf_free (cmd); |
| break; |
| } else { |
| GST_SPLITMUX_WAIT_OUTPUT (splitmux); |
| } |
| } while (splitmux->output_state == |
| SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND); |
| /* loop and re-check the state */ |
| continue; |
| } |
| case SPLITMUX_OUTPUT_STATE_STOPPED: |
| return; |
| } |
| } |
| |
| GST_INFO_OBJECT (ctx->srcpad, |
| "Sleeping for running time %" |
| GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.", |
| GST_STIME_ARGS (ctx->out_running_time), |
| GST_STIME_ARGS (splitmux->max_out_running_time)); |
| GST_SPLITMUX_WAIT_OUTPUT (splitmux); |
| GST_INFO_OBJECT (ctx->srcpad, |
| "Woken for new max running time %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (splitmux->max_out_running_time)); |
| } |
| while (1); |
| } |
| |
| static GstClockTime |
| calculate_next_max_timecode (GstSplitMuxSink * splitmux, |
| const GstVideoTimeCode * cur_tc) |
| { |
| GstVideoTimeCode *target_tc; |
| GstVideoTimeCodeInterval *tc_inter; |
| GstClockTime cur_tc_time, target_tc_time, next_max_tc_time; |
| |
| if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL) |
| return GST_CLOCK_TIME_NONE; |
| |
| tc_inter = |
| gst_video_time_code_interval_new_from_string |
| (splitmux->threshold_timecode_str); |
| target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter); |
| gst_video_time_code_interval_free (tc_inter); |
| |
| /* Convert to ns */ |
| target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc); |
| cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc); |
| |
| /* Add fragment_start_time, accounting for wraparound */ |
| if (target_tc_time >= cur_tc_time) { |
| next_max_tc_time = |
| target_tc_time - cur_tc_time + splitmux->fragment_start_time; |
| } else { |
| GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND; |
| |
| next_max_tc_time = |
| day_in_ns - cur_tc_time + target_tc_time + |
| splitmux->fragment_start_time; |
| } |
| GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT |
| " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time), |
| GST_TIME_ARGS (cur_tc_time)); |
| gst_video_time_code_free (target_tc); |
| |
| return next_max_tc_time; |
| } |
| |
| static gboolean |
| request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer) |
| { |
| GstEvent *ev; |
| GstClockTime target_time; |
| gboolean timecode_based = FALSE; |
| |
| splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE; |
| if (splitmux->threshold_timecode_str) { |
| GstVideoTimeCodeMeta *tc_meta; |
| |
| if (buffer != NULL) { |
| tc_meta = gst_buffer_get_video_time_code_meta (buffer); |
| if (tc_meta) { |
| splitmux->next_max_tc_time = |
| calculate_next_max_timecode (splitmux, &tc_meta->tc); |
| timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE); |
| } |
| } else { |
| /* This can happen in the presence of GAP events that trigger |
| * a new fragment start */ |
| GST_WARNING_OBJECT (splitmux, |
| "No buffer available to calculate next timecode"); |
| } |
| } |
| |
| if (splitmux->send_keyframe_requests == FALSE |
| || (splitmux->threshold_time == 0 && !timecode_based) |
| || splitmux->threshold_bytes != 0) |
| return TRUE; |
| |
| if (timecode_based) { |
| /* We might have rounding errors: aim slightly earlier */ |
| target_time = splitmux->next_max_tc_time - 5 * GST_USECOND; |
| } else { |
| target_time = splitmux->fragment_start_time + splitmux->threshold_time; |
| } |
| ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0); |
| GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (target_time)); |
| return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev); |
| } |
| |
| static GstPadProbeReturn |
| handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) |
| { |
| GstSplitMuxSink *splitmux = ctx->splitmux; |
| MqStreamBuf *buf_info = NULL; |
| |
| GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); |
| |
| /* FIXME: Handle buffer lists, until then make it clear they won't work */ |
| if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { |
| g_warning ("Buffer list handling not implemented"); |
| return GST_PAD_PROBE_DROP; |
| } |
| if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM || |
| info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) { |
| GstEvent *event = gst_pad_probe_info_get_event (info); |
| gboolean locked = FALSE; |
| |
| GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); |
| |
| switch (GST_EVENT_TYPE (event)) { |
| case GST_EVENT_SEGMENT: |
| gst_event_copy_segment (event, &ctx->out_segment); |
| break; |
| case GST_EVENT_FLUSH_STOP: |
| GST_SPLITMUX_LOCK (splitmux); |
| locked = TRUE; |
| gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); |
| g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); |
| g_queue_clear (&ctx->queued_bufs); |
| ctx->flushing = FALSE; |
| break; |
| case GST_EVENT_FLUSH_START: |
| GST_SPLITMUX_LOCK (splitmux); |
| locked = TRUE; |
| GST_LOG_OBJECT (pad, "Flush start"); |
| ctx->flushing = TRUE; |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| break; |
| case GST_EVENT_EOS: |
| GST_SPLITMUX_LOCK (splitmux); |
| locked = TRUE; |
| if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) |
| goto beach; |
| ctx->out_eos = TRUE; |
| break; |
| case GST_EVENT_GAP:{ |
| GstClockTime gap_ts; |
| GstClockTimeDiff rtime; |
| |
| gst_event_parse_gap (event, &gap_ts, NULL); |
| if (gap_ts == GST_CLOCK_TIME_NONE) |
| break; |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| locked = TRUE; |
| |
| if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) |
| goto beach; |
| |
| /* When we get a gap event on the |
| * reference stream and we're trying to open a |
| * new file, we need to store it until we get |
| * the buffer afterwards |
| */ |
| if (ctx->is_reference && |
| (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) { |
| GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives"); |
| gst_event_replace (&ctx->pending_gap, event); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return GST_PAD_PROBE_HANDLED; |
| } |
| |
| rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts); |
| |
| GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (rtime)); |
| |
| if (rtime != GST_CLOCK_STIME_NONE) { |
| ctx->out_running_time = rtime; |
| complete_or_wait_on_out (splitmux, ctx); |
| } |
| break; |
| } |
| case GST_EVENT_CUSTOM_DOWNSTREAM:{ |
| const GstStructure *s; |
| GstClockTimeDiff ts = 0; |
| |
| s = gst_event_get_structure (event); |
| if (!gst_structure_has_name (s, "splitmuxsink-unblock")) |
| break; |
| |
| gst_structure_get_int64 (s, "timestamp", &ts); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| locked = TRUE; |
| |
| if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) |
| goto beach; |
| ctx->out_running_time = ts; |
| if (!ctx->is_reference) |
| complete_or_wait_on_out (splitmux, ctx); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return GST_PAD_PROBE_DROP; |
| } |
| case GST_EVENT_CAPS:{ |
| GstPad *peer; |
| |
| if (!ctx->is_reference) |
| break; |
| |
| peer = gst_pad_get_peer (pad); |
| if (peer) { |
| gboolean ok = gst_pad_send_event (peer, gst_event_ref (event)); |
| |
| gst_object_unref (peer); |
| |
| if (ok) |
| break; |
| |
| } else { |
| break; |
| } |
| /* This is in the case the muxer doesn't allow this change of caps */ |
| GST_SPLITMUX_LOCK (splitmux); |
| locked = TRUE; |
| ctx->caps_change = TRUE; |
| |
| if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) { |
| GST_DEBUG_OBJECT (splitmux, |
| "New caps were not accepted. Switching output file"); |
| if (ctx->out_eos == FALSE) { |
| splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE; |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| } |
| } |
| |
| /* Lets it fall through, if it fails again, then the muxer just can't |
| * support this format, but at least we have a closed file. |
| */ |
| break; |
| } |
| default: |
| break; |
| } |
| |
| /* We need to make sure events aren't passed |
| * until the muxer / sink are ready for it */ |
| if (!locked) |
| GST_SPLITMUX_LOCK (splitmux); |
| if (!ctx->is_reference) |
| complete_or_wait_on_out (splitmux, ctx); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| /* Don't try to forward sticky events before the next buffer is there |
| * because it would cause a new file to be created without the first |
| * buffer being available. |
| */ |
| if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) { |
| gst_event_unref (event); |
| return GST_PAD_PROBE_HANDLED; |
| } else |
| return GST_PAD_PROBE_PASS; |
| } |
| |
| /* Allow everything through until the configured next stopping point */ |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| buf_info = g_queue_pop_tail (&ctx->queued_bufs); |
| if (buf_info == NULL) |
| /* Can only happen due to a poorly timed flush */ |
| goto beach; |
| |
| /* If we have popped a keyframe, decrement the queued_gop count */ |
| if (buf_info->keyframe && splitmux->queued_keyframes > 0) |
| splitmux->queued_keyframes--; |
| |
| ctx->out_running_time = buf_info->run_ts; |
| ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info); |
| |
| GST_LOG_OBJECT (splitmux, |
| "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT |
| " size %" G_GUINT64_FORMAT, |
| pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size); |
| |
| ctx->caps_change = FALSE; |
| |
| complete_or_wait_on_out (splitmux, ctx); |
| |
| splitmux->muxed_out_bytes += buf_info->buf_size; |
| |
| #ifndef GST_DISABLE_GST_DEBUG |
| { |
| GstBuffer *buf = gst_pad_probe_info_get_buffer (info); |
| GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT |
| " run ts %" GST_STIME_FORMAT, buf, |
| GST_STIME_ARGS (ctx->out_running_time)); |
| } |
| #endif |
| |
| ctx->cur_out_buffer = NULL; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| /* pending_gap is protected by the STREAM lock */ |
| if (ctx->pending_gap) { |
| /* If we previously stored a gap event, send it now */ |
| GstPad *peer = gst_pad_get_peer (ctx->srcpad); |
| |
| GST_DEBUG_OBJECT (splitmux, |
| "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad); |
| |
| gst_pad_send_event (peer, ctx->pending_gap); |
| ctx->pending_gap = NULL; |
| |
| gst_object_unref (peer); |
| } |
| |
| mq_stream_buf_free (buf_info); |
| |
| return GST_PAD_PROBE_PASS; |
| |
| beach: |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return GST_PAD_PROBE_DROP; |
| } |
| |
| static gboolean |
| resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer) |
| { |
| return gst_pad_send_event (peer, gst_event_ref (*event)); |
| } |
| |
| static void |
| restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) |
| { |
| GstPad *peer = gst_pad_get_peer (ctx->srcpad); |
| |
| gst_pad_sticky_events_foreach (ctx->srcpad, |
| (GstPadStickyEventsForeachFunction) (resend_sticky), peer); |
| |
| /* Clear EOS flag if not actually EOS */ |
| ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad); |
| |
| gst_object_unref (peer); |
| } |
| |
| /* Called with lock held when a fragment |
| * reaches EOS and it is time to restart |
| * a new fragment |
| */ |
| static void |
| start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| GstElement *muxer, *sink; |
| |
| /* 1 change to new file */ |
| splitmux->switching_fragment = TRUE; |
| |
| /* We need to drop the splitmux lock to acquire the state lock |
| * here and ensure there's no racy state change going on elsewhere */ |
| muxer = gst_object_ref (splitmux->muxer); |
| sink = gst_object_ref (splitmux->active_sink); |
| |
| GST_SPLITMUX_UNLOCK (splitmux); |
| GST_STATE_LOCK (splitmux); |
| |
| gst_element_set_locked_state (muxer, TRUE); |
| gst_element_set_locked_state (sink, TRUE); |
| gst_element_set_state (muxer, GST_STATE_NULL); |
| gst_element_set_state (sink, GST_STATE_NULL); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0) |
| set_next_filename (splitmux, ctx); |
| splitmux->muxed_out_bytes = 0; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| gst_element_set_state (sink, GST_STATE_TARGET (splitmux)); |
| gst_element_set_state (muxer, GST_STATE_TARGET (splitmux)); |
| gst_element_set_locked_state (muxer, FALSE); |
| gst_element_set_locked_state (sink, FALSE); |
| |
| gst_object_unref (sink); |
| gst_object_unref (muxer); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| GST_STATE_UNLOCK (splitmux); |
| splitmux->switching_fragment = FALSE; |
| do_async_done (splitmux); |
| |
| splitmux->ready_for_output = TRUE; |
| |
| g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); |
| |
| send_fragment_opened_closed_msg (splitmux, TRUE); |
| |
| /* FIXME: Is this always the correct next state? */ |
| splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND; |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| } |
| |
| static void |
| bus_handler (GstBin * bin, GstMessage * message) |
| { |
| GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin); |
| |
| switch (GST_MESSAGE_TYPE (message)) { |
| case GST_MESSAGE_EOS: |
| /* If the state is draining out the current file, drop this EOS */ |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| send_fragment_opened_closed_msg (splitmux, FALSE); |
| |
| if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) { |
| GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); |
| splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| |
| gst_message_unref (message); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return; |
| } else { |
| GST_DEBUG_OBJECT (splitmux, |
| "Passing EOS message. Output state %d max_out_running_time %" |
| GST_STIME_FORMAT, splitmux->output_state, |
| GST_STIME_ARGS (splitmux->max_out_running_time)); |
| } |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| case GST_MESSAGE_ASYNC_START: |
| case GST_MESSAGE_ASYNC_DONE: |
| /* Ignore state changes from our children while switching */ |
| if (splitmux->switching_fragment) { |
| if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink |
| || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) { |
| GST_LOG_OBJECT (splitmux, |
| "Ignoring state change from child %" GST_PTR_FORMAT |
| " while switching", GST_MESSAGE_SRC (message)); |
| gst_message_unref (message); |
| return; |
| } |
| } |
| break; |
| case GST_MESSAGE_WARNING: |
| { |
| GError *gerror = NULL; |
| |
| gst_message_parse_warning (message, &gerror, NULL); |
| |
| if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) { |
| GList *item; |
| gboolean caps_change = FALSE; |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| for (item = splitmux->contexts; item; item = item->next) { |
| MqStreamCtx *ctx = item->data; |
| |
| if (ctx->caps_change) { |
| caps_change = TRUE; |
| break; |
| } |
| } |
| |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| if (caps_change) { |
| GST_LOG_OBJECT (splitmux, |
| "Ignoring warning change from child %" GST_PTR_FORMAT |
| " while switching caps", GST_MESSAGE_SRC (message)); |
| gst_message_unref (message); |
| return; |
| } |
| } |
| break; |
| } |
| default: |
| break; |
| } |
| |
| GST_BIN_CLASS (parent_class)->handle_message (bin, message); |
| } |
| |
| static void |
| ctx_set_unblock (MqStreamCtx * ctx) |
| { |
| ctx->need_unblock = TRUE; |
| } |
| |
| static gboolean |
| need_new_fragment (GstSplitMuxSink * splitmux, |
| GstClockTime queued_time, GstClockTime queued_gop_time, |
| guint64 queued_bytes) |
| { |
| guint64 thresh_bytes; |
| GstClockTime thresh_time; |
| gboolean check_robust_muxing; |
| |
| GST_OBJECT_LOCK (splitmux); |
| thresh_bytes = splitmux->threshold_bytes; |
| thresh_time = splitmux->threshold_time; |
| check_robust_muxing = splitmux->use_robust_muxing |
| && splitmux->muxer_has_reserved_props; |
| GST_OBJECT_UNLOCK (splitmux); |
| |
| /* Have we muxed anything into the new file at all? */ |
| if (splitmux->fragment_total_bytes <= 0) |
| return FALSE; |
| |
| /* User told us to split now */ |
| if (g_atomic_int_get (&(splitmux->split_now)) == TRUE) |
| return TRUE; |
| |
| if (thresh_bytes > 0 && queued_bytes >= thresh_bytes) |
| return TRUE; /* Would overrun byte limit */ |
| |
| if (thresh_time > 0 && queued_time >= thresh_time) |
| return TRUE; /* Would overrun byte limit */ |
| |
| /* Timecode-based threshold accounts for possible rounding errors: |
| * 5us should be bigger than all possible rounding errors but nowhere near |
| * big enough to skip to another frame */ |
| if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE && |
| splitmux->reference_ctx->in_running_time > |
| splitmux->next_max_tc_time + 5 * GST_USECOND) |
| return TRUE; /* Timecode threshold */ |
| |
| if (check_robust_muxing) { |
| GstClockTime mux_reserved_remain; |
| |
| g_object_get (splitmux->muxer, |
| "reserved-duration-remaining", &mux_reserved_remain, NULL); |
| |
| GST_LOG_OBJECT (splitmux, |
| "Muxer robust muxing report - %" G_GUINT64_FORMAT |
| " remaining. New GOP would enqueue %" G_GUINT64_FORMAT, |
| mux_reserved_remain, queued_gop_time); |
| |
| if (queued_gop_time >= mux_reserved_remain) { |
| GST_INFO_OBJECT (splitmux, |
| "File is about to run out of header room - %" G_GUINT64_FORMAT |
| " remaining. New GOP would enqueue %" G_GUINT64_FORMAT |
| ". Switching to new file", mux_reserved_remain, queued_gop_time); |
| return TRUE; |
| } |
| } |
| |
| /* Continue and mux this GOP */ |
| return FALSE; |
| } |
| |
| /* Called with splitmux lock held */ |
| /* Called when entering ProcessingCompleteGop state |
| * Assess if mq contents overflowed the current file |
| * -> If yes, need to switch to new file |
| * -> if no, set max_out_running_time to let this GOP in and |
| * go to COLLECTING_GOP_START state |
| */ |
| static void |
| handle_gathered_gop (GstSplitMuxSink * splitmux) |
| { |
| guint64 queued_bytes; |
| GstClockTimeDiff queued_time = 0; |
| GstClockTimeDiff queued_gop_time = 0; |
| GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time; |
| SplitMuxOutputCommand *cmd; |
| |
| /* Assess if the multiqueue contents overflowed the current file */ |
| /* When considering if a newly gathered GOP overflows |
| * the time limit for the file, only consider the running time of the |
| * reference stream. Other streams might have run ahead a little bit, |
| * but extra pieces won't be released to the muxer beyond the reference |
| * stream cut-off anyway - so it forms the limit. */ |
| queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes; |
| queued_time = splitmux->reference_ctx->in_running_time; |
| /* queued_gop_time tracks how much unwritten data there is waiting to |
| * be written to this fragment including this GOP */ |
| if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE) |
| queued_gop_time = |
| splitmux->reference_ctx->in_running_time - |
| splitmux->reference_ctx->out_running_time; |
| else |
| queued_gop_time = |
| splitmux->reference_ctx->in_running_time - splitmux->gop_start_time; |
| |
| GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes); |
| |
| g_assert (queued_gop_time >= 0); |
| g_assert (queued_time >= splitmux->fragment_start_time); |
| |
| queued_time -= splitmux->fragment_start_time; |
| if (queued_time < queued_gop_time) |
| queued_gop_time = queued_time; |
| |
| /* Expand queued bytes estimate by muxer overhead */ |
| queued_bytes += (queued_bytes * splitmux->mux_overhead); |
| |
| GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT |
| " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes); |
| if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) { |
| GST_LOG_OBJECT (splitmux, |
| "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (splitmux->reference_ctx->in_running_time), |
| GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND)); |
| } |
| |
| /* Check for overrun - have we output at least one byte and overrun |
| * either threshold? */ |
| if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) { |
| g_atomic_int_set (&(splitmux->split_now), FALSE); |
| /* Tell the output side to start a new fragment */ |
| GST_INFO_OBJECT (splitmux, |
| "This GOP (dur %" GST_STIME_FORMAT |
| ") would overflow the fragment, Sending start_new_fragment cmd", |
| GST_STIME_ARGS (splitmux->reference_ctx->in_running_time - |
| splitmux->gop_start_time)); |
| cmd = out_cmd_buf_new (); |
| cmd->start_new_fragment = TRUE; |
| g_queue_push_head (&splitmux->out_cmd_q, cmd); |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| |
| new_out_ts = splitmux->reference_ctx->in_running_time; |
| splitmux->fragment_start_time = splitmux->gop_start_time; |
| splitmux->fragment_total_bytes = 0; |
| |
| if (request_next_keyframe (splitmux, |
| splitmux->reference_ctx->prev_in_keyframe) == FALSE) { |
| GST_WARNING_OBJECT (splitmux, |
| "Could not request a keyframe. Files may not split at the exact location they should"); |
| } |
| gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL); |
| } |
| |
| /* And set up to collect the next GOP */ |
| if (!splitmux->reference_ctx->in_eos) { |
| splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START; |
| splitmux->gop_start_time = new_out_ts; |
| } else { |
| /* This is probably already the current state, but just in case: */ |
| splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP; |
| new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */ |
| } |
| |
| /* And wake all input contexts to send a wake-up event */ |
| g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL); |
| GST_SPLITMUX_BROADCAST_INPUT (splitmux); |
| |
| /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */ |
| splitmux->fragment_total_bytes += splitmux->gop_total_bytes; |
| |
| if (splitmux->gop_total_bytes > 0) { |
| GST_LOG_OBJECT (splitmux, |
| "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT |
| " time %" GST_STIME_FORMAT, |
| splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time)); |
| |
| /* Send this GOP to the output command queue */ |
| cmd = out_cmd_buf_new (); |
| cmd->start_new_fragment = FALSE; |
| cmd->max_output_ts = new_out_ts; |
| GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %" |
| GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts)); |
| g_queue_push_head (&splitmux->out_cmd_q, cmd); |
| |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| } |
| |
| splitmux->gop_total_bytes = 0; |
| } |
| |
| /* Called with splitmux lock held */ |
| /* Called from each input pad when it is has all the pieces |
| * for a GOP or EOS, starting with the reference pad which has set the |
| * splitmux->max_in_running_time |
| */ |
| static void |
| check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| GList *cur; |
| GstEvent *event; |
| |
| /* On ENDING_FILE, the reference stream sends a command to start a new |
| * fragment, then releases the GOP for output in the new fragment. |
| * If somes streams received no buffer during the last GOP that overran, |
| * because its next buffer has a timestamp bigger than |
| * ctx->max_in_running_time, its queue is empty. In that case the only |
| * way to wakeup the output thread is by injecting an event in the |
| * queue. This usually happen with subtitle streams. |
| * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */ |
| if (ctx->need_unblock) { |
| GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event"); |
| event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM | |
| GST_EVENT_TYPE_SERIALIZED, |
| gst_structure_new ("splitmuxsink-unblock", "timestamp", |
| G_TYPE_INT64, splitmux->max_in_running_time, NULL)); |
| |
| GST_SPLITMUX_UNLOCK (splitmux); |
| gst_pad_send_event (ctx->sinkpad, event); |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| ctx->need_unblock = FALSE; |
| GST_SPLITMUX_BROADCAST_INPUT (splitmux); |
| /* state may have changed while we were unlocked. Loop again if so */ |
| if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) |
| return; |
| } |
| |
| if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) { |
| gboolean ready = TRUE; |
| |
| /* Iterate each pad, and check that the input running time is at least |
| * up to the reference running time, and if so handle the collected GOP */ |
| GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %" |
| GST_STIME_FORMAT " ctx %p", |
| GST_STIME_ARGS (splitmux->max_in_running_time), ctx); |
| for (cur = g_list_first (splitmux->contexts); cur != NULL; |
| cur = g_list_next (cur)) { |
| MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); |
| |
| GST_LOG_OBJECT (splitmux, |
| "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT |
| " EOS %d", tmpctx, tmpctx->srcpad, |
| GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); |
| |
| if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE && |
| tmpctx->in_running_time < splitmux->max_in_running_time && |
| !tmpctx->in_eos) { |
| GST_LOG_OBJECT (splitmux, |
| "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep", |
| tmpctx, tmpctx->srcpad); |
| ready = FALSE; |
| break; |
| } |
| } |
| if (ready) { |
| GST_DEBUG_OBJECT (splitmux, |
| "Collected GOP is complete. Processing (ctx %p)", ctx); |
| /* All pads have a complete GOP, release it into the multiqueue */ |
| handle_gathered_gop (splitmux); |
| } |
| } |
| |
| /* If upstream reached EOS we are not expecting more data, no need to wait |
| * here. */ |
| if (ctx->in_eos) |
| return; |
| |
| /* Some pad is not yet ready, or GOP is being pushed |
| * either way, sleep and wait to get woken */ |
| while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT && |
| !ctx->flushing && |
| (ctx->in_running_time >= splitmux->max_in_running_time) && |
| (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) { |
| |
| GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx); |
| GST_SPLITMUX_WAIT_INPUT (splitmux); |
| GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx); |
| } |
| } |
| |
| static GstPadProbeReturn |
| handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) |
| { |
| GstSplitMuxSink *splitmux = ctx->splitmux; |
| GstBuffer *buf; |
| MqStreamBuf *buf_info = NULL; |
| GstClockTime ts; |
| gboolean loop_again; |
| gboolean keyframe = FALSE; |
| |
| GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); |
| |
| /* FIXME: Handle buffer lists, until then make it clear they won't work */ |
| if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { |
| g_warning ("Buffer list handling not implemented"); |
| return GST_PAD_PROBE_DROP; |
| } |
| if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM || |
| info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) { |
| GstEvent *event = gst_pad_probe_info_get_event (info); |
| |
| GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); |
| |
| switch (GST_EVENT_TYPE (event)) { |
| case GST_EVENT_SEGMENT: |
| gst_event_copy_segment (event, &ctx->in_segment); |
| break; |
| case GST_EVENT_FLUSH_STOP: |
| GST_SPLITMUX_LOCK (splitmux); |
| gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); |
| ctx->in_eos = FALSE; |
| ctx->in_running_time = GST_CLOCK_STIME_NONE; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| case GST_EVENT_EOS: |
| GST_SPLITMUX_LOCK (splitmux); |
| ctx->in_eos = TRUE; |
| |
| if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) |
| goto beach; |
| |
| if (ctx->is_reference) { |
| GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); |
| /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */ |
| splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT; |
| /* Wake up other input pads to collect this GOP */ |
| GST_SPLITMUX_BROADCAST_INPUT (splitmux); |
| check_completed_gop (splitmux, ctx); |
| } else if (splitmux->input_state == |
| SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) { |
| /* If we are waiting for a GOP to be completed (ie, for aux |
| * pads to catch up), then this pad is complete, so check |
| * if the whole GOP is. |
| */ |
| check_completed_gop (splitmux, ctx); |
| } |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| case GST_EVENT_GAP:{ |
| GstClockTime gap_ts; |
| GstClockTimeDiff rtime; |
| |
| gst_event_parse_gap (event, &gap_ts, NULL); |
| if (gap_ts == GST_CLOCK_TIME_NONE) |
| break; |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) |
| goto beach; |
| rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts); |
| |
| GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (rtime)); |
| |
| if (ctx->is_reference |
| && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) { |
| splitmux->gop_start_time = splitmux->fragment_start_time = rtime; |
| GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (splitmux->fragment_start_time)); |
| /* Also take this as the first start time when starting up, |
| * so that we start counting overflow from the first frame */ |
| if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) |
| splitmux->max_in_running_time = splitmux->fragment_start_time; |
| } |
| |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| } |
| default: |
| break; |
| } |
| return GST_PAD_PROBE_PASS; |
| } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) { |
| switch (GST_QUERY_TYPE (GST_QUERY (info->data))) { |
| case GST_QUERY_ALLOCATION: |
| return GST_PAD_PROBE_DROP; |
| default: |
| return GST_PAD_PROBE_PASS; |
| } |
| } |
| |
| buf = gst_pad_probe_info_get_buffer (info); |
| buf_info = mq_stream_buf_new (); |
| |
| if (GST_BUFFER_PTS_IS_VALID (buf)) |
| ts = GST_BUFFER_PTS (buf); |
| else |
| ts = GST_BUFFER_DTS (buf); |
| |
| GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts)); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) |
| goto beach; |
| |
| /* If this buffer has a timestamp, advance the input timestamp of the |
| * stream */ |
| if (GST_CLOCK_TIME_IS_VALID (ts)) { |
| GstClockTimeDiff running_time = |
| my_segment_to_running_time (&ctx->in_segment, ts); |
| |
| GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (running_time)); |
| |
| if (GST_CLOCK_STIME_IS_VALID (running_time) |
| && running_time > ctx->in_running_time) |
| ctx->in_running_time = running_time; |
| } |
| |
| /* Try to make sure we have a valid running time */ |
| if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) { |
| ctx->in_running_time = |
| my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start); |
| } |
| |
| GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (ctx->in_running_time)); |
| |
| buf_info->run_ts = ctx->in_running_time; |
| buf_info->buf_size = gst_buffer_get_size (buf); |
| buf_info->duration = GST_BUFFER_DURATION (buf); |
| |
| /* initialize fragment_start_time */ |
| if (ctx->is_reference |
| && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) { |
| splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts; |
| GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (splitmux->fragment_start_time)); |
| gst_buffer_replace (&ctx->prev_in_keyframe, buf); |
| |
| /* Also take this as the first start time when starting up, |
| * so that we start counting overflow from the first frame */ |
| if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) |
| splitmux->max_in_running_time = splitmux->fragment_start_time; |
| if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) { |
| GST_WARNING_OBJECT (splitmux, |
| "Could not request a keyframe. Files may not split at the exact location they should"); |
| } |
| gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL); |
| } |
| |
| GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT |
| " total GOP bytes %" G_GUINT64_FORMAT, |
| GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes); |
| |
| loop_again = TRUE; |
| do { |
| if (ctx->flushing) |
| break; |
| |
| switch (splitmux->input_state) { |
| case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START: |
| if (ctx->is_reference) { |
| /* If a keyframe, we have a complete GOP */ |
| if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || |
| !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) || |
| splitmux->max_in_running_time >= ctx->in_running_time) { |
| /* Pass this buffer through */ |
| loop_again = FALSE; |
| /* Allow other input pads to catch up to here too */ |
| splitmux->max_in_running_time = ctx->in_running_time; |
| GST_SPLITMUX_BROADCAST_INPUT (splitmux); |
| break; |
| } |
| GST_INFO_OBJECT (pad, |
| "Have keyframe with running time %" GST_STIME_FORMAT, |
| GST_STIME_ARGS (ctx->in_running_time)); |
| keyframe = TRUE; |
| splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT; |
| splitmux->max_in_running_time = ctx->in_running_time; |
| /* Wake up other input pads to collect this GOP */ |
| GST_SPLITMUX_BROADCAST_INPUT (splitmux); |
| check_completed_gop (splitmux, ctx); |
| /* Store this new keyframe to remember the start of GOP */ |
| gst_buffer_replace (&ctx->prev_in_keyframe, buf); |
| } else { |
| /* Pass this buffer if the reference ctx is far enough ahead */ |
| if (ctx->in_running_time < splitmux->max_in_running_time) { |
| loop_again = FALSE; |
| break; |
| } |
| |
| /* We're still waiting for a keyframe on the reference pad, sleep */ |
| GST_LOG_OBJECT (pad, "Sleeping for GOP start"); |
| GST_SPLITMUX_WAIT_INPUT (splitmux); |
| GST_LOG_OBJECT (pad, |
| "Done sleeping for GOP start input state now %d", |
| splitmux->input_state); |
| } |
| break; |
| case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{ |
| /* We're collecting a GOP. If this is the reference context, |
| * we need to check if this is a keyframe that marks the start |
| * of the next GOP. If it is, it marks the end of the GOP we're |
| * collecting, so sleep and wait until all the other pads also |
| * reach that timestamp - at which point, we have an entire GOP |
| * and either go to ENDING_FILE or release this GOP to the muxer and |
| * go back to COLLECT_GOP_START. */ |
| |
| /* If we overran the target timestamp, it might be time to process |
| * the GOP, otherwise bail out for more data |
| */ |
| GST_LOG_OBJECT (pad, |
| "Checking TS %" GST_STIME_FORMAT " against max %" |
| GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time), |
| GST_STIME_ARGS (splitmux->max_in_running_time)); |
| |
| if (ctx->in_running_time < splitmux->max_in_running_time) { |
| loop_again = FALSE; |
| break; |
| } |
| |
| GST_LOG_OBJECT (pad, |
| "Collected last packet of GOP. Checking other pads"); |
| check_completed_gop (splitmux, ctx); |
| break; |
| } |
| case SPLITMUX_INPUT_STATE_FINISHING_UP: |
| loop_again = FALSE; |
| break; |
| default: |
| loop_again = FALSE; |
| break; |
| } |
| } |
| while (loop_again); |
| |
| if (keyframe) { |
| splitmux->queued_keyframes++; |
| buf_info->keyframe = TRUE; |
| } |
| |
| /* Update total input byte counter for overflow detect */ |
| splitmux->gop_total_bytes += buf_info->buf_size; |
| |
| /* Now add this buffer to the queue just before returning */ |
| g_queue_push_head (&ctx->queued_bufs, buf_info); |
| |
| GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT |
| " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); |
| |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return GST_PAD_PROBE_PASS; |
| |
| beach: |
| GST_SPLITMUX_UNLOCK (splitmux); |
| if (buf_info) |
| mq_stream_buf_free (buf_info); |
| return GST_PAD_PROBE_PASS; |
| } |
| |
| static void |
| grow_blocked_queues (GstSplitMuxSink * splitmux) |
| { |
| GList *cur; |
| |
| /* Scan other queues for full-ness and grow them */ |
| for (cur = g_list_first (splitmux->contexts); |
| cur != NULL; cur = g_list_next (cur)) { |
| MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); |
| guint cur_limit; |
| guint cur_len = g_queue_get_length (&tmpctx->queued_bufs); |
| |
| g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL); |
| GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len); |
| |
| if (cur_len >= cur_limit) { |
| cur_limit = cur_len + 1; |
| GST_DEBUG_OBJECT (tmpctx->q, |
| "Queue overflowed and needs enlarging. Growing to %u buffers", |
| cur_limit); |
| g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL); |
| } |
| } |
| } |
| |
| static void |
| handle_q_underrun (GstElement * q, gpointer user_data) |
| { |
| MqStreamCtx *ctx = (MqStreamCtx *) (user_data); |
| GstSplitMuxSink *splitmux = ctx->splitmux; |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| GST_DEBUG_OBJECT (q, |
| "Queue reported underrun with %d keyframes and %d cmds enqueued", |
| splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q)); |
| grow_blocked_queues (splitmux); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| } |
| |
| static void |
| handle_q_overrun (GstElement * q, gpointer user_data) |
| { |
| MqStreamCtx *ctx = (MqStreamCtx *) (user_data); |
| GstSplitMuxSink *splitmux = ctx->splitmux; |
| gboolean allow_grow = FALSE; |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| GST_DEBUG_OBJECT (q, |
| "Queue reported overrun with %d keyframes and %d cmds enqueued", |
| splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q)); |
| |
| if (splitmux->queued_keyframes < 2) { |
| /* Less than a full GOP queued, grow the queue */ |
| allow_grow = TRUE; |
| } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) { |
| allow_grow = TRUE; |
| } else { |
| /* If another queue is starved, grow */ |
| GList *cur; |
| for (cur = g_list_first (splitmux->contexts); |
| cur != NULL; cur = g_list_next (cur)) { |
| MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); |
| if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) { |
| allow_grow = TRUE; |
| } |
| } |
| } |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| if (allow_grow) { |
| guint cur_limit; |
| |
| g_object_get (q, "max-size-buffers", &cur_limit, NULL); |
| cur_limit++; |
| |
| GST_DEBUG_OBJECT (q, |
| "Queue overflowed and needs enlarging. Growing to %u buffers", |
| cur_limit); |
| |
| g_object_set (q, "max-size-buffers", cur_limit, NULL); |
| } |
| } |
| |
| static GstPad * |
| gst_splitmux_sink_request_new_pad (GstElement * element, |
| GstPadTemplate * templ, const gchar * name, const GstCaps * caps) |
| { |
| GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; |
| GstPadTemplate *mux_template = NULL; |
| GstPad *res = NULL; |
| GstElement *q; |
| GstPad *q_sink = NULL, *q_src = NULL; |
| gchar *gname; |
| gboolean is_video = FALSE; |
| MqStreamCtx *ctx; |
| |
| GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| if (!create_muxer (splitmux)) |
| goto fail; |
| |
| if (templ->name_template) { |
| if (g_str_equal (templ->name_template, "video")) { |
| if (splitmux->have_video) |
| goto already_have_video; |
| |
| /* FIXME: Look for a pad template with matching caps, rather than by name */ |
| mux_template = |
| gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS |
| (splitmux->muxer), "video_%u"); |
| |
| /* Fallback to find sink pad templates named 'video' (flvmux) */ |
| if (!mux_template) { |
| mux_template = |
| gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS |
| (splitmux->muxer), "video"); |
| } |
| is_video = TRUE; |
| name = NULL; |
| } else { |
| mux_template = |
| gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS |
| (splitmux->muxer), templ->name_template); |
| |
| /* Fallback to find sink pad templates named 'audio' (flvmux) */ |
| if (!mux_template) { |
| mux_template = |
| gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS |
| (splitmux->muxer), "audio"); |
| name = NULL; |
| } |
| } |
| if (mux_template == NULL) { |
| /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */ |
| mux_template = |
| gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS |
| (splitmux->muxer), "sink_%d"); |
| name = NULL; |
| } |
| } |
| |
| res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps); |
| if (res == NULL) |
| goto fail; |
| |
| if (is_video) |
| gname = g_strdup ("video"); |
| else if (name == NULL) |
| gname = gst_pad_get_name (res); |
| else |
| gname = g_strdup (name); |
| |
| if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL) |
| goto fail; |
| |
| gst_element_set_state (q, GST_STATE_TARGET (splitmux)); |
| |
| g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0), |
| "max-size-buffers", 5, NULL); |
| |
| q_sink = gst_element_get_static_pad (q, "sink"); |
| q_src = gst_element_get_static_pad (q, "src"); |
| |
| if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) { |
| gst_element_release_request_pad (splitmux->muxer, res); |
| gst_object_unref (GST_OBJECT (res)); |
| goto fail; |
| } |
| |
| gst_object_unref (GST_OBJECT (res)); |
| |
| ctx = mq_stream_ctx_new (splitmux); |
| /* Context holds a ref: */ |
| ctx->q = gst_object_ref (q); |
| ctx->srcpad = q_src; |
| ctx->sinkpad = q_sink; |
| ctx->q_overrun_id = |
| g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx); |
| g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx); |
| |
| mq_stream_ctx_ref (ctx); |
| ctx->src_pad_block_id = |
| gst_pad_add_probe (q_src, |
| GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH, |
| (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify) |
| _pad_block_destroy_src_notify); |
| if (is_video && splitmux->reference_ctx != NULL) { |
| splitmux->reference_ctx->is_reference = FALSE; |
| splitmux->reference_ctx = NULL; |
| } |
| if (splitmux->reference_ctx == NULL) { |
| splitmux->reference_ctx = ctx; |
| ctx->is_reference = TRUE; |
| } |
| |
| res = gst_ghost_pad_new_from_template (gname, q_sink, templ); |
| g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx); |
| |
| mq_stream_ctx_ref (ctx); |
| ctx->sink_pad_block_id = |
| gst_pad_add_probe (q_sink, |
| GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH | |
| GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, |
| (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify) |
| _pad_block_destroy_sink_notify); |
| |
| GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT |
| " feeds queue pad %" GST_PTR_FORMAT, res, q_sink); |
| |
| splitmux->contexts = g_list_prepend (splitmux->contexts, ctx); |
| |
| g_free (gname); |
| |
| if (is_video) |
| splitmux->have_video = TRUE; |
| |
| gst_pad_set_active (res, TRUE); |
| gst_element_add_pad (element, res); |
| |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| return res; |
| fail: |
| GST_SPLITMUX_UNLOCK (splitmux); |
| |
| if (q_sink) |
| gst_object_unref (q_sink); |
| if (q_src) |
| gst_object_unref (q_src); |
| return NULL; |
| already_have_video: |
| GST_DEBUG_OBJECT (splitmux, "video sink pad already requested"); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| return NULL; |
| } |
| |
| static void |
| gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad) |
| { |
| GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; |
| GstPad *muxpad = NULL; |
| MqStreamCtx *ctx = |
| (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT)); |
| |
| GST_SPLITMUX_LOCK (splitmux); |
| |
| if (splitmux->muxer == NULL) |
| goto fail; /* Elements don't exist yet - nothing to release */ |
| |
| GST_INFO_OBJECT (pad, "releasing request pad"); |
| |
| muxpad = gst_pad_get_peer (ctx->srcpad); |
| |
| /* Remove the context from our consideration */ |
| splitmux->contexts = g_list_remove (splitmux->contexts, ctx); |
| |
| if (ctx->sink_pad_block_id) |
| gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id); |
| |
| if (ctx->src_pad_block_id) |
| gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id); |
| |
| /* Can release the context now */ |
| mq_stream_ctx_unref (ctx); |
| if (ctx == splitmux->reference_ctx) |
| splitmux->reference_ctx = NULL; |
| |
| /* Release and free the muxer input */ |
| if (muxpad) { |
| gst_element_release_request_pad (splitmux->muxer, muxpad); |
| gst_object_unref (muxpad); |
| } |
| |
| if (GST_PAD_PAD_TEMPLATE (pad) && |
| g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE |
| (pad)), "video")) |
| splitmux->have_video = FALSE; |
| |
| gst_element_remove_pad (element, pad); |
| |
| /* Reset the internal elements only after all request pads are released */ |
| if (splitmux->contexts == NULL) |
| gst_splitmux_reset (splitmux); |
| |
| fail: |
| GST_SPLITMUX_UNLOCK (splitmux); |
| } |
| |
| static GstElement * |
| create_element (GstSplitMuxSink * splitmux, |
| const gchar * factory, const gchar * name, gboolean locked) |
| { |
| GstElement *ret = gst_element_factory_make (factory, name); |
| if (ret == NULL) { |
| g_warning ("Failed to create %s - splitmuxsink will not work", name); |
| return NULL; |
| } |
| |
| if (locked) { |
| /* Ensure the sink starts in locked state and NULL - it will be changed |
| * by the filename setting code */ |
| gst_element_set_locked_state (ret, TRUE); |
| gst_element_set_state (ret, GST_STATE_NULL); |
| } |
| |
| if (!gst_bin_add (GST_BIN (splitmux), ret)) { |
| g_warning ("Could not add %s element - splitmuxsink will not work", name); |
| gst_object_unref (ret); |
| return NULL; |
| } |
| |
| return ret; |
| } |
| |
| static gboolean |
| create_muxer (GstSplitMuxSink * splitmux) |
| { |
| /* Create internal elements */ |
| if (splitmux->muxer == NULL) { |
| GstElement *provided_muxer = NULL; |
| |
| GST_OBJECT_LOCK (splitmux); |
| if (splitmux->provided_muxer != NULL) |
| provided_muxer = gst_object_ref (splitmux->provided_muxer); |
| GST_OBJECT_UNLOCK (splitmux); |
| |
| if (provided_muxer == NULL) { |
| if ((splitmux->muxer = |
| create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL) |
| goto fail; |
| } else { |
| /* Ensure it's not in locked state (we might be reusing an old element) */ |
| gst_element_set_locked_state (provided_muxer, FALSE); |
| if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) { |
| g_warning ("Could not add muxer element - splitmuxsink will not work"); |
| gst_object_unref (provided_muxer); |
| goto fail; |
| } |
| |
| splitmux->muxer = provided_muxer; |
| gst_object_unref (provided_muxer); |
| } |
| |
| if (splitmux->use_robust_muxing) { |
| update_muxer_properties (splitmux); |
| } |
| } |
| |
| return TRUE; |
| fail: |
| return FALSE; |
| } |
| |
| static GstElement * |
| find_sink (GstElement * e) |
| { |
| GstElement *res = NULL; |
| GstIterator *iter; |
| gboolean done = FALSE; |
| GValue data = { 0, }; |
| |
| if (!GST_IS_BIN (e)) |
| return e; |
| |
| if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL) |
| return e; |
| |
| iter = gst_bin_iterate_sinks (GST_BIN (e)); |
| while (!done) { |
| switch (gst_iterator_next (iter, &data)) { |
| case GST_ITERATOR_OK: |
| { |
| GstElement *child = g_value_get_object (&data); |
| if (g_object_class_find_property (G_OBJECT_GET_CLASS (child), |
| "location") != NULL) { |
| res = child; |
| done = TRUE; |
| } |
| g_value_reset (&data); |
| break; |
| } |
| case GST_ITERATOR_RESYNC: |
| gst_iterator_resync (iter); |
| break; |
| case GST_ITERATOR_DONE: |
| done = TRUE; |
| break; |
| case GST_ITERATOR_ERROR: |
| g_assert_not_reached (); |
| break; |
| } |
| } |
| g_value_unset (&data); |
| gst_iterator_free (iter); |
| |
| return res; |
| } |
| |
| static gboolean |
| create_sink (GstSplitMuxSink * splitmux) |
| { |
| GstElement *provided_sink = NULL; |
| |
| if (splitmux->active_sink == NULL) { |
| |
| GST_OBJECT_LOCK (splitmux); |
| if (splitmux->provided_sink != NULL) |
| provided_sink = gst_object_ref (splitmux->provided_sink); |
| GST_OBJECT_UNLOCK (splitmux); |
| |
| if (provided_sink == NULL) { |
| if ((splitmux->sink = |
| create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL) |
| goto fail; |
| splitmux->active_sink = splitmux->sink; |
| } else { |
| /* Ensure the sink starts in locked state and NULL - it will be changed |
| * by the filename setting code */ |
| gst_element_set_locked_state (provided_sink, TRUE); |
| gst_element_set_state (provided_sink, GST_STATE_NULL); |
| if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) { |
| g_warning ("Could not add sink elements - splitmuxsink will not work"); |
| gst_object_unref (provided_sink); |
| goto fail; |
| } |
| |
| splitmux->active_sink = provided_sink; |
| |
| /* The bin holds a ref now, we can drop our tmp ref */ |
| gst_object_unref (provided_sink); |
| |
| /* Find the sink element */ |
| splitmux->sink = find_sink (splitmux->active_sink); |
| if (splitmux->sink == NULL) { |
| g_warning |
| ("Could not locate sink element in provided sink - splitmuxsink will not work"); |
| goto fail; |
| } |
| } |
| |
| #if 1 |
| if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink), |
| "async") != NULL) { |
| /* async child elements are causing state change races and weird |
| * failures, so let's try and turn that off */ |
| g_object_set (splitmux->sink, "async", FALSE, NULL); |
| } |
| #endif |
| |
| if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) { |
| g_warning ("Failed to link muxer and sink- splitmuxsink will not work"); |
| goto fail; |
| } |
| } |
| |
| return TRUE; |
| fail: |
| return FALSE; |
| } |
| |
| #ifdef __GNUC__ |
| #pragma GCC diagnostic ignored "-Wformat-nonliteral" |
| #endif |
| static void |
| set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) |
| { |
| gchar *fname = NULL; |
| GstSample *sample; |
| GstCaps *caps; |
| |
| gst_splitmux_sink_ensure_max_files (splitmux); |
| |
| if (ctx->cur_out_buffer == NULL) { |
| GST_WARNING_OBJECT (splitmux, "Starting next file without buffer"); |
| } |
| |
| caps = gst_pad_get_current_caps (ctx->srcpad); |
| sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL); |
| g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0, |
| splitmux->fragment_id, sample, &fname); |
| gst_sample_unref (sample); |
| if (caps) |
| gst_caps_unref (caps); |
| |
| if (fname == NULL) { |
| /* Fallback to the old signal if the new one returned nothing */ |
| g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, |
| splitmux->fragment_id, &fname); |
| } |
| |
| if (!fname) |
| fname = splitmux->location ? |
| g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL; |
| |
| if (fname) { |
| GST_INFO_OBJECT (splitmux, "Setting file to %s", fname); |
| g_object_set (splitmux->sink, "location", fname, NULL); |
| g_free (fname); |
| |
| splitmux->fragment_id++; |
| } |
| } |
| |
| static void |
| do_async_start (GstSplitMuxSink * splitmux) |
| { |
| GstMessage *message; |
| |
| if (!splitmux->need_async_start) { |
| GST_INFO_OBJECT (splitmux, "no async_start needed"); |
| return; |
| } |
| |
| splitmux->async_pending = TRUE; |
| |
| GST_INFO_OBJECT (splitmux, "Sending async_start message"); |
| message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux)); |
| GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST |
| (splitmux), message); |
| } |
| |
| static void |
| do_async_done (GstSplitMuxSink * splitmux) |
| { |
| GstMessage *message; |
| |
| if (splitmux->async_pending) { |
| GST_INFO_OBJECT (splitmux, "Sending async_done message"); |
| message = |
| gst_message_new_async_done (GST_OBJECT_CAST (splitmux), |
| GST_CLOCK_TIME_NONE); |
| GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST |
| (splitmux), message); |
| |
| splitmux->async_pending = FALSE; |
| } |
| |
| splitmux->need_async_start = FALSE; |
| } |
| |
| static GstStateChangeReturn |
| gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) |
| { |
| GstStateChangeReturn ret; |
| GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; |
| |
| switch (transition) { |
| case GST_STATE_CHANGE_NULL_TO_READY:{ |
| GST_SPLITMUX_LOCK (splitmux); |
| if (!create_muxer (splitmux) || !create_sink (splitmux)) { |
| ret = GST_STATE_CHANGE_FAILURE; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| goto beach; |
| } |
| GST_SPLITMUX_UNLOCK (splitmux); |
| splitmux->fragment_id = 0; |
| break; |
| } |
| case GST_STATE_CHANGE_READY_TO_PAUSED:{ |
| GST_SPLITMUX_LOCK (splitmux); |
| /* Start by collecting one input on each pad */ |
| splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START; |
| splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE; |
| splitmux->max_in_running_time = GST_CLOCK_STIME_NONE; |
| splitmux->gop_start_time = splitmux->fragment_start_time = |
| GST_CLOCK_STIME_NONE; |
| splitmux->muxed_out_bytes = 0; |
| splitmux->ready_for_output = FALSE; |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| } |
| case GST_STATE_CHANGE_PAUSED_TO_READY: |
| g_atomic_int_set (&(splitmux->split_now), FALSE); |
| case GST_STATE_CHANGE_READY_TO_NULL: |
| GST_SPLITMUX_LOCK (splitmux); |
| splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED; |
| splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED; |
| /* Wake up any blocked threads */ |
| GST_LOG_OBJECT (splitmux, |
| "State change -> NULL or READY. Waking threads"); |
| GST_SPLITMUX_BROADCAST_INPUT (splitmux); |
| GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| default: |
| break; |
| } |
| |
| ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); |
| if (ret == GST_STATE_CHANGE_FAILURE) |
| goto beach; |
| |
| switch (transition) { |
| case GST_STATE_CHANGE_PLAYING_TO_PAUSED: |
| splitmux->need_async_start = TRUE; |
| break; |
| case GST_STATE_CHANGE_READY_TO_PAUSED:{ |
| /* Change state async, because our child sink might not |
| * be ready to do that for us yet if it's state is still locked */ |
| |
| splitmux->need_async_start = TRUE; |
| /* we want to go async to PAUSED until we managed to configure and add the |
| * sink */ |
| GST_SPLITMUX_LOCK (splitmux); |
| do_async_start (splitmux); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| ret = GST_STATE_CHANGE_ASYNC; |
| break; |
| } |
| case GST_STATE_CHANGE_READY_TO_NULL: |
| GST_SPLITMUX_LOCK (splitmux); |
| splitmux->fragment_id = 0; |
| /* Reset internal elements only if no pad contexts are using them */ |
| if (splitmux->contexts == NULL) |
| gst_splitmux_reset (splitmux); |
| do_async_done (splitmux); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| break; |
| default: |
| break; |
| } |
| |
| beach: |
| if (transition == GST_STATE_CHANGE_NULL_TO_READY && |
| ret == GST_STATE_CHANGE_FAILURE) { |
| /* Cleanup elements on failed transition out of NULL */ |
| gst_splitmux_reset (splitmux); |
| GST_SPLITMUX_LOCK (splitmux); |
| do_async_done (splitmux); |
| GST_SPLITMUX_UNLOCK (splitmux); |
| } |
| return ret; |
| } |
| |
| gboolean |
| register_splitmuxsink (GstPlugin * plugin) |
| { |
| GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0, |
| "Split File Muxing Sink"); |
| |
| return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE, |
| GST_TYPE_SPLITMUX_SINK); |
| } |
| |
| static void |
| gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux) |
| { |
| if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) { |
| splitmux->fragment_id = 0; |
| } |
| } |
| |
| static void |
| split_now (GstSplitMuxSink * splitmux) |
| { |
| g_atomic_int_set (&(splitmux->split_now), TRUE); |
| } |