Render full size overlays, draw with glvideomixer

With this approach there's no need for downscaling as video and
overlay rendering are decoupled in glvideomixer, so we can do
1080p@30 in real time. Video frames are displayed at their
normal rate and overlays are rendered at a potentially slower
rate if we can't keep up without the video stalling.

The main improvment here is a custom source element that enables
us to hit a fast GL texture upload path where we draw the overlay
with cairo/rsvg straight into a dma-buf (allocated by ion) and use
that as a texture in glvideomixer.

Change-Id: Icf69e29ab55a95122754a6e9f2c63ac7adc2d465
diff --git a/edgetpuvision/apps.py b/edgetpuvision/apps.py
index 9c5febd..e84df8a 100644
--- a/edgetpuvision/apps.py
+++ b/edgetpuvision/apps.py
@@ -43,8 +43,6 @@
     parser.add_argument('--source',
                         help='/dev/videoN:FMT:WxH:N/D or .mp4 file or image file',
                         default='/dev/video0:YUY2:1280x720:30/1')
-    parser.add_argument('--downscale', type=float, default=2.0,
-                        help='Downscale factor for .mp4 file rendering')
     parser.add_argument('--loop',  default=False, action='store_true',
                         help='Loop input video file')
     parser.add_argument('--displaymode', type=Display, choices=Display, default=Display.FULLSCREEN,
@@ -54,7 +52,6 @@
 
     if not run_gen(render_gen(args),
                    source=args.source,
-                   downscale=args.downscale,
                    loop=args.loop,
                    display=args.displaymode):
         print('Invalid source argument:', args.source)
diff --git a/edgetpuvision/gst_native.py b/edgetpuvision/gst_native.py
index 5a0c562..acce316 100644
--- a/edgetpuvision/gst_native.py
+++ b/edgetpuvision/gst_native.py
@@ -1,6 +1,7 @@
 import cairo
 import contextlib
 import ctypes
+import threading
 
 import gi
 gi.require_version('Gdk', '3.0')
@@ -53,16 +54,12 @@
 libcairo.cairo_surface_flush.argtypes = [ctypes.c_void_p]
 libcairo.cairo_surface_destroy.restype = None
 libcairo.cairo_surface_destroy.argtypes = [ctypes.c_void_p]
-libcairo.cairo_surface_status.restype = ctypes.c_int
-libcairo.cairo_surface_status.argtypes = [ctypes.c_void_p]
 libcairo.cairo_format_stride_for_width.restype = ctypes.c_int
 libcairo.cairo_format_stride_for_width.argtypes = [ctypes.c_int, ctypes.c_int]
 libcairo.cairo_create.restype = ctypes.c_void_p
 libcairo.cairo_create.argtypes = [ctypes.c_void_p]
 libcairo.cairo_destroy.restype = None
 libcairo.cairo_destroy.argtypes = [ctypes.c_void_p]
-libcairo.cairo_scale.restype = None
-libcairo.cairo_scale.argtypes = [ctypes.c_void_p, ctypes.c_double, ctypes.c_double]
 
 librsvg = ctypes.CDLL('librsvg-2.so.2')
 librsvg.rsvg_handle_new_from_data.restype = ctypes.c_void_p
@@ -102,22 +99,19 @@
     finally:
         libgst.gst_buffer_unmap(ptr, mapping)
 
-# GStreamer Element that attaches VideoOverlayComposition to buffers passing by.
-class OverlayInjector(GstBase.BaseTransform):
+class OverlaySource(GstBase.BaseSrc):
     __gstmetadata__ = ('<longname>', '<class>', '<description>', '<author>')
     __gsttemplates__ = (Gst.PadTemplate.new('src',
                                                Gst.PadDirection.SRC,
                                                Gst.PadPresence.ALWAYS,
-                                               Gst.Caps.new_any()),
-                        Gst.PadTemplate.new('sink',
-                                               Gst.PadDirection.SINK,
-                                               Gst.PadPresence.ALWAYS,
-                                               Gst.Caps.new_any()))
+                                               Gst.Caps.from_string(
+                                                'video/x-raw,format=BGRA,framerate=0/1'
+                                                )))
 
     @staticmethod
     def _plugin_init(plugin):
-        gtype = GObject.type_register(OverlayInjector)
-        Gst.Element.register(plugin, 'overlayinjector', 0, gtype)
+        gtype = GObject.type_register(OverlaySource)
+        Gst.Element.register(plugin, 'overlaysrc', 0, gtype)
         return True
 
     @staticmethod
@@ -127,7 +121,7 @@
             version[0], version[1],         # GStreamer version
             '',                             # name
             '',                             # description
-            OverlayInjector._plugin_init,   # init_func
+            OverlaySource._plugin_init,     # init_func
             '',                             # version
             'unknown',                      # license
             '',                             # source
@@ -136,60 +130,135 @@
         )
 
     def __init__(self):
-        GstBase.BaseTransform.__init__(self)
-        GstBase.BaseTransform.set_in_place(self, True)
-        self.render_size = None
+        GstBase.BaseSrc.__init__(self)
+        self.set_format(Gst.Format.TIME)
+        self.set_do_timestamp(False)
+        self.set_live(True)
+        self.cond = threading.Condition()
+        self.width = 0
+        self.height = 0
+        self.flushing = False
+        self.eos = False
         self.svg = None
-        self.rendered_svg = None
-        self.composition = None
-        self.scale_factor = 0.75
-
-    def set_svg(self, svg, render_size):
-        self.svg = svg
-        self.render_size = render_size
-
-    def do_transform_ip(self, frame_buf):
-        self.render()
-        if self.composition:
-            # Note: Buffer IS writable (ref is 1 in native land). However gst-python
-            # took an additional ref so it's now 2 and gst_buffer_is_writable
-            # returns false. We can't modify the buffer without fiddling with refcount.
-            if frame_buf.mini_object.refcount != 2:
-                return Gst.FlowReturn.ERROR
-            frame_buf.mini_object.refcount -= 1
-            GstVideo.buffer_add_video_overlay_composition_meta(frame_buf, self.composition)
-            frame_buf.mini_object.refcount += 1
-        return Gst.FlowReturn.OK
+        self.pts = 0
 
 
-    def render(self):
-        if not self.svg:
-            self.composition = None
-            self.rendered_svg = None
-            return
+    def do_decide_allocation(self, query):
+        if query.get_n_allocation_pools() > 0:
+            pool, size, min_buffers, max_buffers = query.parse_nth_allocation_pool(0)
+            query.set_nth_allocation_pool(0, pool, size, min_buffers, min(max_buffers, 3))
+        return GstBase.BaseSrc.do_decide_allocation(self, query)
 
-        if self.svg == self.rendered_svg:
-            return
+    def do_event(self, event):
+        if event.type == Gst.EventType.SEEK:
+            _, _, flags, _, _, _, _ = event.parse_seek()
+            if flags | Gst.SeekFlags.FLUSH:
+                self.send_event(Gst.Event.new_flush_start())
+                self.send_event(Gst.Event.new_flush_stop(True))
+            return True
+        return GstBase.BaseSrc.do_event(self, event)
 
-        overlay_size = self.render_size * self.scale_factor
-        stride = libcairo.cairo_format_stride_for_width(
-                int(cairo.FORMAT_ARGB32), overlay_size.width)
-        overlay_buffer = Gst.Buffer.new_allocate(None,
-                stride * overlay_size.height)
-        with _gst_buffer_map(overlay_buffer, Gst.MapFlags.WRITE) as mapped:
-            # Fill with transparency and create surface from buffer.
+    def set_eos(self):
+        with self.cond:
+            self.eos = True
+
+    def do_start (self):
+        self.set_svg(None, 0)
+        return True
+
+    def do_stop (self):
+        self.set_svg(None, 0)
+        return True
+
+    def set_svg(self, svg, pts):
+        with self.cond:
+            self.svg = svg
+            self.pts = pts
+            self.eos = False
+            self.cond.notify_all()
+
+    def set_flushing(self, flushing):
+        with self.cond:
+            self.flushing = flushing
+            self.cond.notify_all()
+
+    def do_set_caps(self, caps):
+        structure = caps.get_structure(0)
+        self.width = structure.get_value('width')
+        self.height = structure.get_value('height')
+        return True
+
+    def do_unlock(self):
+        self.set_flushing(True)
+        return True
+
+    def do_unlock_stop(self):
+        self.set_flushing(False)
+        return True
+
+    def get_flow_return_locked(self, default=None):
+        if self.eos:
+            self.eos = False
+            self.svg = None
+            return Gst.FlowReturn.EOS
+        if self.flushing:
+            return Gst.FlowReturn.FLUSHING
+        return default
+
+    def do_fill(self, offset, size, buf):
+        with self.cond:
+            result = self.get_flow_return_locked()
+            if result:
+                return result
+
+            while self.svg is None:
+                self.cond.wait()
+                result = self.get_flow_return_locked()
+                if result:
+                    return result
+
+            assert self.svg is not None
+            svg = self.svg
+            pts = self.pts
+            self.svg = None
+
+        # Note: Buffer IS writable (ref is 1 in native land). However gst-python
+        # took an additional ref so it's now 2 and gst_buffer_is_writable
+        # returns false. We can't modify the buffer without fiddling with refcount.
+        assert buf.mini_object.refcount == 2
+        buf.mini_object.refcount = 1
+        try:
+            self.render_svg(svg, buf)
+            buf.pts = pts
+        finally:
+            buf.mini_object.refcount = 2
+
+        with self.cond:
+            return self.get_flow_return_locked(Gst.FlowReturn.OK)
+
+    def render_svg(self, svg, buf):
+        with _gst_buffer_map(buf, Gst.MapFlags.WRITE) as mapped:
+            stride = libcairo.cairo_format_stride_for_width(
+                    int(cairo.FORMAT_ARGB32), self.width)
+            assert len(mapped) >= stride * self.height
+
+            # Fill with transparency.
             ctypes.memset(ctypes.addressof(mapped), 0, ctypes.sizeof(mapped))
+
+            # If svg is '' (can't be None here) we return 100% transparency.
+            if not svg:
+                return
+
             surface = libcairo.cairo_image_surface_create_for_data(
                     ctypes.addressof(mapped),
                     int(cairo.FORMAT_ARGB32),
-                    overlay_size.width,
-                    overlay_size.height,
+                    self.width,
+                    self.height,
                     stride)
 
             # Render the SVG overlay.
-            data = self.svg.encode('utf-8')
+            data = svg.encode('utf-8')
             context = libcairo.cairo_create(surface)
-            libcairo.cairo_scale(context, self.scale_factor, self.scale_factor)
             handle = librsvg.rsvg_handle_new_from_data(data, len(data), 0)
             librsvg.rsvg_handle_render_cairo(handle, context)
             librsvg.rsvg_handle_close(handle, 0)
@@ -198,14 +267,4 @@
             libcairo.cairo_surface_destroy(surface)
             libcairo.cairo_destroy(context)
 
-            # Attach overlay to VideoOverlayComposition.
-            GstVideo.buffer_add_video_meta(overlay_buffer,
-                    GstVideo.VideoFrameFlags.NONE, GstVideo.VideoFormat.BGRA,
-                    overlay_size.width, overlay_size.height)
-            rect = GstVideo.VideoOverlayRectangle.new_raw(overlay_buffer,
-                    0, 0, self.render_size.width, self.render_size.height,
-                    GstVideo.VideoOverlayFormatFlags.PREMULTIPLIED_ALPHA)
-            self.composition = GstVideo.VideoOverlayComposition.new(rect)
-            self.rendered_svg = self.svg
-
-OverlayInjector.plugin_register()
+OverlaySource.plugin_register()
diff --git a/edgetpuvision/gstreamer.py b/edgetpuvision/gstreamer.py
index 6c0018f..11d2560 100644
--- a/edgetpuvision/gstreamer.py
+++ b/edgetpuvision/gstreamer.py
@@ -145,12 +145,15 @@
     assert len(streams) == 1
     return streams[0]
 
-def is_seekable(element):
+def get_seek_element(pipeline):
+    element = pipeline.get_by_name('glsink')
+    if not element:
+        element = pipeline
     query = Gst.Query.new_seeking(Gst.Format.TIME)
     if element.query(query):
         _,  seekable, _, _ = query.parse_seeking()
-        return seekable
-    return False
+        return element
+    return None
 
 @contextlib.contextmanager
 def pull_sample(sink):
@@ -159,21 +162,22 @@
 
     result, mapinfo = buf.map(Gst.MapFlags.READ)
     if result:
-        yield sample, mapinfo.data
+        yield sample, mapinfo.data, buf.pts
     buf.unmap(mapinfo)
 
 def new_sample_callback(process):
     def callback(sink, pipeline):
-        with pull_sample(sink) as (sample, data):
+        with pull_sample(sink) as (sample, data, pts):
             process(data, caps_size(sample.get_caps()))
         return Gst.FlowReturn.OK
     return callback
 
 def on_bus_message(bus, message, pipeline, loop):
     if message.type == Gst.MessageType.EOS:
-        if loop and is_seekable(pipeline):
+        seek_element = get_seek_element(pipeline)
+        if loop and seek_element:
             flags = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT
-            if not pipeline.seek_simple(Gst.Format.TIME, flags, 0):
+            if not seek_element.seek_simple(Gst.Format.TIME, flags, 0):
                 Gtk.main_quit()
         else:
             Gtk.main_quit()
@@ -185,8 +189,13 @@
         sys.stderr.write('Error: %s: %s\n' % (err, debug))
         Gtk.main_quit()
 
+def on_sink_eos(sink, pipeline):
+    overlay = pipeline.get_by_name('overlay')
+    if overlay:
+        overlay.set_eos()
+
 def on_new_sample(sink, pipeline, render_overlay, layout, images, get_command):
-    with pull_sample(sink) as (sample, data):
+    with pull_sample(sink) as (sample, data, pts):
         custom_command = None
         save_frame = False
 
@@ -206,25 +215,25 @@
                              command=custom_command)
         overlay = pipeline.get_by_name('overlay')
         if overlay:
-            overlay.set_svg(svg, layout.render_size)
+            overlay.set_svg(svg, pts)
 
         if save_frame:
             images.put((data, layout.inference_size, svg))
 
     return Gst.FlowReturn.OK
 
-def run_gen(render_overlay_gen, *, source, downscale, loop, display):
+def run_gen(render_overlay_gen, *, source, loop, display):
     inference_size = render_overlay_gen.send(None)  # Initialize.
+    next(render_overlay_gen)
     return run(inference_size,
         lambda tensor, layout, command:
             render_overlay_gen.send((tensor, layout, command)),
         source=source,
-        downscale=downscale,
         loop=loop,
         display=display)
 
-def run(inference_size, render_overlay, *, source, downscale, loop, display):
-    result = get_pipeline(source, inference_size, downscale, display)
+def run(inference_size, render_overlay, *, source, loop, display):
+    result = get_pipeline(source, inference_size, display)
     if result:
         layout, pipeline = result
         run_pipeline(pipeline, layout, loop, render_overlay, display)
@@ -232,7 +241,7 @@
 
     return False
 
-def get_pipeline(source, inference_size, downscale, display):
+def get_pipeline(source, inference_size, display):
     fmt = parse_format(source)
     if fmt:
         layout = make_layout(inference_size, fmt.size)
@@ -241,7 +250,7 @@
     filename = os.path.expanduser(source)
     if os.path.isfile(filename):
         info = get_video_info(filename)
-        render_size = Size(info.get_width(), info.get_height()) / downscale
+        render_size = caps_size(info.get_caps())
         layout = make_layout(inference_size, render_size)
         return layout, file_pipline(info.is_image(), filename, layout, display)
 
@@ -315,7 +324,8 @@
                 render_overlay=functools.partial(render_overlay, layout=layout),
                 layout=layout,
                 images=images,
-                get_command=get_command)},
+                get_command=get_command),
+             'eos' : on_sink_eos},
             **(signals or {})
         }
 
diff --git a/edgetpuvision/pipelines.py b/edgetpuvision/pipelines.py
index d79bf2b..739d400 100644
--- a/edgetpuvision/pipelines.py
+++ b/edgetpuvision/pipelines.py
@@ -45,6 +45,8 @@
 # Display
 def image_display_pipeline(filename, layout):
     return (
+        [Filter('glvideomixer', name='mixer', background='black'),
+         display_sink(sync=False)],
         [decoded_file_src(filename),
          Tee(name='t')],
         [Pad('t'),
@@ -53,24 +55,35 @@
          Filter('videoscale'),
          Caps('video/x-raw', format='RGBA', width=layout.render_size.width, height=layout.render_size.height),
          Filter('imagefreeze'),
-         Filter('overlayinjector', name='overlay'),
-         display_sink()],
+         Filter('glupload'),
+         Pad('mixer')],
+        [Source('overlay', name='overlay'),
+         Caps('video/x-raw', format='BGRA', width=layout.render_size.width, height=layout.render_size.height),
+         Filter('glupload'),
+         Queue(max_size_buffers=1),
+         Pad('mixer')],
         [Pad('t'),
          Queue(),
          inference_pipeline(layout, stillimage=True)],
     )
 
-def video_display_pipeline(filename, layout,):
+
+def video_display_pipeline(filename, layout):
     return (
+        [Filter('glvideomixer', name='mixer', background='black'),
+         display_sink(sync=True)],
         [decoded_file_src(filename),
          Filter('glupload'),
          Tee(name='t')],
         [Pad('t'),
+         Filter('glupload'),
+         Queue(),
+         Pad('mixer')],
+        [Source('overlay', name='overlay'),
+         Caps('video/x-raw', format='BGRA', width=layout.render_size.width, height=layout.render_size.height),
+         Filter('glupload'),
          Queue(max_size_buffers=1),
-         Filter('glfilterbin', filter='glcolorscale'),
-         Filter('overlayinjector', name='overlay'),
-         Caps('video/x-raw', width=layout.render_size.width, height=layout.render_size.height),
-         display_sink()],
+         Pad('mixer')],
         [Pad('t'),
          Queue(max_size_buffers=1, leaky='downstream'),
          inference_pipeline(layout)],
@@ -78,14 +91,20 @@
 
 def camera_display_pipeline(fmt, layout):
     return (
+        [Filter('glvideomixer', name='mixer', background='black'),
+         display_sink(sync=False)],
         [v4l2_src(fmt),
          Filter('glupload'),
          Tee(name='t')],
-        [Pad(name='t'),
-         Queue(max_size_buffers=1, leaky='downstream'),
-         Filter('glfilterbin', filter='glcolorscale'),
-         Filter('overlayinjector', name='overlay'),
-         display_sink()],
+        [Pad('t'),
+         Filter('glupload'),
+         Queue(),
+         Pad('mixer')],
+        [Source('overlay', name='overlay'),
+         Caps('video/x-raw', format='BGRA', width=layout.render_size.width, height=layout.render_size.height),
+         Filter('glupload'),
+         Queue(max_size_buffers=1),
+         Pad('mixer')],
         [Pad(name='t'),
          Queue(max_size_buffers=1, leaky='downstream'),
          inference_pipeline(layout)],