| /* A set of utility functions that are common between elements |
| * based upon GstAdaptiveDemux |
| * |
| * Copyright (c) <2015> YouView TV Ltd |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public |
| * License along with this library; if not, write to the |
| * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, |
| * Boston, MA 02110-1301, USA. |
| */ |
| |
| #include <gst/check/gstcheck.h> |
| #include "adaptive_demux_engine.h" |
| #include "adaptive_demux_common.h" |
| |
| #define GST_TEST_HTTP_SRC_NAME "testhttpsrc" |
| |
| struct _GstAdaptiveDemuxTestCaseClass |
| { |
| GObjectClass parent_class; |
| }; |
| |
| #define gst_adaptive_demux_test_case_parent_class parent_class |
| |
| static void gst_adaptive_demux_test_case_dispose (GObject * object); |
| static void gst_adaptive_demux_test_case_finalize (GObject * object); |
| static void gst_adaptive_demux_test_case_clear (GstAdaptiveDemuxTestCase * |
| testData); |
| |
| G_DEFINE_TYPE (GstAdaptiveDemuxTestCase, gst_adaptive_demux_test_case, |
| G_TYPE_OBJECT); |
| |
| static void |
| gst_adaptive_demux_test_case_class_init (GstAdaptiveDemuxTestCaseClass * klass) |
| { |
| GObjectClass *object = G_OBJECT_CLASS (klass); |
| |
| object->dispose = gst_adaptive_demux_test_case_dispose; |
| object->finalize = gst_adaptive_demux_test_case_finalize; |
| } |
| |
| static void |
| gst_adaptive_demux_test_case_init (GstAdaptiveDemuxTestCase * testData) |
| { |
| testData->output_streams = NULL; |
| testData->test_task = NULL; |
| g_rec_mutex_init (&testData->test_task_lock); |
| g_mutex_init (&testData->test_task_state_lock); |
| g_cond_init (&testData->test_task_state_cond); |
| gst_adaptive_demux_test_case_clear (testData); |
| } |
| |
| static void |
| gst_adaptive_demux_test_case_clear (GstAdaptiveDemuxTestCase * testData) |
| { |
| if (testData->output_streams) { |
| g_list_free (testData->output_streams); |
| testData->output_streams = NULL; |
| } |
| testData->count_of_finished_streams = 0; |
| if (testData->test_task) { |
| gst_task_stop (testData->test_task); |
| gst_task_join (testData->test_task); |
| gst_object_unref (testData->test_task); |
| testData->test_task = NULL; |
| } |
| testData->signal_context = NULL; |
| testData->test_task_state = TEST_TASK_STATE_NOT_STARTED; |
| testData->threshold_for_seek = 0; |
| gst_event_replace (&testData->seek_event, NULL); |
| testData->signal_context = NULL; |
| } |
| |
| |
| static void |
| gst_adaptive_demux_test_case_dispose (GObject * object) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (object); |
| |
| gst_adaptive_demux_test_case_clear (testData); |
| |
| GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object)); |
| } |
| |
| static void |
| gst_adaptive_demux_test_case_finalize (GObject * object) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (object); |
| |
| g_cond_clear (&testData->test_task_state_cond); |
| g_mutex_clear (&testData->test_task_state_lock); |
| g_rec_mutex_clear (&testData->test_task_lock); |
| if (testData->test_task) { |
| gst_task_stop (testData->test_task); |
| gst_task_join (testData->test_task); |
| gst_object_unref (testData->test_task); |
| testData->test_task = NULL; |
| } |
| if (testData->output_streams) { |
| g_list_free (testData->output_streams); |
| testData->output_streams = NULL; |
| } |
| |
| GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object)); |
| } |
| |
| /** |
| * gst_adaptive_demux_test_case_new: |
| * |
| * Creates a new #GstAdaptiveDemuxTestCase. Free with g_object_unref(). |
| * |
| * Returns: (transfer full): a new #GstAdaptiveDemuxTestCase |
| */ |
| GstAdaptiveDemuxTestCase * |
| gst_adaptive_demux_test_case_new (void) |
| { |
| return g_object_newv (GST_TYPE_ADAPTIVE_DEMUX_TEST_CASE, 0, NULL); |
| } |
| |
| |
| GstAdaptiveDemuxTestExpectedOutput * |
| gst_adaptive_demux_test_find_test_data_by_stream (GstAdaptiveDemuxTestCase * |
| testData, GstAdaptiveDemuxTestOutputStream * stream, guint * index) |
| { |
| gchar *pad_name; |
| GstAdaptiveDemuxTestExpectedOutput *ret = NULL; |
| guint count = 0; |
| |
| pad_name = gst_pad_get_name (stream->pad); |
| fail_unless (pad_name != NULL); |
| for (GList * walk = testData->output_streams; walk; walk = g_list_next (walk)) { |
| GstAdaptiveDemuxTestExpectedOutput *td = walk->data; |
| if (strcmp (td->name, pad_name) == 0) { |
| ret = td; |
| if (index) |
| *index = count; |
| } |
| ++count; |
| } |
| g_free (pad_name); |
| return ret; |
| } |
| |
| /* function to validate data received by AppSink */ |
| gboolean |
| gst_adaptive_demux_test_check_received_data (GstAdaptiveDemuxTestEngine * |
| engine, GstAdaptiveDemuxTestOutputStream * stream, GstBuffer * buffer, |
| gpointer user_data) |
| { |
| GstMapInfo info; |
| guint pattern; |
| guint64 streamOffset; |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); |
| GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; |
| |
| fail_unless (stream != NULL); |
| fail_unless (engine->pipeline != NULL); |
| testOutputStreamData = |
| gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL); |
| fail_unless (testOutputStreamData != NULL); |
| streamOffset = stream->segment_start + stream->segment_received_size; |
| if (testOutputStreamData->expected_data) { |
| gsize size = gst_buffer_get_size (buffer); |
| if (gst_buffer_memcmp (buffer, 0, |
| &testOutputStreamData->expected_data[streamOffset], size) == 0) { |
| return TRUE; |
| } |
| /* If buffers do not match, fall back to a slower byte-based check |
| so that the test can output the position where the received data |
| diverges from expected_data |
| */ |
| } |
| |
| gst_buffer_map (buffer, &info, GST_MAP_READ); |
| |
| GST_DEBUG |
| ("segment_start = %" G_GUINT64_FORMAT " segment_received_size = %" |
| G_GUINT64_FORMAT " bufferSize=%d", |
| stream->segment_start, stream->segment_received_size, (gint) info.size); |
| |
| pattern = streamOffset - streamOffset % sizeof (pattern); |
| for (guint64 i = 0; i != info.size; ++i) { |
| guint received = info.data[i]; |
| guint expected; |
| |
| if (testOutputStreamData->expected_data) { |
| fail_unless (streamOffset + i < testOutputStreamData->expected_size); |
| expected = testOutputStreamData->expected_data[streamOffset + i]; |
| } else { |
| gchar pattern_byte_to_read; |
| |
| pattern_byte_to_read = (streamOffset + i) % sizeof (pattern); |
| if (pattern_byte_to_read == 0) { |
| pattern = streamOffset + i; |
| } |
| |
| expected = (pattern >> (pattern_byte_to_read * 8)) & 0xFF; |
| #if 0 |
| GST_DEBUG |
| ("received '0x%02x' expected '0x%02x' offset %" G_GUINT64_FORMAT |
| " pattern=%08x byte_to_read=%d", |
| received, expected, i, pattern, pattern_byte_to_read); |
| #endif |
| } |
| |
| fail_unless (received == expected, |
| "output validation failed: received '0x%02x' expected '0x%02x' byte %" |
| G_GUINT64_FORMAT " offset=%" G_GUINT64_FORMAT "\n", received, expected, |
| i, streamOffset); |
| } |
| |
| gst_buffer_unmap (buffer, &info); |
| return TRUE; |
| } |
| |
| /* function to check total size of data received by AppSink |
| * will be called when AppSink receives eos. |
| */ |
| void |
| gst_adaptive_demux_test_check_size_of_received_data (GstAdaptiveDemuxTestEngine |
| * engine, GstAdaptiveDemuxTestOutputStream * stream, gpointer user_data) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); |
| GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; |
| |
| testOutputStreamData = |
| gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL); |
| fail_unless (testOutputStreamData != NULL); |
| |
| fail_unless (stream->total_received_size == |
| testOutputStreamData->expected_size, |
| "size validation failed, expected %d received %d", |
| testOutputStreamData->expected_size, stream->total_received_size); |
| testData->count_of_finished_streams++; |
| if (testData->count_of_finished_streams == |
| g_list_length (testData->output_streams)) { |
| g_main_loop_quit (engine->loop); |
| } |
| } |
| |
| typedef struct _SeekTaskContext |
| { |
| GstElement *pipeline; |
| GstTask *task; |
| GstEvent *seek_event; |
| } SeekTaskContext; |
| |
| /* function to generate a seek event. Will be run in a separate thread */ |
| static void |
| testSeekTaskDoSeek (gpointer user_data) |
| { |
| SeekTaskContext *context = (SeekTaskContext *) user_data; |
| GstTask *task; |
| |
| GST_DEBUG ("testSeekTaskDoSeek calling seek"); |
| |
| fail_unless (GST_IS_EVENT (context->seek_event)); |
| fail_unless (GST_EVENT_TYPE (context->seek_event) == GST_EVENT_SEEK); |
| |
| if (!gst_element_send_event (GST_ELEMENT (context->pipeline), |
| context->seek_event)) |
| fail ("Seek failed!\n"); |
| GST_DEBUG ("seek ok"); |
| task = context->task; |
| g_slice_free (SeekTaskContext, context); |
| gst_task_stop (task); |
| } |
| |
| /* function to be called during seek test when demux sends data to AppSink |
| * It monitors the data sent and after a while will generate a seek request. |
| */ |
| static gboolean |
| testSeekAdaptiveDemuxSendsData (GstAdaptiveDemuxTestEngine * engine, |
| GstAdaptiveDemuxTestOutputStream * stream, |
| GstBuffer * buffer, gpointer user_data) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); |
| SeekTaskContext *seekContext; |
| GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; |
| guint index = 0; |
| |
| testOutputStreamData = |
| gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, |
| &index); |
| fail_unless (testOutputStreamData != NULL); |
| /* first entry in testData->output_streams is the |
| PAD on which to perform the seek */ |
| if (index == 0 && |
| testData->test_task == NULL && |
| (stream->total_received_size + stream->segment_received_size) >= |
| testData->threshold_for_seek) { |
| GstSeekFlags seek_flags; |
| |
| testData->threshold_for_seek = |
| stream->total_received_size + stream->segment_received_size; |
| |
| gst_event_parse_seek (testData->seek_event, NULL, NULL, &seek_flags, NULL, |
| NULL, NULL, NULL); |
| if (seek_flags & GST_SEEK_FLAG_FLUSH) |
| testOutputStreamData->expected_size += testData->threshold_for_seek; |
| |
| GST_DEBUG ("starting seek task"); |
| |
| g_mutex_lock (&testData->test_task_state_lock); |
| testData->test_task_state = |
| TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE; |
| g_mutex_unlock (&testData->test_task_state_lock); |
| |
| seekContext = g_slice_new (SeekTaskContext); |
| seekContext->pipeline = engine->pipeline; |
| seekContext->seek_event = gst_event_ref (testData->seek_event); |
| testData->test_task = seekContext->task = |
| gst_task_new ((GstTaskFunction) testSeekTaskDoSeek, seekContext, NULL); |
| gst_task_set_lock (testData->test_task, &testData->test_task_lock); |
| gst_task_start (testData->test_task); |
| |
| GST_DEBUG ("seek task started"); |
| |
| if (seek_flags & GST_SEEK_FLAG_FLUSH) { |
| g_mutex_lock (&testData->test_task_state_lock); |
| |
| GST_DEBUG ("waiting for seek task to change state on testsrc"); |
| |
| /* wait for test_task to run, send a flush start event to AppSink |
| * and change the testhttpsrc element state from PLAYING to PAUSED |
| */ |
| while (testData->test_task_state == |
| TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE) { |
| g_cond_wait (&testData->test_task_state_cond, |
| &testData->test_task_state_lock); |
| } |
| g_mutex_unlock (&testData->test_task_state_lock); |
| /* we can continue now, but this buffer will be rejected by AppSink |
| * because it is in flushing mode |
| */ |
| GST_DEBUG ("seek task changed state on testsrc, resuming"); |
| } |
| } |
| |
| return TRUE; |
| } |
| |
| static void |
| testSeekAdaptiveAppSinkEvent (GstAdaptiveDemuxTestEngine * engine, |
| GstAdaptiveDemuxTestOutputStream * stream, |
| GstEvent * event, gpointer user_data) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); |
| GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; |
| guint index = 0; |
| |
| testOutputStreamData = |
| gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, |
| &index); |
| fail_unless (testOutputStreamData != NULL); |
| |
| if (testData->seek_event && GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT |
| && testOutputStreamData->post_seek_segment.format != GST_FORMAT_UNDEFINED |
| && gst_event_get_seqnum (event) == |
| gst_event_get_seqnum (testData->seek_event)) { |
| const GstSegment *seek_segment; |
| |
| |
| gst_event_parse_segment (event, &seek_segment); |
| fail_unless (seek_segment->format == |
| testOutputStreamData->post_seek_segment.format); |
| fail_unless (seek_segment->rate == |
| testOutputStreamData->post_seek_segment.rate); |
| fail_unless (seek_segment->start == |
| testOutputStreamData->post_seek_segment.start); |
| fail_unless (seek_segment->stop == |
| testOutputStreamData->post_seek_segment.stop); |
| fail_unless (seek_segment->base == |
| testOutputStreamData->post_seek_segment.base); |
| fail_unless (seek_segment->time == |
| testOutputStreamData->post_seek_segment.time); |
| |
| testOutputStreamData->segment_verification_needed = FALSE; |
| } |
| } |
| |
| /* callback called when main_loop detects a state changed event */ |
| static void |
| testSeekOnStateChanged (GstBus * bus, GstMessage * msg, gpointer user_data) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); |
| GstState old_state, new_state; |
| const char *srcName = GST_OBJECT_NAME (msg->src); |
| |
| gst_message_parse_state_changed (msg, &old_state, &new_state, NULL); |
| GST_DEBUG ("Element %s changed state from %s to %s", |
| GST_OBJECT_NAME (msg->src), |
| gst_element_state_get_name (old_state), |
| gst_element_state_get_name (new_state)); |
| |
| if (strstr (srcName, "srcbin") == srcName && |
| old_state == GST_STATE_PLAYING && new_state == GST_STATE_PAUSED) { |
| g_mutex_lock (&testData->test_task_state_lock); |
| if (testData->test_task_state == |
| TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE) { |
| GST_DEBUG ("changing test_task_state"); |
| testData->test_task_state = TEST_TASK_STATE_EXITING; |
| g_cond_signal (&testData->test_task_state_cond); |
| } |
| g_mutex_unlock (&testData->test_task_state_lock); |
| } |
| } |
| |
| /* |
| * Issue a seek request after media segment has started to be downloaded |
| * on the first pad listed in GstAdaptiveDemuxTestOutputStreamData and the |
| * first chunk of at least one byte has already arrived in AppSink |
| */ |
| static void |
| testSeekPreTestCallback (GstAdaptiveDemuxTestEngine * engine, |
| gpointer user_data) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); |
| GstBus *bus; |
| |
| /* register a callback to listen for state change events */ |
| bus = gst_pipeline_get_bus (GST_PIPELINE (engine->pipeline)); |
| gst_bus_add_signal_watch (bus); |
| g_signal_connect (bus, "message::state-changed", |
| G_CALLBACK (testSeekOnStateChanged), testData); |
| } |
| |
| static void |
| testSeekPostTestCallback (GstAdaptiveDemuxTestEngine * engine, |
| gpointer user_data) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); |
| for (GList * walk = testData->output_streams; walk; walk = g_list_next (walk)) { |
| GstAdaptiveDemuxTestExpectedOutput *td = walk->data; |
| |
| fail_if (td->segment_verification_needed); |
| } |
| } |
| |
| /* function to check total size of data received by AppSink |
| * will be called when AppSink receives eos. |
| */ |
| void gst_adaptive_demux_test_download_error_size_of_received_data |
| (GstAdaptiveDemuxTestEngine * engine, |
| GstAdaptiveDemuxTestOutputStream * stream, gpointer user_data) |
| { |
| GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data); |
| GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData; |
| |
| testOutputStreamData = |
| gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL); |
| fail_unless (testOutputStreamData != NULL); |
| /* expect to receive more than 0 */ |
| fail_unless (stream->total_received_size > 0, |
| "size validation failed for %s, expected > 0, received %d", |
| testOutputStreamData->name, stream->total_received_size); |
| |
| /* expect to receive less than file size */ |
| fail_unless (stream->total_received_size < |
| testOutputStreamData->expected_size, |
| "size validation failed for %s, expected < %d received %d", |
| testOutputStreamData->name, testOutputStreamData->expected_size, |
| stream->total_received_size); |
| if (testData->count_of_finished_streams == |
| g_list_length (testData->output_streams)) { |
| g_main_loop_quit (engine->loop); |
| } |
| } |
| |
| void |
| gst_adaptive_demux_test_seek (const gchar * element_name, |
| const gchar * manifest_uri, GstAdaptiveDemuxTestCase * testData) |
| { |
| GstAdaptiveDemuxTestCallbacks cb = { 0 }; |
| cb.appsink_received_data = gst_adaptive_demux_test_check_received_data; |
| cb.appsink_eos = gst_adaptive_demux_test_check_size_of_received_data; |
| cb.appsink_event = testSeekAdaptiveAppSinkEvent; |
| cb.pre_test = testSeekPreTestCallback; |
| cb.post_test = testSeekPostTestCallback; |
| cb.demux_sent_data = testSeekAdaptiveDemuxSendsData; |
| gst_adaptive_demux_test_run (element_name, manifest_uri, &cb, testData); |
| /* the call to g_object_unref of testData will clean up the seek task */ |
| } |
| |
| void |
| gst_adaptive_demux_test_setup (void) |
| { |
| GstRegistry *registry; |
| gboolean ret; |
| |
| registry = gst_registry_get (); |
| ret = gst_test_http_src_register_plugin (registry, GST_TEST_HTTP_SRC_NAME); |
| fail_unless (ret); |
| } |
| |
| void |
| gst_adaptive_demux_test_teardown (void) |
| { |
| gst_test_http_src_install_callbacks (NULL, NULL); |
| gst_test_http_src_set_default_blocksize (0); |
| } |