| /* GStreamer Split Demuxer bin that recombines files created by |
| * the splitmuxsink element. |
| * |
| * Copyright (C) <2014> Jan Schmidt <jan@centricular.com> |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public |
| * License along with this library; if not, write to the |
| * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, |
| * Boston, MA 02110-1301, USA. |
| */ |
| |
| #ifdef HAVE_CONFIG_H |
| #include "config.h" |
| #endif |
| |
| #include <string.h> |
| #include "gstsplitmuxsrc.h" |
| |
| GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug); |
| #define GST_CAT_DEFAULT splitmux_part_debug |
| |
| #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock) |
| #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock) |
| #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock) |
| #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond) |
| |
| #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock) |
| #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock) |
| |
| enum |
| { |
| SIGNAL_PREPARED, |
| LAST_SIGNAL |
| }; |
| |
| static guint part_reader_signals[LAST_SIGNAL] = { 0 }; |
| |
| typedef struct _GstSplitMuxPartPad |
| { |
| GstPad parent; |
| |
| /* Reader we belong to */ |
| GstSplitMuxPartReader *reader; |
| /* Output splitmuxsrc source pad */ |
| GstPad *target; |
| |
| GstDataQueue *queue; |
| |
| gboolean is_eos; |
| gboolean flushing; |
| gboolean seen_buffer; |
| |
| GstClockTime max_ts; |
| GstSegment segment; |
| |
| GstSegment orig_segment; |
| GstClockTime initial_ts_offset; |
| } GstSplitMuxPartPad; |
| |
| typedef struct _GstSplitMuxPartPadClass |
| { |
| GstPadClass parent; |
| } GstSplitMuxPartPadClass; |
| |
| static GType gst_splitmux_part_pad_get_type (void); |
| #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type() |
| #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p)) |
| |
| static void splitmux_part_pad_constructed (GObject * pad); |
| static void splitmux_part_pad_finalize (GObject * pad); |
| static void handle_buffer_measuring (GstSplitMuxPartReader * reader, |
| GstSplitMuxPartPad * part_pad, GstBuffer * buf); |
| |
| static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue, |
| guint visible, guint bytes, guint64 time, gpointer checkdata); |
| static void type_found (GstElement * typefind, guint probability, |
| GstCaps * caps, GstSplitMuxPartReader * reader); |
| static void check_if_pads_collected (GstSplitMuxPartReader * reader); |
| |
| /* Called with reader lock held */ |
| static gboolean |
| have_empty_queue (GstSplitMuxPartReader * reader) |
| { |
| GList *cur; |
| |
| for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); |
| if (part_pad->is_eos) { |
| GST_LOG_OBJECT (part_pad, "Pad is EOS"); |
| return TRUE; |
| } |
| if (gst_data_queue_is_empty (part_pad->queue)) { |
| GST_LOG_OBJECT (part_pad, "Queue is empty"); |
| return TRUE; |
| } |
| } |
| |
| return FALSE; |
| } |
| |
| /* Called with reader lock held */ |
| static gboolean |
| block_until_can_push (GstSplitMuxPartReader * reader) |
| { |
| while (reader->running) { |
| if (reader->flushing) |
| goto out; |
| if (reader->active && have_empty_queue (reader)) |
| goto out; |
| |
| GST_LOG_OBJECT (reader, |
| "Waiting for activation or empty queue on reader %s", reader->path); |
| SPLITMUX_PART_WAIT (reader); |
| } |
| |
| GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d", |
| reader->path, reader->active, reader->flushing); |
| out: |
| return reader->active && !reader->flushing; |
| } |
| |
| static void |
| handle_buffer_measuring (GstSplitMuxPartReader * reader, |
| GstSplitMuxPartPad * part_pad, GstBuffer * buf) |
| { |
| GstClockTime ts = GST_CLOCK_TIME_NONE; |
| GstClockTimeDiff offset; |
| |
| if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS && |
| !part_pad->seen_buffer) { |
| /* If this is the first buffer on the pad in the collect_streams state, |
| * then calculate inital offset based on running time of this segment */ |
| part_pad->initial_ts_offset = |
| part_pad->orig_segment.start + part_pad->orig_segment.base - |
| part_pad->orig_segment.time; |
| GST_DEBUG_OBJECT (reader, |
| "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT, |
| part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset)); |
| } |
| part_pad->seen_buffer = TRUE; |
| |
| /* Adjust buffer timestamps */ |
| offset = reader->start_offset + part_pad->segment.base; |
| offset -= part_pad->initial_ts_offset; |
| |
| /* Update the stored max duration on the pad, |
| * always preferring making DTS contiguous |
| * where possible */ |
| if (GST_BUFFER_DTS_IS_VALID (buf)) |
| ts = GST_BUFFER_DTS (buf) + offset; |
| else if (GST_BUFFER_PTS_IS_VALID (buf)) |
| ts = GST_BUFFER_PTS (buf) + offset; |
| |
| GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT |
| " incoming PTS %" GST_TIME_FORMAT |
| " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT |
| " to %" GST_TIME_FORMAT, part_pad, |
| GST_TIME_ARGS (GST_BUFFER_DTS (buf)), |
| GST_TIME_ARGS (GST_BUFFER_PTS (buf)), |
| GST_STIME_ARGS (offset), GST_TIME_ARGS (ts)); |
| |
| if (GST_CLOCK_TIME_IS_VALID (ts)) { |
| if (GST_BUFFER_DURATION_IS_VALID (buf)) |
| ts += GST_BUFFER_DURATION (buf); |
| |
| if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) { |
| part_pad->max_ts = ts; |
| GST_LOG_OBJECT (reader, |
| "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad, |
| GST_TIME_ARGS (part_pad->max_ts)); |
| } |
| } |
| /* Is it time to move to measuring state yet? */ |
| check_if_pads_collected (reader); |
| } |
| |
| static gboolean |
| splitmux_data_queue_is_full_cb (GstDataQueue * queue, |
| guint visible, guint bytes, guint64 time, gpointer checkdata) |
| { |
| /* Arbitrary safety limit. If we hit it, playback is likely to stall */ |
| if (time > 20 * GST_SECOND) |
| return TRUE; |
| return FALSE; |
| } |
| |
| static void |
| splitmux_part_free_queue_item (GstDataQueueItem * item) |
| { |
| gst_mini_object_unref (item->object); |
| g_slice_free (GstDataQueueItem, item); |
| } |
| |
| static GstFlowReturn |
| splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) |
| { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad); |
| GstSplitMuxPartReader *reader = part_pad->reader; |
| GstDataQueueItem *item; |
| GstClockTimeDiff offset; |
| |
| GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf); |
| SPLITMUX_PART_LOCK (reader); |
| |
| if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS || |
| reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) { |
| handle_buffer_measuring (reader, part_pad, buf); |
| gst_buffer_unref (buf); |
| SPLITMUX_PART_UNLOCK (reader); |
| return GST_FLOW_OK; |
| } |
| |
| if (!block_until_can_push (reader)) { |
| /* Flushing */ |
| SPLITMUX_PART_UNLOCK (reader); |
| gst_buffer_unref (buf); |
| return GST_FLOW_FLUSHING; |
| } |
| |
| /* Adjust buffer timestamps */ |
| offset = reader->start_offset + part_pad->segment.base; |
| offset -= part_pad->initial_ts_offset; |
| |
| if (GST_BUFFER_PTS_IS_VALID (buf)) |
| GST_BUFFER_PTS (buf) += offset; |
| if (GST_BUFFER_DTS_IS_VALID (buf)) |
| GST_BUFFER_DTS (buf) += offset; |
| |
| /* We are active, and one queue is empty, place this buffer in |
| * the dataqueue */ |
| GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf); |
| item = g_slice_new (GstDataQueueItem); |
| item->destroy = (GDestroyNotify) splitmux_part_free_queue_item; |
| item->object = GST_MINI_OBJECT (buf); |
| item->size = gst_buffer_get_size (buf); |
| item->duration = GST_BUFFER_DURATION (buf); |
| if (item->duration == GST_CLOCK_TIME_NONE) |
| item->duration = 0; |
| item->visible = TRUE; |
| |
| gst_object_ref (part_pad); |
| |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| if (!gst_data_queue_push (part_pad->queue, item)) { |
| splitmux_part_free_queue_item (item); |
| gst_object_unref (part_pad); |
| return GST_FLOW_FLUSHING; |
| } |
| |
| gst_object_unref (part_pad); |
| return GST_FLOW_OK; |
| } |
| |
| /* Called with splitmux part lock held */ |
| static gboolean |
| splitmux_part_is_eos_locked (GstSplitMuxPartReader * part) |
| { |
| GList *cur; |
| for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); |
| if (!part_pad->is_eos) |
| return FALSE; |
| } |
| |
| return TRUE; |
| } |
| |
| static gboolean |
| splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part) |
| { |
| GList *cur; |
| GST_LOG_OBJECT (part, "Checking for preroll"); |
| for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); |
| if (!part_pad->seen_buffer) { |
| GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled", |
| part_pad); |
| return FALSE; |
| } |
| } |
| GST_LOG_OBJECT (part, "Part is prerolled"); |
| return TRUE; |
| } |
| |
| |
| gboolean |
| gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader) |
| { |
| gboolean res; |
| |
| SPLITMUX_PART_LOCK (reader); |
| res = splitmux_part_is_eos_locked (reader); |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| return res; |
| } |
| |
| /* Called with splitmux part lock held */ |
| static gboolean |
| splitmux_is_flushing (GstSplitMuxPartReader * reader) |
| { |
| GList *cur; |
| for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); |
| if (part_pad->flushing) |
| return TRUE; |
| } |
| |
| return FALSE; |
| } |
| |
| static gboolean |
| splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) |
| { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad); |
| GstSplitMuxPartReader *reader = part_pad->reader; |
| gboolean ret = TRUE; |
| SplitMuxSrcPad *target; |
| GstDataQueueItem *item; |
| |
| SPLITMUX_PART_LOCK (reader); |
| |
| target = gst_object_ref (part_pad->target); |
| |
| GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad, |
| event); |
| |
| if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) |
| goto drop_event; |
| |
| switch (GST_EVENT_TYPE (event)) { |
| case GST_EVENT_SEGMENT:{ |
| GstSegment *seg = &part_pad->segment; |
| |
| GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event); |
| |
| gst_event_copy_segment (event, seg); |
| gst_event_copy_segment (event, &part_pad->orig_segment); |
| |
| if (seg->format != GST_FORMAT_TIME) |
| goto wrong_segment; |
| |
| /* Adjust segment */ |
| /* Adjust start/stop so the overall file is 0 + start_offset based */ |
| if (seg->stop != -1) { |
| seg->stop -= seg->start; |
| seg->stop += seg->time + reader->start_offset; |
| } |
| seg->start = seg->time + reader->start_offset; |
| seg->time += reader->start_offset; |
| seg->position += reader->start_offset; |
| |
| GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event); |
| |
| /* Replace event */ |
| gst_event_unref (event); |
| event = gst_event_new_segment (seg); |
| |
| if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS |
| && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS) |
| break; /* Only do further stuff with segments during initial measuring */ |
| |
| /* Take the first segment from the first part */ |
| if (target->segment.format == GST_FORMAT_UNDEFINED) { |
| gst_segment_copy_into (seg, &target->segment); |
| GST_DEBUG_OBJECT (reader, |
| "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment); |
| } |
| |
| if (seg->stop != -1 && target->segment.stop != -1) { |
| GstClockTime stop = seg->base + seg->stop; |
| if (stop > target->segment.stop) { |
| target->segment.stop = stop; |
| GST_DEBUG_OBJECT (reader, |
| "Adjusting segment stop by %" GST_TIME_FORMAT |
| " output now %" GST_SEGMENT_FORMAT, |
| GST_TIME_ARGS (reader->start_offset), &target->segment); |
| } |
| } |
| GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event); |
| break; |
| } |
| case GST_EVENT_EOS:{ |
| |
| GST_DEBUG_OBJECT (part_pad, |
| "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT, |
| reader->prep_state, GST_TIME_ARGS (part_pad->max_ts)); |
| |
| if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS || |
| reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) { |
| /* Mark this pad as EOS */ |
| part_pad->is_eos = TRUE; |
| if (splitmux_part_is_eos_locked (reader)) { |
| /* Finished measuring things, set state and tell the state change func |
| * so it can seek back to the start */ |
| GST_LOG_OBJECT (reader, |
| "EOS while measuring streams. Resetting for ready"); |
| reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY; |
| SPLITMUX_PART_BROADCAST (reader); |
| } |
| goto drop_event; |
| } |
| break; |
| } |
| case GST_EVENT_FLUSH_START: |
| reader->flushing = TRUE; |
| part_pad->flushing = TRUE; |
| GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue", |
| part_pad); |
| gst_data_queue_set_flushing (part_pad->queue, TRUE); |
| SPLITMUX_PART_BROADCAST (reader); |
| break; |
| case GST_EVENT_FLUSH_STOP:{ |
| gst_data_queue_set_flushing (part_pad->queue, FALSE); |
| gst_data_queue_flush (part_pad->queue); |
| part_pad->seen_buffer = FALSE; |
| part_pad->flushing = FALSE; |
| part_pad->is_eos = FALSE; |
| |
| reader->flushing = splitmux_is_flushing (reader); |
| GST_LOG_OBJECT (reader, |
| "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d", |
| reader->path, pad, reader->flushing); |
| SPLITMUX_PART_BROADCAST (reader); |
| break; |
| } |
| default: |
| break; |
| } |
| |
| /* Don't send events downstream while preparing */ |
| if (reader->prep_state != PART_STATE_READY) |
| goto drop_event; |
| |
| /* Don't pass flush events - those are done by the parent */ |
| if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START || |
| GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) |
| goto drop_event; |
| |
| if (!block_until_can_push (reader)) { |
| SPLITMUX_PART_UNLOCK (reader); |
| gst_object_unref (target); |
| gst_event_unref (event); |
| return FALSE; |
| } |
| |
| switch (GST_EVENT_TYPE (event)) { |
| case GST_EVENT_GAP:{ |
| /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */ |
| goto drop_event; |
| } |
| default: |
| break; |
| } |
| |
| /* We are active, and one queue is empty, place this buffer in |
| * the dataqueue */ |
| gst_object_ref (part_pad->queue); |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event); |
| item = g_slice_new (GstDataQueueItem); |
| item->destroy = (GDestroyNotify) splitmux_part_free_queue_item; |
| item->object = GST_MINI_OBJECT (event); |
| item->size = 0; |
| item->duration = 0; |
| if (item->duration == GST_CLOCK_TIME_NONE) |
| item->duration = 0; |
| item->visible = FALSE; |
| |
| if (!gst_data_queue_push (part_pad->queue, item)) { |
| splitmux_part_free_queue_item (item); |
| ret = FALSE; |
| } |
| |
| gst_object_unref (part_pad->queue); |
| gst_object_unref (target); |
| |
| return ret; |
| wrong_segment: |
| gst_event_unref (event); |
| gst_object_unref (target); |
| SPLITMUX_PART_UNLOCK (reader); |
| GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL), |
| ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT, |
| reader->path, pad)); |
| return FALSE; |
| drop_event: |
| GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT |
| " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target); |
| gst_event_unref (event); |
| gst_object_unref (target); |
| SPLITMUX_PART_UNLOCK (reader); |
| return TRUE; |
| } |
| |
| static gboolean |
| splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query) |
| { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad); |
| GstSplitMuxPartReader *reader = part_pad->reader; |
| GstPad *target; |
| gboolean ret = FALSE; |
| gboolean active; |
| |
| SPLITMUX_PART_LOCK (reader); |
| target = gst_object_ref (part_pad->target); |
| active = reader->active; |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| if (active) { |
| GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT |
| " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target); |
| |
| ret = gst_pad_query (target, query); |
| } |
| |
| gst_object_unref (target); |
| |
| return ret; |
| } |
| |
| G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD); |
| |
| static void |
| splitmux_part_pad_constructed (GObject * pad) |
| { |
| gst_pad_set_chain_function (GST_PAD (pad), |
| GST_DEBUG_FUNCPTR (splitmux_part_pad_chain)); |
| gst_pad_set_event_function (GST_PAD (pad), |
| GST_DEBUG_FUNCPTR (splitmux_part_pad_event)); |
| gst_pad_set_query_function (GST_PAD (pad), |
| GST_DEBUG_FUNCPTR (splitmux_part_pad_query)); |
| |
| G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad); |
| } |
| |
| static void |
| gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass) |
| { |
| GObjectClass *gobject_klass = (GObjectClass *) (klass); |
| |
| gobject_klass->constructed = splitmux_part_pad_constructed; |
| gobject_klass->finalize = splitmux_part_pad_finalize; |
| } |
| |
| static void |
| gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad) |
| { |
| pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb, |
| NULL, NULL, pad); |
| gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED); |
| gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED); |
| } |
| |
| static void |
| splitmux_part_pad_finalize (GObject * obj) |
| { |
| GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj); |
| |
| GST_DEBUG_OBJECT (obj, "finalize"); |
| gst_data_queue_set_flushing (pad->queue, TRUE); |
| gst_data_queue_flush (pad->queue); |
| gst_object_unref (GST_OBJECT_CAST (pad->queue)); |
| pad->queue = NULL; |
| |
| G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj); |
| } |
| |
| static void |
| new_decoded_pad_added_cb (GstElement * element, GstPad * pad, |
| GstSplitMuxPartReader * part); |
| static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader); |
| static GstStateChangeReturn |
| gst_splitmux_part_reader_change_state (GstElement * element, |
| GstStateChange transition); |
| static gboolean gst_splitmux_part_reader_send_event (GstElement * element, |
| GstEvent * event); |
| static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader |
| * part, gboolean flushing); |
| static void bus_handler (GstBin * bin, GstMessage * msg); |
| static void splitmux_part_reader_dispose (GObject * object); |
| static void splitmux_part_reader_finalize (GObject * object); |
| static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader); |
| |
| #define gst_splitmux_part_reader_parent_class parent_class |
| G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader, |
| GST_TYPE_PIPELINE); |
| |
| static void |
| gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass) |
| { |
| GObjectClass *gobject_klass = (GObjectClass *) (klass); |
| GstElementClass *gstelement_class = (GstElementClass *) klass; |
| GstBinClass *gstbin_class = (GstBinClass *) klass; |
| |
| GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0, |
| "Split File Demuxing Source helper"); |
| |
| gobject_klass->dispose = splitmux_part_reader_dispose; |
| gobject_klass->finalize = splitmux_part_reader_finalize; |
| |
| part_reader_signals[SIGNAL_PREPARED] = |
| g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), |
| G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass, |
| prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
| gstelement_class->change_state = gst_splitmux_part_reader_change_state; |
| gstelement_class->send_event = gst_splitmux_part_reader_send_event; |
| |
| gstbin_class->handle_message = bus_handler; |
| } |
| |
| static void |
| gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader) |
| { |
| GstElement *typefind; |
| |
| reader->active = FALSE; |
| reader->duration = GST_CLOCK_TIME_NONE; |
| |
| g_cond_init (&reader->inactive_cond); |
| g_mutex_init (&reader->lock); |
| g_mutex_init (&reader->type_lock); |
| |
| /* FIXME: Create elements on a state change */ |
| reader->src = gst_element_factory_make ("filesrc", NULL); |
| if (reader->src == NULL) { |
| GST_ERROR_OBJECT (reader, "Failed to create filesrc element"); |
| return; |
| } |
| gst_bin_add (GST_BIN_CAST (reader), reader->src); |
| |
| typefind = gst_element_factory_make ("typefind", NULL); |
| if (!typefind) { |
| GST_ERROR_OBJECT (reader, |
| "Failed to create typefind element - check your installation"); |
| return; |
| } |
| |
| gst_bin_add (GST_BIN_CAST (reader), typefind); |
| reader->typefind = typefind; |
| |
| if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) { |
| GST_ERROR_OBJECT (reader, |
| "Failed to link typefind element - check your installation"); |
| return; |
| } |
| |
| g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found), |
| reader); |
| } |
| |
| static void |
| splitmux_part_reader_dispose (GObject * object) |
| { |
| GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object; |
| |
| splitmux_part_reader_reset (reader); |
| |
| G_OBJECT_CLASS (parent_class)->dispose (object); |
| } |
| |
| static void |
| splitmux_part_reader_finalize (GObject * object) |
| { |
| GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object; |
| |
| g_cond_clear (&reader->inactive_cond); |
| g_mutex_clear (&reader->lock); |
| g_mutex_clear (&reader->type_lock); |
| |
| g_free (reader->path); |
| |
| G_OBJECT_CLASS (parent_class)->finalize (object); |
| } |
| |
| static void |
| splitmux_part_reader_reset (GstSplitMuxPartReader * reader) |
| { |
| GList *cur; |
| |
| SPLITMUX_PART_LOCK (reader); |
| for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstPad *pad = GST_PAD_CAST (cur->data); |
| gst_pad_set_active (GST_PAD_CAST (pad), FALSE); |
| gst_object_unref (GST_OBJECT_CAST (pad)); |
| } |
| |
| g_list_free (reader->pads); |
| reader->pads = NULL; |
| SPLITMUX_PART_UNLOCK (reader); |
| } |
| |
| static GstSplitMuxPartPad * |
| gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader, |
| GstPad * target) |
| { |
| GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD, |
| "name", GST_PAD_NAME (target), |
| "direction", GST_PAD_SINK, |
| NULL); |
| pad->target = target; |
| pad->reader = reader; |
| |
| gst_pad_set_active (GST_PAD_CAST (pad), TRUE); |
| |
| return pad; |
| } |
| |
| static void |
| new_decoded_pad_added_cb (GstElement * element, GstPad * pad, |
| GstSplitMuxPartReader * reader) |
| { |
| GstPad *out_pad = NULL; |
| GstSplitMuxPartPad *proxy_pad; |
| GstCaps *caps; |
| GstPadLinkReturn link_ret; |
| |
| caps = gst_pad_get_current_caps (pad); |
| |
| GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT |
| " caps %" GST_PTR_FORMAT, reader->path, pad, caps); |
| |
| gst_caps_unref (caps); |
| |
| /* Look up or create the output pad */ |
| if (reader->get_pad_cb) |
| out_pad = reader->get_pad_cb (reader, pad, reader->cb_data); |
| if (out_pad == NULL) |
| return; |
| |
| /* Create our proxy pad to interact with this new pad */ |
| proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad); |
| GST_DEBUG_OBJECT (reader, |
| "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT, |
| proxy_pad, out_pad); |
| |
| link_ret = gst_pad_link (pad, GST_PAD (proxy_pad)); |
| if (link_ret != GST_PAD_LINK_OK) { |
| gst_object_unref (proxy_pad); |
| GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL), |
| ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT |
| " ret %d", reader->path, pad, link_ret)); |
| return; |
| } |
| GST_DEBUG_OBJECT (reader, |
| "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT, |
| pad, proxy_pad); |
| |
| SPLITMUX_PART_LOCK (reader); |
| reader->pads = g_list_prepend (reader->pads, proxy_pad); |
| SPLITMUX_PART_UNLOCK (reader); |
| } |
| |
| static gboolean |
| gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event) |
| { |
| GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element; |
| gboolean ret = FALSE; |
| GstPad *pad = NULL; |
| |
| /* Send event to the first source pad we found */ |
| SPLITMUX_PART_LOCK (reader); |
| if (reader->pads) { |
| GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data); |
| pad = gst_pad_get_peer (proxy_pad); |
| } |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| if (pad) { |
| ret = gst_pad_send_event (pad, event); |
| gst_object_unref (pad); |
| } else { |
| gst_event_unref (event); |
| } |
| |
| return ret; |
| } |
| |
| /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */ |
| static void |
| gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader, |
| GstClockTime time) |
| { |
| SPLITMUX_PART_UNLOCK (reader); |
| GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (time)); |
| gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME, |
| GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time, |
| GST_SEEK_TYPE_END, 0); |
| |
| SPLITMUX_PART_LOCK (reader); |
| |
| /* Wait for flush to finish, so old data is gone */ |
| while (reader->flushing) { |
| GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path); |
| SPLITMUX_PART_WAIT (reader); |
| } |
| } |
| |
| /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */ |
| static gboolean |
| gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader, |
| GstSegment * target_seg) |
| { |
| GstSeekFlags flags; |
| GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE; |
| |
| flags = target_seg->flags | GST_SEEK_FLAG_FLUSH; |
| |
| SPLITMUX_PART_LOCK (reader); |
| if (target_seg->start >= reader->start_offset) |
| start = target_seg->start - reader->start_offset; |
| /* If the segment stop is within this part, don't play to the end */ |
| if (target_seg->stop != -1 && |
| target_seg->stop < reader->start_offset + reader->duration) |
| stop = target_seg->stop - reader->start_offset; |
| |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| GST_DEBUG_OBJECT (reader, |
| "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %" |
| GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags, |
| GST_TIME_ARGS (start), GST_TIME_ARGS (stop)); |
| |
| return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate, |
| target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, |
| stop); |
| } |
| |
| /* Called with lock held */ |
| static void |
| gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader) |
| { |
| /* Trigger a flushing seek to near the end of the file and run each stream |
| * to EOS in order to find the smallest end timestamp to start the next |
| * file from |
| */ |
| if (GST_CLOCK_TIME_IS_VALID (reader->duration) |
| && reader->duration > GST_SECOND) { |
| GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND); |
| gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts); |
| } |
| |
| /* Wait for things to happen */ |
| while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) |
| SPLITMUX_PART_WAIT (reader); |
| |
| if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) { |
| /* Fire the prepared signal and go to READY state */ |
| GST_DEBUG_OBJECT (reader, |
| "Stream measuring complete. File %s is now ready. Firing prepared signal", |
| reader->path); |
| reader->prep_state = PART_STATE_READY; |
| g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL); |
| } |
| } |
| |
| static GstElement * |
| find_demuxer (GstCaps * caps) |
| { |
| GList *factories = |
| gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER, |
| GST_RANK_MARGINAL); |
| GList *compat_elements; |
| GstElement *e = NULL; |
| |
| if (factories == NULL) |
| return NULL; |
| |
| compat_elements = |
| gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE); |
| |
| if (compat_elements) { |
| /* Just take the first (highest ranked) option */ |
| GstElementFactory *factory = |
| GST_ELEMENT_FACTORY_CAST (compat_elements->data); |
| e = gst_element_factory_create (factory, NULL); |
| gst_plugin_feature_list_free (compat_elements); |
| } |
| |
| if (factories) |
| gst_plugin_feature_list_free (factories); |
| |
| return e; |
| } |
| |
| static void |
| type_found (GstElement * typefind, guint probability, |
| GstCaps * caps, GstSplitMuxPartReader * reader) |
| { |
| GstElement *demux; |
| |
| GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps); |
| |
| /* typefind found a type. Look for the demuxer to handle it */ |
| demux = reader->demux = find_demuxer (caps); |
| if (reader->demux == NULL) { |
| GST_ERROR_OBJECT (reader, "Failed to create demuxer element"); |
| return; |
| } |
| |
| gst_element_set_locked_state (demux, TRUE); |
| gst_bin_add (GST_BIN_CAST (reader), demux); |
| gst_element_link_pads (reader->typefind, "src", demux, NULL); |
| gst_element_sync_state_with_parent (reader->demux); |
| gst_element_set_locked_state (demux, FALSE); |
| |
| /* Connect to demux signals */ |
| g_signal_connect (demux, |
| "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader); |
| g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader); |
| } |
| |
| static void |
| check_if_pads_collected (GstSplitMuxPartReader * reader) |
| { |
| if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) { |
| /* Check we have all pads and each pad has seen a buffer */ |
| if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) { |
| GST_DEBUG_OBJECT (reader, |
| "no more pads - file %s. Measuring stream length", reader->path); |
| reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS; |
| SPLITMUX_PART_BROADCAST (reader); |
| } |
| } |
| } |
| |
| static void |
| no_more_pads (GstElement * element, GstSplitMuxPartReader * reader) |
| { |
| GstClockTime duration = GST_CLOCK_TIME_NONE; |
| GList *cur; |
| /* Query the minimum duration of any pad in this piece and store it. |
| * FIXME: Only consider audio and video */ |
| SPLITMUX_PART_LOCK (reader); |
| for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstPad *target = GST_PAD_CAST (cur->data); |
| if (target) { |
| gint64 cur_duration; |
| if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) { |
| GST_INFO_OBJECT (reader, |
| "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT, |
| reader->path, target, GST_TIME_ARGS (cur_duration)); |
| if (cur_duration < duration) |
| duration = cur_duration; |
| } |
| } |
| } |
| GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT, |
| reader->path, GST_TIME_ARGS (duration)); |
| reader->duration = (GstClockTime) duration; |
| |
| reader->no_more_pads = TRUE; |
| |
| check_if_pads_collected (reader); |
| SPLITMUX_PART_UNLOCK (reader); |
| } |
| |
| gboolean |
| gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part, |
| GstPad * src_pad, GstQuery * query) |
| { |
| GstPad *target = NULL; |
| gboolean ret; |
| GList *cur; |
| |
| SPLITMUX_PART_LOCK (part); |
| /* Find the pad corresponding to the visible output target pad */ |
| for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); |
| if (part_pad->target == src_pad) { |
| target = gst_object_ref (GST_OBJECT_CAST (part_pad)); |
| break; |
| } |
| } |
| SPLITMUX_PART_UNLOCK (part); |
| |
| if (target == NULL) |
| return FALSE; |
| |
| ret = gst_pad_peer_query (target, query); |
| gst_object_unref (GST_OBJECT_CAST (target)); |
| |
| if (ret == FALSE) |
| goto out; |
| |
| /* Post-massaging of queries */ |
| switch (GST_QUERY_TYPE (query)) { |
| case GST_QUERY_POSITION:{ |
| GstFormat fmt; |
| gint64 position; |
| |
| gst_query_parse_position (query, &fmt, &position); |
| if (fmt != GST_FORMAT_TIME) |
| return FALSE; |
| SPLITMUX_PART_LOCK (part); |
| position += part->start_offset; |
| GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (position)); |
| SPLITMUX_PART_UNLOCK (part); |
| |
| gst_query_set_position (query, fmt, position); |
| break; |
| } |
| default: |
| break; |
| } |
| |
| out: |
| gst_object_unref (target); |
| return ret; |
| } |
| |
| static GstStateChangeReturn |
| gst_splitmux_part_reader_change_state (GstElement * element, |
| GstStateChange transition) |
| { |
| GstStateChangeReturn ret; |
| GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element; |
| |
| switch (transition) { |
| case GST_STATE_CHANGE_NULL_TO_READY:{ |
| break; |
| } |
| case GST_STATE_CHANGE_READY_TO_PAUSED:{ |
| /* Hold the splitmux type lock until after the |
| * parent state change function has finished |
| * changing the states of things, and type finding can continue */ |
| SPLITMUX_PART_LOCK (reader); |
| g_object_set (reader->src, "location", reader->path, NULL); |
| SPLITMUX_PART_UNLOCK (reader); |
| SPLITMUX_PART_TYPE_LOCK (reader); |
| break; |
| } |
| case GST_STATE_CHANGE_READY_TO_NULL: |
| case GST_STATE_CHANGE_PAUSED_TO_READY: |
| SPLITMUX_PART_LOCK (reader); |
| gst_splitmux_part_reader_set_flushing_locked (reader, TRUE); |
| reader->running = FALSE; |
| SPLITMUX_PART_BROADCAST (reader); |
| SPLITMUX_PART_UNLOCK (reader); |
| break; |
| case GST_STATE_CHANGE_PLAYING_TO_PAUSED: |
| SPLITMUX_PART_LOCK (reader); |
| reader->active = FALSE; |
| gst_splitmux_part_reader_set_flushing_locked (reader, TRUE); |
| SPLITMUX_PART_BROADCAST (reader); |
| SPLITMUX_PART_UNLOCK (reader); |
| break; |
| default: |
| break; |
| } |
| |
| ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); |
| if (ret == GST_STATE_CHANGE_FAILURE) { |
| if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) { |
| /* Make sure to release the lock we took above */ |
| SPLITMUX_PART_TYPE_UNLOCK (reader); |
| } |
| goto beach; |
| } |
| |
| switch (transition) { |
| case GST_STATE_CHANGE_READY_TO_PAUSED: |
| /* Sleep and wait until all streams have been collected, then do the seeks |
| * to measure the stream lengths. This took the type lock above, |
| * but it's OK to release it now and let typefinding happen... */ |
| SPLITMUX_PART_TYPE_UNLOCK (reader); |
| |
| SPLITMUX_PART_LOCK (reader); |
| reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS; |
| gst_splitmux_part_reader_set_flushing_locked (reader, FALSE); |
| reader->running = TRUE; |
| |
| while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) { |
| GST_LOG_OBJECT (reader, "Waiting to collect all output streams"); |
| SPLITMUX_PART_WAIT (reader); |
| } |
| |
| if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS || |
| reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) { |
| gst_splitmux_part_reader_measure_streams (reader); |
| } else if (reader->prep_state == PART_STATE_FAILED) |
| ret = GST_STATE_CHANGE_FAILURE; |
| SPLITMUX_PART_UNLOCK (reader); |
| break; |
| case GST_STATE_CHANGE_PAUSED_TO_PLAYING: |
| SPLITMUX_PART_LOCK (reader); |
| gst_splitmux_part_reader_set_flushing_locked (reader, FALSE); |
| reader->active = TRUE; |
| SPLITMUX_PART_BROADCAST (reader); |
| SPLITMUX_PART_UNLOCK (reader); |
| break; |
| case GST_STATE_CHANGE_READY_TO_NULL: |
| reader->prep_state = PART_STATE_NULL; |
| splitmux_part_reader_reset (reader); |
| break; |
| default: |
| break; |
| } |
| |
| beach: |
| return ret; |
| } |
| |
| static gboolean |
| check_bus_messages (GstSplitMuxPartReader * part) |
| { |
| gboolean ret = FALSE; |
| GstBus *bus; |
| GstMessage *m; |
| |
| bus = gst_element_get_bus (GST_ELEMENT_CAST (part)); |
| while ((m = gst_bus_pop (bus)) != NULL) { |
| if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) { |
| GST_LOG_OBJECT (part, "Got error message while preparing. Failing."); |
| gst_message_unref (m); |
| goto done; |
| } |
| gst_message_unref (m); |
| } |
| ret = TRUE; |
| done: |
| gst_object_unref (bus); |
| return ret; |
| } |
| |
| gboolean |
| gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part) |
| { |
| GstStateChangeReturn ret; |
| |
| ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED); |
| |
| if (ret != GST_STATE_CHANGE_SUCCESS) |
| return FALSE; |
| |
| return check_bus_messages (part); |
| } |
| |
| void |
| gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part) |
| { |
| gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL); |
| } |
| |
| void |
| gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader, |
| const gchar * path) |
| { |
| reader->path = g_strdup (path); |
| } |
| |
| gboolean |
| gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader, |
| GstSegment * seg) |
| { |
| GST_DEBUG_OBJECT (reader, "Activating part reader"); |
| |
| if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) { |
| GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT, |
| seg); |
| return FALSE; |
| } |
| if (gst_element_set_state (GST_ELEMENT_CAST (reader), |
| GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { |
| GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING"); |
| return FALSE; |
| } |
| return TRUE; |
| } |
| |
| void |
| gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader) |
| { |
| GST_DEBUG_OBJECT (reader, "Deactivating reader"); |
| gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED); |
| } |
| |
| void |
| gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader, |
| gboolean flushing) |
| { |
| GList *cur; |
| |
| GST_LOG_OBJECT (reader, "%s dataqueues", |
| flushing ? "Flushing" : "Done flushing"); |
| for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); |
| gst_data_queue_set_flushing (part_pad->queue, flushing); |
| if (flushing) |
| gst_data_queue_flush (part_pad->queue); |
| } |
| }; |
| |
| void |
| gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader, |
| gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb) |
| { |
| reader->cb_data = cb_data; |
| reader->get_pad_cb = get_pad_cb; |
| } |
| |
| GstClockTime |
| gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader) |
| { |
| GList *cur; |
| GstClockTime ret = GST_CLOCK_TIME_NONE; |
| |
| SPLITMUX_PART_LOCK (reader); |
| for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); |
| if (part_pad->max_ts < ret) |
| ret = part_pad->max_ts; |
| } |
| |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| return ret; |
| } |
| |
| void |
| gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader, |
| GstClockTime offset) |
| { |
| SPLITMUX_PART_LOCK (reader); |
| reader->start_offset = offset; |
| GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT, |
| GST_TIME_ARGS (offset)); |
| SPLITMUX_PART_UNLOCK (reader); |
| } |
| |
| GstClockTime |
| gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader) |
| { |
| GstClockTime ret = GST_CLOCK_TIME_NONE; |
| |
| SPLITMUX_PART_LOCK (reader); |
| ret = reader->start_offset; |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| return ret; |
| } |
| |
| GstClockTime |
| gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader) |
| { |
| GstClockTime dur; |
| |
| SPLITMUX_PART_LOCK (reader); |
| dur = reader->duration; |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| return dur; |
| } |
| |
| GstPad * |
| gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader, |
| GstPad * target) |
| { |
| GstPad *result = NULL; |
| GList *cur; |
| |
| SPLITMUX_PART_LOCK (reader); |
| for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { |
| GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); |
| if (part_pad->target == target) { |
| result = (GstPad *) gst_object_ref (part_pad); |
| break; |
| } |
| } |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| return result; |
| } |
| |
| GstFlowReturn |
| gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad, |
| GstDataQueueItem ** item) |
| { |
| GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad); |
| GstDataQueue *q; |
| GstFlowReturn ret; |
| |
| /* Get one item from the appropriate dataqueue */ |
| SPLITMUX_PART_LOCK (reader); |
| if (reader->prep_state == PART_STATE_FAILED) { |
| SPLITMUX_PART_UNLOCK (reader); |
| return GST_FLOW_ERROR; |
| } |
| |
| q = gst_object_ref (part_pad->queue); |
| |
| /* Have to drop the lock around pop, so we can be woken up for flush */ |
| SPLITMUX_PART_UNLOCK (reader); |
| if (!gst_data_queue_pop (q, item) || (*item == NULL)) { |
| ret = GST_FLOW_FLUSHING; |
| goto out; |
| } |
| |
| SPLITMUX_PART_LOCK (reader); |
| |
| SPLITMUX_PART_BROADCAST (reader); |
| if (GST_IS_EVENT ((*item)->object)) { |
| GstEvent *e = (GstEvent *) ((*item)->object); |
| /* Mark this pad as EOS */ |
| if (GST_EVENT_TYPE (e) == GST_EVENT_EOS) |
| part_pad->is_eos = TRUE; |
| } |
| |
| SPLITMUX_PART_UNLOCK (reader); |
| |
| ret = GST_FLOW_OK; |
| out: |
| gst_object_unref (q); |
| return ret; |
| } |
| |
| static void |
| bus_handler (GstBin * bin, GstMessage * message) |
| { |
| GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin; |
| |
| switch (GST_MESSAGE_TYPE (message)) { |
| case GST_MESSAGE_ERROR: |
| /* Make sure to set the state to failed and wake up the listener |
| * on error */ |
| SPLITMUX_PART_LOCK (reader); |
| reader->prep_state = PART_STATE_FAILED; |
| SPLITMUX_PART_BROADCAST (reader); |
| SPLITMUX_PART_UNLOCK (reader); |
| break; |
| default: |
| break; |
| } |
| |
| GST_BIN_CLASS (parent_class)->handle_message (bin, message); |
| } |