v4l2: Handle corrupted buffer with empty payload

This allow skipping buffer flagged with ERROR that has no payload.
This is typical behaviour when a recovererable error occured during
capture in the driver, but that no valid data was ever written into that
buffer. This patch also translate V4L2_BUF_FLAG_ERROR into
GST_BUFFER_FLAG_CORRUPTED. Hence decoding error produce
by decoder due to missing frames will now be correctly marked. Finally,
this fixes a buffer leak when EOS is reached.

https://bugzilla.gnome.org/show_bug.cgi?id=740040
diff --git a/sys/v4l2/gstv4l2bufferpool.c b/sys/v4l2/gstv4l2bufferpool.c
index 2ea9eb5..99d1cbf 100644
--- a/sys/v4l2/gstv4l2bufferpool.c
+++ b/sys/v4l2/gstv4l2bufferpool.c
@@ -1129,6 +1129,9 @@
       GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DELTA_UNIT);
   }
 
+  if (group->buffer.flags & V4L2_BUF_FLAG_ERROR)
+    GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_CORRUPTED);
+
   GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
 
   *buffer = outbuf;
@@ -1595,8 +1598,12 @@
           GstBuffer *tmp;
 
           if ((*buf)->pool == bpool) {
-            if (gst_buffer_get_size (*buf) == 0)
-              goto eos;
+            if (gst_buffer_get_size (*buf) == 0) {
+              if (GST_BUFFER_FLAG_IS_SET (*buf, GST_BUFFER_FLAG_CORRUPTED))
+                goto buffer_corrupted;
+              else
+                goto eos;
+            }
 
             /* start copying buffers when we are running low on buffers */
             if (g_atomic_int_get (&pool->num_queued) < pool->copy_threshold) {
@@ -1632,7 +1639,11 @@
           /* An empty buffer on capture indicates the end of stream */
           if (gst_buffer_get_size (tmp) == 0) {
             gst_v4l2_buffer_pool_release_buffer (bpool, tmp);
-            goto eos;
+
+            if (GST_BUFFER_FLAG_IS_SET (*buf, GST_BUFFER_FLAG_CORRUPTED))
+              goto buffer_corrupted;
+            else
+              goto eos;
           }
 
           ret = gst_v4l2_buffer_pool_copy_buffer (pool, *buf, tmp);
@@ -1787,10 +1798,19 @@
     GST_ERROR_OBJECT (pool, "failed to copy buffer");
     return ret;
   }
+buffer_corrupted:
+  {
+    GST_WARNING_OBJECT (pool, "Dropping corrupted buffer without payload");
+    gst_buffer_unref (*buf);
+    *buf = NULL;
+    return GST_V4L2_FLOW_CORRUPTED_BUFFER;
+  }
 eos:
   {
     GST_DEBUG_OBJECT (pool, "end of stream reached");
-    return GST_FLOW_EOS;
+    gst_buffer_unref (*buf);
+    *buf = NULL;
+    return GST_V4L2_FLOW_LAST_BUFFER;
   }
 acquire_failed:
   {
diff --git a/sys/v4l2/gstv4l2bufferpool.h b/sys/v4l2/gstv4l2bufferpool.h
index 8601a32..76013ce 100644
--- a/sys/v4l2/gstv4l2bufferpool.h
+++ b/sys/v4l2/gstv4l2bufferpool.h
@@ -44,6 +44,16 @@
 #define GST_V4L2_BUFFER_POOL(obj)      (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_V4L2_BUFFER_POOL, GstV4l2BufferPool))
 #define GST_V4L2_BUFFER_POOL_CAST(obj) ((GstV4l2BufferPool*)(obj))
 
+/* This flow return is used to indicated that the last buffer of a
+ * drain or a resoltuion change has been found. This should normally
+ * only occure for mem-2-mem devices. */
+#define GST_V4L2_FLOW_LAST_BUFFER GST_FLOW_CUSTOM_SUCCESS
+
+/* This flow return is used to indicated that the returned buffer was marked
+ * with the error flag and had no payload. This error should be recovered by
+ * simply waiting for next buffer. */
+#define GST_V4L2_FLOW_CORRUPTED_BUFFER GST_FLOW_CUSTOM_SUCCESS_1
+
 struct _GstV4l2BufferPool
 {
   GstBufferPool parent;
diff --git a/sys/v4l2/gstv4l2src.c b/sys/v4l2/gstv4l2src.c
index 06d4f09..8775612 100644
--- a/sys/v4l2/gstv4l2src.c
+++ b/sys/v4l2/gstv4l2src.c
@@ -638,19 +638,22 @@
 {
   GstV4l2Src *v4l2src = GST_V4L2SRC (src);
   GstV4l2Object *obj = v4l2src->v4l2object;
+  GstV4l2BufferPool *pool = GST_V4L2_BUFFER_POOL_CAST (obj->pool);
   GstFlowReturn ret;
   GstClock *clock;
   GstClockTime abs_time, base_time, timestamp, duration;
   GstClockTime delay;
 
-  ret = GST_BASE_SRC_CLASS (parent_class)->alloc (GST_BASE_SRC (src), 0,
-      obj->info.size, buf);
+  do {
+    ret = GST_BASE_SRC_CLASS (parent_class)->alloc (GST_BASE_SRC (src), 0,
+        obj->info.size, buf);
 
-  if (G_UNLIKELY (ret != GST_FLOW_OK))
-    goto alloc_failed;
+    if (G_UNLIKELY (ret != GST_FLOW_OK))
+      goto alloc_failed;
 
-  ret =
-      gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL_CAST (obj->pool), buf);
+    ret = gst_v4l2_buffer_pool_process (pool, buf);
+
+  } while (ret == GST_V4L2_FLOW_CORRUPTED_BUFFER);
 
   if (G_UNLIKELY (ret != GST_FLOW_OK))
     goto error;
@@ -788,8 +791,15 @@
   }
 error:
   {
-    GST_DEBUG_OBJECT (src, "error processing buffer %d (%s)", ret,
-        gst_flow_get_name (ret));
+    if (ret == GST_V4L2_FLOW_LAST_BUFFER) {
+      GST_ELEMENT_ERROR (src, RESOURCE, FAILED,
+          ("Driver returned a buffer with no payload, this most likely "
+              "indicate a bug in the driver."), (NULL));
+      ret = GST_FLOW_ERROR;
+    } else {
+      GST_DEBUG_OBJECT (src, "error processing buffer %d (%s)", ret,
+          gst_flow_get_name (ret));
+    }
     return ret;
   }
 }
diff --git a/sys/v4l2/gstv4l2transform.c b/sys/v4l2/gstv4l2transform.c
index f645532..cfcc3dd 100644
--- a/sys/v4l2/gstv4l2transform.c
+++ b/sys/v4l2/gstv4l2transform.c
@@ -468,20 +468,23 @@
   if (G_UNLIKELY (ret != GST_FLOW_OK))
     goto beach;
 
-  pool = gst_base_transform_get_buffer_pool (trans);
+  do {
+    pool = gst_base_transform_get_buffer_pool (trans);
 
-  if (!gst_buffer_pool_set_active (pool, TRUE))
-    goto activate_failed;
+    if (!gst_buffer_pool_set_active (pool, TRUE))
+      goto activate_failed;
 
-  GST_DEBUG_OBJECT (self, "Dequeue output buffer");
-  ret = gst_buffer_pool_acquire_buffer (pool, outbuf, NULL);
-  g_object_unref (pool);
+    GST_DEBUG_OBJECT (self, "Dequeue output buffer");
+    ret = gst_buffer_pool_acquire_buffer (pool, outbuf, NULL);
+    g_object_unref (pool);
 
-  if (ret != GST_FLOW_OK)
-    goto alloc_failed;
+    if (ret != GST_FLOW_OK)
+      goto alloc_failed;
 
-  pool = self->v4l2capture->pool;
-  ret = gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL (pool), outbuf);
+    pool = self->v4l2capture->pool;
+    ret = gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL (pool), outbuf);
+
+  } while (ret == GST_V4L2_FLOW_CORRUPTED_BUFFER);
 
   if (ret != GST_FLOW_OK) {
     gst_buffer_unref (*outbuf);
diff --git a/sys/v4l2/gstv4l2videodec.c b/sys/v4l2/gstv4l2videodec.c
index 5f45d6d..0ce3b32 100644
--- a/sys/v4l2/gstv4l2videodec.c
+++ b/sys/v4l2/gstv4l2videodec.c
@@ -358,6 +358,7 @@
 gst_v4l2_video_dec_loop (GstVideoDecoder * decoder)
 {
   GstV4l2VideoDec *self = GST_V4L2_VIDEO_DEC (decoder);
+  GstV4l2BufferPool *v4l2_pool = GST_V4L2_BUFFER_POOL (self->v4l2capture->pool);
   GstBufferPool *pool;
   GstVideoCodecFrame *frame;
   GstBuffer *buffer = NULL;
@@ -365,28 +366,29 @@
 
   GST_LOG_OBJECT (decoder, "Allocate output buffer");
 
-  /* We cannot use the base class allotate helper since it taking the internal
-   * stream lock. we know that the acquire may need to poll until more frames
-   * comes in and holding this lock would prevent that.
-   */
-  pool = gst_video_decoder_get_buffer_pool (decoder);
+  do {
+    /* We cannot use the base class allotate helper since it taking the internal
+     * stream lock. we know that the acquire may need to poll until more frames
+     * comes in and holding this lock would prevent that.
+     */
+    pool = gst_video_decoder_get_buffer_pool (decoder);
 
-  /* Pool may be NULL if we started going to READY state */
-  if (pool == NULL) {
-    ret = GST_FLOW_FLUSHING;
-    goto beach;
-  }
+    /* Pool may be NULL if we started going to READY state */
+    if (pool == NULL) {
+      ret = GST_FLOW_FLUSHING;
+      goto beach;
+    }
 
-  ret = gst_buffer_pool_acquire_buffer (pool, &buffer, NULL);
-  g_object_unref (pool);
+    ret = gst_buffer_pool_acquire_buffer (pool, &buffer, NULL);
+    g_object_unref (pool);
 
-  if (ret != GST_FLOW_OK)
-    goto beach;
+    if (ret != GST_FLOW_OK)
+      goto beach;
 
-  GST_LOG_OBJECT (decoder, "Process output buffer");
-  ret =
-      gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL (self->
-          v4l2capture->pool), &buffer);
+    GST_LOG_OBJECT (decoder, "Process output buffer");
+    ret = gst_v4l2_buffer_pool_process (v4l2_pool, &buffer);
+
+  } while (ret == GST_V4L2_FLOW_CORRUPTED_BUFFER);
 
   if (ret != GST_FLOW_OK)
     goto beach;