blob: ad182118034e96ef51866cafc1689d2594bd83d7 [file] [log] [blame]
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .gst import *
def decoded_file_src(filename, render_size, freeze=False):
return [
Source('file', location=filename),
Filter('decodebin'),
Filter('v4l2convert'),
Caps('video/x-raw,width=%d,height=%d,format=BGRA' % (render_size.width, render_size.height)),
Filter('imagefreeze') if freeze else Filter('identity'),
Filter('glupload'),
Filter('glcolorconvert'),
Caps('video/x-raw(memory:GLMemory),format=RGBA'),
]
def v4l2_src(fmt, render_size):
return [
Source('v4l2', device=fmt.device),
Caps('video/x-raw', format=fmt.pixel, width=fmt.size.width, height=fmt.size.height,
framerate='%d/%d' % fmt.framerate),
Filter('v4l2convert'),
Caps('video/x-raw,width=%d,height=%d,format=BGRA' % (render_size.width, render_size.height)),
Filter('glupload'),
Filter('glcolorconvert'),
Caps('video/x-raw(memory:GLMemory),format=RGBA'),
]
def display_sink(sync=True):
return [
Queue(),
Filter('glsvgoverlay', name='overlay', sync=True),
Sink('glimage', name='glsink', sync=sync, qos=False, max_lateness=-1)
]
def h264_sink():
return Sink('app', name='h264sink', emit_signals=True, max_buffers=1, drop=False, sync=False)
def inference_pipeline(layout, stillimage=False):
size = max_inner_size(layout.render_size, layout.inference_size)
return [
Queue(),
Filter('glfilterbin', filter='glbox'),
Caps('video/x-raw', format='RGB', width=layout.inference_size.width, height=layout.inference_size.height),
Queue(),
Sink('app', name='appsink', emit_signals=True, sync=False),
]
# Display
def image_display_pipeline(filename, layout):
return (
[decoded_file_src(filename, layout.render_size, freeze=True),
Tee(name='t')],
[Pad('t'),
display_sink()],
[Pad('t'),
inference_pipeline(layout)],
)
def video_display_pipeline(filename, layout):
return (
[decoded_file_src(filename, layout.render_size),
Tee(name='t')],
[Pad('t'),
inference_pipeline(layout)],
[Pad('t'),
display_sink()],
)
def camera_display_pipeline(fmt, layout):
return (
[v4l2_src(fmt, layout.render_size),
Tee(name='t')],
[Pad('t'),
inference_pipeline(layout)],
[Pad('t'),
display_sink(False)],
)
# Headless
def image_headless_pipeline(filename, layout):
return (
[decoded_file_src(filename, layout.render_size, freeze=True),
inference_pipeline(layout)],
)
def video_headless_pipeline(filename, layout):
return (
[decoded_file_src(filename, layout.render_size),
inference_pipeline(layout)],
)
def camera_headless_pipeline(fmt, layout):
return (
[v4l2_src(fmt, layout.render_size),
inference_pipeline(layout)],
)
def video_streaming_pipeline(filename, layout):
return (
[Source('file', location=filename),
Filter('qtdemux'),
Tee(name='t')],
[Pad('t'),
Queue(),
Filter('h264parse'),
Caps('video/x-h264', stream_format='byte-stream', alignment='nal'),
h264_sink()],
[Pad('t'),
Queue(),
Filter('decodebin'),
Filter('v4l2convert'),
Caps('video/x-raw,format=BGRA'),
Filter('glupload'),
Filter('glcolorconvert'),
Caps('video/x-raw(memory:GLMemory),format=RGBA'),
inference_pipeline(layout)],
)
def camera_streaming_pipeline(fmt, profile, bitrate, layout):
return (
[v4l2_src(fmt, layout.render_size),
Tee(name='t')],
[Pad('t'),
Queue(max_size_buffers=1, leaky='downstream'),
Filter('gldownload'),
Filter('videoconvert'),
Filter('x264enc',
speed_preset='ultrafast',
tune='zerolatency',
threads=4,
key_int_max=5,
bitrate=int(bitrate / 1000), # kbit per second.
aud=False),
Caps('video/x-h264', profile=profile),
Filter('h264parse'),
Caps('video/x-h264', stream_format='byte-stream', alignment='nal'),
h264_sink()],
[Pad('t'),
inference_pipeline(layout)],
)