blob: d2ebe95954ca91b136ea528c2dda5ade61a85fb5 [file] [log] [blame]
import collections
import contextlib
import fcntl
import functools
import os
import queue
import sys
import termios
import threading
import time
import numpy as np
import gi
gi.require_version('GLib', '2.0')
gi.require_version('GObject', '2.0')
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
gi.require_version('GstPbutils', '1.0')
from gi.repository import GLib, GObject, Gst, GstBase
GObject.threads_init()
Gst.init(None)
from gi.repository import GstPbutils # Must be called after Gst.init().
from PIL import Image
from .gst import *
COMMAND_SAVE_FRAME = ' '
COMMAND_PRINT_INFO = 'p'
def set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
return fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
@contextlib.contextmanager
def term_raw_mode(fd):
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
new[3] = new[3] & ~(termios.ICANON | termios.ECHO)
termios.tcsetattr(fd, termios.TCSANOW, new)
try:
yield
finally:
termios.tcsetattr(fd, termios.TCSAFLUSH, old)
def get_nowait(q):
try:
return q.get_nowait()
except queue.Empty:
return None
@contextlib.contextmanager
def Worker(process, maxsize=0):
commands = queue.Queue(maxsize)
def run():
while True:
args = commands.get()
if args is None:
break
process(*args)
commands.task_done()
thread = threading.Thread(target=run)
thread.start()
try:
yield commands
finally:
commands.put(None)
thread.join()
def sink(fullscreen, sync=False):
return Filter('kmssink' if fullscreen else 'waylandsink', sync=sync),
def inference_pipeline(render_size, inference_size):
size = max_inner_size(render_size, inference_size)
return (
Filter('glfilterbin', filter='glcolorscale'),
Caps('video/x-raw', format='RGBA', width=size.width, height=size.height),
Filter('videoconvert'),
Caps('video/x-raw', format='RGB', width=size.width, height=size.height),
Filter('videobox', autocrop=True),
Caps('video/x-raw', width=inference_size.width, height=inference_size.height),
Filter('appsink', name='appsink', emit_signals=True, max_buffers=1, drop=True, sync=False)
)
def image_file_pipeline(filename, render_size, inference_size, fullscreen):
size = max_inner_size(render_size, inference_size)
return (
Filter('filesrc', location=filename),
Filter('decodebin'),
Filter('videoconvert'),
Caps('video/x-raw', format='RGB'),
Filter('imagefreeze'),
Tee(pads=((
Queue(max_size_buffers=1),
Filter('videoconvert'),
Filter('videoscale'),
Filter('rsvgoverlay', name='overlay'),
Caps('video/x-raw', width=render_size.width, height=render_size.height),
sink(fullscreen),
),(
Queue(max_size_buffers=1),
Filter('videoconvert'),
Filter('videoscale'),
Caps('video/x-raw', width=size.width, height=size.height),
Filter('videobox', autocrop=True),
Caps('video/x-raw', width=inference_size.width, height=inference_size.height),
Filter('appsink', name='appsink', emit_signals=True, max_buffers=1, drop=True, sync=False)
)))
)
def video_file_pipeline(filename, render_size, inference_size, fullscreen):
return (
Filter('filesrc', location=filename),
Filter('qtdemux'),
Filter('h264parse'),
Filter('vpudec'),
Filter('glupload'),
Tee(pads=((
Queue(max_size_buffers=1),
Filter('glfilterbin', filter='glcolorscale'),
Filter('rsvgoverlay', name='overlay'),
Caps('video/x-raw', width=render_size.width, height=render_size.height),
sink(fullscreen),
),(
Queue(max_size_buffers=1, leaky='downstream'),
inference_pipeline(render_size, inference_size),
)))
)
# v4l2-ctl --list-formats-ext --device /dev/video1
def v4l2_camera(fmt):
return (
Filter('v4l2src', device=fmt.device),
Caps('video/x-raw', format=fmt.pixel, width=fmt.size.width, height=fmt.size.height,
framerate='%d/%d' % fmt.framerate),
)
def video_camera_pipeline(render_size, inference_size, fullscreen):
return (
# TODO(dkovalev): Queue(max_size_buffers=1, leaky='downstream'),
Filter('glupload'),
Tee(pads=((
Queue(max_size_buffers=1, leaky='downstream'),
Filter('glfilterbin', filter='glcolorscale'),
Filter('rsvgoverlay', name='overlay'),
sink(fullscreen),
),(
Queue(max_size_buffers=1, leaky='downstream'),
inference_pipeline(render_size, inference_size),
)))
)
def h264sink(display_decoded=False):
appsink = Filter('appsink', name='h264sink', emit_signals=True, max_buffers=1, drop=False, sync=False),
if display_decoded:
return Tee(pads=(
(Queue(), appsink),
(Queue(), Filter('vpudec'), Filter('kmssink', sync=False))
))
return appsink
def file_streaming_pipeline(filename, render_size, inference_size):
return (
Filter('filesrc', location=filename),
Filter('qtdemux'),
Tee(pads=((
Queue(max_size_buffers=1),
Filter('h264parse'),
Filter('vpudec'),
inference_pipeline(render_size, inference_size),
), (
Queue(max_size_buffers=1),
Filter('h264parse'),
Caps('video/x-h264', stream_format='byte-stream', alignment='nal'),
h264sink()
)))
)
def camera_streaming_pipeline(profile, bitrate, render_size, inference_size):
size = max_inner_size(render_size, inference_size)
return (
Tee(pads=((
Queue(),
inference_pipeline(render_size, inference_size)
), (
Queue(max_size_buffers=1, leaky='downstream'),
Filter('videoconvert'),
Filter('x264enc',
speed_preset='ultrafast',
tune='zerolatency',
threads=4,
key_int_max=5,
bitrate=int(bitrate / 1000), # kbit per second.
aud=False),
Caps('video/x-h264', profile=profile),
Filter('h264parse'),
Caps('video/x-h264', stream_format='byte-stream', alignment='nal'),
h264sink()
)))
)
def save_frame(rgb, size, overlay=None, ext='png'):
tag = '%010d' % int(time.monotonic() * 1000)
img = Image.frombytes('RGB', size, rgb, 'raw')
name = 'img-%s.%s' % (tag, ext)
img.save(name)
print('Frame saved as "%s"' % name)
if overlay:
name = 'img-%s.svg' % tag
with open(name, 'w') as f:
f.write(overlay)
print('Overlay saved as "%s"' % name)
def avg_fps_counter(window_size):
window = collections.deque(maxlen=window_size)
prev = time.monotonic()
yield 0.0 # First fps value.
while True:
curr = time.monotonic()
window.append(curr - prev)
prev = curr
yield len(window) / sum(window)
def caps_size(caps):
structure = caps.get_structure(0)
return Size(structure.get_value('width'),
structure.get_value('height'))
def get_video_info(filename):
#Command line: gst-discoverer-1.0 -v ~/cars_highway.mp4
uri = 'file://' + filename
discoverer = GstPbutils.Discoverer()
info = discoverer.discover_uri(uri)
streams = info.get_video_streams()
assert len(streams) == 1
return streams[0]
def loop():
return GLib.MainLoop.new(None, False)
@contextlib.contextmanager
def pull_sample(sink):
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
yield sample, mapinfo.data
buf.unmap(mapinfo)
def new_sample_callback(process):
def callback(sink, pipeline):
with pull_sample(sink) as (sample, data):
process(data, caps_size(sample.get_caps()))
return Gst.FlowReturn.OK
return callback
def on_bus_message(bus, message, loop):
if message.type == Gst.MessageType.EOS:
loop.quit()
elif message.type == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write('Warning: %s: %s\n' % (err, debug))
elif message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write('Error: %s: %s\n' % (err, debug))
loop.quit()
return True
def run_pipeline(loop, pipeline, signals):
# Create pipeline
pipeline = describe(pipeline)
print(pipeline)
pipeline = Gst.parse_launch(pipeline)
# Attach signals
for name, signals in signals.items():
component = pipeline.get_by_name(name)
if component:
for signal_name, signal_handler in signals.items():
component.connect(signal_name, signal_handler, pipeline)
# Set up a pipeline bus watch to catch errors.
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', on_bus_message, loop)
# Run pipeline.
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except KeyboardInterrupt:
pass
finally:
pipeline.set_state(Gst.State.NULL)
def on_keypress(fd, flags, commands):
for ch in sys.stdin.read():
commands.put(ch)
return True
def on_new_sample(sink, pipeline, render_overlay, render_size, images, commands, fps_counter):
with pull_sample(sink) as (sample, data):
inference_rate = next(fps_counter)
custom_command = None
save_frame = False
command = get_nowait(commands)
if command == COMMAND_SAVE_FRAME:
save_frame = True
elif command == COMMAND_PRINT_INFO:
print('Timestamp: %.2f' % time.monotonic())
print('Inference FPS: %s' % inference_rate)
print('Render size: %d x %d' % render_size)
print('Inference size: %d x %d' % caps_size(sample.get_caps()))
else:
custom_command = command
svg = render_overlay(np.frombuffer(data, dtype=np.uint8),
inference_rate=inference_rate,
command=custom_command)
overlay = pipeline.get_by_name('overlay')
overlay.set_property('data', svg)
if save_frame:
images.put((data, caps_size(sample.get_caps()), svg))
return Gst.FlowReturn.OK
def run_gen(render_overlay_gen, *, source, downscale, fullscreen):
inference_size = render_overlay_gen.send(None) # Initialize.
return run(inference_size,
lambda tensor, size, window, inference_rate, command:
render_overlay_gen.send((tensor, size, window, inference_rate, command)),
source=source,
downscale=downscale,
fullscreen=fullscreen)
def run(inference_size, render_overlay, *, source, downscale, fullscreen):
reg = Gst.Registry.get()
for feature in reg.get_feature_list_by_plugin('vpu.imx'):
# Otherwise decodebin uses vpudec to decode JPEG images and fails.
feature.set_rank(Gst.Rank.MARGINAL)
fmt = parse_format(source)
if fmt:
run_camera(inference_size, render_overlay, fmt, fullscreen)
return True
filename = os.path.expanduser(source)
if os.path.isfile(filename):
run_file(inference_size, render_overlay,
filename=filename,
downscale=downscale,
fullscreen=fullscreen)
return True
return False
def run_camera(inference_size, render_overlay, fmt, fullscreen):
inference_size = Size(*inference_size)
camera = v4l2_camera(fmt)
caps = next(x for x in camera if isinstance(x, Caps))
render_size = Size(caps.width, caps.height)
pipeline = camera + video_camera_pipeline(render_size, inference_size, fullscreen)
return run_loop(pipeline, inference_size, render_size, render_overlay)
def run_file(inference_size, render_overlay, *, filename, downscale, fullscreen):
inference_size = Size(*inference_size)
info = get_video_info(filename)
render_size = Size(info.get_width(), info.get_height()) / downscale
if info.is_image():
pipeline = image_file_pipeline(filename, render_size, inference_size, fullscreen)
else:
pipeline = video_file_pipeline(filename, render_size, inference_size, fullscreen)
return run_loop(pipeline, inference_size, render_size, render_overlay)
def run_loop(pipeline, inference_size, render_size, render_overlay):
loop = GLib.MainLoop()
commands = queue.Queue()
with contextlib.ExitStack() as stack:
images = stack.enter_context(Worker(save_frame))
if sys.stdin.isatty():
set_nonblocking(sys.stdin.fileno())
GLib.io_add_watch(sys.stdin.fileno(), GLib.IO_IN, on_keypress, commands)
stack.enter_context(term_raw_mode(sys.stdin.fileno()))
size = min_outer_size(inference_size, render_size)
window = center_inside(render_size, size)
run_pipeline(loop, pipeline, {'appsink': {'new-sample':
functools.partial(on_new_sample,
render_overlay=functools.partial(render_overlay,
size=size,
window=window),
render_size=render_size,
images=images,
commands=commands,
fps_counter=avg_fps_counter(30))}
})
while GLib.MainContext.default().iteration(False):
pass