| import collections |
| import contextlib |
| import enum |
| import fcntl |
| import functools |
| import os |
| import pathlib |
| import queue |
| import sys |
| import termios |
| import threading |
| import time |
| |
| import numpy as np |
| |
| import gi |
| gi.require_version('GLib', '2.0') |
| gi.require_version('GObject', '2.0') |
| gi.require_version('Gst', '1.0') |
| gi.require_version('GstBase', '1.0') |
| gi.require_version('GstPbutils', '1.0') |
| |
| from gi.repository import GLib, GObject, Gst, GstBase |
| |
| GObject.threads_init() |
| Gst.init(None) |
| |
| from gi.repository import GstPbutils # Must be called after Gst.init(). |
| |
| from PIL import Image |
| |
| from .gst import * |
| from .pipelines import * |
| |
| COMMAND_SAVE_FRAME = ' ' |
| COMMAND_PRINT_INFO = 'p' |
| COMMAND_QUIT = 'q' |
| |
| class Display(enum.Enum): |
| FULLSCREEN = 'fullscreen' |
| WINDOW = 'window' |
| NONE = 'none' |
| |
| def __str__(self): |
| return self.value |
| |
| def set_nonblocking(fd): |
| flags = fcntl.fcntl(fd, fcntl.F_GETFL) |
| return fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
| |
| @contextlib.contextmanager |
| def term_raw_mode(fd): |
| old = termios.tcgetattr(fd) |
| new = termios.tcgetattr(fd) |
| new[3] = new[3] & ~(termios.ICANON | termios.ECHO) |
| termios.tcsetattr(fd, termios.TCSANOW, new) |
| try: |
| yield |
| finally: |
| termios.tcsetattr(fd, termios.TCSAFLUSH, old) |
| |
| def get_nowait(q): |
| try: |
| return q.get_nowait() |
| except queue.Empty: |
| return None |
| |
| @contextlib.contextmanager |
| def Worker(process, maxsize=0): |
| commands = queue.Queue(maxsize) |
| |
| def run(): |
| while True: |
| args = commands.get() |
| if args is None: |
| break |
| process(*args) |
| commands.task_done() |
| |
| thread = threading.Thread(target=run) |
| thread.start() |
| try: |
| yield commands |
| finally: |
| commands.put(None) |
| thread.join() |
| |
| def save_frame(rgb, size, overlay=None, ext='png'): |
| tag = '%010d' % int(time.monotonic() * 1000) |
| img = Image.frombytes('RGB', size, rgb, 'raw') |
| name = 'img-%s.%s' % (tag, ext) |
| img.save(name) |
| print('Frame saved as "%s"' % name) |
| if overlay: |
| name = 'img-%s.svg' % tag |
| with open(name, 'w') as f: |
| f.write(overlay) |
| print('Overlay saved as "%s"' % name) |
| |
| Layout = collections.namedtuple('Layout', ('size', 'window', 'inference_size', 'render_size')) |
| |
| def make_layout(inference_size, render_size): |
| size = min_outer_size(inference_size, render_size) |
| window = center_inside(render_size, size) |
| return Layout(size=size, window=window, |
| inference_size=inference_size, render_size=render_size) |
| |
| def caps_size(caps): |
| structure = caps.get_structure(0) |
| return Size(structure.get_value('width'), |
| structure.get_value('height')) |
| |
| def get_video_info(filename): |
| uri = pathlib.Path(filename).absolute().as_uri() |
| discoverer = GstPbutils.Discoverer() |
| info = discoverer.discover_uri(uri) |
| |
| streams = info.get_video_streams() |
| assert len(streams) == 1 |
| return streams[0] |
| |
| def loop(): |
| return GLib.MainLoop.new(None, False) |
| |
| @contextlib.contextmanager |
| def pull_sample(sink): |
| sample = sink.emit('pull-sample') |
| buf = sample.get_buffer() |
| |
| result, mapinfo = buf.map(Gst.MapFlags.READ) |
| if result: |
| yield sample, mapinfo.data |
| buf.unmap(mapinfo) |
| |
| def new_sample_callback(process): |
| def callback(sink, pipeline, loop): |
| with pull_sample(sink) as (sample, data): |
| process(data, caps_size(sample.get_caps())) |
| return Gst.FlowReturn.OK |
| return callback |
| |
| def on_bus_message(bus, message, loop): |
| if message.type == Gst.MessageType.EOS: |
| loop.quit() |
| elif message.type == Gst.MessageType.WARNING: |
| err, debug = message.parse_warning() |
| sys.stderr.write('Warning: %s: %s\n' % (err, debug)) |
| elif message.type == Gst.MessageType.ERROR: |
| err, debug = message.parse_error() |
| sys.stderr.write('Error: %s: %s\n' % (err, debug)) |
| loop.quit() |
| return True |
| |
| def run_pipeline(loop, pipeline, signals): |
| # Create pipeline |
| pipeline = describe(pipeline) |
| print(pipeline) |
| pipeline = Gst.parse_launch(pipeline) |
| |
| # Attach signals |
| for name, signals in signals.items(): |
| component = pipeline.get_by_name(name) |
| if component: |
| for signal_name, signal_handler in signals.items(): |
| component.connect(signal_name, signal_handler, pipeline, loop) |
| |
| # Set up a pipeline bus watch to catch errors. |
| bus = pipeline.get_bus() |
| bus.add_signal_watch() |
| bus.connect('message', on_bus_message, loop) |
| |
| # Run pipeline. |
| pipeline.set_state(Gst.State.PLAYING) |
| try: |
| loop.run() |
| except KeyboardInterrupt: |
| pass |
| finally: |
| pipeline.set_state(Gst.State.NULL) |
| |
| # Process all pending operations on the loop. |
| while loop.get_context().iteration(False): |
| pass |
| |
| def on_keypress(fd, flags, commands): |
| for ch in sys.stdin.read(): |
| commands.put(ch) |
| return True |
| |
| def on_new_sample(sink, pipeline, loop, render_overlay, layout, images, commands): |
| with pull_sample(sink) as (sample, data): |
| custom_command = None |
| save_frame = False |
| |
| command = get_nowait(commands) |
| if command == COMMAND_SAVE_FRAME: |
| save_frame = True |
| elif command == COMMAND_PRINT_INFO: |
| print('Timestamp: %.2f' % time.monotonic()) |
| print('Render size: %d x %d' % layout.render_size) |
| print('Inference size: %d x %d' % layout.inference_size) |
| elif command == COMMAND_QUIT: |
| loop.quit() |
| else: |
| custom_command = command |
| |
| svg = render_overlay(np.frombuffer(data, dtype=np.uint8), |
| command=custom_command) |
| overlay = pipeline.get_by_name('overlay') |
| if overlay: |
| overlay.set_property('data', svg) |
| |
| if save_frame: |
| images.put((data, layout.inference_size, svg)) |
| |
| return Gst.FlowReturn.OK |
| |
| def run_gen(render_overlay_gen, *, source, downscale, display): |
| inference_size = render_overlay_gen.send(None) # Initialize. |
| return run(inference_size, |
| lambda tensor, layout, command: |
| render_overlay_gen.send((tensor, layout, command)), |
| source=source, |
| downscale=downscale, |
| display=display) |
| |
| def run(inference_size, render_overlay, *, source, downscale, display): |
| result = get_pipeline(source, inference_size, downscale, display) |
| if result: |
| layout, pipeline = result |
| run_loop(pipeline, layout, render_overlay) |
| return True |
| |
| return False |
| |
| def get_pipeline(source, inference_size, downscale, display): |
| inference_size = Size(*inference_size) |
| fmt = parse_format(source) |
| if fmt: |
| layout = make_layout(inference_size, fmt.size) |
| return layout, camera_pipeline(fmt, layout, display) |
| |
| filename = os.path.expanduser(source) |
| if os.path.isfile(filename): |
| info = get_video_info(filename) |
| render_size = Size(info.get_width(), info.get_height()) / downscale |
| layout = make_layout(inference_size, render_size) |
| return layout, file_pipline(filename, info, layout, display) |
| |
| return None |
| |
| def camera_pipeline(fmt, layout, display): |
| if display is Display.NONE: |
| return camera_headless_pipeline(fmt, layout) |
| else: |
| return camera_display_pipeline(fmt, layout, display is Display.FULLSCREEN) |
| |
| def file_pipline(filename, info, layout, display): |
| if display is Display.NONE: |
| if info.is_image(): |
| return image_headless_pipeline(filename, layout) |
| else: |
| return video_headless_pipeline(filename, layout) |
| else: |
| fullscreen = display is Display.FULLSCREEN |
| if info.is_image(): |
| return image_display_pipeline(filename, layout, fullscreen) |
| else: |
| return video_display_pipeline(filename, layout, fullscreen) |
| |
| def run_loop(pipeline, layout, render_overlay): |
| loop = GLib.MainLoop() |
| commands = queue.Queue() |
| |
| with contextlib.ExitStack() as stack: |
| images = stack.enter_context(Worker(save_frame)) |
| |
| if sys.stdin.isatty(): |
| set_nonblocking(sys.stdin.fileno()) |
| GLib.io_add_watch(sys.stdin.fileno(), GLib.IO_IN, on_keypress, commands) |
| stack.enter_context(term_raw_mode(sys.stdin.fileno())) |
| |
| run_pipeline(loop, pipeline, {'appsink': {'new-sample': |
| functools.partial(on_new_sample, |
| render_overlay=functools.partial(render_overlay, layout=layout), |
| layout=layout, |
| images=images, |
| commands=commands)} |
| }) |