| /* GStreamer |
| * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com> |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public |
| * License along with this library; if not, write to the |
| * Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
| * Boston, MA 02111-1307, USA. |
| */ |
| |
| /** |
| * SECTION:element-curlsink |
| * @short_description: sink that uploads data to a server using libcurl |
| * @see_also: |
| * |
| * This is a network sink that uses libcurl as a client to upload data to |
| * a server (e.g. a HTTP/FTP server). |
| * |
| * <refsect2> |
| * <title>Example launch line (upload a JPEG file to an HTTP server)</title> |
| * |[ |
| * gst-launch filesrc location=image.jpg ! jpegparse ! curlsink \ |
| * file-name=image.jpg \ |
| * location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/ \ |
| * user=test passwd=test \ |
| * content-type=image/jpeg \ |
| * use-content-length=false |
| * ]| |
| * </refsect2> |
| */ |
| |
| #ifdef HAVE_CONFIG_H |
| #include "config.h" |
| #endif |
| |
| #include <curl/curl.h> |
| #include <string.h> |
| #include <stdio.h> |
| |
| #include <sys/socket.h> |
| #include <sys/types.h> |
| #include <netinet/in.h> |
| #include <unistd.h> |
| #include <netinet/ip.h> |
| #include <netinet/tcp.h> |
| #include <sys/stat.h> |
| #include <fcntl.h> |
| |
| #include "gstcurlbasesink.h" |
| |
| /* Default values */ |
| #define GST_CAT_DEFAULT gst_curl_base_sink_debug |
| #define DEFAULT_URL "localhost:5555" |
| #define DEFAULT_TIMEOUT 30 |
| #define DEFAULT_QOS_DSCP 0 |
| |
| #define DSCP_MIN 0 |
| #define DSCP_MAX 63 |
| |
| |
| /* Plugin specific settings */ |
| static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", |
| GST_PAD_SINK, |
| GST_PAD_ALWAYS, |
| GST_STATIC_CAPS_ANY); |
| |
| GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug); |
| |
| enum |
| { |
| PROP_0, |
| PROP_LOCATION, |
| PROP_USER_NAME, |
| PROP_USER_PASSWD, |
| PROP_FILE_NAME, |
| PROP_TIMEOUT, |
| PROP_QOS_DSCP |
| }; |
| |
| /* Object class function declarations */ |
| static void gst_curl_base_sink_finalize (GObject * gobject); |
| static void gst_curl_base_sink_set_property (GObject * object, guint prop_id, |
| const GValue * value, GParamSpec * pspec); |
| static void gst_curl_base_sink_get_property (GObject * object, guint prop_id, |
| GValue * value, GParamSpec * pspec); |
| |
| /* BaseSink class function declarations */ |
| static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink, |
| GstBuffer * buf); |
| static gboolean gst_curl_base_sink_event (GstBaseSink * bsink, |
| GstEvent * event); |
| static gboolean gst_curl_base_sink_start (GstBaseSink * bsink); |
| static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink); |
| static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink); |
| static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink); |
| |
| /* private functions */ |
| |
| static gboolean gst_curl_base_sink_transfer_setup_unlocked |
| (GstCurlBaseSink * sink); |
| static gboolean gst_curl_base_sink_transfer_start_unlocked |
| (GstCurlBaseSink * sink); |
| static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink); |
| static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size, |
| size_t nmemb, void *stream); |
| static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size, |
| size_t nmemb, void *stream); |
| static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink, |
| void *curl_ptr, size_t block_size, guint * last_chunk); |
| static int gst_curl_base_sink_transfer_socket_cb (void *clientp, |
| curl_socket_t curlfd, curlsocktype purpose); |
| static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data); |
| static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink); |
| static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink); |
| |
| static gboolean gst_curl_base_sink_wait_for_data_unlocked |
| (GstCurlBaseSink * sink); |
| static void gst_curl_base_sink_new_file_notify_unlocked |
| (GstCurlBaseSink * sink); |
| static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked |
| (GstCurlBaseSink * sink); |
| static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink); |
| static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink); |
| static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink); |
| |
| static void handle_transfer (GstCurlBaseSink * sink); |
| static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf, |
| size_t max_bytes_to_send, guint * last_chunk); |
| |
| #define parent_class gst_curl_base_sink_parent_class |
| G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK); |
| |
| static void |
| gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass) |
| { |
| GObjectClass *gobject_class = G_OBJECT_CLASS (klass); |
| GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass; |
| GstElementClass *element_class = GST_ELEMENT_CLASS (klass); |
| |
| GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0, |
| "curl base sink element"); |
| GST_DEBUG_OBJECT (klass, "class_init"); |
| |
| gst_element_class_set_details_simple (element_class, |
| "Curl base sink", |
| "Sink/Network", |
| "Upload data over the network to a server using libcurl", |
| "Patricia Muscalu <patricia@axis.com>"); |
| |
| gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event); |
| gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render); |
| gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start); |
| gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop); |
| gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock); |
| gstbasesink_class->unlock_stop = |
| GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop); |
| gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize); |
| |
| gobject_class->set_property = gst_curl_base_sink_set_property; |
| gobject_class->get_property = gst_curl_base_sink_get_property; |
| |
| klass->handle_transfer = handle_transfer; |
| klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb; |
| klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer; |
| |
| /* FIXME: check against souphttpsrc and use same names for same properties */ |
| g_object_class_install_property (gobject_class, PROP_LOCATION, |
| g_param_spec_string ("location", "Location", |
| "URI location to write to", NULL, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_USER_NAME, |
| g_param_spec_string ("user", "User name", |
| "User name to use for server authentication", NULL, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_USER_PASSWD, |
| g_param_spec_string ("passwd", "User password", |
| "User password to use for server authentication", NULL, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_FILE_NAME, |
| g_param_spec_string ("file-name", "Base file name", |
| "The base file name for the uploaded images", NULL, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_TIMEOUT, |
| g_param_spec_int ("timeout", "Timeout", |
| "Number of seconds waiting to write before timeout", |
| 0, G_MAXINT, DEFAULT_TIMEOUT, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| g_object_class_install_property (gobject_class, PROP_QOS_DSCP, |
| g_param_spec_int ("qos-dscp", |
| "QoS diff srv code point", |
| "Quality of Service, differentiated services code point (0 default)", |
| DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP, |
| G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
| |
| gst_element_class_add_pad_template (element_class, |
| gst_static_pad_template_get (&sinktemplate)); |
| } |
| |
| static void |
| gst_curl_base_sink_init (GstCurlBaseSink * sink) |
| { |
| sink->transfer_buf = g_malloc (sizeof (TransferBuffer)); |
| sink->transfer_cond = g_malloc (sizeof (TransferCondition)); |
| g_cond_init (&sink->transfer_cond->cond); |
| sink->transfer_cond->data_sent = FALSE; |
| sink->transfer_cond->data_available = FALSE; |
| sink->transfer_cond->wait_for_response = FALSE; |
| sink->timeout = DEFAULT_TIMEOUT; |
| sink->qos_dscp = DEFAULT_QOS_DSCP; |
| sink->url = g_strdup (DEFAULT_URL); |
| sink->transfer_thread_close = FALSE; |
| sink->new_file = TRUE; |
| sink->flow_ret = GST_FLOW_OK; |
| sink->is_live = FALSE; |
| } |
| |
| static void |
| gst_curl_base_sink_finalize (GObject * gobject) |
| { |
| GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject); |
| |
| GST_DEBUG ("finalizing curlsink"); |
| if (this->transfer_thread != NULL) { |
| g_thread_join (this->transfer_thread); |
| } |
| |
| gst_curl_base_sink_transfer_cleanup (this); |
| g_cond_clear (&this->transfer_cond->cond); |
| g_free (this->transfer_cond); |
| g_free (this->transfer_buf); |
| |
| g_free (this->url); |
| g_free (this->user); |
| g_free (this->passwd); |
| g_free (this->file_name); |
| if (this->fdset != NULL) { |
| gst_poll_free (this->fdset); |
| this->fdset = NULL; |
| } |
| G_OBJECT_CLASS (parent_class)->finalize (gobject); |
| } |
| |
| void |
| gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink) |
| { |
| GST_LOG ("more data to send"); |
| |
| sink->transfer_cond->data_available = TRUE; |
| sink->transfer_cond->data_sent = FALSE; |
| sink->transfer_cond->wait_for_response = TRUE; |
| g_cond_signal (&sink->transfer_cond->cond); |
| } |
| |
| void |
| gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink) |
| { |
| GST_OBJECT_LOCK (sink); |
| GST_LOG_OBJECT (sink, "setting transfer thread close flag"); |
| sink->transfer_thread_close = TRUE; |
| g_cond_signal (&sink->transfer_cond->cond); |
| GST_OBJECT_UNLOCK (sink); |
| |
| if (sink->transfer_thread != NULL) { |
| GST_LOG_OBJECT (sink, "waiting for transfer thread to finish"); |
| g_thread_join (sink->transfer_thread); |
| sink->transfer_thread = NULL; |
| } |
| } |
| |
| void |
| gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live) |
| { |
| g_return_if_fail (GST_IS_CURL_BASE_SINK (sink)); |
| |
| GST_OBJECT_LOCK (sink); |
| sink->is_live = live; |
| GST_OBJECT_UNLOCK (sink); |
| } |
| |
| gboolean |
| gst_curl_base_sink_is_live (GstCurlBaseSink * sink) |
| { |
| gboolean result; |
| |
| g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE); |
| |
| GST_OBJECT_LOCK (sink); |
| result = sink->is_live; |
| GST_OBJECT_UNLOCK (sink); |
| |
| return result; |
| } |
| |
| static GstFlowReturn |
| gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf) |
| { |
| GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink); |
| GstMapInfo map; |
| guint8 *data; |
| size_t size; |
| GstFlowReturn ret; |
| |
| GST_LOG ("enter render"); |
| |
| sink = GST_CURL_BASE_SINK (bsink); |
| gst_buffer_map (buf, &map, GST_MAP_READ); |
| data = map.data; |
| size = map.size; |
| |
| GST_OBJECT_LOCK (sink); |
| |
| /* check if the transfer thread has encountered problems while the |
| * pipeline thread was working elsewhere */ |
| if (sink->flow_ret != GST_FLOW_OK) { |
| goto done; |
| } |
| |
| g_assert (sink->transfer_cond->data_available == FALSE); |
| |
| /* if there is no transfer thread created, lets create one */ |
| if (sink->transfer_thread == NULL) { |
| if (!gst_curl_base_sink_transfer_start_unlocked (sink)) { |
| sink->flow_ret = GST_FLOW_ERROR; |
| goto done; |
| } |
| } |
| |
| /* make data available for the transfer thread and notify */ |
| sink->transfer_buf->ptr = data; |
| sink->transfer_buf->len = size; |
| sink->transfer_buf->offset = 0; |
| gst_curl_base_sink_transfer_thread_notify_unlocked (sink); |
| |
| /* wait for the transfer thread to send the data. This will be notified |
| * either when transfer is completed by the curl read callback or by |
| * the thread function if an error has occured. */ |
| gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink); |
| |
| done: |
| ret = sink->flow_ret; |
| GST_OBJECT_UNLOCK (sink); |
| gst_buffer_unmap (buf, &map); |
| |
| GST_LOG ("exit render"); |
| |
| return ret; |
| } |
| |
| static gboolean |
| gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event) |
| { |
| GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink); |
| GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); |
| |
| switch (event->type) { |
| case GST_EVENT_EOS: |
| GST_DEBUG_OBJECT (sink, "received EOS"); |
| gst_curl_base_sink_transfer_thread_close (sink); |
| gst_curl_base_sink_wait_for_response (sink); |
| break; |
| case GST_EVENT_CAPS: |
| if (klass->set_mime_type) { |
| GstCaps *caps; |
| gst_event_parse_caps (event, &caps); |
| klass->set_mime_type (sink, caps); |
| } |
| break; |
| default: |
| break; |
| } |
| |
| return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event); |
| } |
| |
| static gboolean |
| gst_curl_base_sink_start (GstBaseSink * bsink) |
| { |
| GstCurlBaseSink *sink; |
| |
| sink = GST_CURL_BASE_SINK (bsink); |
| |
| /* reset flags */ |
| sink->transfer_cond->data_sent = FALSE; |
| sink->transfer_cond->data_available = FALSE; |
| sink->transfer_cond->wait_for_response = FALSE; |
| sink->transfer_thread_close = FALSE; |
| sink->new_file = TRUE; |
| sink->flow_ret = GST_FLOW_OK; |
| |
| if ((sink->fdset = gst_poll_new (TRUE)) == NULL) { |
| GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE, |
| ("gst_poll_new failed: %s", g_strerror (errno)), (NULL)); |
| return FALSE; |
| } |
| |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_curl_base_sink_stop (GstBaseSink * bsink) |
| { |
| GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink); |
| |
| gst_curl_base_sink_transfer_thread_close (sink); |
| if (sink->fdset != NULL) { |
| gst_poll_free (sink->fdset); |
| sink->fdset = NULL; |
| } |
| |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_curl_base_sink_unlock (GstBaseSink * bsink) |
| { |
| GstCurlBaseSink *sink; |
| |
| sink = GST_CURL_BASE_SINK (bsink); |
| |
| GST_LOG_OBJECT (sink, "Flushing"); |
| gst_poll_set_flushing (sink->fdset, TRUE); |
| |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_curl_base_sink_unlock_stop (GstBaseSink * bsink) |
| { |
| GstCurlBaseSink *sink; |
| |
| sink = GST_CURL_BASE_SINK (bsink); |
| |
| GST_LOG_OBJECT (sink, "No longer flushing"); |
| gst_poll_set_flushing (sink->fdset, FALSE); |
| |
| return TRUE; |
| } |
| |
| static void |
| gst_curl_base_sink_set_property (GObject * object, guint prop_id, |
| const GValue * value, GParamSpec * pspec) |
| { |
| GstCurlBaseSink *sink; |
| GstState cur_state; |
| |
| g_return_if_fail (GST_IS_CURL_BASE_SINK (object)); |
| sink = GST_CURL_BASE_SINK (object); |
| |
| gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0); |
| if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) { |
| GST_OBJECT_LOCK (sink); |
| |
| switch (prop_id) { |
| case PROP_LOCATION: |
| g_free (sink->url); |
| sink->url = g_value_dup_string (value); |
| GST_DEBUG_OBJECT (sink, "url set to %s", sink->url); |
| break; |
| case PROP_USER_NAME: |
| g_free (sink->user); |
| sink->user = g_value_dup_string (value); |
| GST_DEBUG_OBJECT (sink, "user set to %s", sink->user); |
| break; |
| case PROP_USER_PASSWD: |
| g_free (sink->passwd); |
| sink->passwd = g_value_dup_string (value); |
| GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd); |
| break; |
| case PROP_FILE_NAME: |
| g_free (sink->file_name); |
| sink->file_name = g_value_dup_string (value); |
| GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name); |
| break; |
| case PROP_TIMEOUT: |
| sink->timeout = g_value_get_int (value); |
| GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout); |
| break; |
| case PROP_QOS_DSCP: |
| sink->qos_dscp = g_value_get_int (value); |
| gst_curl_base_sink_setup_dscp_unlocked (sink); |
| GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp); |
| break; |
| default: |
| GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id); |
| break; |
| } |
| |
| GST_OBJECT_UNLOCK (sink); |
| |
| return; |
| } |
| |
| /* in PLAYING or PAUSED state */ |
| GST_OBJECT_LOCK (sink); |
| |
| switch (prop_id) { |
| case PROP_FILE_NAME: |
| g_free (sink->file_name); |
| sink->file_name = g_value_dup_string (value); |
| GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name); |
| gst_curl_base_sink_new_file_notify_unlocked (sink); |
| break; |
| case PROP_TIMEOUT: |
| sink->timeout = g_value_get_int (value); |
| GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout); |
| break; |
| case PROP_QOS_DSCP: |
| sink->qos_dscp = g_value_get_int (value); |
| gst_curl_base_sink_setup_dscp_unlocked (sink); |
| GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp); |
| break; |
| default: |
| GST_WARNING_OBJECT (sink, "cannot set property when PLAYING"); |
| break; |
| } |
| |
| GST_OBJECT_UNLOCK (sink); |
| } |
| |
| static void |
| gst_curl_base_sink_get_property (GObject * object, guint prop_id, |
| GValue * value, GParamSpec * pspec) |
| { |
| GstCurlBaseSink *sink; |
| |
| g_return_if_fail (GST_IS_CURL_BASE_SINK (object)); |
| sink = GST_CURL_BASE_SINK (object); |
| |
| switch (prop_id) { |
| case PROP_LOCATION: |
| g_value_set_string (value, sink->url); |
| break; |
| case PROP_USER_NAME: |
| g_value_set_string (value, sink->user); |
| break; |
| case PROP_USER_PASSWD: |
| g_value_set_string (value, sink->passwd); |
| break; |
| case PROP_FILE_NAME: |
| g_value_set_string (value, sink->file_name); |
| break; |
| case PROP_TIMEOUT: |
| g_value_set_int (value, sink->timeout); |
| break; |
| case PROP_QOS_DSCP: |
| g_value_set_int (value, sink->qos_dscp); |
| break; |
| default: |
| GST_DEBUG_OBJECT (sink, "invalid property id"); |
| break; |
| } |
| } |
| |
| static gboolean |
| gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink) |
| { |
| GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); |
| |
| #ifdef DEBUG |
| curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1); |
| #endif |
| |
| curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url); |
| |
| curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout); |
| |
| /* using signals in a multithreaded application is dangeous */ |
| curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1); |
| |
| /* socket settings */ |
| curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink); |
| curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION, |
| gst_curl_base_sink_transfer_socket_cb); |
| |
| curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION, klass->transfer_read_cb); |
| curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink); |
| curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION, |
| gst_curl_base_sink_transfer_write_cb); |
| curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink); |
| |
| return TRUE; |
| } |
| |
| static gboolean |
| gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink) |
| { |
| gboolean res = FALSE; |
| GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); |
| |
| gst_curl_base_sink_transfer_set_common_options_unlocked (sink); |
| |
| /* authentication settings */ |
| if (sink->user != NULL && strlen (sink->user)) { |
| curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user); |
| curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd); |
| } |
| |
| if (klass->set_options_unlocked) { |
| res = klass->set_options_unlocked (sink); |
| } |
| |
| return res; |
| } |
| |
| static size_t |
| transfer_data_buffer (void *curl_ptr, TransferBuffer * buf, |
| size_t max_bytes_to_send, guint * last_chunk) |
| { |
| guint buf_len = buf->len; |
| size_t bytes_to_send = MIN (max_bytes_to_send, buf->len); |
| |
| memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send); |
| buf->offset = buf->offset + bytes_to_send; |
| buf->len = buf->len - bytes_to_send; |
| |
| /* the last data chunk */ |
| if (bytes_to_send == buf_len) { |
| buf->offset = 0; |
| buf->len = 0; |
| *last_chunk = 1; |
| } |
| |
| GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send); |
| |
| return bytes_to_send; |
| } |
| |
| static size_t |
| gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink, |
| void *curl_ptr, size_t block_size, guint * last_chunk) |
| { |
| TransferBuffer *buffer; |
| size_t bytes_to_send; |
| |
| buffer = sink->transfer_buf; |
| GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT, |
| buffer->len, buffer->offset); |
| |
| if (buffer->len <= 0) { |
| GST_WARNING ("got zero- or negative-length buffer"); |
| |
| return 0; |
| } |
| |
| /* more data in buffer(s) */ |
| bytes_to_send = transfer_data_buffer (curl_ptr, sink->transfer_buf, |
| block_size, last_chunk); |
| |
| return bytes_to_send; |
| } |
| |
| static size_t |
| gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb, |
| void *stream) |
| { |
| GstCurlBaseSink *sink; |
| GstCurlBaseSinkClass *klass; |
| size_t max_bytes_to_send; |
| size_t bytes_to_send; |
| guint last_chunk = 0; |
| |
| sink = (GstCurlBaseSink *) stream; |
| klass = GST_CURL_BASE_SINK_GET_CLASS (sink); |
| |
| max_bytes_to_send = size * nmemb; |
| |
| /* wait for data to come available, if new file or thread close is set |
| * then zero will be returned to indicate end of current transfer */ |
| GST_OBJECT_LOCK (sink); |
| if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) { |
| if (klass->flush_data_unlocked) { |
| bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr, |
| max_bytes_to_send, sink->new_file); |
| |
| GST_OBJECT_UNLOCK (sink); |
| |
| return bytes_to_send; |
| } |
| |
| GST_OBJECT_UNLOCK (sink); |
| GST_LOG ("returning 0, no more data to send in this file"); |
| |
| return 0; |
| } |
| |
| GST_OBJECT_UNLOCK (sink); |
| |
| bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr, |
| max_bytes_to_send, &last_chunk); |
| |
| /* the last data chunk */ |
| if (last_chunk) { |
| gst_curl_base_sink_data_sent_notify (sink); |
| } |
| |
| return bytes_to_send; |
| } |
| |
| static size_t |
| gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size, |
| size_t nmemb, void G_GNUC_UNUSED * stream) |
| { |
| GstCurlBaseSink *sink; |
| GstCurlBaseSinkClass *klass; |
| size_t realsize = size * nmemb; |
| |
| sink = (GstCurlBaseSink *) stream; |
| klass = GST_CURL_BASE_SINK_GET_CLASS (sink); |
| |
| if (klass->transfer_verify_response_code) { |
| if (!klass->transfer_verify_response_code (sink)) { |
| GST_DEBUG_OBJECT (sink, "response error"); |
| GST_OBJECT_LOCK (sink); |
| sink->flow_ret = GST_FLOW_ERROR; |
| GST_OBJECT_UNLOCK (sink); |
| } |
| } |
| |
| GST_DEBUG ("response %s", (gchar *) ptr); |
| |
| return realsize; |
| } |
| |
| CURLcode |
| gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink) |
| { |
| CURLcode code = CURLE_OK; |
| CURL *easy; |
| CURLMsg *msg; |
| gint msgs_left; |
| gchar *eff_url = NULL; |
| |
| do { |
| easy = NULL; |
| while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) { |
| if (msg->msg == CURLMSG_DONE) { |
| easy = msg->easy_handle; |
| code = msg->data.result; |
| break; |
| } |
| } |
| if (easy) { |
| curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url); |
| GST_DEBUG ("transfer done %s (%s-%d)\n", eff_url, |
| curl_easy_strerror (code), code); |
| } |
| } while (easy); |
| |
| return code; |
| } |
| |
| static void |
| handle_transfer (GstCurlBaseSink * sink) |
| { |
| GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); |
| gint retval; |
| gint activated_fds; |
| gint running_handles; |
| gint timeout; |
| CURLMcode m_code; |
| CURLcode e_code; |
| |
| GST_OBJECT_LOCK (sink); |
| timeout = sink->timeout; |
| GST_OBJECT_UNLOCK (sink); |
| |
| /* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data |
| available to send or receive - call simply curl_multi_perform before |
| poll() on more actions */ |
| do { |
| m_code = curl_multi_perform (sink->multi_handle, &running_handles); |
| } while (m_code == CURLM_CALL_MULTI_PERFORM); |
| |
| while (running_handles && (m_code == CURLM_OK)) { |
| if (klass->transfer_prepare_poll_wait) { |
| klass->transfer_prepare_poll_wait (sink); |
| } |
| |
| activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND); |
| if (G_UNLIKELY (activated_fds == -1)) { |
| if (errno == EAGAIN || errno == EINTR) { |
| GST_DEBUG_OBJECT (sink, "interrupted by signal"); |
| } else if (errno == EBUSY) { |
| GST_DEBUG_OBJECT (sink, "poll stopped"); |
| retval = GST_FLOW_EOS; |
| goto fail; |
| } else { |
| GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno)); |
| GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll failed"), (NULL)); |
| retval = GST_FLOW_ERROR; |
| goto fail; |
| } |
| } else if (G_UNLIKELY (activated_fds == 0)) { |
| GST_DEBUG_OBJECT (sink, "poll timed out"); |
| GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll timed out"), (NULL)); |
| retval = GST_FLOW_ERROR; |
| goto fail; |
| } |
| |
| /* readable/writable sockets */ |
| do { |
| m_code = curl_multi_perform (sink->multi_handle, &running_handles); |
| } while (m_code == CURLM_CALL_MULTI_PERFORM); |
| } |
| |
| if (m_code != CURLM_OK) { |
| GST_DEBUG_OBJECT (sink, "curl multi error"); |
| GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", |
| curl_multi_strerror (m_code)), (NULL)); |
| retval = GST_FLOW_ERROR; |
| goto fail; |
| } |
| |
| /* problems still might have occurred on individual transfers even when |
| * curl_multi_perform returns CURLM_OK */ |
| if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) { |
| GST_DEBUG_OBJECT (sink, "curl easy error"); |
| GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", |
| curl_easy_strerror (e_code)), (NULL)); |
| retval = GST_FLOW_ERROR; |
| goto fail; |
| } |
| |
| gst_curl_base_sink_got_response_notify (sink); |
| |
| return; |
| |
| fail: |
| GST_OBJECT_LOCK (sink); |
| if (sink->flow_ret == GST_FLOW_OK) { |
| sink->flow_ret = retval; |
| } |
| GST_OBJECT_UNLOCK (sink); |
| return; |
| } |
| |
| /* This function gets called by libcurl after the socket() call but before |
| * the connect() call. */ |
| static int |
| gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd, |
| curlsocktype G_GNUC_UNUSED purpose) |
| { |
| GstCurlBaseSink *sink; |
| gboolean ret = TRUE; |
| |
| sink = (GstCurlBaseSink *) clientp; |
| |
| g_assert (sink); |
| |
| if (curlfd < 0) { |
| /* signal an unrecoverable error to the library which will close the socket |
| and return CURLE_COULDNT_CONNECT |
| */ |
| return 1; |
| } |
| |
| gst_poll_fd_init (&sink->fd); |
| sink->fd.fd = curlfd; |
| |
| ret = ret && gst_poll_add_fd (sink->fdset, &sink->fd); |
| ret = ret && gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE); |
| ret = ret && gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE); |
| GST_DEBUG ("fd: %d", sink->fd.fd); |
| GST_OBJECT_LOCK (sink); |
| gst_curl_base_sink_setup_dscp_unlocked (sink); |
| GST_OBJECT_UNLOCK (sink); |
| |
| /* success */ |
| if (ret) { |
| return 0; |
| } else { |
| return 1; |
| } |
| } |
| |
| static gboolean |
| gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink) |
| { |
| GError *error = NULL; |
| gboolean ret = TRUE; |
| |
| GST_LOG ("creating transfer thread"); |
| sink->transfer_thread_close = FALSE; |
| sink->new_file = TRUE; |
| sink->transfer_thread = |
| g_thread_try_new ("Curl Transfer Thread", (GThreadFunc) |
| gst_curl_base_sink_transfer_thread_func, sink, &error); |
| |
| if (sink->transfer_thread == NULL || error != NULL) { |
| ret = FALSE; |
| if (error) { |
| GST_ERROR_OBJECT (sink, "could not create thread %s", error->message); |
| g_error_free (error); |
| } else { |
| GST_ERROR_OBJECT (sink, "could not create thread for unknown reason"); |
| } |
| } |
| |
| return ret; |
| } |
| |
| static gpointer |
| gst_curl_base_sink_transfer_thread_func (gpointer data) |
| { |
| GstCurlBaseSink *sink = (GstCurlBaseSink *) data; |
| GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink); |
| GstFlowReturn ret; |
| gboolean data_available; |
| |
| GST_LOG ("transfer thread started"); |
| GST_OBJECT_LOCK (sink); |
| if (!gst_curl_base_sink_transfer_setup_unlocked (sink)) { |
| GST_DEBUG_OBJECT (sink, "curl setup error"); |
| GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("curl setup error"), (NULL)); |
| sink->flow_ret = GST_FLOW_ERROR; |
| goto done; |
| } |
| |
| while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) { |
| /* we are working on a new file, clearing flag and setting a new file |
| * name */ |
| sink->new_file = FALSE; |
| |
| /* wait for data to arrive for this new file, if we get a new file name |
| * again before getting data we will simply skip transfering anything |
| * for this file and go directly to the new file */ |
| data_available = gst_curl_base_sink_wait_for_data_unlocked (sink); |
| if (data_available) { |
| if (G_UNLIKELY (!klass->set_protocol_dynamic_options_unlocked (sink))) { |
| sink->flow_ret = GST_FLOW_ERROR; |
| GST_OBJECT_UNLOCK (sink); |
| GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, ("Unexpected state."), |
| (NULL)); |
| GST_OBJECT_LOCK (sink); |
| goto done; |
| } |
| } |
| |
| /* stay unlocked while handling the actual transfer */ |
| GST_OBJECT_UNLOCK (sink); |
| |
| if (data_available) { |
| if (!gst_curl_base_sink_is_live (sink)) { |
| /* prepare transfer if needed */ |
| if (klass->prepare_transfer) { |
| GST_OBJECT_LOCK (sink); |
| if (!klass->prepare_transfer (sink)) { |
| sink->flow_ret = GST_FLOW_ERROR; |
| goto done; |
| } |
| GST_OBJECT_UNLOCK (sink); |
| } |
| curl_multi_add_handle (sink->multi_handle, sink->curl); |
| } |
| |
| /* Start driving the transfer. */ |
| klass->handle_transfer (sink); |
| |
| /* easy handle will be possibly re-used for next transfer, thus it needs to |
| * be removed from the multi stack and re-added again */ |
| if (!gst_curl_base_sink_is_live (sink)) { |
| curl_multi_remove_handle (sink->multi_handle, sink->curl); |
| } |
| } |
| |
| /* lock again before looping to check the thread closed flag */ |
| GST_OBJECT_LOCK (sink); |
| } |
| |
| if (sink->is_live) { |
| curl_multi_remove_handle (sink->multi_handle, sink->curl); |
| } |
| |
| done: |
| /* extract the error code so the lock does not have to be |
| * taken when calling the functions below that take the lock |
| * on their own */ |
| ret = sink->flow_ret; |
| GST_OBJECT_UNLOCK (sink); |
| |
| /* if there is a flow error, always notify the render function so it |
| * can return the flow error up along the pipeline. as an error has |
| * occurred there is no response to receive, so notify the event function |
| * so it doesn't block indefinitely waiting for a response. */ |
| if (ret != GST_FLOW_OK) { |
| gst_curl_base_sink_data_sent_notify (sink); |
| gst_curl_base_sink_got_response_notify (sink); |
| } |
| |
| GST_DEBUG ("exit thread func - transfer thread close flag: %d", |
| sink->transfer_thread_close); |
| |
| return NULL; |
| } |
| |
| static gboolean |
| gst_curl_base_sink_transfer_setup_unlocked (GstCurlBaseSink * sink) |
| { |
| g_assert (sink); |
| |
| if (sink->curl == NULL) { |
| /* curl_easy_init automatically calls curl_global_init(3) */ |
| if ((sink->curl = curl_easy_init ()) == NULL) { |
| g_warning ("Failed to init easy handle"); |
| return FALSE; |
| } |
| } |
| |
| if (!gst_curl_base_sink_transfer_set_options_unlocked (sink)) { |
| g_warning ("Failed to setup easy handle"); |
| GST_OBJECT_UNLOCK (sink); |
| return FALSE; |
| } |
| |
| /* init a multi stack (non-blocking interface to liburl) */ |
| if (sink->multi_handle == NULL) { |
| if ((sink->multi_handle = curl_multi_init ()) == NULL) { |
| return FALSE; |
| } |
| } |
| |
| return TRUE; |
| } |
| |
| static void |
| gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink) |
| { |
| if (sink->curl != NULL) { |
| if (sink->multi_handle != NULL) { |
| curl_multi_remove_handle (sink->multi_handle, sink->curl); |
| } |
| curl_easy_cleanup (sink->curl); |
| sink->curl = NULL; |
| } |
| |
| if (sink->multi_handle != NULL) { |
| curl_multi_cleanup (sink->multi_handle); |
| sink->multi_handle = NULL; |
| } |
| } |
| |
| static gboolean |
| gst_curl_base_sink_wait_for_data_unlocked (GstCurlBaseSink * sink) |
| { |
| gboolean data_available = FALSE; |
| |
| GST_LOG ("waiting for data"); |
| while (!sink->transfer_cond->data_available && |
| !sink->transfer_thread_close && !sink->new_file) { |
| g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink)); |
| } |
| |
| if (sink->transfer_thread_close) { |
| GST_LOG ("wait for data aborted due to thread close"); |
| } else if (sink->new_file) { |
| GST_LOG ("wait for data aborted due to new file name"); |
| } else { |
| GST_LOG ("wait for data completed"); |
| data_available = TRUE; |
| } |
| |
| return data_available; |
| } |
| |
| static void |
| gst_curl_base_sink_new_file_notify_unlocked (GstCurlBaseSink * sink) |
| { |
| GST_LOG ("new file name"); |
| sink->new_file = TRUE; |
| g_cond_signal (&sink->transfer_cond->cond); |
| } |
| |
| static void |
| gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked |
| (GstCurlBaseSink * sink) |
| { |
| GST_LOG ("waiting for buffer send to complete"); |
| |
| /* this function should not check if the transfer thread is set to be closed |
| * since that flag only can be set by the EoS event (by the pipeline thread). |
| * This can therefore never happen while this function is running since this |
| * function also is called by the pipeline thread (in the render function) */ |
| while (!sink->transfer_cond->data_sent) { |
| g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink)); |
| } |
| GST_LOG ("buffer send completed"); |
| } |
| |
| static void |
| gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink) |
| { |
| GST_LOG ("transfer completed"); |
| GST_OBJECT_LOCK (sink); |
| sink->transfer_cond->data_available = FALSE; |
| sink->transfer_cond->data_sent = TRUE; |
| g_cond_signal (&sink->transfer_cond->cond); |
| GST_OBJECT_UNLOCK (sink); |
| } |
| |
| static void |
| gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink) |
| { |
| GST_LOG ("waiting for remote to send response code"); |
| |
| GST_OBJECT_LOCK (sink); |
| while (sink->transfer_cond->wait_for_response) { |
| g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink)); |
| } |
| GST_OBJECT_UNLOCK (sink); |
| |
| GST_LOG ("response code received"); |
| } |
| |
| static void |
| gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink) |
| { |
| GST_LOG ("got response code"); |
| |
| GST_OBJECT_LOCK (sink); |
| sink->transfer_cond->wait_for_response = FALSE; |
| g_cond_signal (&sink->transfer_cond->cond); |
| GST_OBJECT_UNLOCK (sink); |
| } |
| |
| static gint |
| gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink) |
| { |
| gint tos; |
| gint af; |
| gint ret = -1; |
| union |
| { |
| struct sockaddr sa; |
| struct sockaddr_in6 sa_in6; |
| struct sockaddr_storage sa_stor; |
| } sa; |
| socklen_t slen = sizeof (sa); |
| |
| if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) { |
| GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno)); |
| return ret; |
| } |
| af = sa.sa.sa_family; |
| |
| /* if this is an IPv4-mapped address then do IPv4 QoS */ |
| if (af == AF_INET6) { |
| GST_DEBUG_OBJECT (sink, "check IP6 socket"); |
| if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) { |
| GST_DEBUG_OBJECT (sink, "mapped to IPV4"); |
| af = AF_INET; |
| } |
| } |
| /* extract and shift 6 bits of the DSCP */ |
| tos = (sink->qos_dscp & 0x3f) << 2; |
| |
| switch (af) { |
| case AF_INET: |
| ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)); |
| break; |
| case AF_INET6: |
| #ifdef IPV6_TCLASS |
| ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, |
| sizeof (tos)); |
| break; |
| #endif |
| default: |
| GST_ERROR_OBJECT (sink, "unsupported AF"); |
| break; |
| } |
| if (ret) { |
| GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno)); |
| } |
| |
| return ret; |
| } |