| /* GStreamer |
| * Copyright (C) <2009> Collabora Ltd |
| * @author: Olivier Crete <olivier.crete@collabora.co.uk |
| * Copyright (C) <2009> Nokia Inc |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public |
| * License along with this library; if not, write to the |
| * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, |
| * Boston, MA 02110-1301, USA. |
| */ |
| /** |
| * SECTION:element-shmsink |
| * @title: shmsink |
| * |
| * Send data over shared memory to the matching source. |
| * |
| * ## Example launch lines |
| * |[ |
| * gst-launch-1.0 -v videotestsrc ! "video/x-raw, format=YUY2, color-matrix=sdtv, \ |
| * chroma-site=mpeg2, width=(int)320, height=(int)240, framerate=(fraction)30/1" \ |
| * ! shmsink socket-path=/tmp/blah shm-size=2000000 |
| * ]| Send video to shm buffers. |
| * |
| */ |
| #ifdef HAVE_CONFIG_H |
| #include "config.h" |
| #endif |
| |
| #include "gstshmsink.h" |
| |
| #include <gst/gst.h> |
| |
| #include <string.h> |
| |
| /* signals */ |
| enum |
| { |
| SIGNAL_CLIENT_CONNECTED, |
| SIGNAL_CLIENT_DISCONNECTED, |
| LAST_SIGNAL |
| }; |
| |
| /* properties */ |
| enum |
| { |
| PROP_0, |
| PROP_SOCKET_PATH, |
| PROP_PERMS, |
| PROP_SHM_SIZE, |
| PROP_WAIT_FOR_CONNECTION, |
| PROP_BUFFER_TIME |
| }; |
| |
| struct GstShmClient |
| { |
| ShmClient *client; |
| GstPollFD pollfd; |
| }; |
| |
| #define DEFAULT_SIZE ( 64 * 1024 * 1024 ) |
| #define DEFAULT_WAIT_FOR_CONNECTION (TRUE) |
| /* Default is user read/write, group read */ |
| #define DEFAULT_PERMS ( S_IRUSR | S_IWUSR | S_IRGRP ) |
| |
| |
| GST_DEBUG_CATEGORY_STATIC (shmsink_debug); |
| #define GST_CAT_DEFAULT shmsink_debug |
| |
| static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", |
| GST_PAD_SINK, |
| GST_PAD_ALWAYS, |
| GST_STATIC_CAPS_ANY); |
| |
| #define gst_shm_sink_parent_class parent_class |
| G_DEFINE_TYPE (GstShmSink, gst_shm_sink, GST_TYPE_BASE_SINK); |
| |
| static void gst_shm_sink_finalize (GObject * object); |
| static void gst_shm_sink_set_property (GObject * object, guint prop_id, |
| const GValue * value, GParamSpec * pspec); |
| static void gst_shm_sink_get_property (GObject * object, guint prop_id, |
| GValue * value, GParamSpec * pspec); |
| |
| static gboolean gst_shm_sink_start (GstBaseSink * bsink); |
| static gboolean gst_shm_sink_stop (GstBaseSink * bsink); |
| static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf); |
| |
| static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event); |
| static gboolean gst_shm_sink_unlock (GstBaseSink * bsink); |
| static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink); |
| static gboolean gst_shm_sink_propose_allocation (GstBaseSink * sink, |
| GstQuery * query); |
| |
| static gpointer pollthread_func (gpointer data); |
| |
| static guint signals[LAST_SIGNAL] = { 0 }; |
| |
| |
| |
| /******************** |
| * CUSTOM ALLOCATOR * |
| ********************/ |
| |
| #define GST_TYPE_SHM_SINK_ALLOCATOR \ |
| (gst_shm_sink_allocator_get_type()) |
| #define GST_SHM_SINK_ALLOCATOR(obj) \ |
| (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SINK_ALLOCATOR, \ |
| GstShmSinkAllocator)) |
| #define GST_SHM_SINK_ALLOCATOR_CLASS(klass) \ |
| (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SINK_ALLOCATOR, \ |
| GstShmSinkAllocatorClass)) |
| #define GST_IS_SHM_SINK_ALLOCATOR(obj) \ |
| (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SINK_ALLOCATOR)) |
| #define GST_IS_SHM_SINK_ALLOCATOR_CLASS(klass) \ |
| (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK_ALLOCATOR)) |
| |
| struct _GstShmSinkAllocator |
| { |
| GstAllocator parent; |
| |
| GstShmSink *sink; |
| }; |
| |
| typedef struct _GstShmSinkAllocatorClass |
| { |
| GstAllocatorClass parent; |
| } GstShmSinkAllocatorClass; |
| |
| typedef struct _GstShmSinkMemory |
| { |
| GstMemory mem; |
| |
| gchar *data; |
| GstShmSink *sink; |
| ShmBlock *block; |
| } GstShmSinkMemory; |
| |
| GType gst_shm_sink_allocator_get_type (void); |
| |
| G_DEFINE_TYPE (GstShmSinkAllocator, gst_shm_sink_allocator, GST_TYPE_ALLOCATOR); |
| |
| static void |
| gst_shm_sink_allocator_dispose (GObject * object) |
| { |
| GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (object); |
| |
| if (self->sink) |
| gst_object_unref (self->sink); |
| self->sink = NULL; |
| |
| G_OBJECT_CLASS (gst_shm_sink_allocator_parent_class)->dispose (object); |
| } |
| |
| static void |
| gst_shm_sink_allocator_free (GstAllocator * allocator, GstMemory * mem) |
| { |
| GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem; |
| |
| if (mymem->block) { |
| GST_OBJECT_LOCK (mymem->sink); |
| sp_writer_free_block (mymem->block); |
| GST_OBJECT_UNLOCK (mymem->sink); |
| gst_object_unref (mymem->sink); |
| } |
| gst_object_unref (mem->allocator); |
| |
| g_slice_free (GstShmSinkMemory, mymem); |
| } |
| |
| static gpointer |
| gst_shm_sink_allocator_mem_map (GstMemory * mem, gsize maxsize, |
| GstMapFlags flags) |
| { |
| GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem; |
| |
| return mymem->data; |
| } |
| |
| static void |
| gst_shm_sink_allocator_mem_unmap (GstMemory * mem) |
| { |
| } |
| |
| static GstMemory * |
| gst_shm_sink_allocator_mem_share (GstMemory * mem, gssize offset, gssize size) |
| { |
| GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem; |
| GstShmSinkMemory *mysub; |
| GstMemory *parent; |
| |
| /* find the real parent */ |
| if ((parent = mem->parent) == NULL) |
| parent = mem; |
| |
| if (size == -1) |
| size = mem->size - offset; |
| |
| mysub = g_slice_new0 (GstShmSinkMemory); |
| /* the shared memory is always readonly */ |
| gst_memory_init (GST_MEMORY_CAST (mysub), GST_MINI_OBJECT_FLAGS (parent) | |
| GST_MINI_OBJECT_FLAG_LOCK_READONLY, gst_object_ref (mem->allocator), |
| parent, mem->maxsize, mem->align, mem->offset + offset, size); |
| mysub->data = mymem->data; |
| |
| return (GstMemory *) mysub; |
| } |
| |
| static gboolean |
| gst_shm_sink_allocator_mem_is_span (GstMemory * mem1, GstMemory * mem2, |
| gsize * offset) |
| { |
| GstShmSinkMemory *mymem1 = (GstShmSinkMemory *) mem1; |
| GstShmSinkMemory *mymem2 = (GstShmSinkMemory *) mem2; |
| |
| if (offset) { |
| GstMemory *parent; |
| |
| parent = mem1->parent; |
| |
| *offset = mem1->offset - parent->offset; |
| } |
| |
| /* and memory is contiguous */ |
| return mymem1->data + mem1->offset + mem1->size == |
| mymem2->data + mem2->offset; |
| } |
| |
| static void |
| gst_shm_sink_allocator_init (GstShmSinkAllocator * self) |
| { |
| GstAllocator *allocator = GST_ALLOCATOR (self); |
| |
| allocator->mem_map = gst_shm_sink_allocator_mem_map; |
| allocator->mem_unmap = gst_shm_sink_allocator_mem_unmap; |
| allocator->mem_share = gst_shm_sink_allocator_mem_share; |
| allocator->mem_is_span = gst_shm_sink_allocator_mem_is_span; |
| } |
| |
| |
| static GstMemory * |
| gst_shm_sink_allocator_alloc_locked (GstShmSinkAllocator * self, gsize size, |
| GstAllocationParams * params) |
| { |
| GstMemory *memory = NULL; |
| ShmBlock *block = NULL; |
| gsize maxsize = size + params->prefix + params->padding; |
| gsize align = params->align; |
| |
| /* ensure configured alignment */ |
| align |= gst_memory_alignment; |
| /* allocate more to compensate for alignment */ |
| maxsize += align; |
| |
| block = sp_writer_alloc_block (self->sink->pipe, maxsize); |
| if (block) { |
| GstShmSinkMemory *mymem; |
| gsize aoffset, padding; |
| |
| GST_LOG_OBJECT (self, |
| "Allocated block %p with %" G_GSIZE_FORMAT " bytes at %p", block, size, |
| sp_writer_block_get_buf (block)); |
| |
| mymem = g_slice_new0 (GstShmSinkMemory); |
| memory = GST_MEMORY_CAST (mymem); |
| mymem->data = sp_writer_block_get_buf (block); |
| mymem->sink = gst_object_ref (self->sink); |
| mymem->block = block; |
| |
| /* do alignment */ |
| if ((aoffset = ((guintptr) mymem->data & align))) { |
| aoffset = (align + 1) - aoffset; |
| mymem->data += aoffset; |
| maxsize -= aoffset; |
| } |
| |
| if (params->prefix && (params->flags & GST_MEMORY_FLAG_ZERO_PREFIXED)) |
| memset (mymem->data, 0, params->prefix); |
| |
| padding = maxsize - (params->prefix + size); |
| if (padding && (params->flags & GST_MEMORY_FLAG_ZERO_PADDED)) |
| memset (mymem->data + params->prefix + size, 0, padding); |
| |
| gst_memory_init (memory, params->flags, |
| GST_ALLOCATOR_CAST (g_object_ref (self)), NULL, maxsize, align, |
| params->prefix, size); |
| } |
| |
| return memory; |
| } |
| |
| static GstMemory * |
| gst_shm_sink_allocator_alloc (GstAllocator * allocator, gsize size, |
| GstAllocationParams * params) |
| { |
| GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (allocator); |
| GstMemory *memory = NULL; |
| |
| GST_OBJECT_LOCK (self->sink); |
| memory = gst_shm_sink_allocator_alloc_locked (self, size, params); |
| GST_OBJECT_UNLOCK (self->sink); |
| |
| if (!memory) { |
| memory = gst_allocator_alloc (NULL, size, params); |
| GST_LOG_OBJECT (self, |
| "Not enough shared memory for GstMemory of %" G_GSIZE_FORMAT |
| "bytes, allocating using standard allocator", size); |
| } |
| |
| return memory; |
| } |
| |
| |
| static void |
| gst_shm_sink_allocator_class_init (GstShmSinkAllocatorClass * klass) |
| { |
| GstAllocatorClass *allocator_class = GST_ALLOCATOR_CLASS (klass); |
| GObjectClass *object_class = G_OBJECT_CLASS (klass); |
| |
| allocator_class->alloc = gst_shm_sink_allocator_alloc; |
| allocator_class->free = gst_shm_sink_allocator_free; |
| object_class->dispose = gst_shm_sink_allocator_dispose; |
| } |
| |
| static GstShmSinkAllocator * |
| gst_shm_sink_allocator_new (GstShmSink * sink) |
| { |
| GstShmSinkAllocator *self = g_object_new (GST_TYPE_SHM_SINK_ALLOCATOR, NULL); |
| |
| gst_object_ref_sink (self); |
| |
| self->sink = gst_object_ref (sink); |
| |
| return self; |
| } |
| |
| |
| /*************** |
| * MAIN OBJECT * |
| ***************/ |
| |
| static void |
| gst_shm_sink_init (GstShmSink * self) |
| { |
| g_cond_init (&self->cond); |
| self->size = DEFAULT_SIZE; |
| self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION; |
| self->perms = DEFAULT_PERMS; |
| |
| gst_allocation_params_init (&self->params); |
| } |
| |
| static void |
| gst_shm_sink_class_init (GstShmSinkClass * klass) |
| { |
| GObjectClass *gobject_class; |
| GstElementClass *gstelement_class; |
| GstBaseSinkClass *gstbasesink_class; |
| |
| gobject_class = (GObjectClass *) klass; |
| gstelement_class = (GstElementClass *) klass; |
| gstbasesink_class = (GstBaseSinkClass *) klass; |
| |
| gobject_class->finalize = gst_shm_sink_finalize; |
| gobject_class->set_property = gst_shm_sink_set_property; |
| gobject_class->get_property = gst_shm_sink_get_property; |
| |
| gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_shm_sink_start); |
| gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_shm_sink_stop); |
| gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_shm_sink_render); |
| gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event); |
| gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock); |
| gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop); |
| gstbasesink_class->propose_allocation = |
| GST_DEBUG_FUNCPTR (gst_shm_sink_propose_allocation); |
| |
| g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, |
| g_param_spec_string ("socket-path", |
| "Path to the control socket", |
| "The path to the control socket used to control the shared memory " |
| "transport. This may be modified during the NULL->READY transition", |
| NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_PERMS, |
| g_param_spec_uint ("perms", |
| "Permissions on the shm area", |
| "Permissions to set on the shm area", |
| 0, 07777, DEFAULT_PERMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_SHM_SIZE, |
| g_param_spec_uint ("shm-size", |
| "Size of the shm area", |
| "Size of the shared memory area", |
| 0, G_MAXUINT, DEFAULT_SIZE, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION, |
| g_param_spec_boolean ("wait-for-connection", |
| "Wait for a connection until rendering", |
| "Block the stream until the shm pipe is connected", |
| DEFAULT_WAIT_FOR_CONNECTION, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| g_object_class_install_property (gobject_class, PROP_BUFFER_TIME, |
| g_param_spec_int64 ("buffer-time", |
| "Buffer Time of the shm buffer", |
| "Maximum Size of the shm buffer in nanoseconds (-1 to disable)", |
| -1, G_MAXINT64, -1, |
| G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| signals[SIGNAL_CLIENT_CONNECTED] = g_signal_new ("client-connected", |
| GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, |
| g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); |
| |
| signals[SIGNAL_CLIENT_DISCONNECTED] = g_signal_new ("client-disconnected", |
| GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, |
| g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); |
| |
| gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate); |
| |
| gst_element_class_set_static_metadata (gstelement_class, |
| "Shared Memory Sink", |
| "Sink", |
| "Send data over shared memory to the matching source", |
| "Olivier Crete <olivier.crete@collabora.co.uk>"); |
| |
| GST_DEBUG_CATEGORY_INIT (shmsink_debug, "shmsink", 0, "Shared Memory Sink"); |
| } |
| |
| static void |
| gst_shm_sink_finalize (GObject * object) |
| { |
| GstShmSink *self = GST_SHM_SINK (object); |
| |
| g_cond_clear (&self->cond); |
| g_free (self->socket_path); |
| |
| G_OBJECT_CLASS (parent_class)->finalize (object); |
| } |
| |
| /* |
| * Set the value of a property for the server sink. |
| */ |
| static void |
| gst_shm_sink_set_property (GObject * object, guint prop_id, |
| const GValue * value, GParamSpec * pspec) |
| { |
| GstShmSink *self = GST_SHM_SINK (object); |
| int ret = 0; |
| |
| switch (prop_id) { |
| case PROP_SOCKET_PATH: |
| GST_OBJECT_LOCK (object); |
| g_free (self->socket_path); |
| self->socket_path = g_value_dup_string (value); |
| GST_OBJECT_UNLOCK (object); |
| break; |
| case PROP_PERMS: |
| GST_OBJECT_LOCK (object); |
| self->perms = g_value_get_uint (value); |
| if (self->pipe) |
| ret = sp_writer_setperms_shm (self->pipe, self->perms); |
| GST_OBJECT_UNLOCK (object); |
| if (ret < 0) |
| GST_WARNING_OBJECT (object, "Could not set permissions on pipe: %s", |
| strerror (ret)); |
| break; |
| case PROP_SHM_SIZE: |
| GST_OBJECT_LOCK (object); |
| if (self->pipe) { |
| if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) { |
| /* Swap allocators, so we can know immediately if the memory is |
| * ours */ |
| gst_object_unref (self->allocator); |
| self->allocator = gst_shm_sink_allocator_new (self); |
| |
| GST_DEBUG_OBJECT (self, "Resized shared memory area from %u to " |
| "%u bytes", self->size, g_value_get_uint (value)); |
| } else { |
| GST_WARNING_OBJECT (self, "Could not resize shared memory area from" |
| "%u to %u bytes", self->size, g_value_get_uint (value)); |
| } |
| } |
| self->size = g_value_get_uint (value); |
| GST_OBJECT_UNLOCK (object); |
| break; |
| case PROP_WAIT_FOR_CONNECTION: |
| GST_OBJECT_LOCK (object); |
| self->wait_for_connection = g_value_get_boolean (value); |
| GST_OBJECT_UNLOCK (object); |
| g_cond_broadcast (&self->cond); |
| break; |
| case PROP_BUFFER_TIME: |
| GST_OBJECT_LOCK (object); |
| self->buffer_time = g_value_get_int64 (value); |
| GST_OBJECT_UNLOCK (object); |
| g_cond_broadcast (&self->cond); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| static void |
| gst_shm_sink_get_property (GObject * object, guint prop_id, |
| GValue * value, GParamSpec * pspec) |
| { |
| GstShmSink *self = GST_SHM_SINK (object); |
| |
| GST_OBJECT_LOCK (object); |
| |
| switch (prop_id) { |
| case PROP_SOCKET_PATH: |
| g_value_set_string (value, self->socket_path); |
| break; |
| case PROP_PERMS: |
| g_value_set_uint (value, self->perms); |
| break; |
| case PROP_SHM_SIZE: |
| g_value_set_uint (value, self->size); |
| break; |
| case PROP_WAIT_FOR_CONNECTION: |
| g_value_set_boolean (value, self->wait_for_connection); |
| break; |
| case PROP_BUFFER_TIME: |
| g_value_set_int64 (value, self->buffer_time); |
| break; |
| default: |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
| break; |
| } |
| |
| GST_OBJECT_UNLOCK (object); |
| } |
| |
| |
| |
| static gboolean |
| gst_shm_sink_start (GstBaseSink * bsink) |
| { |
| GstShmSink *self = GST_SHM_SINK (bsink); |
| GError *err = NULL; |
| |
| self->stop = FALSE; |
| |
| if (!self->socket_path) { |
| GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, |
| ("Could not open socket."), (NULL)); |
| return FALSE; |
| } |
| |
| GST_DEBUG_OBJECT (self, "Creating new socket at %s" |
| " with shared memory of %d bytes", self->socket_path, self->size); |
| |
| self->pipe = sp_writer_create (self->socket_path, self->size, self->perms); |
| |
| if (!self->pipe) { |
| GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, |
| ("Could not open socket."), (NULL)); |
| return FALSE; |
| } |
| |
| sp_set_data (self->pipe, self); |
| g_free (self->socket_path); |
| self->socket_path = g_strdup (sp_writer_get_path (self->pipe)); |
| |
| GST_DEBUG ("Created socket at %s", self->socket_path); |
| |
| self->poll = gst_poll_new (TRUE); |
| gst_poll_fd_init (&self->serverpollfd); |
| self->serverpollfd.fd = sp_get_fd (self->pipe); |
| gst_poll_add_fd (self->poll, &self->serverpollfd); |
| gst_poll_fd_ctl_read (self->poll, &self->serverpollfd, TRUE); |
| |
| self->pollthread = |
| g_thread_try_new ("gst-shmsink-poll-thread", pollthread_func, self, &err); |
| |
| if (!self->pollthread) |
| goto thread_error; |
| |
| self->allocator = gst_shm_sink_allocator_new (self); |
| |
| return TRUE; |
| |
| thread_error: |
| |
| sp_writer_close (self->pipe, NULL, NULL); |
| self->pipe = NULL; |
| gst_poll_free (self->poll); |
| |
| GST_ELEMENT_ERROR (self, CORE, THREAD, ("Could not start thread"), |
| ("%s", err->message)); |
| g_error_free (err); |
| return FALSE; |
| } |
| |
| |
| static gboolean |
| gst_shm_sink_stop (GstBaseSink * bsink) |
| { |
| GstShmSink *self = GST_SHM_SINK (bsink); |
| |
| self->stop = TRUE; |
| gst_poll_set_flushing (self->poll, TRUE); |
| |
| if (self->allocator) |
| gst_object_unref (self->allocator); |
| self->allocator = NULL; |
| |
| g_thread_join (self->pollthread); |
| self->pollthread = NULL; |
| |
| GST_DEBUG_OBJECT (self, "Stopping"); |
| |
| while (self->clients) { |
| struct GstShmClient *client = self->clients->data; |
| self->clients = g_list_remove (self->clients, client); |
| sp_writer_close_client (self->pipe, client->client, |
| (sp_buffer_free_callback) gst_buffer_unref, NULL); |
| g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0, |
| client->pollfd.fd); |
| g_slice_free (struct GstShmClient, client); |
| } |
| |
| gst_poll_free (self->poll); |
| self->poll = NULL; |
| |
| sp_writer_close (self->pipe, NULL, NULL); |
| self->pipe = NULL; |
| |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_shm_sink_can_render (GstShmSink * self, GstClockTime time) |
| { |
| ShmBuffer *b; |
| |
| if (time == GST_CLOCK_TIME_NONE || self->buffer_time == GST_CLOCK_TIME_NONE) |
| return TRUE; |
| |
| b = sp_writer_get_pending_buffers (self->pipe); |
| for (; b != NULL; b = sp_writer_get_next_buffer (b)) { |
| GstBuffer *buf = sp_writer_buf_get_tag (b); |
| if (GST_CLOCK_DIFF (time, GST_BUFFER_PTS (buf)) > self->buffer_time) |
| return FALSE; |
| } |
| |
| return TRUE; |
| } |
| |
| static GstFlowReturn |
| gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) |
| { |
| GstShmSink *self = GST_SHM_SINK (bsink); |
| int rv = 0; |
| GstMapInfo map; |
| gboolean need_new_memory = FALSE; |
| GstFlowReturn ret = GST_FLOW_OK; |
| GstMemory *memory = NULL; |
| GstBuffer *sendbuf = NULL; |
| gsize written_bytes; |
| |
| GST_OBJECT_LOCK (self); |
| while (self->wait_for_connection && !self->clients) { |
| g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); |
| if (self->unlock) { |
| GST_OBJECT_UNLOCK (self); |
| ret = gst_base_sink_wait_preroll (bsink); |
| if (ret == GST_FLOW_OK) |
| GST_OBJECT_LOCK (self); |
| else |
| return ret; |
| } |
| } |
| |
| while (!gst_shm_sink_can_render (self, GST_BUFFER_TIMESTAMP (buf))) { |
| g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); |
| if (self->unlock) { |
| GST_OBJECT_UNLOCK (self); |
| ret = gst_base_sink_wait_preroll (bsink); |
| if (ret == GST_FLOW_OK) |
| GST_OBJECT_LOCK (self); |
| else |
| return ret; |
| } |
| } |
| |
| |
| if (gst_buffer_n_memory (buf) > 1) { |
| GST_LOG_OBJECT (self, "Buffer %p has %d GstMemory, we only support a single" |
| " one, need to do a memcpy", buf, gst_buffer_n_memory (buf)); |
| need_new_memory = TRUE; |
| } else { |
| memory = gst_buffer_peek_memory (buf, 0); |
| |
| if (memory->allocator != GST_ALLOCATOR (self->allocator)) { |
| need_new_memory = TRUE; |
| GST_LOG_OBJECT (self, "Memory in buffer %p was not allocated by " |
| "%" GST_PTR_FORMAT ", will memcpy", buf, memory->allocator); |
| } |
| } |
| |
| if (need_new_memory) { |
| if (gst_buffer_get_size (buf) > sp_writer_get_max_buf_size (self->pipe)) { |
| gsize area_size = sp_writer_get_max_buf_size (self->pipe); |
| GST_ELEMENT_ERROR (self, RESOURCE, NO_SPACE_LEFT, (NULL), |
| ("Shared memory area of size %" G_GSIZE_FORMAT " is smaller than" |
| "buffer of size %" G_GSIZE_FORMAT, area_size, |
| gst_buffer_get_size (buf))); |
| goto error; |
| } |
| |
| while ((memory = |
| gst_shm_sink_allocator_alloc_locked (self->allocator, |
| gst_buffer_get_size (buf), &self->params)) == NULL) { |
| g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); |
| if (self->unlock) { |
| GST_OBJECT_UNLOCK (self); |
| ret = gst_base_sink_wait_preroll (bsink); |
| if (ret == GST_FLOW_OK) |
| GST_OBJECT_LOCK (self); |
| else |
| return ret; |
| } |
| } |
| |
| while (self->wait_for_connection && !self->clients) { |
| g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); |
| if (self->unlock) { |
| GST_OBJECT_UNLOCK (self); |
| ret = gst_base_sink_wait_preroll (bsink); |
| if (ret == GST_FLOW_OK) { |
| GST_OBJECT_LOCK (self); |
| } else { |
| gst_memory_unref (memory); |
| return ret; |
| } |
| } |
| } |
| |
| if (!gst_memory_map (memory, &map, GST_MAP_WRITE)) { |
| GST_ELEMENT_ERROR (self, STREAM, FAILED, |
| (NULL), ("Failed to map memory")); |
| goto error; |
| } |
| |
| GST_DEBUG_OBJECT (self, |
| "Copying %" G_GSIZE_FORMAT " bytes into map of size %" G_GSIZE_FORMAT |
| " bytes.", gst_buffer_get_size (buf), map.size); |
| written_bytes = gst_buffer_extract (buf, 0, map.data, map.size); |
| GST_DEBUG_OBJECT (self, "Copied %" G_GSIZE_FORMAT " bytes.", written_bytes); |
| gst_memory_unmap (memory, &map); |
| |
| sendbuf = gst_buffer_new (); |
| if (!gst_buffer_copy_into (sendbuf, buf, GST_BUFFER_COPY_METADATA, 0, -1)) { |
| GST_ELEMENT_ERROR (self, STREAM, FAILED, |
| (NULL), ("Failed to copy data into send buffer")); |
| gst_buffer_unref (sendbuf); |
| goto error; |
| } |
| gst_buffer_append_memory (sendbuf, memory); |
| } else { |
| sendbuf = gst_buffer_ref (buf); |
| } |
| |
| if (!gst_buffer_map (sendbuf, &map, GST_MAP_READ)) { |
| GST_ELEMENT_ERROR (self, STREAM, FAILED, |
| (NULL), ("Failed to map data into send buffer")); |
| goto error; |
| } |
| |
| /* Make the memory readonly as of now as we've sent it to the other side |
| * We know it's not mapped for writing anywhere as we just mapped it for |
| * reading |
| */ |
| rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf); |
| if (rv == -1) { |
| GST_ELEMENT_ERROR (self, STREAM, FAILED, |
| (NULL), ("Failed to send data over SHM")); |
| gst_buffer_unmap (sendbuf, &map); |
| goto error; |
| } |
| |
| gst_buffer_unmap (sendbuf, &map); |
| |
| GST_OBJECT_UNLOCK (self); |
| |
| if (rv == 0) { |
| GST_DEBUG_OBJECT (self, "No clients connected, unreffing buffer"); |
| gst_buffer_unref (sendbuf); |
| } |
| |
| return ret; |
| |
| error: |
| GST_OBJECT_UNLOCK (self); |
| return GST_FLOW_ERROR; |
| } |
| |
| static void |
| free_buffer_locked (GstBuffer * buffer, void *data) |
| { |
| GSList **list = data; |
| |
| g_assert (buffer != NULL); |
| |
| *list = g_slist_prepend (*list, buffer); |
| } |
| |
| static gpointer |
| pollthread_func (gpointer data) |
| { |
| GstShmSink *self = GST_SHM_SINK (data); |
| GList *item; |
| GstClockTime timeout = GST_CLOCK_TIME_NONE; |
| int rv = 0; |
| |
| while (!self->stop) { |
| |
| do { |
| rv = gst_poll_wait (self->poll, timeout); |
| } while (rv < 0 && errno == EINTR); |
| |
| if (rv < 0) { |
| GST_ELEMENT_ERROR (self, RESOURCE, READ, |
| ("Failed waiting on fd activity"), |
| ("gst_poll_wait returned %d, errno: %d", rv, errno)); |
| return NULL; |
| } |
| |
| timeout = GST_CLOCK_TIME_NONE; |
| |
| if (self->stop) |
| return NULL; |
| |
| if (gst_poll_fd_has_closed (self->poll, &self->serverpollfd)) { |
| GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed read from shmsink"), |
| ("Control socket has closed")); |
| return NULL; |
| } |
| |
| if (gst_poll_fd_has_error (self->poll, &self->serverpollfd)) { |
| GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsink"), |
| ("Control socket has error")); |
| return NULL; |
| } |
| |
| if (gst_poll_fd_can_read (self->poll, &self->serverpollfd)) { |
| ShmClient *client; |
| struct GstShmClient *gclient; |
| |
| GST_OBJECT_LOCK (self); |
| client = sp_writer_accept_client (self->pipe); |
| GST_OBJECT_UNLOCK (self); |
| |
| if (!client) { |
| GST_ELEMENT_ERROR (self, RESOURCE, READ, |
| ("Failed to read from shmsink"), |
| ("Control socket returns wrong data")); |
| return NULL; |
| } |
| |
| gclient = g_slice_new (struct GstShmClient); |
| gclient->client = client; |
| gst_poll_fd_init (&gclient->pollfd); |
| gclient->pollfd.fd = sp_writer_get_client_fd (client); |
| gst_poll_add_fd (self->poll, &gclient->pollfd); |
| gst_poll_fd_ctl_read (self->poll, &gclient->pollfd, TRUE); |
| self->clients = g_list_prepend (self->clients, gclient); |
| g_signal_emit (self, signals[SIGNAL_CLIENT_CONNECTED], 0, |
| gclient->pollfd.fd); |
| /* we need to call gst_poll_wait before calling gst_poll_* status |
| functions on that new descriptor, so restart the loop, so _wait |
| will have been called on all elements of self->poll, whether |
| they have just been added or not. */ |
| timeout = 0; |
| continue; |
| } |
| |
| again: |
| for (item = self->clients; item; item = item->next) { |
| struct GstShmClient *gclient = item->data; |
| |
| if (gst_poll_fd_has_closed (self->poll, &gclient->pollfd)) { |
| GST_WARNING_OBJECT (self, "One client is gone, closing"); |
| goto close_client; |
| } |
| |
| if (gst_poll_fd_has_error (self->poll, &gclient->pollfd)) { |
| GST_WARNING_OBJECT (self, "One client fd has error, closing"); |
| goto close_client; |
| } |
| |
| if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) { |
| int rv; |
| gpointer tag = NULL; |
| |
| GST_OBJECT_LOCK (self); |
| rv = sp_writer_recv (self->pipe, gclient->client, &tag); |
| GST_OBJECT_UNLOCK (self); |
| |
| if (rv < 0) { |
| GST_WARNING_OBJECT (self, "One client has read error," |
| " closing (retval: %d errno: %d)", rv, errno); |
| goto close_client; |
| } |
| |
| g_assert (rv == 0 || tag == NULL); |
| |
| if (rv == 0) |
| gst_buffer_unref (tag); |
| } |
| continue; |
| close_client: |
| { |
| GSList *list = NULL; |
| GST_OBJECT_LOCK (self); |
| sp_writer_close_client (self->pipe, gclient->client, |
| (sp_buffer_free_callback) free_buffer_locked, (void **) &list); |
| GST_OBJECT_UNLOCK (self); |
| g_slist_free_full (list, (GDestroyNotify) gst_buffer_unref); |
| } |
| |
| gst_poll_remove_fd (self->poll, &gclient->pollfd); |
| self->clients = g_list_remove (self->clients, gclient); |
| |
| g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0, |
| gclient->pollfd.fd); |
| g_slice_free (struct GstShmClient, gclient); |
| |
| goto again; |
| } |
| |
| g_cond_broadcast (&self->cond); |
| } |
| |
| return NULL; |
| } |
| |
| static gboolean |
| gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event) |
| { |
| GstShmSink *self = GST_SHM_SINK (bsink); |
| |
| switch (GST_EVENT_TYPE (event)) { |
| case GST_EVENT_EOS: |
| GST_OBJECT_LOCK (self); |
| while (self->wait_for_connection && sp_writer_pending_writes (self->pipe) |
| && !self->unlock) |
| g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); |
| GST_OBJECT_UNLOCK (self); |
| break; |
| default: |
| break; |
| } |
| |
| return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event); |
| } |
| |
| |
| static gboolean |
| gst_shm_sink_unlock (GstBaseSink * bsink) |
| { |
| GstShmSink *self = GST_SHM_SINK (bsink); |
| |
| GST_OBJECT_LOCK (self); |
| self->unlock = TRUE; |
| GST_OBJECT_UNLOCK (self); |
| |
| g_cond_broadcast (&self->cond); |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_shm_sink_unlock_stop (GstBaseSink * bsink) |
| { |
| GstShmSink *self = GST_SHM_SINK (bsink); |
| |
| GST_OBJECT_LOCK (self); |
| self->unlock = FALSE; |
| GST_OBJECT_UNLOCK (self); |
| |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_shm_sink_propose_allocation (GstBaseSink * sink, GstQuery * query) |
| { |
| GstShmSink *self = GST_SHM_SINK (sink); |
| |
| if (self->allocator) |
| gst_query_add_allocation_param (query, GST_ALLOCATOR (self->allocator), |
| NULL); |
| |
| return TRUE; |
| } |