| /* GStreamer |
| * Copyright (C) 2015-2017 YouView TV Ltd |
| * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk> |
| * |
| * gstipcpipelinecomm.c: |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public |
| * License along with this library; if not, write to the |
| * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, |
| * Boston, MA 02110-1301, USA. |
| */ |
| |
| #ifdef HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #include <unistd.h> |
| #include <errno.h> |
| #include <string.h> |
| #include <gst/base/gstbytewriter.h> |
| #include <gst/gstprotection.h> |
| #include "gstipcpipelinecomm.h" |
| |
| GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug); |
| #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug |
| |
| #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND) |
| |
| GQuark QUARK_ID; |
| |
| typedef enum |
| { |
| ACK_TYPE_NONE, |
| ACK_TYPE_TIMED, |
| ACK_TYPE_BLOCKING |
| } AckType; |
| |
| typedef enum |
| { |
| COMM_REQUEST_TYPE_BUFFER, |
| COMM_REQUEST_TYPE_EVENT, |
| COMM_REQUEST_TYPE_QUERY, |
| COMM_REQUEST_TYPE_STATE_CHANGE, |
| COMM_REQUEST_TYPE_MESSAGE, |
| } CommRequestType; |
| |
| typedef struct |
| { |
| guint32 id; |
| gboolean replied; |
| gboolean comm_error; |
| guint32 ret; |
| GstQuery *query; |
| CommRequestType type; |
| GCond cond; |
| } CommRequest; |
| |
| static const gchar *comm_request_ret_get_name (CommRequestType type, |
| guint32 ret); |
| static guint32 comm_request_ret_get_failure_value (CommRequestType type); |
| |
| static CommRequest * |
| comm_request_new (guint32 id, CommRequestType type, GstQuery * query) |
| { |
| CommRequest *req; |
| |
| req = g_malloc (sizeof (CommRequest)); |
| req->id = id; |
| g_cond_init (&req->cond); |
| req->replied = FALSE; |
| req->comm_error = FALSE; |
| req->query = query; |
| req->ret = comm_request_ret_get_failure_value (type); |
| req->type = type; |
| |
| return req; |
| } |
| |
| static guint32 |
| comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req, |
| AckType ack_type) |
| { |
| guint32 ret = comm_request_ret_get_failure_value (req->type); |
| guint64 end_time; |
| |
| if (ack_type == ACK_TYPE_TIMED) |
| end_time = g_get_monotonic_time () + comm->ack_time; |
| else |
| end_time = G_MAXUINT64; |
| |
| GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u", |
| req->id); |
| while (!req->replied) { |
| if (ack_type == ACK_TYPE_TIMED) { |
| if (!g_cond_wait_until (&req->cond, &comm->mutex, end_time)) |
| break; |
| } else |
| g_cond_wait (&req->cond, &comm->mutex); |
| } |
| |
| if (req->replied) { |
| ret = req->ret; |
| GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)", |
| req->id, ret, comm_request_ret_get_name (req->type, ret)); |
| } else { |
| req->comm_error = TRUE; |
| GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u", |
| req->id); |
| } |
| |
| return ret; |
| } |
| |
| static void |
| comm_request_free (CommRequest * req) |
| { |
| g_cond_clear (&req->cond); |
| g_free (req); |
| } |
| |
| static const gchar * |
| comm_request_ret_get_name (CommRequestType type, guint32 ret) |
| { |
| switch (type) { |
| case COMM_REQUEST_TYPE_BUFFER: |
| return gst_flow_get_name (ret); |
| case COMM_REQUEST_TYPE_EVENT: |
| case COMM_REQUEST_TYPE_QUERY: |
| case COMM_REQUEST_TYPE_MESSAGE: |
| return ret ? "TRUE" : "FALSE"; |
| case COMM_REQUEST_TYPE_STATE_CHANGE: |
| return gst_element_state_change_return_get_name (ret); |
| default: |
| g_assert_not_reached (); |
| } |
| } |
| |
| static guint32 |
| comm_request_ret_get_failure_value (CommRequestType type) |
| { |
| switch (type) { |
| case COMM_REQUEST_TYPE_BUFFER: |
| return GST_FLOW_COMM_ERROR; |
| case COMM_REQUEST_TYPE_EVENT: |
| case COMM_REQUEST_TYPE_MESSAGE: |
| case COMM_REQUEST_TYPE_QUERY: |
| return FALSE; |
| case COMM_REQUEST_TYPE_STATE_CHANGE: |
| return GST_STATE_CHANGE_FAILURE; |
| default: |
| g_assert_not_reached (); |
| } |
| } |
| |
| static const gchar * |
| gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type) |
| { |
| switch (type) { |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: |
| return "ACK"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: |
| return "QUERY_RESULT"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: |
| return "BUFFER"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: |
| return "EVENT"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: |
| return "SINK_MESSAGE_EVENT"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: |
| return "QUERY"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: |
| return "STATE_CHANGE"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: |
| return "STATE_LOST"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: |
| return "MESSAGE"; |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: |
| return "GERROR_MESSAGE"; |
| default: |
| return "UNKNOWN"; |
| } |
| } |
| |
| static gboolean |
| gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id, |
| GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type) |
| { |
| CommRequest *req; |
| gboolean comm_error; |
| GHashTable *waiting_ids; |
| |
| if (ack_type == ACK_TYPE_NONE) |
| return TRUE; |
| |
| req = comm_request_new (id, type, query); |
| waiting_ids = g_hash_table_ref (comm->waiting_ids); |
| g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req); |
| *ret = comm_request_wait (comm, req, ack_type); |
| comm_error = req->comm_error; |
| g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id)); |
| g_hash_table_unref (waiting_ids); |
| return !comm_error; |
| } |
| |
| static gboolean |
| write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size) |
| { |
| size_t offset; |
| gboolean ret = TRUE; |
| |
| offset = 0; |
| GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size); |
| while (size) { |
| ssize_t written = |
| write (comm->fdout, (const unsigned char *) data + offset, size); |
| if (written < 0) { |
| if (errno == EAGAIN || errno == EINTR) |
| continue; |
| GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s", |
| strerror (errno)); |
| ret = FALSE; |
| goto done; |
| } |
| size -= written; |
| offset += written; |
| } |
| |
| done: |
| return ret; |
| } |
| |
| static gboolean |
| write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw) |
| { |
| guint8 *data; |
| gboolean ret; |
| guint size; |
| |
| size = gst_byte_writer_get_size (bw); |
| data = gst_byte_writer_reset_and_get_data (bw); |
| if (!data) |
| return FALSE; |
| ret = write_to_fd_raw (comm, data, size); |
| g_free (data); |
| return ret; |
| } |
| |
| static void |
| gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id, |
| guint32 ret, CommRequestType type) |
| { |
| const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK; |
| guint32 size; |
| GstByteWriter bw; |
| |
| g_mutex_lock (&comm->mutex); |
| |
| GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id, |
| comm_request_ret_get_name (type, ret), ret); |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, id)) |
| goto write_failed; |
| size = sizeof (ret); |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, ret)) |
| goto write_failed; |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| gst_byte_writer_reset (&bw); |
| return; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| goto done; |
| } |
| |
| void |
| gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm, |
| guint32 id, GstFlowReturn ret) |
| { |
| gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, |
| COMM_REQUEST_TYPE_BUFFER); |
| } |
| |
| void |
| gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm, |
| guint32 id, gboolean ret) |
| { |
| gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, |
| COMM_REQUEST_TYPE_EVENT); |
| } |
| |
| void |
| gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm, |
| guint32 id, GstStateChangeReturn ret) |
| { |
| gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, |
| COMM_REQUEST_TYPE_STATE_CHANGE); |
| } |
| |
| void |
| gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm, |
| guint32 id, gboolean result, GstQuery * query) |
| { |
| const unsigned char payload_type = |
| GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT; |
| guint8 result8 = result; |
| guint32 size; |
| size_t len; |
| char *str = NULL; |
| guint32 type; |
| const GstStructure *structure; |
| GstByteWriter bw; |
| |
| g_mutex_lock (&comm->mutex); |
| |
| GST_TRACE_OBJECT (comm->element, |
| "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query); |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, id)) |
| goto write_failed; |
| structure = gst_query_get_structure (query); |
| if (structure) { |
| str = gst_structure_to_string (structure); |
| len = strlen (str); |
| } else { |
| str = NULL; |
| len = 0; |
| } |
| size = 1 + sizeof (guint32) + len + 1; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint8 (&bw, result8)) |
| goto write_failed; |
| type = GST_QUERY_TYPE (query); |
| if (!gst_byte_writer_put_uint32_le (&bw, type)) |
| goto write_failed; |
| if (str) { |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1)) |
| goto write_failed; |
| } else { |
| if (!gst_byte_writer_put_uint8 (&bw, 0)) |
| goto write_failed; |
| } |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| gst_byte_writer_reset (&bw); |
| g_free (str); |
| return; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| goto done; |
| } |
| |
| static gboolean |
| gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm, |
| guint32 size, GstQuery ** query) |
| { |
| gchar *end = NULL; |
| GstStructure *structure; |
| guint8 result; |
| guint32 type; |
| const guint8 *payload = NULL; |
| guint32 mapped_size = size; |
| |
| /* this should not be called if we don't have enough yet */ |
| *query = NULL; |
| g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE); |
| g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE); |
| |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| if (!payload) |
| return FALSE; |
| result = *payload++; |
| memcpy (&type, payload, sizeof (type)); |
| payload += sizeof (type); |
| |
| size -= 1 + sizeof (guint32); |
| if (size == 0) |
| goto done; |
| |
| if (payload[size - 1]) { |
| result = FALSE; |
| goto done; |
| } |
| if (*payload) { |
| structure = gst_structure_from_string ((const char *) payload, &end); |
| } else { |
| structure = NULL; |
| } |
| if (!structure) { |
| result = FALSE; |
| goto done; |
| } |
| |
| *query = gst_query_new_custom (type, structure); |
| |
| done: |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| return result; |
| } |
| |
| typedef struct |
| { |
| guint32 bytes; |
| |
| guint64 size; |
| guint32 flags; |
| guint64 api; |
| char *str; |
| } MetaBuildInfo; |
| |
| typedef struct |
| { |
| GstIpcPipelineComm *comm; |
| guint32 n_meta; |
| guint32 total_bytes; |
| MetaBuildInfo *info; |
| } MetaListRepresentation; |
| |
| static gboolean |
| build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data) |
| { |
| MetaListRepresentation *repr = user_data; |
| |
| repr->n_meta++; |
| repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo)); |
| repr->info[repr->n_meta - 1].bytes = |
| /* 4 byte bytes */ |
| 4 |
| /* 4 byte GstMetaFlags */ |
| + 4 |
| /* GstMetaInfo::api */ |
| + 4 + strlen (g_type_name ((*meta)->info->api)) + 1 |
| /* GstMetaInfo::size */ |
| + 8 |
| /* str length */ |
| + 4; |
| |
| repr->info[repr->n_meta - 1].flags = (*meta)->flags; |
| repr->info[repr->n_meta - 1].api = (*meta)->info->api; |
| repr->info[repr->n_meta - 1].size = (*meta)->info->size; |
| repr->info[repr->n_meta - 1].str = NULL; |
| |
| /* GstMeta is a base class, and actual useful classes are all different... |
| So we list a few of them we know we want and ignore the open ended rest */ |
| if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) { |
| GstProtectionMeta *m = (GstProtectionMeta *) * meta; |
| repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info); |
| repr->info[repr->n_meta - 1].bytes += |
| strlen (repr->info[repr->n_meta - 1].str) + 1; |
| GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s", |
| g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str); |
| } else { |
| GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s", |
| g_type_name ((*meta)->info->api)); |
| } |
| repr->total_bytes += repr->info[repr->n_meta - 1].bytes; |
| return TRUE; |
| } |
| |
| typedef struct |
| { |
| guint64 pts; |
| guint64 dts; |
| guint64 duration; |
| guint64 offset; |
| guint64 offset_end; |
| guint64 flags; |
| } CommBufferMetadata; |
| |
| GstFlowReturn |
| gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm, |
| GstBuffer * buffer) |
| { |
| const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER; |
| GstMapInfo map; |
| guint32 ret32 = GST_FLOW_OK; |
| guint32 size, n; |
| CommBufferMetadata meta; |
| GstFlowReturn ret; |
| MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */ |
| GstByteWriter bw; |
| |
| g_mutex_lock (&comm->mutex); |
| ++comm->send_id; |
| |
| GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT, |
| comm->send_id, buffer); |
| |
| gst_byte_writer_init (&bw); |
| |
| meta.pts = GST_BUFFER_PTS (buffer); |
| meta.dts = GST_BUFFER_DTS (buffer); |
| meta.duration = GST_BUFFER_DURATION (buffer); |
| meta.offset = GST_BUFFER_OFFSET (buffer); |
| meta.offset_end = GST_BUFFER_OFFSET_END (buffer); |
| meta.flags = GST_BUFFER_FLAGS (buffer); |
| |
| /* work out meta size */ |
| gst_buffer_foreach_meta (buffer, build_meta, &repr); |
| |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) |
| goto write_failed; |
| size = |
| gst_buffer_get_size (buffer) + sizeof (guint32) + |
| sizeof (CommBufferMetadata) + repr.total_bytes; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta))) |
| goto write_failed; |
| size = gst_buffer_get_size (buffer); |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) |
| goto map_failed; |
| ret = write_to_fd_raw (comm, map.data, map.size); |
| gst_buffer_unmap (buffer, &map); |
| if (!ret) |
| goto write_failed; |
| |
| /* meta */ |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta)) |
| goto write_failed; |
| for (n = 0; n < repr.n_meta; ++n) { |
| const MetaBuildInfo *info = repr.info + n; |
| guint32 len; |
| const char *s; |
| |
| if (!gst_byte_writer_put_uint32_le (&bw, info->bytes)) |
| goto write_failed; |
| |
| if (!gst_byte_writer_put_uint32_le (&bw, info->flags)) |
| goto write_failed; |
| |
| s = g_type_name (info->api); |
| len = strlen (s) + 1; |
| if (!gst_byte_writer_put_uint32_le (&bw, len)) |
| goto write_failed; |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len)) |
| goto write_failed; |
| |
| if (!gst_byte_writer_put_uint64_le (&bw, info->size)) |
| goto write_failed; |
| |
| s = info->str; |
| len = s ? (strlen (s) + 1) : 0; |
| if (!gst_byte_writer_put_uint32_le (&bw, len)) |
| goto write_failed; |
| if (len) |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len)) |
| goto write_failed; |
| } |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, |
| ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER)) |
| goto wait_failed; |
| ret = ret32; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| gst_byte_writer_reset (&bw); |
| for (n = 0; n < repr.n_meta; ++n) |
| g_free (repr.info[n].str); |
| g_free (repr.info); |
| return ret; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| ret = GST_FLOW_COMM_ERROR; |
| goto done; |
| |
| wait_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to wait for reply on socket")); |
| ret = GST_FLOW_COMM_ERROR; |
| goto done; |
| |
| map_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), |
| ("Failed to map buffer")); |
| ret = GST_FLOW_ERROR; |
| goto done; |
| } |
| |
| static GstBuffer * |
| gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size) |
| { |
| GstBuffer *buffer; |
| CommBufferMetadata meta; |
| guint32 n_meta, n; |
| const guint8 *payload = NULL; |
| guint32 mapped_size, buffer_data_size; |
| |
| /* this should not be called if we don't have enough yet */ |
| g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); |
| g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL); |
| |
| mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size); |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| if (!payload) |
| return NULL; |
| memcpy (&meta, payload, sizeof (CommBufferMetadata)); |
| payload += sizeof (CommBufferMetadata); |
| memcpy (&buffer_data_size, payload, sizeof (buffer_data_size)); |
| size -= mapped_size; |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| |
| if (buffer_data_size == 0) { |
| buffer = gst_buffer_new (); |
| } else { |
| buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size); |
| gst_adapter_flush (comm->adapter, buffer_data_size); |
| } |
| size -= buffer_data_size; |
| |
| GST_BUFFER_PTS (buffer) = meta.pts; |
| GST_BUFFER_DTS (buffer) = meta.dts; |
| GST_BUFFER_DURATION (buffer) = meta.duration; |
| GST_BUFFER_OFFSET (buffer) = meta.offset; |
| GST_BUFFER_OFFSET_END (buffer) = meta.offset_end; |
| GST_BUFFER_FLAGS (buffer) = meta.flags; |
| |
| /* If you don't call that, the GType isn't yet known at the |
| g_type_from_name below */ |
| gst_protection_meta_get_info (); |
| |
| mapped_size = size; |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| if (!payload) { |
| gst_buffer_unref (buffer); |
| return NULL; |
| } |
| memcpy (&n_meta, payload, sizeof (n_meta)); |
| payload += sizeof (n_meta); |
| |
| for (n = 0; n < n_meta; ++n) { |
| guint32 flags, len, bytes; |
| guint64 msize; |
| GType api; |
| GstMeta *meta; |
| GstStructure *structure = NULL; |
| |
| memcpy (&bytes, payload, sizeof (bytes)); |
| payload += sizeof (bytes); |
| |
| #define READ_FIELD(f) do { \ |
| memcpy (&f, payload, sizeof (f)); \ |
| payload += sizeof(f); \ |
| } while(0) |
| |
| READ_FIELD (flags); |
| READ_FIELD (len); |
| api = g_type_from_name ((const char *) payload); |
| payload = (const guint8 *) strchr ((const char *) payload, 0) + 1; |
| READ_FIELD (msize); |
| READ_FIELD (len); |
| if (len) { |
| structure = gst_structure_new_from_string ((const char *) payload); |
| payload += len + 1; |
| } |
| |
| /* Seems we can add a meta from the api nor type ? */ |
| if (api == GST_PROTECTION_META_API_TYPE) { |
| meta = |
| gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL); |
| ((GstProtectionMeta *) meta)->info = structure; |
| } else { |
| GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s", |
| g_type_name (api)); |
| if (structure) |
| gst_structure_free (structure); |
| } |
| |
| #undef READ_FIELD |
| |
| } |
| |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| |
| return buffer; |
| } |
| |
| static gboolean |
| gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm, |
| GstEvent * event) |
| { |
| const unsigned char payload_type = |
| GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT; |
| gboolean ret; |
| guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen; |
| char *str = NULL; |
| const GstStructure *structure; |
| GstMessage *message = NULL; |
| const char *name; |
| GstByteWriter bw; |
| |
| g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE, |
| FALSE); |
| |
| g_mutex_lock (&comm->mutex); |
| ++comm->send_id; |
| |
| GST_TRACE_OBJECT (comm->element, |
| "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event); |
| |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) |
| goto write_failed; |
| name = gst_structure_get_name (gst_event_get_structure (event)); |
| slen = strlen (name) + 1; |
| gst_event_parse_sink_message (event, &message); |
| structure = gst_message_get_structure (message); |
| if (structure) { |
| str = gst_structure_to_string (structure); |
| structure_slen = strlen (str); |
| } else { |
| str = NULL; |
| structure_slen = 0; |
| } |
| size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) + |
| strlen (name) + 1 + structure_slen + 1; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| |
| type = GST_MESSAGE_TYPE (message); |
| if (!gst_byte_writer_put_uint32_le (&bw, type)) |
| goto write_failed; |
| size -= sizeof (type); |
| |
| eseqnum = GST_EVENT_SEQNUM (event); |
| if (!gst_byte_writer_put_uint32_le (&bw, eseqnum)) |
| goto write_failed; |
| size -= sizeof (eseqnum); |
| |
| mseqnum = GST_MESSAGE_SEQNUM (message); |
| if (!gst_byte_writer_put_uint32_le (&bw, mseqnum)) |
| goto write_failed; |
| size -= sizeof (mseqnum); |
| |
| if (!gst_byte_writer_put_uint32_le (&bw, slen)) |
| goto write_failed; |
| size -= sizeof (slen); |
| |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen)) |
| goto write_failed; |
| size -= slen; |
| |
| if (str) { |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size)) |
| goto write_failed; |
| } else { |
| if (!gst_byte_writer_put_uint8 (&bw, 0)) |
| goto write_failed; |
| } |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, |
| GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED, |
| COMM_REQUEST_TYPE_EVENT)) |
| goto write_failed; |
| |
| ret = ret32; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| gst_byte_writer_reset (&bw); |
| g_free (str); |
| if (message) |
| gst_message_unref (message); |
| return ret; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| ret = FALSE; |
| goto done; |
| } |
| |
| static GstEvent * |
| gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm, |
| guint32 size) |
| { |
| GstMessage *message; |
| GstEvent *event = NULL; |
| gchar *end = NULL; |
| GstStructure *structure; |
| guint32 type, eseqnum, mseqnum, slen; |
| const char *name; |
| guint32 mapped_size = size; |
| const guint8 *payload; |
| |
| /* this should not be called if we don't have enough yet */ |
| g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); |
| g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL); |
| |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| if (!payload) |
| return NULL; |
| memcpy (&type, payload, sizeof (type)); |
| payload += sizeof (type); |
| size -= sizeof (type); |
| if (size == 0) |
| goto done; |
| |
| memcpy (&eseqnum, payload, sizeof (eseqnum)); |
| payload += sizeof (eseqnum); |
| size -= sizeof (eseqnum); |
| if (size == 0) |
| goto done; |
| |
| memcpy (&mseqnum, payload, sizeof (mseqnum)); |
| payload += sizeof (mseqnum); |
| size -= sizeof (mseqnum); |
| if (size == 0) |
| goto done; |
| |
| memcpy (&slen, payload, sizeof (slen)); |
| payload += sizeof (slen); |
| size -= sizeof (slen); |
| if (size == 0) |
| goto done; |
| |
| if (payload[slen - 1]) |
| goto done; |
| name = (const char *) payload; |
| payload += slen; |
| size -= slen; |
| |
| if ((payload)[size - 1]) { |
| goto done; |
| } |
| if (*payload) { |
| structure = gst_structure_from_string ((const char *) payload, &end); |
| } else { |
| structure = NULL; |
| } |
| |
| message = |
| gst_message_new_custom (type, GST_OBJECT (comm->element), structure); |
| gst_message_set_seqnum (message, mseqnum); |
| event = gst_event_new_sink_message (name, message); |
| gst_event_set_seqnum (event, eseqnum); |
| gst_message_unref (message); |
| |
| done: |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| return event; |
| } |
| |
| gboolean |
| gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm, |
| gboolean upstream, GstEvent * event) |
| { |
| const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT; |
| gboolean ret; |
| guint32 type, size, ret32 = TRUE, seqnum, slen; |
| char *str = NULL; |
| const GstStructure *structure; |
| GstByteWriter bw; |
| |
| /* we special case sink-message event as gst can't serialize/de-serialize it */ |
| if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE) |
| return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event); |
| |
| g_mutex_lock (&comm->mutex); |
| ++comm->send_id; |
| |
| GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT, |
| comm->send_id, event); |
| |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) |
| goto write_failed; |
| structure = gst_event_get_structure (event); |
| if (structure) { |
| |
| if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) { |
| GstStructure *s = gst_structure_copy (structure); |
| gst_structure_remove_field (s, "stream"); |
| str = gst_structure_to_string (s); |
| gst_structure_free (s); |
| } else { |
| str = gst_structure_to_string (structure); |
| } |
| |
| slen = strlen (str); |
| } else { |
| str = NULL; |
| slen = 0; |
| } |
| size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| |
| type = GST_EVENT_TYPE (event); |
| if (!gst_byte_writer_put_uint32_le (&bw, type)) |
| goto write_failed; |
| |
| seqnum = GST_EVENT_SEQNUM (event); |
| if (!gst_byte_writer_put_uint32_le (&bw, seqnum)) |
| goto write_failed; |
| |
| if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0)) |
| goto write_failed; |
| |
| if (str) { |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1)) |
| goto write_failed; |
| } else { |
| if (!gst_byte_writer_put_uint8 (&bw, 0)) |
| goto write_failed; |
| } |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| /* Upstream events get serialized, this is required to send seeks only |
| * one at a time. */ |
| if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, |
| (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ? |
| ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT)) |
| goto write_failed; |
| ret = ret32; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| g_free (str); |
| gst_byte_writer_reset (&bw); |
| return ret; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| ret = FALSE; |
| goto done; |
| } |
| |
| static GstEvent * |
| gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size, |
| gboolean * upstream) |
| { |
| GstEvent *event = NULL; |
| gchar *end = NULL; |
| GstStructure *structure; |
| guint32 type, seqnum; |
| guint32 mapped_size = size; |
| const guint8 *payload; |
| |
| /* this should not be called if we don't have enough yet */ |
| g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); |
| g_return_val_if_fail (size >= sizeof (type), NULL); |
| |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| if (!payload) |
| return NULL; |
| |
| memcpy (&type, payload, sizeof (type)); |
| payload += sizeof (type); |
| size -= sizeof (type); |
| if (size == 0) |
| goto done; |
| |
| memcpy (&seqnum, payload, sizeof (seqnum)); |
| payload += sizeof (seqnum); |
| size -= sizeof (seqnum); |
| if (size == 0) |
| goto done; |
| |
| *upstream = (*payload) ? TRUE : FALSE; |
| payload += 1; |
| size -= 1; |
| if (size == 0) |
| goto done; |
| |
| if (payload[size - 1]) |
| goto done; |
| if (*payload) { |
| structure = gst_structure_from_string ((const char *) payload, &end); |
| } else { |
| structure = NULL; |
| } |
| |
| event = gst_event_new_custom (type, structure); |
| gst_event_set_seqnum (event, seqnum); |
| |
| done: |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| return event; |
| } |
| |
| gboolean |
| gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm, |
| gboolean upstream, GstQuery * query) |
| { |
| const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY; |
| gboolean ret; |
| guint32 type, size, ret32 = TRUE, slen; |
| char *str = NULL; |
| const GstStructure *structure; |
| GstByteWriter bw; |
| |
| g_mutex_lock (&comm->mutex); |
| ++comm->send_id; |
| |
| GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT, |
| comm->send_id, query); |
| |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) |
| goto write_failed; |
| structure = gst_query_get_structure (query); |
| if (structure) { |
| str = gst_structure_to_string (structure); |
| slen = strlen (str); |
| } else { |
| str = NULL; |
| slen = 0; |
| } |
| size = sizeof (type) + 1 + slen + 1; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| |
| type = GST_QUERY_TYPE (query); |
| if (!gst_byte_writer_put_uint32_le (&bw, type)) |
| goto write_failed; |
| |
| if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0)) |
| goto write_failed; |
| |
| if (str) { |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1)) |
| goto write_failed; |
| } else { |
| if (!gst_byte_writer_put_uint8 (&bw, 0)) |
| goto write_failed; |
| } |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32, |
| GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED, |
| COMM_REQUEST_TYPE_QUERY)) |
| goto write_failed; |
| |
| ret = ret32; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| g_free (str); |
| gst_byte_writer_reset (&bw); |
| return ret; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| ret = FALSE; |
| goto done; |
| } |
| |
| static GstQuery * |
| gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size, |
| gboolean * upstream) |
| { |
| GstQuery *query = NULL; |
| gchar *end = NULL; |
| GstStructure *structure; |
| guint32 type; |
| guint32 mapped_size = size; |
| const guint8 *payload; |
| |
| /* this should not be called if we don't have enough yet */ |
| g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); |
| g_return_val_if_fail (size >= sizeof (type), NULL); |
| |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| if (!payload) |
| return NULL; |
| |
| memcpy (&type, payload, sizeof (type)); |
| payload += sizeof (type); |
| size -= sizeof (type); |
| if (size == 0) |
| goto done; |
| |
| *upstream = (*payload) ? TRUE : FALSE; |
| payload += 1; |
| size -= 1; |
| if (size == 0) |
| goto done; |
| |
| if (payload[size - 1]) |
| goto done; |
| if (*payload) { |
| structure = gst_structure_from_string ((const char *) payload, &end); |
| } else { |
| structure = NULL; |
| } |
| |
| query = gst_query_new_custom (type, structure); |
| |
| /* CAPS queries contain a filter field, of GstCaps type, which can be NULL. |
| This does not play well with the serialization/deserialization system, |
| which will give us a non-NULL GstCaps which has a value of NULL. This |
| in turn wreaks havoc with any code that tests whether filter is NULL |
| (which basically means, am I being given an optional GstCaps ?). |
| So we look for non-NULL GstCaps which have NULL contents, and replace |
| them with NULL instead. */ |
| if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) { |
| GstCaps *filter; |
| gst_query_parse_caps (query, &filter); |
| if (filter |
| && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)), |
| "NULL")) { |
| gst_query_unref (query); |
| query = gst_query_new_caps (NULL); |
| } |
| } |
| |
| done: |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| return query; |
| } |
| |
| GstStateChangeReturn |
| gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm, |
| GstStateChange transition) |
| { |
| const unsigned char payload_type = |
| GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE; |
| GstStateChangeReturn ret; |
| guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS; |
| GstByteWriter bw; |
| |
| g_mutex_lock (&comm->mutex); |
| ++comm->send_id; |
| |
| GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s", |
| comm->send_id, |
| gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), |
| gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); |
| |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) |
| goto write_failed; |
| size = sizeof (transition); |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, transition)) |
| goto write_failed; |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, |
| ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE)) |
| goto write_failed; |
| ret = ret32; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| gst_byte_writer_reset (&bw); |
| return ret; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| ret = GST_STATE_CHANGE_FAILURE; |
| goto done; |
| } |
| |
| static gboolean |
| is_valid_state_change (GstStateChange transition) |
| { |
| if (transition == GST_STATE_CHANGE_NULL_TO_READY) |
| return TRUE; |
| if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) |
| return TRUE; |
| if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING) |
| return TRUE; |
| if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED) |
| return TRUE; |
| if (transition == GST_STATE_CHANGE_PAUSED_TO_READY) |
| return TRUE; |
| if (transition == GST_STATE_CHANGE_READY_TO_NULL) |
| return TRUE; |
| if (GST_STATE_TRANSITION_CURRENT (transition) == |
| GST_STATE_TRANSITION_NEXT (transition)) |
| return TRUE; |
| return FALSE; |
| } |
| |
| static gboolean |
| gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm, |
| guint32 size, guint32 * transition) |
| { |
| guint32 mapped_size = size; |
| const guint8 *payload; |
| |
| /* this should not be called if we don't have enough yet */ |
| g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE); |
| g_return_val_if_fail (size >= sizeof (*transition), FALSE); |
| |
| payload = gst_adapter_map (comm->adapter, size); |
| if (!payload) |
| return FALSE; |
| memcpy (transition, payload, sizeof (*transition)); |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| return is_valid_state_change (*transition); |
| } |
| |
| void |
| gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm) |
| { |
| const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST; |
| guint32 size; |
| GstByteWriter bw; |
| |
| g_mutex_lock (&comm->mutex); |
| ++comm->send_id; |
| |
| GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id); |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) |
| goto write_failed; |
| size = 0; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| gst_byte_writer_reset (&bw); |
| return; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| goto done; |
| } |
| |
| static gboolean |
| gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size) |
| { |
| /* no payload */ |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm, |
| GstMessage * message) |
| { |
| const unsigned char payload_type = |
| GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE; |
| gboolean ret; |
| guint32 code, size, ret32 = TRUE; |
| char *str = NULL; |
| GError *error; |
| char *extra_message; |
| const char *domain_string; |
| unsigned char msgtype; |
| GstByteWriter bw; |
| |
| g_mutex_lock (&comm->mutex); |
| ++comm->send_id; |
| |
| if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) { |
| gst_message_parse_error (message, &error, &extra_message); |
| msgtype = 2; |
| } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) { |
| gst_message_parse_warning (message, &error, &extra_message); |
| msgtype = 1; |
| } else { |
| gst_message_parse_info (message, &error, &extra_message); |
| msgtype = 0; |
| } |
| code = error->code; |
| domain_string = g_quark_to_string (error->domain); |
| GST_TRACE_OBJECT (comm->element, |
| "Writing error %u: domain %s, code %u, message %s, extra message %s", |
| comm->send_id, domain_string, error->code, error->message, extra_message); |
| |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) |
| goto write_failed; |
| |
| size = sizeof (size); |
| size += 1; |
| size += strlen (domain_string) + 1; |
| size += sizeof (code); |
| size += sizeof (size); |
| size += error->message ? strlen (error->message) + 1 : 0; |
| size += sizeof (size); |
| size += extra_message ? strlen (extra_message) + 1 : 0; |
| |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| |
| if (!gst_byte_writer_put_uint8 (&bw, msgtype)) |
| goto write_failed; |
| size = strlen (domain_string) + 1; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, code)) |
| goto write_failed; |
| size = error->message ? strlen (error->message) + 1 : 0; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| if (error->message) { |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size)) |
| goto write_failed; |
| } |
| size = extra_message ? strlen (extra_message) + 1 : 0; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| if (extra_message) { |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size)) |
| goto write_failed; |
| } |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, |
| ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE)) |
| goto write_failed; |
| |
| ret = ret32; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| g_free (str); |
| g_error_free (error); |
| g_free (extra_message); |
| gst_byte_writer_reset (&bw); |
| return ret; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| ret = FALSE; |
| goto done; |
| } |
| |
| static GstMessage * |
| gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm, |
| guint32 size) |
| { |
| GstMessage *message = NULL; |
| guint32 code; |
| GQuark domain; |
| const char *msg, *extra_message; |
| GError *error; |
| unsigned char msgtype; |
| guint32 mapped_size = size; |
| const guint8 *payload; |
| |
| /* this should not be called if we don't have enough yet */ |
| g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); |
| g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1, |
| NULL); |
| |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| if (!payload) |
| return NULL; |
| msgtype = *payload++; |
| memcpy (&size, payload, sizeof (size)); |
| payload += sizeof (size); |
| if (payload[size - 1]) |
| goto done; |
| domain = g_quark_from_string ((const char *) payload); |
| payload += size; |
| |
| memcpy (&code, payload, sizeof (code)); |
| payload += sizeof (code); |
| |
| memcpy (&size, payload, sizeof (size)); |
| payload += sizeof (size); |
| if (size) { |
| if (payload[size - 1]) |
| goto done; |
| msg = (const char *) payload; |
| } else { |
| msg = NULL; |
| } |
| payload += size; |
| |
| memcpy (&size, payload, sizeof (size)); |
| payload += sizeof (size); |
| if (size) { |
| if (payload[size - 1]) |
| goto done; |
| extra_message = (const char *) payload; |
| } else { |
| extra_message = NULL; |
| } |
| payload += size; |
| |
| error = g_error_new (domain, code, "%s", msg); |
| if (msgtype == 2) |
| message = |
| gst_message_new_error (GST_OBJECT (comm->element), error, |
| extra_message); |
| else if (msgtype == 1) |
| message = |
| gst_message_new_warning (GST_OBJECT (comm->element), error, |
| extra_message); |
| else |
| message = |
| gst_message_new_info (GST_OBJECT (comm->element), error, extra_message); |
| g_error_free (error); |
| |
| done: |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| |
| return message; |
| } |
| |
| gboolean |
| gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm, |
| GstMessage * message) |
| { |
| const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE; |
| gboolean ret; |
| guint32 type, size, ret32 = TRUE, slen; |
| char *str = NULL; |
| const GstStructure *structure; |
| GstByteWriter bw; |
| |
| /* we special case error as gst can't serialize/de-serialize it */ |
| if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR |
| || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING |
| || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO) |
| return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message); |
| |
| g_mutex_lock (&comm->mutex); |
| ++comm->send_id; |
| |
| GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT, |
| comm->send_id, message); |
| |
| gst_byte_writer_init (&bw); |
| if (!gst_byte_writer_put_uint8 (&bw, payload_type)) |
| goto write_failed; |
| if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) |
| goto write_failed; |
| structure = gst_message_get_structure (message); |
| if (structure) { |
| str = gst_structure_to_string (structure); |
| slen = strlen (str); |
| } else { |
| str = NULL; |
| slen = 0; |
| } |
| size = sizeof (type) + slen + 1; |
| if (!gst_byte_writer_put_uint32_le (&bw, size)) |
| goto write_failed; |
| |
| type = GST_MESSAGE_TYPE (message); |
| if (!gst_byte_writer_put_uint32_le (&bw, type)) |
| goto write_failed; |
| size -= sizeof (type); |
| if (str) { |
| if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size)) |
| goto write_failed; |
| } else { |
| if (!gst_byte_writer_put_uint8 (&bw, 0)) |
| goto write_failed; |
| } |
| |
| if (!write_byte_writer_to_fd (comm, &bw)) |
| goto write_failed; |
| |
| if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, |
| ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE)) |
| goto write_failed; |
| |
| ret = ret32; |
| |
| done: |
| g_mutex_unlock (&comm->mutex); |
| g_free (str); |
| gst_byte_writer_reset (&bw); |
| return ret; |
| |
| write_failed: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), |
| ("Failed to write to socket")); |
| ret = FALSE; |
| goto done; |
| } |
| |
| static GstMessage * |
| gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size) |
| { |
| GstMessage *message = NULL; |
| gchar *end = NULL; |
| GstStructure *structure; |
| guint32 type; |
| guint32 mapped_size = size; |
| const guint8 *payload; |
| |
| /* this should not be called if we don't have enough yet */ |
| g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); |
| g_return_val_if_fail (size >= sizeof (type), NULL); |
| |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| if (!payload) |
| return NULL; |
| memcpy (&type, payload, sizeof (type)); |
| payload += sizeof (type); |
| size -= sizeof (type); |
| if (size == 0) |
| goto done; |
| |
| if (payload[size - 1]) |
| goto done; |
| if (*payload) { |
| structure = gst_structure_from_string ((const char *) payload, &end); |
| } else { |
| structure = NULL; |
| } |
| |
| message = |
| gst_message_new_custom (type, GST_OBJECT (comm->element), structure); |
| |
| done: |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| |
| return message; |
| } |
| |
| void |
| gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element) |
| { |
| g_mutex_init (&comm->mutex); |
| comm->element = element; |
| comm->fdin = comm->fdout = -1; |
| comm->ack_time = DEFAULT_ACK_TIME; |
| comm->waiting_ids = |
| g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, |
| (GDestroyNotify) comm_request_free); |
| comm->adapter = gst_adapter_new (); |
| comm->poll = gst_poll_new (TRUE); |
| gst_poll_fd_init (&comm->pollFDin); |
| } |
| |
| void |
| gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm) |
| { |
| g_hash_table_destroy (comm->waiting_ids); |
| gst_object_unref (comm->adapter); |
| gst_poll_free (comm->poll); |
| g_mutex_clear (&comm->mutex); |
| } |
| |
| static void |
| cancel_request (gpointer key, gpointer value, gpointer user_data, |
| GstFlowReturn fret) |
| { |
| GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data; |
| guint32 id = GPOINTER_TO_INT (key); |
| CommRequest *req = (CommRequest *) value; |
| |
| GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id, |
| req->type); |
| req->ret = fret; |
| req->replied = TRUE; |
| g_cond_signal (&req->cond); |
| } |
| |
| static void |
| cancel_request_error (gpointer key, gpointer value, gpointer user_data) |
| { |
| CommRequest *req = (CommRequest *) value; |
| GstFlowReturn fret = comm_request_ret_get_failure_value (req->type); |
| |
| cancel_request (key, value, user_data, fret); |
| } |
| |
| void |
| gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup) |
| { |
| g_mutex_lock (&comm->mutex); |
| g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm); |
| if (cleanup) { |
| g_hash_table_unref (comm->waiting_ids); |
| comm->waiting_ids = |
| g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, |
| (GDestroyNotify) comm_request_free); |
| } |
| g_mutex_unlock (&comm->mutex); |
| } |
| |
| static gboolean |
| set_field (GQuark field_id, const GValue * value, gpointer user_data) |
| { |
| GstStructure *structure = user_data; |
| |
| gst_structure_id_set_value (structure, field_id, value); |
| |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id, |
| GstFlowReturn ret, GstQuery * query) |
| { |
| CommRequest *req; |
| |
| req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id)); |
| if (!req) { |
| GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id); |
| return FALSE; |
| } |
| |
| GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret, |
| comm_request_ret_get_name (req->type, ret), req->id); |
| req->replied = TRUE; |
| req->ret = ret; |
| if (query) { |
| if (req->query) { |
| /* We need to update the original query in place, as the caller |
| will expect the object to be the same */ |
| GstStructure *structure = gst_query_writable_structure (req->query); |
| gst_structure_remove_all_fields (structure); |
| gst_structure_foreach (gst_query_get_structure (query), set_field, |
| structure); |
| } else { |
| GST_WARNING_OBJECT (comm->element, |
| "Got query reply, but no query was in the request"); |
| } |
| } |
| g_cond_signal (&req->cond); |
| return TRUE; |
| } |
| |
| static gint |
| update_adapter (GstIpcPipelineComm * comm) |
| { |
| GstMemory *mem = NULL; |
| GstBuffer *buf; |
| GstMapInfo map; |
| ssize_t sz; |
| gint ret = 0; |
| |
| again: |
| /* update pollFDin if necessary (fdin changed or we lost our parent). |
| * we do not allow a parent-less element to communicate with its peer |
| * in order to avoid race conditions where the slave tries to change |
| * the state of its parent pipeline while it is not yet added in that |
| * pipeline. */ |
| if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) { |
| if (comm->pollFDin.fd != -1) { |
| GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d", |
| comm->pollFDin.fd); |
| gst_poll_remove_fd (comm->poll, &comm->pollFDin); |
| gst_poll_fd_init (&comm->pollFDin); |
| } |
| if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) { |
| GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin); |
| comm->pollFDin.fd = comm->fdin; |
| gst_poll_add_fd (comm->poll, &comm->pollFDin); |
| gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE); |
| } |
| } |
| |
| /* wait for activity on fdin or a flush */ |
| if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) { |
| if (errno == EAGAIN) |
| goto again; |
| /* error out, unless interrupted or flushing */ |
| if (errno != EINTR) |
| ret = (errno == EBUSY) ? 2 : 1; |
| } |
| |
| /* read from fdin if possible and push data to our adapter */ |
| if (comm->pollFDin.fd >= 0 |
| && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) { |
| if (!mem) |
| mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL); |
| |
| gst_memory_map (mem, &map, GST_MAP_WRITE); |
| sz = read (comm->pollFDin.fd, map.data, map.size); |
| gst_memory_unmap (mem, &map); |
| |
| if (sz <= 0) { |
| if (errno == EAGAIN) |
| goto again; |
| /* error out, unless interrupted */ |
| if (errno != EINTR) |
| ret = 1; |
| } else { |
| gst_memory_resize (mem, 0, sz); |
| buf = gst_buffer_new (); |
| gst_buffer_append_memory (buf, mem); |
| mem = NULL; |
| GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz); |
| gst_adapter_push (comm->adapter, buf); |
| } |
| } |
| |
| if (mem) |
| gst_memory_unref (mem); |
| |
| return ret; |
| } |
| |
| static gboolean |
| read_many (GstIpcPipelineComm * comm) |
| { |
| gboolean ret = TRUE; |
| gsize available; |
| const guint8 *payload; |
| |
| while (1) |
| switch (comm->state) { |
| case GST_IPC_PIPELINE_COMM_STATE_TYPE: |
| { |
| guint8 type; |
| guint32 mapped_size; |
| |
| available = gst_adapter_available (comm->adapter); |
| mapped_size = 1 + sizeof (gint32) * 2; |
| if (available < mapped_size) |
| goto done; |
| |
| payload = gst_adapter_map (comm->adapter, mapped_size); |
| type = *payload++; |
| g_mutex_lock (&comm->mutex); |
| memcpy (&comm->id, payload, sizeof (guint32)); |
| memcpy (&comm->payload_length, payload + 4, sizeof (guint32)); |
| g_mutex_unlock (&comm->mutex); |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, mapped_size); |
| GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u", |
| comm->id, type, comm->payload_length); |
| switch (type) { |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: |
| GST_TRACE_OBJECT (comm->element, "switching to state %s", |
| gst_ipc_pipeline_comm_data_type_get_name (type)); |
| comm->state = type; |
| break; |
| default: |
| goto out_of_sync; |
| } |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: |
| { |
| const guint8 *rets; |
| guint32 ret32; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| if (available < sizeof (guint32)) |
| goto ack_failed; |
| |
| rets = gst_adapter_map (comm->adapter, sizeof (guint32)); |
| memcpy (&ret32, rets, sizeof (ret32)); |
| gst_adapter_unmap (comm->adapter); |
| gst_adapter_flush (comm->adapter, sizeof (guint32)); |
| GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u", |
| gst_flow_get_name (ret32), comm->id); |
| |
| g_mutex_lock (&comm->mutex); |
| gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL); |
| g_mutex_unlock (&comm->mutex); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: |
| { |
| GstQuery *query = NULL; |
| gboolean qret; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| qret = |
| gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length, |
| &query); |
| |
| GST_TRACE_OBJECT (comm->element, |
| "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret, |
| query); |
| |
| g_mutex_lock (&comm->mutex); |
| gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query); |
| g_mutex_unlock (&comm->mutex); |
| |
| gst_query_unref (query); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: |
| { |
| GstBuffer *buf; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length); |
| if (!buf) |
| goto buffer_failed; |
| |
| /* set caps and push */ |
| GST_TRACE_OBJECT (comm->element, |
| "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT |
| ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT |
| ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT |
| ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), |
| GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf), |
| GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf), |
| GST_BUFFER_FLAGS (buf)); |
| |
| gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID, |
| GINT_TO_POINTER (comm->id), NULL); |
| |
| if (comm->on_buffer) |
| (*comm->on_buffer) (comm->id, buf, comm->user_data); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: |
| { |
| GstEvent *event; |
| gboolean upstream; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length, |
| &upstream); |
| if (!event) |
| goto event_failed; |
| |
| GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s", |
| event, gst_event_type_get_name (event->type)); |
| |
| gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID, |
| GINT_TO_POINTER (comm->id), NULL); |
| |
| if (comm->on_event) |
| (*comm->on_event) (comm->id, event, upstream, comm->user_data); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: |
| { |
| GstEvent *event; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| event = gst_ipc_pipeline_comm_read_sink_message_event (comm, |
| comm->payload_length); |
| if (!event) |
| goto event_failed; |
| |
| GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p", |
| event); |
| |
| gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID, |
| GINT_TO_POINTER (comm->id), NULL); |
| |
| if (comm->on_event) |
| (*comm->on_event) (comm->id, event, FALSE, comm->user_data); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: |
| { |
| GstQuery *query; |
| gboolean upstream; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length, |
| &upstream); |
| if (!query) |
| goto query_failed; |
| |
| GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s", |
| query, gst_query_type_get_name (query->type)); |
| |
| gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID, |
| GINT_TO_POINTER (comm->id), NULL); |
| |
| if (comm->on_query) |
| (*comm->on_query) (comm->id, query, upstream, comm->user_data); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: |
| { |
| guint32 transition; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| if (!gst_ipc_pipeline_comm_read_state_change (comm, |
| comm->payload_length, &transition)) |
| goto state_change_failed; |
| |
| GST_TRACE_OBJECT (comm->element, |
| "deserialized state change request: %s -> %s", |
| gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT |
| (transition)), |
| gst_element_state_get_name (GST_STATE_TRANSITION_NEXT |
| (transition))); |
| |
| if (comm->on_state_change) |
| (*comm->on_state_change) (comm->id, transition, comm->user_data); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: |
| { |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length)) |
| goto event_failed; |
| |
| GST_TRACE_OBJECT (comm->element, "deserialized state-lost"); |
| |
| if (comm->on_state_lost) |
| (*comm->on_state_lost) (comm->user_data); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: |
| { |
| GstMessage *message; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| message = gst_ipc_pipeline_comm_read_message (comm, |
| comm->payload_length); |
| if (!message) |
| goto message_failed; |
| |
| GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s", |
| message, gst_message_type_get_name (message->type)); |
| |
| if (comm->on_message) |
| (*comm->on_message) (comm->id, message, comm->user_data); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: |
| { |
| GstMessage *message; |
| |
| available = gst_adapter_available (comm->adapter); |
| if (available < comm->payload_length) |
| goto done; |
| |
| message = gst_ipc_pipeline_comm_read_gerror_message (comm, |
| comm->payload_length); |
| if (!message) |
| goto message_failed; |
| |
| GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s", |
| message, gst_message_type_get_name (message->type)); |
| |
| if (comm->on_message) |
| (*comm->on_message) (comm->id, message, comm->user_data); |
| |
| GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| break; |
| } |
| } |
| |
| done: |
| return ret; |
| |
| /* ERRORS */ |
| out_of_sync: |
| { |
| GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), |
| ("Socket out of sync")); |
| ret = FALSE; |
| goto done; |
| } |
| state_change_failed: |
| { |
| GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), |
| ("could not read state change from fd")); |
| ret = FALSE; |
| goto done; |
| } |
| ack_failed: |
| { |
| GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), |
| ("could not read ack from fd")); |
| ret = FALSE; |
| goto done; |
| } |
| buffer_failed: |
| { |
| GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), |
| ("could not read buffer from fd")); |
| ret = FALSE; |
| goto done; |
| } |
| event_failed: |
| { |
| GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), |
| ("could not read event from fd")); |
| ret = FALSE; |
| goto done; |
| } |
| message_failed: |
| { |
| GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), |
| ("could not read message from fd")); |
| ret = FALSE; |
| goto done; |
| } |
| query_failed: |
| { |
| GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), |
| ("could not read query from fd")); |
| ret = FALSE; |
| goto done; |
| } |
| } |
| |
| static gpointer |
| reader_thread (gpointer data) |
| { |
| GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data; |
| gboolean running = TRUE; |
| gint ret = 0; |
| |
| while (running) { |
| ret = update_adapter (comm); |
| switch (ret) { |
| case 1: |
| GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), |
| ("Failed to read from socket")); |
| running = FALSE; |
| break; |
| case 2: |
| GST_INFO_OBJECT (comm->element, "We're stopping, all good"); |
| running = FALSE; |
| break; |
| default: |
| read_many (comm); |
| break; |
| } |
| } |
| |
| GST_INFO_OBJECT (comm->element, "Reader thread ending"); |
| return NULL; |
| } |
| |
| gboolean |
| gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, |
| void (*on_buffer) (guint32, GstBuffer *, gpointer), |
| void (*on_event) (guint32, GstEvent *, gboolean, gpointer), |
| void (*on_query) (guint32, GstQuery *, gboolean, gpointer), |
| void (*on_state_change) (guint32, GstStateChange, gpointer), |
| void (*on_state_lost) (gpointer), |
| void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data) |
| { |
| if (comm->reader_thread) |
| return FALSE; |
| |
| comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; |
| comm->on_buffer = on_buffer; |
| comm->on_event = on_event; |
| comm->on_query = on_query; |
| comm->on_state_change = on_state_change; |
| comm->on_state_lost = on_state_lost; |
| comm->on_message = on_message; |
| comm->user_data = user_data; |
| gst_poll_set_flushing (comm->poll, FALSE); |
| comm->reader_thread = |
| g_thread_new ("reader", (GThreadFunc) reader_thread, comm); |
| return TRUE; |
| } |
| |
| void |
| gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm) |
| { |
| if (!comm->reader_thread) |
| return; |
| |
| gst_poll_set_flushing (comm->poll, TRUE); |
| g_thread_join (comm->reader_thread); |
| comm->reader_thread = NULL; |
| } |
| |
| static gchar * |
| gst_value_serialize_event (const GValue * value) |
| { |
| const GstStructure *structure; |
| GstEvent *ev; |
| gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s; |
| GValue val = G_VALUE_INIT; |
| |
| ev = g_value_get_boxed (value); |
| |
| g_value_init (&val, gst_event_type_get_type ()); |
| g_value_set_enum (&val, ev->type); |
| type = gst_value_serialize (&val); |
| g_value_unset (&val); |
| |
| g_value_init (&val, G_TYPE_UINT64); |
| g_value_set_uint64 (&val, ev->timestamp); |
| ts = gst_value_serialize (&val); |
| g_value_unset (&val); |
| |
| g_value_init (&val, G_TYPE_UINT); |
| g_value_set_uint (&val, ev->seqnum); |
| seqnum = gst_value_serialize (&val); |
| g_value_unset (&val); |
| |
| g_value_init (&val, G_TYPE_INT64); |
| g_value_set_int64 (&val, gst_event_get_running_time_offset (ev)); |
| rt_offset = gst_value_serialize (&val); |
| g_value_unset (&val); |
| |
| structure = gst_event_get_structure (ev); |
| str = gst_structure_to_string (structure); |
| str64 = g_base64_encode ((guchar *) str, strlen (str) + 1); |
| g_strdelimit (str64, "=", '_'); |
| g_free (str); |
| |
| s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64, |
| NULL); |
| |
| g_free (type); |
| g_free (ts); |
| g_free (seqnum); |
| g_free (rt_offset); |
| g_free (str64); |
| |
| return s; |
| } |
| |
| static gboolean |
| gst_value_deserialize_event (GValue * dest, const gchar * s) |
| { |
| GstEvent *ev = NULL; |
| GValue val = G_VALUE_INIT; |
| gboolean ret = FALSE; |
| gchar **fields; |
| gsize len; |
| |
| fields = g_strsplit (s, ":", -1); |
| if (g_strv_length (fields) != 5) |
| goto wrong_length; |
| |
| g_strdelimit (fields[4], "_", '='); |
| g_base64_decode_inplace (fields[4], &len); |
| |
| g_value_init (&val, gst_event_type_get_type ()); |
| if (!gst_value_deserialize (&val, fields[0])) |
| goto fail; |
| ev = gst_event_new_custom (g_value_get_enum (&val), |
| gst_structure_new_from_string (fields[4])); |
| |
| g_value_unset (&val); |
| g_value_init (&val, G_TYPE_UINT64); |
| if (!gst_value_deserialize (&val, fields[1])) |
| goto fail; |
| ev->timestamp = g_value_get_uint64 (&val); |
| |
| g_value_unset (&val); |
| g_value_init (&val, G_TYPE_UINT); |
| if (!gst_value_deserialize (&val, fields[2])) |
| goto fail; |
| ev->seqnum = g_value_get_uint (&val); |
| |
| g_value_unset (&val); |
| g_value_init (&val, G_TYPE_INT64); |
| if (!gst_value_deserialize (&val, fields[3])) |
| goto fail; |
| gst_event_set_running_time_offset (ev, g_value_get_int64 (&val)); |
| |
| g_value_take_boxed (dest, ev); |
| ev = NULL; |
| ret = TRUE; |
| |
| fail: |
| g_clear_pointer (&ev, gst_event_unref); |
| g_value_unset (&val); |
| |
| wrong_length: |
| g_strfreev (fields); |
| return ret; |
| } |
| |
| #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \ |
| G_STMT_START { \ |
| static GstValueTable gst_value = \ |
| { 0, NULL, \ |
| gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \ |
| gst_value.type = _gtype; \ |
| gst_value_register (&gst_value); \ |
| } G_STMT_END |
| |
| void |
| gst_ipc_pipeline_comm_plugin_init (void) |
| { |
| static volatile gsize once = 0; |
| |
| if (g_once_init_enter (&once)) { |
| GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0, |
| "ipc pipeline comm"); |
| QUARK_ID = g_quark_from_static_string ("ipcpipeline-id"); |
| REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event); |
| g_once_init_leave (&once, (gsize) 1); |
| } |
| } |