flvmux: Wait for caps from both srcs before writing header

Wait for caps on all pads to start writing data even when source is live.

Includes unit test by Havard Graff that simulates it.

https://bugzilla.gnome.org/show_bug.cgi?id=794722
diff --git a/gst/flv/gstflvmux.c b/gst/flv/gstflvmux.c
index 6b248e4..826658f 100644
--- a/gst/flv/gstflvmux.c
+++ b/gst/flv/gstflvmux.c
@@ -1812,6 +1812,19 @@
 static GstClockTime
 gst_flv_mux_get_next_time (GstAggregator * aggregator)
 {
+  GstFlvMux *mux = GST_FLV_MUX (aggregator);
+
+  GST_OBJECT_LOCK (aggregator);
+  if (mux->state == GST_FLV_MUX_STATE_HEADER &&
+      ((mux->audio_pad && mux->audio_pad->codec == G_MAXUINT) ||
+          (mux->video_pad && mux->video_pad->codec == G_MAXUINT)))
+    goto wait_for_data;
+  GST_OBJECT_UNLOCK (aggregator);
+
   return gst_flv_mux_get_next_time_for_segment (aggregator,
       &GST_AGGREGATOR_PAD (aggregator->srcpad)->segment);
+
+wait_for_data:
+  GST_OBJECT_UNLOCK (aggregator);
+  return GST_CLOCK_TIME_NONE;
 }
diff --git a/tests/check/elements/flvmux.c b/tests/check/elements/flvmux.c
index 34b5d1c..f8b4059 100644
--- a/tests/check/elements/flvmux.c
+++ b/tests/check/elements/flvmux.c
@@ -169,43 +169,61 @@
   return buf;
 }
 
+guint8 speex_hdr0[] = {
+  0x53, 0x70, 0x65, 0x65, 0x78, 0x20, 0x20, 0x20,
+  0x31, 0x2e, 0x32, 0x72, 0x63, 0x31, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
+  0x50, 0x00, 0x00, 0x00, 0x80, 0x3e, 0x00, 0x00,
+  0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
+  0x01, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff,
+  0x40, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
+};
+
+guint8 speex_hdr1[] = {
+  0x1f, 0x00, 0x00, 0x00, 0x45, 0x6e, 0x63, 0x6f,
+  0x64, 0x65, 0x64, 0x20, 0x77, 0x69, 0x74, 0x68,
+  0x20, 0x47, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
+  0x65, 0x72, 0x20, 0x53, 0x70, 0x65, 0x65, 0x78,
+  0x65, 0x6e, 0x63, 0x00, 0x00, 0x00, 0x00, 0x01
+};
+
+guint8 speex_buf[] = {
+  0x36, 0x9d, 0x1b, 0x9a, 0x20, 0x00, 0x01, 0x68,
+  0xe8, 0xe8, 0xe8, 0xe8, 0xe8, 0xe8, 0xe8, 0x84,
+  0x00, 0xb4, 0x74, 0x74, 0x74, 0x74, 0x74, 0x74,
+  0x74, 0x42, 0x00, 0x5a, 0x3a, 0x3a, 0x3a, 0x3a,
+  0x3a, 0x3a, 0x3a, 0x21, 0x00, 0x2d, 0x1d, 0x1d,
+  0x1d, 0x1d, 0x1d, 0x1d, 0x1d, 0x1b, 0x3b, 0x60,
+  0xab, 0xab, 0xab, 0xab, 0xab, 0x0a, 0xba, 0xba,
+  0xba, 0xba, 0xb0, 0xab, 0xab, 0xab, 0xab, 0xab,
+  0x0a, 0xba, 0xba, 0xba, 0xba, 0xb7
+};
+
+guint8 h264_buf[] = {
+  0x00, 0x00, 0x00, 0x0b, 0x67, 0x42, 0xc0, 0x0c,
+  0x95, 0xa7, 0x20, 0x1e, 0x11, 0x08, 0xd4, 0x00,
+  0x00, 0x00, 0x04, 0x68, 0xce, 0x3c, 0x80, 0x00,
+  0x00, 0x00, 0x55, 0x65, 0xb8, 0x04, 0x0e, 0x7e,
+  0x1f, 0x22, 0x60, 0x34, 0x01, 0xe2, 0x00, 0x3c,
+  0xe1, 0xfc, 0x91, 0x40, 0xa6, 0x9e, 0x07, 0x42,
+  0x56, 0x44, 0x73, 0x75, 0x40, 0x9f, 0x0c, 0x87,
+  0x83, 0xc9, 0x52, 0x60, 0x6d, 0xd8, 0x98, 0x01,
+  0x16, 0xbd, 0x0f, 0xa6, 0xaf, 0x75, 0x83, 0xdd,
+  0xfa, 0xe7, 0x8f, 0xe3, 0x58, 0x10, 0x0f, 0x5c,
+  0x18, 0x2f, 0x41, 0x40, 0x23, 0x0b, 0x03, 0x70,
+  0x00, 0xff, 0xe4, 0xa6, 0x7d, 0x7f, 0x3f, 0x76,
+  0x01, 0xd0, 0x98, 0x2a, 0x0c, 0xb8, 0x02, 0x32,
+  0xbc, 0x56, 0xfd, 0x34, 0x4f, 0xcf, 0xfe, 0xa0,
+};
+
 GST_START_TEST (test_speex_streamable)
 {
   GstBuffer *buf;
   GstMapInfo map = GST_MAP_INFO_INIT;
 
-  guint8 header0[] = {
-    0x53, 0x70, 0x65, 0x65, 0x78, 0x20, 0x20, 0x20,
-    0x31, 0x2e, 0x32, 0x72, 0x63, 0x31, 0x00, 0x00,
-    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-    0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
-    0x50, 0x00, 0x00, 0x00, 0x80, 0x3e, 0x00, 0x00,
-    0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
-    0x01, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff,
-    0x40, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-    0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
-  };
-
-  guint8 header1[] = {
-    0x1f, 0x00, 0x00, 0x00, 0x45, 0x6e, 0x63, 0x6f,
-    0x64, 0x65, 0x64, 0x20, 0x77, 0x69, 0x74, 0x68,
-    0x20, 0x47, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
-    0x65, 0x72, 0x20, 0x53, 0x70, 0x65, 0x65, 0x78,
-    0x65, 0x6e, 0x63, 0x00, 0x00, 0x00, 0x00, 0x01
-  };
-
-  guint8 buffer[] = {
-    0x36, 0x9d, 0x1b, 0x9a, 0x20, 0x00, 0x01, 0x68,
-    0xe8, 0xe8, 0xe8, 0xe8, 0xe8, 0xe8, 0xe8, 0x84,
-    0x00, 0xb4, 0x74, 0x74, 0x74, 0x74, 0x74, 0x74,
-    0x74, 0x42, 0x00, 0x5a, 0x3a, 0x3a, 0x3a, 0x3a,
-    0x3a, 0x3a, 0x3a, 0x21, 0x00, 0x2d, 0x1d, 0x1d,
-    0x1d, 0x1d, 0x1d, 0x1d, 0x1d, 0x1b, 0x3b, 0x60,
-    0xab, 0xab, 0xab, 0xab, 0xab, 0x0a, 0xba, 0xba,
-    0xba, 0xba, 0xb0, 0xab, 0xab, 0xab, 0xab, 0xab,
-    0x0a, 0xba, 0xba, 0xba, 0xba, 0xb7
-  };
 
   GstCaps *caps = gst_caps_new_simple ("audio/x-speex",
       "rate", G_TYPE_INT, 16000,
@@ -221,18 +239,20 @@
   g_object_set (h->element, "streamable", 1, NULL);
 
   /* push speex header0 */
-  gst_harness_push (h, create_buffer (header0, sizeof (header0), base_time, 0));
+  gst_harness_push (h, create_buffer (speex_hdr0,
+          sizeof (speex_hdr0), base_time, 0));
 
   /* push speex header1 */
-  gst_harness_push (h, create_buffer (header1, sizeof (header1), base_time, 0));
+  gst_harness_push (h, create_buffer (speex_hdr1,
+          sizeof (speex_hdr1), base_time, 0));
 
   /* push speex data */
-  gst_harness_push (h, create_buffer (buffer, sizeof (buffer),
-          base_time, duration));
+  gst_harness_push (h, create_buffer (speex_buf,
+          sizeof (speex_buf), base_time, duration));
 
   /* push speex data 2 */
-  gst_harness_push (h, create_buffer (buffer, sizeof (buffer),
-          base_time + duration, duration));
+  gst_harness_push (h, create_buffer (speex_buf,
+          sizeof (speex_buf), base_time + duration, duration));
 
   /* pull out stream-start event */
   gst_event_unref (gst_harness_pull_event (h));
@@ -270,7 +290,8 @@
   /* 0xb2 means Speex, 16000Hz, Mono */
   fail_unless_equals_int (0xb2, map.data[11]);
   /* verify content is intact */
-  fail_unless_equals_int (0, memcmp (&map.data[12], header0, sizeof (header0)));
+  fail_unless_equals_int (0, memcmp (&map.data[12], speex_hdr0,
+          sizeof (speex_hdr0)));
   gst_buffer_unmap (buf, &map);
   gst_buffer_unref (buf);
 
@@ -287,7 +308,8 @@
   /* 0xb2 means Speex, 16000Hz, Mono */
   fail_unless_equals_int (0xb2, map.data[11]);
   /* verify content is intact */
-  fail_unless_equals_int (0, memcmp (&map.data[12], header1, sizeof (header1)));
+  fail_unless_equals_int (0, memcmp (&map.data[12], speex_hdr1,
+          sizeof (speex_hdr1)));
   gst_buffer_unmap (buf, &map);
   gst_buffer_unref (buf);
 
@@ -307,7 +329,8 @@
   /* 0xb2 means Speex, 16000Hz, Mono */
   fail_unless_equals_int (0xb2, map.data[11]);
   /* verify content is intact */
-  fail_unless_equals_int (0, memcmp (&map.data[12], buffer, sizeof (buffer)));
+  fail_unless_equals_int (0, memcmp (&map.data[12], speex_buf,
+          sizeof (speex_buf)));
   gst_buffer_unmap (buf, &map);
   gst_buffer_unref (buf);
 
@@ -327,7 +350,8 @@
   /* 0xb2 means Speex, 16000Hz, Mono */
   fail_unless_equals_int (0xb2, map.data[11]);
   /* verify content is intact */
-  fail_unless_equals_int (0, memcmp (&map.data[12], buffer, sizeof (buffer)));
+  fail_unless_equals_int (0, memcmp (&map.data[12], speex_buf,
+          sizeof (speex_buf)));
   gst_buffer_unmap (buf, &map);
   gst_buffer_unref (buf);
 
@@ -423,6 +447,112 @@
 
 GST_END_TEST;
 
+typedef struct
+{
+  GstHarness *a_sink;
+  GstHarness *v_sink;
+} DemuxHarnesses;
+
+static void
+flvdemux_pad_added (GstElement * flvdemux, GstPad * srcpad, DemuxHarnesses * h)
+{
+  GstCaps *caps = gst_pad_get_current_caps (srcpad);
+  const gchar *name = gst_structure_get_name (gst_caps_get_structure (caps, 0));
+
+  if (g_ascii_strncasecmp ("audio", name, 5) == 0)
+    gst_harness_add_element_src_pad (h->a_sink, srcpad);
+  else
+    gst_harness_add_element_src_pad (h->v_sink, srcpad);
+
+  gst_caps_unref (caps);
+}
+
+GST_START_TEST (test_video_caps_late)
+{
+  GstHarness *mux = gst_harness_new_with_padnames ("flvmux", NULL, "src");
+  GstHarness *a_src =
+      gst_harness_new_with_element (mux->element, "audio", NULL);
+  GstHarness *v_src =
+      gst_harness_new_with_element (mux->element, "video", NULL);
+  GstHarness *demux = gst_harness_new_with_padnames ("flvdemux", "sink", NULL);
+  GstHarness *a_sink =
+      gst_harness_new_with_element (demux->element, NULL, NULL);
+  GstHarness *v_sink =
+      gst_harness_new_with_element (demux->element, NULL, NULL);
+  DemuxHarnesses harnesses = { a_sink, v_sink };
+  guint i;
+  GstTestClock *tclock;
+
+  g_object_set (mux->element, "streamable", TRUE,
+      "latency", G_GUINT64_CONSTANT (1), NULL);
+  gst_harness_use_testclock (mux);
+
+  g_signal_connect (demux->element, "pad-added",
+      G_CALLBACK (flvdemux_pad_added), &harnesses);
+  gst_harness_add_sink_harness (mux, demux);
+
+  gst_harness_set_src_caps_str (a_src,
+      "audio/x-speex, rate=(int)16000, channels=(int)1");
+
+  fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (a_src,
+          gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
+              speex_hdr0, sizeof (speex_hdr0), 0, sizeof (speex_hdr0), NULL,
+              NULL)));
+
+  fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (a_src,
+          gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
+              speex_hdr1, sizeof (speex_hdr1), 0, sizeof (speex_hdr1), NULL,
+              NULL)));
+
+  /* Wait a little and make sure no clock was scheduled as this shouldn't happen
+   * before the caps are set */
+  g_usleep (40 * 1000);
+  tclock = gst_harness_get_testclock (mux);
+  fail_unless (gst_test_clock_get_next_entry_time (tclock) ==
+      GST_CLOCK_TIME_NONE);
+
+  gst_harness_set_src_caps_str (v_src,
+      "video/x-h264, stream-format=(string)avc, alignment=(string)au, "
+      "codec_data=(buffer)0142c00cffe1000b6742c00c95a7201e1108d401000468ce3c80");
+
+  gst_harness_crank_single_clock_wait (mux);
+
+  fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (a_src,
+          gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
+              speex_buf, sizeof (speex_buf), 0, sizeof (speex_buf), NULL,
+              NULL)));
+
+  fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (v_src,
+          gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
+              h264_buf, sizeof (h264_buf), 0, sizeof (h264_buf), NULL, NULL)));
+
+  gst_harness_crank_single_clock_wait (mux);
+  gst_harness_crank_single_clock_wait (mux);
+  gst_harness_crank_single_clock_wait (mux);
+
+
+  /* push from flvmux to demux */
+  for (i = 0; i < 6; i++)
+    gst_harness_push_to_sink (mux);
+
+  /* verify we got 2x audio and 1x video buffers out of flvdemux */
+  gst_buffer_unref (gst_harness_pull (a_sink));
+  gst_buffer_unref (gst_harness_pull (a_sink));
+  gst_buffer_unref (gst_harness_pull (v_sink));
+
+  fail_unless (gst_test_clock_get_next_entry_time (tclock) ==
+      GST_CLOCK_TIME_NONE);
+
+  g_clear_object (&tclock);
+  gst_harness_teardown (a_src);
+  gst_harness_teardown (v_src);
+  gst_harness_teardown (mux);
+  gst_harness_teardown (a_sink);
+  gst_harness_teardown (v_sink);
+}
+
+GST_END_TEST;
+
 static Suite *
 flvmux_suite (void)
 {
@@ -442,6 +572,7 @@
 
   tcase_add_test (tc_chain, test_speex_streamable);
   tcase_add_test (tc_chain, test_increasing_timestamp_when_pts_none);
+  tcase_add_test (tc_chain, test_video_caps_late);
 
   return s;
 }