blob: 94e4c780290fada1f945b908d7e6be0c675bcc62 [file] [log] [blame]
#!/usr/bin/env python3
import argparse
import collections
import contextlib
import fcntl
import os
import select
import sys
import re
import termios
import time
import threading
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from functools import partial
from gi.repository import GLib, GObject, Gst, GstBase
from PIL import Image
Gst.init(None)
FILENAME_PREFIX = 'img'
FILENAME_SUFFIX = '.png'
AF_SYSFS_NODE = '/sys/module/ov5645_camera_mipi_v2/parameters/ov5645_af'
CAMERA_INIT_QUERY_SYSFS_NODE = '/sys/module/ov5645_camera_mipi_v2/parameters/ov5645_initialized'
HDMI_SYSFS_NODE = '/sys/class/drm/card0/card0-HDMI-A-1/status'
# No of initial frames to throw away before camera has stabilized
SCRAP_FRAMES = 1
SRC_WIDTH = 2592
SRC_HEIGHT = 1944
SRC_RATE = '15/1'
SRC_ELEMENT = 'v4l2src'
SINK_WIDTH = 2592
SINK_HEIGHT = 1944
SINK_ELEMENT = ('appsink name=appsink sync=false emit-signals=true '
'max-buffers=1 drop=true')
SCREEN_SINK = 'waylandsink fullscreen=true sync=false'
FAKE_SINK = 'fakesink sync=false'
SRC_CAPS = 'video/x-raw,format=YUY2,width={width},height={height},framerate={rate}'
SINK_CAPS = 'video/x-raw,format=RGB,width={width},height={height}'
LEAKY_Q = 'queue max-size-buffers=1 leaky=downstream'
PIPELINE = '''
{src_element} ! {src_caps} ! {leaky_q} ! tee name=t
t. ! {leaky_q} ! {screen_sink}
t. ! {leaky_q} ! videoconvert ! {sink_caps} ! {sink_element}
'''
Fraction = collections.namedtuple('Fraction', ('num', 'den'))
Fraction.__str__ = lambda self: '%s/%s' % (self.num, self.den)
Format = collections.namedtuple('Format', ('width', 'height', 'framerate'))
V4L2_DEVICE = re.compile(r'(?P<w>\d+)x(?P<h>\d+):(?P<num>\d+)/(?P<den>\d+)')
def parse_format(src):
match = V4L2_DEVICE.search(src)
if match:
return Format(width=int(match.group('w')),
height=int(match.group('h')),
framerate=Fraction(int(match.group('num')), int(match.group('den'))))
return None
def monitor_connected():
with open(HDMI_SYSFS_NODE, 'r') as hdmi_status:
status = hdmi_status.read()
return (status.rstrip() == 'connected')
def on_bus_message(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write('Warning: %s: %s\n' % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write('Error: %s: %s\n' % (err, debug))
loop.quit()
return True
def on_new_sample(sink, snapinfo):
if not snapinfo.save_frame():
# Throw away the frame
return Gst.FlowReturn.OK
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
imgfile = snapinfo.get_filename()
print('Saving image: ' + imgfile)
caps = sample.get_caps()
width = caps.get_structure(0).get_value('width')
height = caps.get_structure(0).get_value('height')
img = Image.frombytes('RGB', (width, height), mapinfo.data, 'raw')
img.save(imgfile)
img.close()
buf.unmap(mapinfo)
return Gst.FlowReturn.OK
def run_pipeline(snapinfo, camera_mode):
fmt = parse_format(camera_mode)
if fmt:
src_caps = SRC_CAPS.format(width=fmt.width, height=fmt.height, rate=fmt.framerate)
sink_caps = SINK_CAPS.format(width=fmt.width, height=fmt.height)
else:
src_caps = SRC_CAPS.format(width=SRC_WIDTH, height=SRC_HEIGHT, rate=SRC_RATE)
sink_caps = SINK_CAPS.format(width=SINK_WIDTH, height=SINK_HEIGHT)
if snapinfo.oneshot or not monitor_connected():
screen_sink = FAKE_SINK
else:
screen_sink = SCREEN_SINK
pipeline = PIPELINE.format(
leaky_q=LEAKY_Q,
src_element=SRC_ELEMENT,
src_caps=src_caps,
sink_caps=sink_caps,
sink_element=SINK_ELEMENT,
screen_sink=screen_sink)
pipeline = Gst.parse_launch(pipeline)
appsink = pipeline.get_by_name('appsink')
appsink.connect('new-sample', partial(on_new_sample, snapinfo=snapinfo))
loop = GLib.MainLoop()
# Set up a pipeline bus watch to catch errors.
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', on_bus_message, loop)
# Connect the loop to the snaphelper
snapinfo.connect_loop(loop)
# Run pipeline.
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# Clean up.
pipeline.set_state(Gst.State.NULL)
while GLib.MainContext.default().iteration(False):
pass
@contextlib.contextmanager
def setup_keyboard():
fd = sys.stdin.fileno()
termattr = termios.tcgetattr(fd)
orgattr = list(termattr)
termattr[3] = termattr[3] & ~(termios.ICANON | termios.ECHO)
termios.tcsetattr(fd, termios.TCSANOW, termattr)
orgflags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, orgflags | os.O_NONBLOCK)
try:
yield
finally:
termios.tcsetattr(fd, termios.TCSAFLUSH, orgattr)
fcntl.fcntl(fd, fcntl.F_SETFL, orgflags)
class SnapHelper:
def __init__(self, sysfs, prefix='img', oneshot=True, suffix='jpg'):
self.prefix = prefix
self.oneshot = oneshot
self.suffix = suffix
self.snap_it = oneshot
self.num = 0
self.scrapframes = SCRAP_FRAMES
self.sysfs = sysfs
self.loop = None
self.thread = None
if not oneshot:
self.pipe_r, self.pipe_w = os.pipe()
self.thread = threading.Thread(target=self.read_keyboard)
self.thread.daemon = True
def get_filename(self):
while True:
filename = self.prefix + str(self.num).zfill(4) + '.' + self.suffix
self.num = self.num + 1
if not os.path.exists(filename):
break
return filename
def read_keyboard(self):
print('Press space to take a snap, r to refocus, or q to quit')
with setup_keyboard():
while True:
read_fd, _, _ = select.select([sys.stdin, self.pipe_r], [], [])
if self.pipe_r in read_fd:
break
c = sys.stdin.read()
if c == ' ':
self.snap_it = True
if c == 'r':
self.refocus()
if c == 'q':
while not self.loop.is_running():
time.sleep(0.01)
self.loop.quit()
break
def exit_keyboard_thread(self):
try:
os.close(self.pipe_w)
except:
pass
def check_af(self):
try:
self.sysfs.seek(0)
v = self.sysfs.read()
if int(v) != 0x10:
print('NO Focus')
except:
pass
def refocus(self):
try:
self.sysfs.write('1')
self.sysfs.flush()
except:
pass
def save_frame(self):
# We always want to throw away the initial frames to let the
# camera stabilize. This seemed empirically to be the right number
# when running on desktop.
if self.scrapframes > 0:
if self.scrapframes == SCRAP_FRAMES:
self.refocus()
self.scrapframes = self.scrapframes - 1
return False
if self.snap_it:
self.check_af()
self.snap_it = False
retval = True
else:
retval = False
if self.oneshot:
self.loop.quit()
return retval
def connect_loop(self, loop):
self.loop = loop
if self.thread:
self.thread.start()
def main(arguments):
parser = argparse.ArgumentParser('Take camera snapshots')
parser.add_argument(
'--prefix',
'-p',
dest='prefix',
help='Filename prefix',
default=FILENAME_PREFIX)
parser.add_argument(
'--oneshot',
dest='oneshot',
action='store_true',
help='One shot vs. interactive mode')
parser.add_argument(
'--format',
'-f',
dest='suffix',
default='jpg',
choices=('jpg', 'bmp', 'png'),
help='Format to save, default is JPEG')
parser.add_argument('--camera_mode',
help='WxH:N/D of the camera resolution and framerate',
default='2592x1944:15/1')
args = parser.parse_args()
try:
with open(CAMERA_INIT_QUERY_SYSFS_NODE) as init_file:
init_file.seek(0)
init = init_file.read()
if int(init) != 1:
raise Exception('Cannot find ov5645 CSI camera, ' +
'check that your camera is connected')
with open(AF_SYSFS_NODE, 'w+') as sysfs:
snap = SnapHelper(sysfs, args.prefix, args.oneshot, args.suffix)
run_pipeline(snap, args.camera_mode)
snap.exit_keyboard_thread()
except Exception as ex:
print(ex)
if __name__ == '__main__':
main(sys.argv)
sys.exit()