blob: e60ccf531d0aa71b54cd83ab29969d0e53ec5b22 [file] [log] [blame]
Sam Hurste74b3a02017-07-25 15:23:57 +01001/*
2 * GstCurlHttpSrc
3 * Copyright 2017 British Broadcasting Corporation - Research and Development
4 *
5 * Author: Sam Hurst <samuelh@rd.bbc.co.uk>
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Alternatively, the contents of this file may be used under the
26 * GNU Lesser General Public License Version 2.1 (the "LGPL"), in
27 * which case the following provisions apply instead of the ones
28 * mentioned above:
29 *
30 * This library is free software; you can redistribute it and/or
31 * modify it under the terms of the GNU Library General Public
32 * License as published by the Free Software Foundation; either
33 * version 2 of the License, or (at your option) any later version.
34 *
35 * This library is distributed in the hope that it will be useful,
36 * but WITHOUT ANY WARRANTY; without even the implied warranty of
37 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
38 * Library General Public License for more details.
39 *
40 * You should have received a copy of the GNU Library General Public
41 * License along with this library; if not, write to the
42 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
43 * Boston, MA 02111-1307, USA.
44 */
45
46/**
47 * SECTION:element-curlhttpsrc
48 *
49 * This plugin reads data from a remote location specified by a URI, when the
50 * protocol is 'http' or 'https'.
51 *
52 * It is based on the cURL project (http://curl.haxx.se/) and is specifically
53 * designed to be also used with nghttp2 (http://nghttp2.org) to enable HTTP/2
54 * support for GStreamer. Your libcurl library MUST be compiled against nghttp2
55 * for HTTP/2 support for this functionality. HTTPS support is dependent on
56 * cURL being built with SSL support (OpenSSL/PolarSSL/NSS/GnuTLS).
57 *
58 * An HTTP proxy must be specified by URL.
59 * If the "http_proxy" environment variable is set, its value is used.
60 * The #GstCurlHttpSrc:proxy property can be used to override the default.
61 *
62 * <refsect2>
63 * <title>Example launch line</title>
64 * |[
65 * gst-launch-1.0 curlhttpsrc location=http://127.0.1.1/index.html ! fakesink dump=1
66 * ]| The above pipeline reads a web page from the local machine using HTTP and
67 * dumps it to stdout.
68 * |[
69 * gst-launch-1.0 playbin uri=http://rdmedia.bbc.co.uk/dash/testmpds/multiperiod/bbb.php
70 * ]| The above pipeline will start up a DASH streaming session from the given
71 * MPD file. This requires GStreamer to have been built with dashdemux from
72 * gst-plugins-bad.
73 * </refsect2>
74 */
75
76#ifdef HAVE_CONFIG_H
77#include <config.h>
78#endif
79
Edward Hervey69d794b2017-11-21 16:30:17 +010080#include <gst/gst-i18n-plugin.h>
81
Sam Hurste74b3a02017-07-25 15:23:57 +010082#include "gstcurlhttpsrc.h"
83#include "gstcurlqueue.h"
Sam Hurstb922edc2017-07-26 17:43:19 +010084#include "gstcurldefaults.h"
Sam Hurste74b3a02017-07-25 15:23:57 +010085
86GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
87#define GST_CAT_DEFAULT gst_curl_http_src_debug
88GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
89
90/*
91 * Make a source pad template to be able to kick out recv'd data
92 */
93static GstStaticPadTemplate srcpadtemplate = GST_STATIC_PAD_TEMPLATE ("src",
94 GST_PAD_SRC,
95 GST_PAD_ALWAYS,
96 GST_STATIC_CAPS_ANY);
97
98/*
99 * Function Definitions
100 */
101/* Gstreamer generic element functions */
102static void gst_curl_http_src_set_property (GObject * object, guint prop_id,
103 const GValue * value, GParamSpec * pspec);
104static void gst_curl_http_src_get_property (GObject * object, guint prop_id,
105 GValue * value, GParamSpec * pspec);
106static void gst_curl_http_src_ref_multi (GstCurlHttpSrc * src);
107static void gst_curl_http_src_unref_multi (GstCurlHttpSrc * src);
108static void gst_curl_http_src_finalize (GObject * obj);
109static GstFlowReturn gst_curl_http_src_create (GstPushSrc * psrc,
110 GstBuffer ** outbuf);
111static GstFlowReturn gst_curl_http_src_handle_response (GstCurlHttpSrc * src);
112static gboolean gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src);
113static GstStateChangeReturn gst_curl_http_src_change_state (GstElement *
114 element, GstStateChange transition);
115static void gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src);
116static gboolean gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query);
117static gboolean gst_curl_http_src_get_content_length (GstBaseSrc * bsrc,
118 guint64 * size);
119static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc);
120static gboolean gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc);
121
122/* URI Handler functions */
123static void gst_curl_http_src_uri_handler_init (gpointer g_iface,
124 gpointer iface_data);
125static guint gst_curl_http_src_urihandler_get_type (GType type);
126static const gchar *const *gst_curl_http_src_urihandler_get_protocols (GType
127 type);
128static gchar *gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler);
129static gboolean gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
130 const gchar * uri, GError ** error);
131
132/* GstTask functions */
133static void gst_curl_http_src_curl_multi_loop (gpointer thread_data);
134static CURL *gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s);
135static inline void gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src);
136static size_t gst_curl_http_src_get_header (void *header, size_t size,
137 size_t nmemb, void *src);
138static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
139 size_t nmemb, void *src);
140static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
141static char *gst_curl_http_src_strcasestr (const char *haystack,
142 const char *needle);
143
Sam Hurstb922edc2017-07-26 17:43:19 +0100144curl_version_info_data *gst_curl_http_src_curl_capabilities;
Philippe Normand5bf092b2017-08-07 10:25:17 +0100145GstCurlHttpVersion pref_http_ver;
Sam Hurstb922edc2017-07-26 17:43:19 +0100146gchar *gst_curl_http_src_default_useragent;
147
Philippe Normand5bf092b2017-08-07 10:25:17 +0100148#define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
149static GType
150gst_curl_http_version_get_type (void)
151{
152 static GType gtype = 0;
153
154 if (!gtype) {
155 static const GEnumValue http_versions[] = {
156 {GSTCURL_HTTP_VERSION_1_0, "HTTP Version 1.0", "1.0"},
157 {GSTCURL_HTTP_VERSION_1_1, "HTTP Version 1.1", "1.1"},
158#ifdef CURL_VERSION_HTTP2
159 {GSTCURL_HTTP_VERSION_2_0, "HTTP Version 2.0", "2.0"},
160#endif
161 {0, NULL, NULL}
162 };
163 gtype = g_enum_register_static ("GstCurlHttpVersionType", http_versions);
164 }
165 return gtype;
166}
167
Sam Hurste74b3a02017-07-25 15:23:57 +0100168#define gst_curl_http_src_parent_class parent_class
169G_DEFINE_TYPE_WITH_CODE (GstCurlHttpSrc, gst_curl_http_src, GST_TYPE_PUSH_SRC,
170 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
171 gst_curl_http_src_uri_handler_init));
172
173static void
174gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
175{
176 GObjectClass *gobject_class;
177 GstElementClass *gstelement_class;
178 GstBaseSrcClass *gstbasesrc_class;
179 GstPushSrcClass *gstpushsrc_class;
180 const gchar *http_env;
Alex Ashley048d7032017-08-09 15:10:56 +0100181 GstCurlHttpVersion default_http_version;
Sam Hurste74b3a02017-07-25 15:23:57 +0100182
183 gobject_class = (GObjectClass *) klass;
184 gstelement_class = (GstElementClass *) klass;
185 gstbasesrc_class = (GstBaseSrcClass *) klass;
186 gstpushsrc_class = (GstPushSrcClass *) klass;
187
188 GST_DEBUG_CATEGORY_INIT (gst_curl_http_src_debug, "curlhttpsrc",
189 0, "UriHandler for libcURL");
190
191 GST_INFO_OBJECT (klass, "class_init started!");
192
193 gstelement_class->change_state =
194 GST_DEBUG_FUNCPTR (gst_curl_http_src_change_state);
195 gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_curl_http_src_create);
196 gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_curl_http_src_query);
197 gstbasesrc_class->get_size =
198 GST_DEBUG_FUNCPTR (gst_curl_http_src_get_content_length);
199 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock);
200 gstbasesrc_class->unlock_stop =
201 GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock_stop);
202
203 gst_element_class_add_pad_template (gstelement_class,
204 gst_static_pad_template_get (&srcpadtemplate));
205
206 gst_curl_http_src_curl_capabilities = curl_version_info (CURLVERSION_NOW);
Alex Ashley048d7032017-08-09 15:10:56 +0100207#ifdef CURL_VERSION_HTTP2
208 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
209 default_http_version = GSTCURL_HTTP_VERSION_2_0;
210 } else
211#endif
212 default_http_version = GSTCURL_HTTP_VERSION_1_1;
213
Sam Hurste74b3a02017-07-25 15:23:57 +0100214 http_env = g_getenv ("GST_CURL_HTTP_VER");
215 if (http_env != NULL) {
Philippe Normand5bf092b2017-08-07 10:25:17 +0100216 GST_INFO_OBJECT (klass, "Seen env var GST_CURL_HTTP_VER with value %s",
217 http_env);
218 if (!strcmp (http_env, "1.0")) {
219 pref_http_ver = GSTCURL_HTTP_VERSION_1_0;
220 } else if (!strcmp (http_env, "1.1")) {
221 pref_http_ver = GSTCURL_HTTP_VERSION_1_1;
222 } else if (!strcmp (http_env, "2.0")) {
223#ifdef CURL_VERSION_HTTP2
224 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
225 pref_http_ver = GSTCURL_HTTP_VERSION_2_0;
226 } else {
227 goto unsupported_http_version;
228 }
229#endif
230 } else {
231 unsupported_http_version:
232 GST_WARNING_OBJECT (klass,
233 "Unsupported HTTP version: %s. Fallback to default", http_env);
Alex Ashley048d7032017-08-09 15:10:56 +0100234 pref_http_ver = default_http_version;
Philippe Normand5bf092b2017-08-07 10:25:17 +0100235 }
Sam Hurste74b3a02017-07-25 15:23:57 +0100236 } else {
Alex Ashley048d7032017-08-09 15:10:56 +0100237 pref_http_ver = default_http_version;
Sam Hurste74b3a02017-07-25 15:23:57 +0100238 }
239
240 gst_curl_http_src_default_useragent =
241 g_strdup_printf ("GStreamer curlhttpsrc libcurl/%s",
242 gst_curl_http_src_curl_capabilities->version);
243
244 gobject_class->set_property = gst_curl_http_src_set_property;
245 gobject_class->get_property = gst_curl_http_src_get_property;
246 gobject_class->finalize = gst_curl_http_src_finalize;
247
248 g_object_class_install_property (gobject_class, PROP_URI,
249 g_param_spec_string ("location", "Location", "URI of resource to read",
250 GSTCURL_HANDLE_DEFAULT_CURLOPT_URL,
251 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
252
253 g_object_class_install_property (gobject_class, PROP_USERNAME,
254 g_param_spec_string ("user-id", "user-id",
255 "HTTP location URI user id for authentication",
256 GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME,
257 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
258
259 g_object_class_install_property (gobject_class, PROP_PASSWORD,
260 g_param_spec_string ("user-pw", "user-pw",
261 "HTTP location URI password for authentication",
262 GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD,
263 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
264
265 g_object_class_install_property (gobject_class, PROP_PROXYURI,
266 g_param_spec_string ("proxy", "Proxy", "URI of HTTP proxy server",
267 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY,
268 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
269
270 g_object_class_install_property (gobject_class, PROP_PROXYUSERNAME,
271 g_param_spec_string ("proxy-id", "proxy-id",
272 "HTTP proxy URI user id for authentication",
273 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME,
274 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
275
276 g_object_class_install_property (gobject_class, PROP_PROXYPASSWORD,
277 g_param_spec_string ("proxy-pw", "proxy-pw",
278 "HTTP proxy URI password for authentication",
279 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD,
280 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
281
282 g_object_class_install_property (gobject_class, PROP_COOKIES,
283 g_param_spec_boxed ("cookies", "Cookies", "List of HTTP Cookies",
284 G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285
286 g_object_class_install_property (gobject_class, PROP_USERAGENT,
287 g_param_spec_string ("user-agent", "User-Agent",
288 "URI of resource requested", GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT,
289 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
290
291 g_object_class_install_property (gobject_class, PROP_COMPRESS,
292 g_param_spec_boolean ("compress", "Compress",
293 "Allow compressed content encodings",
294 GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING,
295 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
296
297 g_object_class_install_property (gobject_class, PROP_REDIRECT,
298 g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
299 "Allow HTTP Redirections (HTTP Status Code 300 series)",
300 GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION,
301 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302
303 g_object_class_install_property (gobject_class, PROP_MAXREDIRECT,
304 g_param_spec_int ("max-redirect", "Max-Redirect",
305 "Maximum number of permitted redirections. -1 is unlimited.",
306 GSTCURL_HANDLE_MIN_CURLOPT_MAXREDIRS,
307 GSTCURL_HANDLE_MAX_CURLOPT_MAXREDIRS,
308 GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS,
309 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310
311 g_object_class_install_property (gobject_class, PROP_KEEPALIVE,
312 g_param_spec_boolean ("keep-alive", "Keep-Alive",
313 "Toggle keep-alive for connection reuse.",
314 GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE,
315 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316
317 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
318 g_param_spec_int ("timeout", "Timeout",
319 "Value in seconds before timeout a blocking request (0 = no timeout)",
320 GSTCURL_HANDLE_MIN_CURLOPT_TIMEOUT,
321 GSTCURL_HANDLE_MAX_CURLOPT_TIMEOUT,
322 GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT,
323 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324
325 g_object_class_install_property (gobject_class, PROP_HEADERS,
326 g_param_spec_boxed ("extra-headers", "Extra Headers",
327 "Extra headers to append to the HTTP request",
328 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
329
330 g_object_class_install_property (gobject_class, PROP_STRICT_SSL,
331 g_param_spec_boolean ("ssl-strict", "SSL Strict",
332 "Strict SSL certificate checking",
333 GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER,
334 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
335
336 g_object_class_install_property (gobject_class, PROP_SSL_CA_FILE,
337 g_param_spec_string ("ssl-ca-file", "SSL CA File",
338 "Location of an SSL CA file to use for checking SSL certificates",
339 GSTCURL_HANDLE_DEFAULT_CURLOPT_CAINFO,
340 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
341
342 g_object_class_install_property (gobject_class, PROP_RETRIES,
343 g_param_spec_int ("retries", "Retries",
344 "Maximum number of retries until giving up (-1=infinite)",
345 GSTCURL_HANDLE_MIN_RETRIES, GSTCURL_HANDLE_MAX_RETRIES,
346 GSTCURL_HANDLE_DEFAULT_RETRIES,
347 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
348
349 g_object_class_install_property (gobject_class, PROP_CONNECTIONMAXTIME,
350 g_param_spec_uint ("max-connection-time", "Max-Connection-Time",
351 "Maximum amount of time to keep-alive HTTP connections",
352 GSTCURL_MIN_CONNECTION_TIME, GSTCURL_MAX_CONNECTION_TIME,
353 GSTCURL_DEFAULT_CONNECTION_TIME,
354 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355
356 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_SERVER,
357 g_param_spec_uint ("max-connections-per-server",
358 "Max-Connections-Per-Server",
359 "Maximum number of connections allowed per server for HTTP/1.x",
360 GSTCURL_MIN_CONNECTIONS_SERVER, GSTCURL_MAX_CONNECTIONS_SERVER,
361 GSTCURL_DEFAULT_CONNECTIONS_SERVER,
362 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363
364 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_PROXY,
365 g_param_spec_uint ("max-connections-per-proxy",
366 "Max-Connections-Per-Proxy",
367 "Maximum number of concurrent connections allowed per proxy for HTTP/1.x",
368 GSTCURL_MIN_CONNECTIONS_PROXY, GSTCURL_MAX_CONNECTIONS_PROXY,
369 GSTCURL_DEFAULT_CONNECTIONS_PROXY,
370 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371
372 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_GLOBAL,
373 g_param_spec_uint ("max-connections", "Max-Connections",
374 "Maximum number of concurrent connections allowed for HTTP/1.x",
375 GSTCURL_MIN_CONNECTIONS_GLOBAL, GSTCURL_MAX_CONNECTIONS_GLOBAL,
376 GSTCURL_DEFAULT_CONNECTIONS_GLOBAL,
377 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Philippe Normand5bf092b2017-08-07 10:25:17 +0100378
379 g_object_class_install_property (gobject_class, PROP_HTTPVERSION,
380 g_param_spec_enum ("http-version", "HTTP-Version",
381 "The preferred HTTP protocol version",
382 GST_TYPE_CURL_HTTP_VERSION, pref_http_ver,
383 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Sam Hurste74b3a02017-07-25 15:23:57 +0100384
385 /* Add a debugging task so it's easier to debug in the Multi worker thread */
386 GST_DEBUG_CATEGORY_INIT (gst_curl_loop_debug, "curl_multi_loop", 0,
387 "libcURL loop thread debugging");
Edward Hervey07afe5d2017-07-26 09:19:42 +0200388#ifndef GST_DISABLE_GST_DEBUG
Sam Hurste74b3a02017-07-25 15:23:57 +0100389 gst_debug_log (gst_curl_loop_debug, GST_LEVEL_INFO, __FILE__, __func__,
390 __LINE__, NULL, "Testing the curl_multi_loop debugging prints");
Edward Hervey07afe5d2017-07-26 09:19:42 +0200391#endif
Sam Hurste74b3a02017-07-25 15:23:57 +0100392
393 g_mutex_init (&klass->multi_task_context.mutex);
394 g_cond_init (&klass->multi_task_context.signal);
395 g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
396
397 gst_element_class_set_static_metadata (gstelement_class,
398 "HTTP Client Source using libcURL",
399 "Source/Network",
400 "Receiver data as a client over a network via HTTP using cURL",
401 "Sam Hurst <samuelh@rd.bbc.co.uk>");
402}
403
404static void
405gst_curl_http_src_set_property (GObject * object, guint prop_id,
406 const GValue * value, GParamSpec * pspec)
407{
Sam Hurste74b3a02017-07-25 15:23:57 +0100408 GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
409 GSTCURL_FUNCTION_ENTRY (source);
410
411 switch (prop_id) {
412 case PROP_URI:
Reynaldo H. Verdejo Pinochetd82ae7a2017-07-26 16:51:50 -0700413 g_free (source->uri);
Sam Hurste74b3a02017-07-25 15:23:57 +0100414 source->uri = g_value_dup_string (value);
415 break;
416 case PROP_USERNAME:
Reynaldo H. Verdejo Pinochetd82ae7a2017-07-26 16:51:50 -0700417 g_free (source->username);
Sam Hurste74b3a02017-07-25 15:23:57 +0100418 source->username = g_value_dup_string (value);
419 break;
420 case PROP_PASSWORD:
Reynaldo H. Verdejo Pinochetd82ae7a2017-07-26 16:51:50 -0700421 g_free (source->password);
Sam Hurste74b3a02017-07-25 15:23:57 +0100422 source->password = g_value_dup_string (value);
423 break;
424 case PROP_PROXYURI:
Reynaldo H. Verdejo Pinochetd82ae7a2017-07-26 16:51:50 -0700425 g_free (source->proxy_uri);
Sam Hurste74b3a02017-07-25 15:23:57 +0100426 source->proxy_uri = g_value_dup_string (value);
427 break;
428 case PROP_PROXYUSERNAME:
Reynaldo H. Verdejo Pinochetd82ae7a2017-07-26 16:51:50 -0700429 g_free (source->proxy_user);
Sam Hurste74b3a02017-07-25 15:23:57 +0100430 source->proxy_user = g_value_dup_string (value);
431 break;
432 case PROP_PROXYPASSWORD:
Reynaldo H. Verdejo Pinochetd82ae7a2017-07-26 16:51:50 -0700433 g_free (source->proxy_pass);
Sam Hurste74b3a02017-07-25 15:23:57 +0100434 source->proxy_pass = g_value_dup_string (value);
435 break;
436 case PROP_COOKIES:
437 g_strfreev (source->cookies);
438 source->cookies = g_strdupv (g_value_get_boxed (value));
439 source->number_cookies = g_strv_length (source->cookies);
440 break;
441 case PROP_USERAGENT:
Reynaldo H. Verdejo Pinochetd82ae7a2017-07-26 16:51:50 -0700442 g_free (source->user_agent);
Sam Hurste74b3a02017-07-25 15:23:57 +0100443 source->user_agent = g_value_dup_string (value);
444 break;
445 case PROP_HEADERS:
446 {
447 const GstStructure *s = gst_value_get_structure (value);
448 if (source->request_headers)
449 gst_structure_free (source->request_headers);
450 source->request_headers = s ? gst_structure_copy (s) : NULL;
451 }
452 break;
453 case PROP_COMPRESS:
454 source->accept_compressed_encodings = g_value_get_boolean (value);
455 break;
456 case PROP_REDIRECT:
457 source->allow_3xx_redirect = g_value_get_boolean (value);
458 break;
459 case PROP_MAXREDIRECT:
460 source->max_3xx_redirects = g_value_get_int (value);
461 break;
462 case PROP_KEEPALIVE:
463 source->keep_alive = g_value_get_boolean (value);
464 break;
465 case PROP_TIMEOUT:
466 source->timeout_secs = g_value_get_int (value);
467 break;
468 case PROP_STRICT_SSL:
469 source->strict_ssl = g_value_get_boolean (value);
470 break;
471 case PROP_SSL_CA_FILE:
472 source->custom_ca_file = g_value_dup_string (value);
473 break;
474 case PROP_RETRIES:
475 source->total_retries = g_value_get_int (value);
476 break;
477 case PROP_CONNECTIONMAXTIME:
478 source->max_connection_time = g_value_get_uint (value);
479 break;
480 case PROP_MAXCONCURRENT_SERVER:
481 source->max_conns_per_server = g_value_get_uint (value);
482 break;
483 case PROP_MAXCONCURRENT_PROXY:
484 source->max_conns_per_proxy = g_value_get_uint (value);
485 break;
486 case PROP_MAXCONCURRENT_GLOBAL:
487 source->max_conns_global = g_value_get_uint (value);
488 break;
489 case PROP_HTTPVERSION:
Philippe Normand5bf092b2017-08-07 10:25:17 +0100490 source->preferred_http_version = g_value_get_enum (value);
Sam Hurste74b3a02017-07-25 15:23:57 +0100491 break;
492 default:
493 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
494 break;
495 }
496 GSTCURL_FUNCTION_EXIT (source);
497}
498
499static void
500gst_curl_http_src_get_property (GObject * object, guint prop_id,
501 GValue * value, GParamSpec * pspec)
502{
503 GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
504 GSTCURL_FUNCTION_ENTRY (source);
505
506 switch (prop_id) {
507 case PROP_URI:
508 g_value_set_string (value, source->uri);
509 break;
510 case PROP_USERNAME:
511 g_value_set_string (value, source->username);
512 break;
513 case PROP_PASSWORD:
514 g_value_set_string (value, source->password);
515 break;
516 case PROP_PROXYURI:
517 g_value_set_string (value, source->proxy_uri);
518 break;
519 case PROP_PROXYUSERNAME:
520 g_value_set_string (value, source->proxy_user);
521 break;
522 case PROP_PROXYPASSWORD:
523 g_value_set_string (value, source->proxy_pass);
524 break;
525 case PROP_COOKIES:
526 g_value_set_boxed (value, source->cookies);
527 break;
528 case PROP_USERAGENT:
529 g_value_set_string (value, source->user_agent);
530 break;
531 case PROP_HEADERS:
532 gst_value_set_structure (value, source->request_headers);
533 break;
534 case PROP_COMPRESS:
535 g_value_set_boolean (value, source->accept_compressed_encodings);
536 break;
537 case PROP_REDIRECT:
538 g_value_set_boolean (value, source->allow_3xx_redirect);
539 break;
540 case PROP_MAXREDIRECT:
541 g_value_set_int (value, source->max_3xx_redirects);
542 break;
543 case PROP_KEEPALIVE:
544 g_value_set_boolean (value, source->keep_alive);
545 break;
546 case PROP_TIMEOUT:
547 g_value_set_int (value, source->timeout_secs);
548 break;
549 case PROP_STRICT_SSL:
550 g_value_set_boolean (value, source->strict_ssl);
551 break;
552 case PROP_SSL_CA_FILE:
553 g_value_set_string (value, source->custom_ca_file);
554 break;
555 case PROP_RETRIES:
556 g_value_set_int (value, source->total_retries);
557 break;
558 case PROP_CONNECTIONMAXTIME:
559 g_value_set_uint (value, source->max_connection_time);
560 break;
561 case PROP_MAXCONCURRENT_SERVER:
562 g_value_set_uint (value, source->max_conns_per_server);
563 break;
564 case PROP_MAXCONCURRENT_PROXY:
565 g_value_set_uint (value, source->max_conns_per_proxy);
566 break;
567 case PROP_MAXCONCURRENT_GLOBAL:
568 g_value_set_uint (value, source->max_conns_global);
569 break;
570 case PROP_HTTPVERSION:
Philippe Normand5bf092b2017-08-07 10:25:17 +0100571 g_value_set_enum (value, source->preferred_http_version);
Sam Hurste74b3a02017-07-25 15:23:57 +0100572 break;
573 default:
574 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
575 break;
576 }
577 GSTCURL_FUNCTION_EXIT (source);
578}
579
580static void
581gst_curl_http_src_init (GstCurlHttpSrc * source)
582{
583 GSTCURL_FUNCTION_ENTRY (source);
584
585 /* Assume everything is already free'd */
586 source->uri = NULL;
587 source->redirect_uri = NULL;
588 source->username = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME;
589 source->password = GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD;
590 source->proxy_uri = NULL;
591 source->proxy_user = NULL;
592 source->proxy_pass = NULL;
593 source->cookies = NULL;
594 source->user_agent = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT;
595 source->number_cookies = 0;
596 source->request_headers = NULL;
597 source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
598 source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
599 source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
600 source->timeout_secs = GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT;
601 source->max_connection_time = GSTCURL_DEFAULT_CONNECTION_TIME;
602 source->max_conns_per_server = GSTCURL_DEFAULT_CONNECTIONS_SERVER;
603 source->max_conns_per_proxy = GSTCURL_DEFAULT_CONNECTIONS_PROXY;
604 source->max_conns_global = GSTCURL_DEFAULT_CONNECTIONS_GLOBAL;
605 source->strict_ssl = GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER;
606 source->custom_ca_file = NULL;
607 source->preferred_http_version = pref_http_ver;
608 source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
609 source->retries_remaining = source->total_retries;
610 source->slist = NULL;
611
612 gst_caps_replace (&source->caps, NULL);
613 gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
614
615 source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
616 source->no_proxy_list = g_strdup (g_getenv ("no_proxy"));
617
618 g_mutex_init (&source->uri_mutex);
619 g_mutex_init (&source->buffer_mutex);
620 g_cond_init (&source->signal);
621
622 source->buffer = NULL;
623 source->buffer_len = 0;
624 source->state = GSTCURL_NONE;
625 source->pending_state = GSTCURL_NONE;
626 source->status_code = 0;
627
628 source->http_headers = NULL;
629 source->hdrs_updated = FALSE;
630
631 source->curl_result = CURLE_OK;
632
633 GSTCURL_FUNCTION_EXIT (source);
634}
635
636/*
637 * Check if the Curl multi loop has been started. If not, initialise it and
638 * start it running. If it is already running, increment the refcount.
639 */
640static void
641gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
642{
643 GstCurlHttpSrcClass *klass;
644
645 GSTCURL_FUNCTION_ENTRY (src);
646
647 /*klass = (GstCurlHttpSrcClass) g_type_class_peek_parent (src); */
648 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
649 GstCurlHttpSrcClass);
650
651 g_mutex_lock (&klass->multi_task_context.mutex);
652 if (klass->multi_task_context.refcount == 0) {
653 /* Set up various in-task properties */
654
655 /* NULL is treated as the start of the list, no need to allocate. */
656 klass->multi_task_context.queue = NULL;
657
658 /* set up curl */
659 klass->multi_task_context.multi_handle = curl_multi_init ();
660
661 curl_multi_setopt (klass->multi_task_context.multi_handle,
662 CURLMOPT_PIPELINING, 1);
663#ifdef CURLMOPT_MAX_HOST_CONNECTIONS
664 curl_multi_setopt (klass->multi_task_context.multi_handle,
665 CURLMOPT_MAX_HOST_CONNECTIONS, 1);
666#endif
667
668 /* Start the thread */
669 klass->multi_task_context.task = gst_task_new (
670 (GstTaskFunction) gst_curl_http_src_curl_multi_loop,
671 (gpointer) & klass->multi_task_context, NULL);
672 gst_task_set_lock (klass->multi_task_context.task,
673 &klass->multi_task_context.task_rec_mutex);
674 if (gst_task_start (klass->multi_task_context.task) == FALSE) {
675 /*
676 * This is a pretty critical failure and is not recoverable, so commit
677 * sudoku and run away.
678 */
679 GSTCURL_ERROR_PRINT ("Couldn't start curl_multi task! Aborting.");
680 abort ();
681 }
682 GSTCURL_INFO_PRINT ("Curl multi loop has been correctly initialised!");
683 }
684 klass->multi_task_context.refcount++;
685 g_mutex_unlock (&klass->multi_task_context.mutex);
686
687 GSTCURL_FUNCTION_EXIT (src);
688}
689
690/*
691 * Decrement the reference count on the curl multi loop. If this is called by
692 * the last instance to hold a reference, shut down the worker. (Otherwise
693 * GStreamer can't close down with a thread still running). Also offers the
694 * "force_all" boolean parameter, which if TRUE removes all references and shuts
695 * down.
696 */
697static void
698gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
699{
700 GstCurlHttpSrcClass *klass;
701
702 GSTCURL_FUNCTION_ENTRY (src);
703
704 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
705 GstCurlHttpSrcClass);
706
707 g_mutex_lock (&klass->multi_task_context.mutex);
708 klass->multi_task_context.refcount--;
709 GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
710 klass->multi_task_context.refcount);
711
712 if (klass->multi_task_context.refcount <= 0) {
713 /* Everything's done! Clean up. */
714 gst_task_pause (klass->multi_task_context.task);
715 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
716 g_cond_signal (&klass->multi_task_context.signal);
717 g_mutex_unlock (&klass->multi_task_context.mutex);
718 gst_task_join (klass->multi_task_context.task);
719 } else {
720 g_mutex_unlock (&klass->multi_task_context.mutex);
721 }
722
723 GSTCURL_FUNCTION_EXIT (src);
724}
725
726static void
727gst_curl_http_src_finalize (GObject * obj)
728{
729 GstCurlHttpSrc *src = GST_CURLHTTPSRC (obj);
730
731 GSTCURL_FUNCTION_ENTRY (src);
732
733 /* Cleanup all memory allocated */
734 gst_curl_http_src_cleanup_instance (src);
735
736 GSTCURL_FUNCTION_EXIT (src);
737}
738
739/*
740 * Do the transfer. If the transfer hasn't begun yet, start a new curl handle
741 * and pass it to the multi queue to be operated on. Then wait for any blocks
742 * of data and push them to the source pad.
743 */
744static GstFlowReturn
745gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
746{
747 GstFlowReturn ret;
748 GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
749 GstCurlHttpSrcClass *klass;
Sam Hurst85eb69d2017-07-26 17:06:17 +0100750 GstStructure *empty_headers;
Sam Hurste74b3a02017-07-25 15:23:57 +0100751
752 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
753 GstCurlHttpSrcClass);
754
755 GSTCURL_FUNCTION_ENTRY (src);
756 ret = GST_FLOW_OK;
757
758 g_mutex_lock (&src->buffer_mutex);
759 if (src->state == GSTCURL_UNLOCK) {
760 ret = GST_FLOW_FLUSHING;
761 goto escape;
762 }
763
764retry:
765 if (!src->transfer_begun) {
766 GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
767 /* Create the Easy Handle and set up the session. */
768 src->curl_handle = gst_curl_http_src_create_easy_handle (src);
Sam Hurst85eb69d2017-07-26 17:06:17 +0100769 if (src->curl_handle == NULL) {
770 ret = GST_FLOW_ERROR;
771 goto escape;
772 }
Sam Hurste74b3a02017-07-25 15:23:57 +0100773
774 g_mutex_lock (&klass->multi_task_context.mutex);
775
776 if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
777 == FALSE) {
778 GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
Sam Hurst85eb69d2017-07-26 17:06:17 +0100779 ret = GST_FLOW_ERROR;
780 goto escape;
Sam Hurste74b3a02017-07-25 15:23:57 +0100781 }
782
783 /* Signal the worker thread */
784 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT;
785 g_cond_signal (&klass->multi_task_context.signal);
786 g_mutex_unlock (&klass->multi_task_context.mutex);
787
788 src->state = GSTCURL_OK;
789 src->transfer_begun = TRUE;
790 src->data_received = FALSE;
791
792 GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
793
Sam Hurst85eb69d2017-07-26 17:06:17 +0100794 empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
Sam Hurste74b3a02017-07-25 15:23:57 +0100795 src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
796 URI_NAME, G_TYPE_STRING, src->uri,
797 REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
Sam Hurst85eb69d2017-07-26 17:06:17 +0100798 RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
799 gst_structure_free (empty_headers);
Sam Hurste74b3a02017-07-25 15:23:57 +0100800 GST_INFO_OBJECT (src, "Created a new headers object");
801 }
802
803 /* Wait for data to become available, then punt it downstream */
804 while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)) {
805 g_cond_wait (&src->signal, &src->buffer_mutex);
806 }
807
808 if (src->state == GSTCURL_UNLOCK) {
809 if (src->buffer_len > 0) {
810 g_free (src->buffer);
811 src->buffer = NULL;
812 src->buffer_len = 0;
813 }
814 ret = GST_FLOW_FLUSHING;
815 goto escape;
816 }
817
818 ret = gst_curl_http_src_handle_response (src);
819 switch (ret) {
820 case GST_FLOW_ERROR:
821 goto escape; /* Don't attempt a retry, just bomb out */
822 case GST_FLOW_CUSTOM_ERROR:
823 if (src->data_received == TRUE) {
824 /*
825 * If data has already been received, we can't recall previously sent
826 * buffers so don't attempt a retry in this case.
827 *
828 * TODO: Remember the position we got to, and make a range request for
829 * the resource without the bit we've already received?
830 */
831 GST_WARNING_OBJECT (src,
832 "Failed mid-transfer, can't continue for URI %s", src->uri);
833 ret = GST_FLOW_ERROR;
834 goto escape;
835 }
836 src->retries_remaining--;
837 if (src->retries_remaining == 0) {
838 GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
839 ret = GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
840 goto escape;
841 }
842 GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
843 src->state = GSTCURL_NONE;
844 src->transfer_begun = FALSE;
845 src->status_code = 0;
846 src->hdrs_updated = FALSE;
847 if (src->http_headers != NULL) {
848 gst_structure_free (src->http_headers);
849 src->http_headers = NULL;
850 GST_INFO_OBJECT (src, "NULL'd the headers");
851 }
852 gst_curl_http_src_destroy_easy_handle (src);
853 g_mutex_unlock (&src->buffer_mutex);
854 goto retry; /* Attempt a retry! */
855 default:
856 break;
857 }
858
859 if (((src->state == GSTCURL_OK) || (src->state == GSTCURL_DONE)) &&
860 (src->buffer_len > 0)) {
861
862 GST_DEBUG_OBJECT (src, "Pushing %u bytes of transfer for URI %s to pad",
863 src->buffer_len, src->uri);
864 *outbuf = gst_buffer_new_allocate (NULL, src->buffer_len, NULL);
865 gst_buffer_fill (*outbuf, 0, src->buffer, src->buffer_len);
866
867 g_free (src->buffer);
868 src->buffer = NULL;
869 src->buffer_len = 0;
870 src->data_received = TRUE;
871
872 /* ret should still be GST_FLOW_OK */
873 } else if ((src->state == GSTCURL_DONE) && (src->buffer_len == 0)) {
874 GST_INFO_OBJECT (src, "Full body received, signalling EOS for URI %s.",
875 src->uri);
876 src->state = GSTCURL_NONE;
877 src->transfer_begun = FALSE;
878 src->status_code = 0;
879 src->hdrs_updated = FALSE;
880 gst_curl_http_src_destroy_easy_handle (src);
881 ret = GST_FLOW_EOS;
882 } else {
883 switch (src->state) {
884 case GSTCURL_NONE:
885 GST_WARNING_OBJECT (src, "Got unexpected GSTCURL_NONE state!");
886 break;
887 case GSTCURL_REMOVED:
888 GST_WARNING_OBJECT (src, "Transfer got removed from the curl queue");
889 ret = GST_FLOW_EOS;
890 break;
891 case GSTCURL_BAD_QUEUE_REQUEST:
892 GST_ERROR_OBJECT (src, "Bad Queue Request!");
893 ret = GST_FLOW_ERROR;
894 break;
895 case GSTCURL_TOTAL_ERROR:
896 GST_ERROR_OBJECT (src, "Critical, unrecoverable error!");
897 ret = GST_FLOW_ERROR;
898 break;
899 case GSTCURL_PIPELINE_NULL:
900 GST_ERROR_OBJECT (src, "Pipeline null");
901 break;
902 default:
903 GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
904 }
905 }
906
907escape:
908 g_mutex_unlock (&src->buffer_mutex);
909
910 GSTCURL_FUNCTION_EXIT (src);
911 return ret;
912}
913
914/*
915 * Convert header from a GstStructure type to a curl_slist type that curl will
916 * understand.
917 */
918static gboolean
919_headers_to_curl_slist (GQuark field_id, const GValue * value, gpointer ptr)
920{
921 gchar *field;
922 struct curl_slist **p_slist = ptr;
923
924 field = g_strdup_printf ("%s: %s", g_quark_to_string (field_id),
925 g_value_get_string (value));
926
927 *p_slist = curl_slist_append (*p_slist, field);
928
929 g_free (field);
930
931 return TRUE;
932}
933
934/*
935 * From the data in the queue element s, create a CURL easy handle and populate
936 * options with the URL, proxy data, login options, cookies,
937 */
938static CURL *
939gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
940{
941 CURL *handle;
942 gint i;
943 GSTCURL_FUNCTION_ENTRY (s);
944
945 handle = curl_easy_init ();
946 if (handle == NULL) {
947 GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
948 return NULL;
949 }
950 GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
951
952 /* This is mandatory and yet not default option, so if this is NULL
953 * then something very bad is going on. */
Sam Hurst85eb69d2017-07-26 17:06:17 +0100954 if (s->uri == NULL) {
955 GST_ERROR_OBJECT (s, "No URI for curl!");
956 return NULL;
957 }
958 gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
Sam Hurste74b3a02017-07-25 15:23:57 +0100959
960 gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
961 gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
962 gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
963 gst_curl_setopt_str (s, handle, CURLOPT_NOPROXY, s->no_proxy_list);
964 gst_curl_setopt_str (s, handle, CURLOPT_PROXYUSERNAME, s->proxy_user);
965 gst_curl_setopt_str (s, handle, CURLOPT_PROXYPASSWORD, s->proxy_pass);
966
967 for (i = 0; i < s->number_cookies; i++) {
968 gst_curl_setopt_str (s, handle, CURLOPT_COOKIELIST, s->cookies[i]);
969 }
970
971 /* curl_slist_append dynamically allocates memory, but I need to free it */
972 if (s->request_headers != NULL) {
973 gst_structure_foreach (s->request_headers, _headers_to_curl_slist,
974 &s->slist);
Sam Hurst85eb69d2017-07-26 17:06:17 +0100975 if (curl_easy_setopt (handle, CURLOPT_HTTPHEADER, s->slist) != CURLE_OK) {
976 GST_WARNING_OBJECT (s, "Failed to set HTTP headers!");
977 }
Sam Hurste74b3a02017-07-25 15:23:57 +0100978 }
979
980 gst_curl_setopt_str_default (s, handle, CURLOPT_USERAGENT, s->user_agent);
981
982 /*
983 * Unlike soup, this isn't a binary op, curl wants a string here. So if it's
984 * TRUE, simply set the value as an empty string as this allows both gzip and
985 * zlib compression methods.
986 */
987 if (s->accept_compressed_encodings == TRUE) {
Sam Hurst85eb69d2017-07-26 17:06:17 +0100988 gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "");
Sam Hurste74b3a02017-07-25 15:23:57 +0100989 } else {
Sam Hurst85eb69d2017-07-26 17:06:17 +0100990 gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "identity");
Sam Hurste74b3a02017-07-25 15:23:57 +0100991 }
992
993 gst_curl_setopt_int (s, handle, CURLOPT_FOLLOWLOCATION,
994 s->allow_3xx_redirect);
995 gst_curl_setopt_int_default (s, handle, CURLOPT_MAXREDIRS,
996 s->max_3xx_redirects);
Sam Hurst85eb69d2017-07-26 17:06:17 +0100997 gst_curl_setopt_bool (s, handle, CURLOPT_TCP_KEEPALIVE, s->keep_alive);
Sam Hurste74b3a02017-07-25 15:23:57 +0100998 gst_curl_setopt_int (s, handle, CURLOPT_TIMEOUT, s->timeout_secs);
Sam Hurst85eb69d2017-07-26 17:06:17 +0100999 gst_curl_setopt_bool (s, handle, CURLOPT_SSL_VERIFYPEER, s->strict_ssl);
Sam Hurste74b3a02017-07-25 15:23:57 +01001000 gst_curl_setopt_str (s, handle, CURLOPT_CAINFO, s->custom_ca_file);
1001
1002 switch (s->preferred_http_version) {
1003 case GSTCURL_HTTP_VERSION_1_0:
1004 GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.0");
1005 gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1006 CURL_HTTP_VERSION_1_0);
1007 break;
1008 case GSTCURL_HTTP_VERSION_1_1:
1009 GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.1");
1010 gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1011 CURL_HTTP_VERSION_1_1);
1012 break;
1013#ifdef CURL_VERSION_HTTP2
1014 case GSTCURL_HTTP_VERSION_2_0:
1015 GST_DEBUG_OBJECT (s, "Setting version as HTTP/2.0");
Sam Hurst6baa66a2017-08-07 16:41:27 +01001016 if (curl_easy_setopt (handle, CURLOPT_HTTP_VERSION,
1017 CURL_HTTP_VERSION_2_0) != CURLE_OK) {
1018 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
1019 GST_WARNING_OBJECT (s,
1020 "Cannot set unsupported option CURLOPT_HTTP_VERSION");
1021 } else {
1022 GST_INFO_OBJECT (s, "HTTP/2 unsupported by libcurl at this time");
1023 }
1024 }
Sam Hurste74b3a02017-07-25 15:23:57 +01001025 break;
1026#endif
1027 default:
1028 GST_WARNING_OBJECT (s,
1029 "Supplied a bogus HTTP version, using curl default!");
1030 }
1031
Sam Hurst85eb69d2017-07-26 17:06:17 +01001032 gst_curl_setopt_generic (s, handle, CURLOPT_HEADERFUNCTION,
Sam Hurste74b3a02017-07-25 15:23:57 +01001033 gst_curl_http_src_get_header);
Sam Hurst85eb69d2017-07-26 17:06:17 +01001034 gst_curl_setopt_str (s, handle, CURLOPT_HEADERDATA, s);
1035 gst_curl_setopt_generic (s, handle, CURLOPT_WRITEFUNCTION,
Sam Hurste74b3a02017-07-25 15:23:57 +01001036 gst_curl_http_src_get_chunks);
Sam Hurst85eb69d2017-07-26 17:06:17 +01001037 gst_curl_setopt_str (s, handle, CURLOPT_WRITEDATA, s);
Sam Hurste74b3a02017-07-25 15:23:57 +01001038
Sam Hurst85eb69d2017-07-26 17:06:17 +01001039 gst_curl_setopt_str (s, handle, CURLOPT_ERRORBUFFER, s->curl_errbuf);
Sam Hurste74b3a02017-07-25 15:23:57 +01001040
1041 GSTCURL_FUNCTION_EXIT (s);
1042 return handle;
1043}
1044
1045/*
1046 * Check the return type from the curl transfer. If it was okay, then deal with
1047 * any headers that were received. Headers should only be dealt with once - but
1048 * we might get a second set if there are trailing headers (RFC7230 Section 4.4)
1049 */
1050static GstFlowReturn
1051gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
1052{
1053 glong curl_info_long;
1054 gdouble curl_info_dbl;
1055 gchar *redirect_url;
1056 GstBaseSrc *basesrc;
1057 const GValue *response_headers;
1058 GstFlowReturn ret = GST_FLOW_OK;
1059
1060 GSTCURL_FUNCTION_ENTRY (src);
1061
1062 GST_TRACE_OBJECT (src, "status code: %d, curl return code %d",
1063 src->status_code, src->curl_result);
1064
1065 /* Check the curl result code first - anything not 0 is probably a failure */
1066 if (src->curl_result != 0) {
1067 GST_WARNING_OBJECT (src, "Curl failed the transfer (%d): %s",
1068 src->curl_result, curl_easy_strerror (src->curl_result));
1069 GST_DEBUG_OBJECT (src, "Reason for curl failure: %s", src->curl_errbuf);
1070 return GST_FLOW_ERROR;
1071 }
1072
1073 /*
1074 * What response code do we have?
1075 */
1076 if (src->status_code >= 400) {
1077 GST_WARNING_OBJECT (src, "Transfer for URI %s returned error status %u",
1078 src->uri, src->status_code);
1079 src->retries_remaining = 0;
1080 return GST_FLOW_ERROR;
1081 } else if (src->status_code == 0) {
1082 if (curl_easy_getinfo (src->curl_handle, CURLINFO_TOTAL_TIME,
1083 &curl_info_dbl) != CURLE_OK) {
1084 /* Curl cannot be relied on in this state, so return an error. */
1085 return GST_FLOW_ERROR;
1086 }
1087 if (curl_info_dbl > src->timeout_secs) {
1088 return GST_FLOW_CUSTOM_ERROR;
1089 }
1090
1091 if (curl_easy_getinfo (src->curl_handle, CURLINFO_OS_ERRNO,
1092 &curl_info_long) != CURLE_OK) {
1093 /* Curl cannot be relied on in this state, so return an error. */
1094 return GST_FLOW_ERROR;
1095
1096 }
1097
1098 GST_WARNING_OBJECT (src, "Errno for CONNECT call was %ld (%s)",
1099 curl_info_long, g_strerror ((gint) curl_info_long));
1100
1101 /* Some of these responses are retry-able, others not. Set the returned
1102 * state to ERROR so we crash out instead of fruitlessly retrying.
1103 */
1104 if (curl_info_long == ECONNREFUSED) {
1105 return GST_FLOW_ERROR;
1106 }
1107 ret = GST_FLOW_CUSTOM_ERROR;
1108 }
1109
1110
1111 if (ret == GST_FLOW_CUSTOM_ERROR) {
1112 src->hdrs_updated = FALSE;
1113 GSTCURL_FUNCTION_EXIT (src);
1114 return ret;
1115 }
1116
1117 /* Only do this once */
1118 if (src->hdrs_updated == FALSE) {
1119 GSTCURL_FUNCTION_EXIT (src);
1120 return GST_FLOW_OK;
1121 }
1122
1123 /*
1124 * Deal with redirections...
1125 */
1126 if (curl_easy_getinfo (src->curl_handle, CURLINFO_EFFECTIVE_URL,
1127 &redirect_url)
1128 == CURLE_OK) {
1129 size_t lena, lenb;
1130 lena = strlen (src->uri);
1131 lenb = strlen (redirect_url);
1132 if (g_ascii_strncasecmp (src->uri, redirect_url,
1133 (lena > lenb) ? lenb : lena) != 0) {
1134 GST_INFO_OBJECT (src, "Got a redirect to %s, setting as redirect URI",
1135 redirect_url);
1136 src->redirect_uri = g_strdup (redirect_url);
1137 gst_structure_remove_field (src->http_headers, REDIRECT_URI_NAME);
1138 gst_structure_set (src->http_headers, REDIRECT_URI_NAME,
1139 G_TYPE_STRING, redirect_url, NULL);
1140 }
1141 }
1142
1143 /*
1144 * Push the content length
1145 */
1146 if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD,
1147 &curl_info_dbl) == CURLE_OK) {
1148 if (curl_info_dbl == -1) {
1149 GST_WARNING_OBJECT (src,
1150 "No Content-Length was specified in the response.");
1151 } else {
1152 GST_INFO_OBJECT (src, "Content-Length was given as %.0f", curl_info_dbl);
1153 basesrc = GST_BASE_SRC_CAST (src);
1154 basesrc->segment.duration = curl_info_dbl;
1155 gst_element_post_message (GST_ELEMENT (src),
1156 gst_message_new_duration_changed (GST_OBJECT (src)));
1157 }
1158 }
1159
1160 /*
1161 * Push all the received headers down via a sicky event
1162 */
1163 response_headers = gst_structure_get_value (src->http_headers,
1164 RESPONSE_HEADERS_NAME);
1165 if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
1166 GstEvent *hdrs_event;
Sam Hurst85eb69d2017-07-26 17:06:17 +01001167 GstStructure *empty_headers;
Sam Hurste74b3a02017-07-25 15:23:57 +01001168
1169 gst_element_post_message (GST_ELEMENT_CAST (src),
1170 gst_message_new_element (GST_OBJECT_CAST (src),
1171 gst_structure_copy (src->http_headers)));
1172
1173 /* gst_event_new_custom takes ownership of our structure */
1174 hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
1175 src->http_headers);
1176 gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
1177 GST_INFO_OBJECT (src, "Pushed headers downstream");
Sam Hurst85eb69d2017-07-26 17:06:17 +01001178 empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
Sam Hurste74b3a02017-07-25 15:23:57 +01001179 src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
1180 URI_NAME, G_TYPE_STRING, src->uri,
1181 REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
Edward Hervey8d998672017-11-22 16:53:17 +01001182 RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
Sam Hurst85eb69d2017-07-26 17:06:17 +01001183 gst_structure_free (empty_headers);
Sam Hurste74b3a02017-07-25 15:23:57 +01001184 }
1185
1186 src->hdrs_updated = FALSE;
1187
1188 GSTCURL_FUNCTION_EXIT (src);
1189
1190 return ret;
1191}
1192
1193/*
1194 * "Negotiate" capabilities between us and the sink.
1195 * I.e. tell the sink device what data to expect. We can't be told what to send
1196 * unless we implement "only return to me if this type" property. Potential TODO
1197 */
1198static gboolean
1199gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
1200{
1201 GST_INFO_OBJECT (src, "Negotiating caps...");
1202 if (src->caps && src->http_headers) {
1203 const GValue *response_headers = gst_structure_get_value (src->http_headers,
1204 RESPONSE_HEADERS_NAME);
1205
1206 if (gst_structure_has_field (gst_value_get_structure (response_headers),
1207 "content-type") == TRUE) {
1208 const GValue *gv_content_type =
1209 gst_structure_get_value (gst_value_get_structure (response_headers),
1210 "content-type");
1211 if (G_VALUE_HOLDS_STRING (gv_content_type) == TRUE) {
1212 const gchar *content_type = g_value_get_string (gv_content_type);
1213 GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s",
1214 content_type);
1215 src->caps = gst_caps_make_writable (src->caps);
1216 gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
1217 content_type, NULL);
1218 if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
1219 GST_ERROR_OBJECT (src, "Setting caps failed!");
1220 return FALSE;
1221 }
1222 } else {
1223 GST_ERROR_OBJECT (src, "Content Type doesn't contain expected string");
1224 return FALSE;
1225 }
1226 }
1227 } else {
1228 GST_DEBUG_OBJECT (src, "No caps have been set, continue.");
1229 }
1230
1231 return TRUE;
1232}
1233
1234/*
1235 * Cleanup the CURL easy handle once we're done with it.
1236 */
1237static inline void
1238gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src)
1239{
1240 /* Thank you Handles, and well done. Well done, mate. */
1241 if (src->curl_handle != NULL) {
1242 curl_easy_cleanup (src->curl_handle);
1243 src->curl_handle = NULL;
1244 }
1245 /* In addition, clean up the curl header slist if it was used. */
1246 if (src->slist != NULL) {
1247 curl_slist_free_all (src->slist);
1248 src->slist = NULL;
1249 }
1250}
1251
1252static GstStateChangeReturn
1253gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
1254{
1255 GstStateChangeReturn ret;
1256 GstCurlHttpSrc *source = GST_CURLHTTPSRC (element);
1257 GSTCURL_FUNCTION_ENTRY (source);
1258
1259 switch (transition) {
1260 case GST_STATE_CHANGE_NULL_TO_READY:
1261 gst_curl_http_src_ref_multi (source);
1262 break;
Edward Herveyb8cb08c2017-11-21 15:31:05 +01001263 case GST_STATE_CHANGE_READY_TO_PAUSED:
Edward Herveyd4afba72017-11-21 16:30:35 +01001264 if (source->uri == NULL) {
1265 GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (_("No URL set.")),
1266 ("Missing URL"));
Edward Herveyb8cb08c2017-11-21 15:31:05 +01001267 return GST_STATE_CHANGE_FAILURE;
Edward Herveyd4afba72017-11-21 16:30:35 +01001268 }
Edward Herveyb8cb08c2017-11-21 15:31:05 +01001269 break;
Sam Hurste74b3a02017-07-25 15:23:57 +01001270 case GST_STATE_CHANGE_READY_TO_NULL:
1271 /* The pipeline has ended, so signal any running request to end. */
1272 gst_curl_http_src_request_remove (source);
1273 gst_curl_http_src_unref_multi (source);
1274 break;
1275 default:
1276 break;
1277 }
1278
1279 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1280
1281 GSTCURL_FUNCTION_EXIT (source);
1282 return ret;
1283}
1284
1285/*
1286 * Take care of any memory that may be left over from the instance that's now
1287 * closing before we leak it.
1288 */
1289static void
1290gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
1291{
1292 gint i;
1293 g_mutex_lock (&src->uri_mutex);
1294 g_free (src->uri);
1295 src->uri = NULL;
1296 g_free (src->redirect_uri);
1297 src->redirect_uri = NULL;
1298 g_mutex_unlock (&src->uri_mutex);
1299 g_mutex_clear (&src->uri_mutex);
1300
1301 g_free (src->proxy_uri);
1302 src->proxy_uri = NULL;
1303 g_free (src->no_proxy_list);
1304 src->no_proxy_list = NULL;
1305 g_free (src->proxy_user);
1306 src->proxy_user = NULL;
1307 g_free (src->proxy_pass);
1308 src->proxy_pass = NULL;
1309
1310 for (i = 0; i < src->number_cookies; i++) {
1311 g_free (src->cookies[i]);
1312 src->cookies[i] = NULL;
1313 }
1314 g_free (src->cookies);
1315 src->cookies = NULL;
1316
1317 g_mutex_clear (&src->buffer_mutex);
1318
1319 g_cond_clear (&src->signal);
1320
1321 g_free (src->buffer);
1322 src->buffer = NULL;
1323
1324 if (src->http_headers != NULL) {
1325 gst_structure_free (src->http_headers);
1326 src->http_headers = NULL;
1327 }
1328
1329 gst_curl_http_src_destroy_easy_handle (src);
1330}
1331
1332static gboolean
1333gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
1334{
1335 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1336 gboolean ret;
1337 GSTCURL_FUNCTION_ENTRY (src);
1338
1339 switch (GST_QUERY_TYPE (query)) {
1340 case GST_QUERY_URI:
1341 gst_query_set_uri (query, src->uri);
1342 if (src->redirect_uri != NULL) {
1343 gst_query_set_uri_redirection (query, src->redirect_uri);
1344 }
1345 ret = TRUE;
1346 break;
1347 default:
1348 ret = GST_BASE_SRC_CLASS (parent_class)->query (bsrc, query);
1349 break;
1350 }
1351
1352 GSTCURL_FUNCTION_EXIT (src);
1353 return ret;
1354}
1355
1356static gboolean
1357gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
1358{
1359 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1360 const GValue *response_headers;
1361 gboolean ret = FALSE;
1362
1363 if (src->http_headers == NULL) {
1364 return FALSE;
1365 }
1366
1367 response_headers = gst_structure_get_value (src->http_headers,
1368 RESPONSE_HEADERS_NAME);
1369 if (gst_structure_has_field (gst_value_get_structure (response_headers),
1370 "content-length") == TRUE) {
1371 const GValue *content_length =
1372 gst_structure_get_value (gst_value_get_structure (response_headers),
1373 "content-length");
1374 if (G_VALUE_HOLDS_STRING (content_length) == TRUE) {
1375 const gchar *len = g_value_get_string (content_length);
1376 *size = (guint64) g_ascii_strtoull (len, NULL, 10);
1377 ret = TRUE;
1378 } else {
1379 GST_ERROR_OBJECT (src, "Content Length doesn't contain expected string");
1380 }
1381 }
1382
1383 GST_DEBUG_OBJECT (src,
1384 "No content length has yet been set, or there was an error!");
1385 return ret;
1386}
1387
1388static void
1389gst_curl_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1390{
1391 GstURIHandlerInterface *uri_iface = (GstURIHandlerInterface *) g_iface;
1392
1393 uri_iface->get_type = gst_curl_http_src_urihandler_get_type;
1394 uri_iface->get_protocols = gst_curl_http_src_urihandler_get_protocols;
1395 uri_iface->get_uri = gst_curl_http_src_urihandler_get_uri;
1396 uri_iface->set_uri = gst_curl_http_src_urihandler_set_uri;
1397}
1398
1399static guint
1400gst_curl_http_src_urihandler_get_type (GType type)
1401{
1402 return GST_URI_SRC;
1403}
1404
1405static const gchar *const *
1406gst_curl_http_src_urihandler_get_protocols (GType type)
1407{
1408 static const gchar *protocols[] = { "http", "https", NULL };
1409
1410 return protocols;
1411}
1412
1413static gchar *
1414gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler)
1415{
1416 gchar *ret;
1417 GstCurlHttpSrc *source;
1418
Edward Hervey26108d82017-11-21 15:33:08 +01001419 g_return_val_if_fail (GST_IS_URI_HANDLER (handler), NULL);
Sam Hurste74b3a02017-07-25 15:23:57 +01001420 source = GST_CURLHTTPSRC (handler);
1421
1422 GSTCURL_FUNCTION_ENTRY (source);
1423
1424 g_mutex_lock (&source->uri_mutex);
1425 ret = g_strdup (source->uri);
1426 g_mutex_unlock (&source->uri_mutex);
1427
1428 GSTCURL_FUNCTION_EXIT (source);
1429 return ret;
1430}
1431
1432static gboolean
1433gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
1434 const gchar * uri, GError ** error)
1435{
1436 GstCurlHttpSrc *source = GST_CURLHTTPSRC (handler);
1437 GSTCURL_FUNCTION_ENTRY (source);
1438
1439 g_return_val_if_fail (GST_IS_URI_HANDLER (handler), FALSE);
1440 g_return_val_if_fail (uri != NULL, FALSE);
1441
1442 g_mutex_lock (&source->uri_mutex);
1443
1444 if (source->uri != NULL) {
1445 GST_DEBUG_OBJECT (source,
1446 "URI already present as %s, updating to new URI %s", source->uri, uri);
1447 g_free (source->uri);
1448 }
1449
1450 source->uri = g_strdup (uri);
1451 if (source->uri == NULL) {
1452 return FALSE;
1453 }
1454 source->retries_remaining = source->total_retries;
1455
1456 g_mutex_unlock (&source->uri_mutex);
1457
1458 GSTCURL_FUNCTION_EXIT (source);
1459 return TRUE;
1460}
1461
1462/*
1463 * Cancel any currently running transfer, and then signal all the loops to drop
1464 * any received buffers. The ::create() method should return GST_FLOW_FLUSHING.
1465 */
1466static gboolean
1467gst_curl_http_src_unlock (GstBaseSrc * bsrc)
1468{
1469 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1470
1471 g_mutex_lock (&src->buffer_mutex);
1472 if (src->state != GSTCURL_UNLOCK) {
1473 if (src->state == GSTCURL_OK) {
1474 /* A transfer is running, cancel it */
1475 gst_curl_http_src_request_remove (src);
1476 }
1477 src->pending_state = src->state;
1478 src->state = GSTCURL_UNLOCK;
1479 }
1480 g_cond_signal (&src->signal);
1481 g_mutex_unlock (&src->buffer_mutex);
1482
1483 return TRUE;
1484}
1485
1486/*
1487 * Finish the unlock request above and return curlhttpsrc to the normal state.
1488 * This will probably be GSTCURL_DONE, and the next return from ::create() will
1489 * be GST_FLOW_EOS as we don't want to deliver parts of a HTTP body.
1490 */
1491static gboolean
1492gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
1493{
1494 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1495
1496 g_mutex_lock (&src->buffer_mutex);
1497 src->state = src->pending_state;
1498 src->pending_state = GSTCURL_NONE;
1499 g_cond_signal (&src->signal);
1500 g_mutex_unlock (&src->buffer_mutex);
1501
1502 return TRUE;
1503}
1504
1505/*****************************************************************************
1506 * Curl loop task functions begin
1507 *****************************************************************************/
1508static void
1509gst_curl_http_src_curl_multi_loop (gpointer thread_data)
1510{
1511 GstCurlHttpSrcMultiTaskContext *context;
1512 GstCurlHttpSrcQueueElement *qelement;
1513 int i, still_running;
1514 gboolean cond = FALSE;
1515 CURLMsg *curl_message;
1516
1517 context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
1518
1519 g_mutex_lock (&context->mutex);
1520
1521 /* Someone is holding a reference to us, but isn't using us so to avoid
1522 * unnecessary clock cycle wasting, sit in a conditional wait until woken.
1523 */
1524 while (context->state == GSTCURL_MULTI_LOOP_STATE_WAIT) {
1525 GSTCURL_DEBUG_PRINT ("Entering wait state...");
1526 g_cond_wait (&context->signal, &context->mutex);
1527 GSTCURL_DEBUG_PRINT ("Received wake up call!");
1528 }
1529
1530 if (context->state == GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) {
1531 GSTCURL_DEBUG_PRINT ("Received a new item on the queue!");
1532 if (context->queue == NULL) {
1533 GSTCURL_ERROR_PRINT ("Request Queue was empty on a Queue Event!");
1534 context->state = GSTCURL_MULTI_LOOP_STATE_WAIT;
1535 return;
1536 }
1537
1538 /*
1539 * Use the running mutex to lock access to each element, as the
1540 * mutex's memory barriers stop cache optimisations from meaning
1541 * flag values can't be trusted. The trylock will only let us in
1542 * once and should fail immediately prior.
1543 */
1544 qelement = context->queue;
1545 while (qelement != NULL) {
1546 if (g_mutex_trylock (&qelement->running) == TRUE) {
1547 GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
1548 cond = TRUE;
1549 curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
1550 }
1551 qelement = qelement->next;
1552 }
1553
1554 if (cond != TRUE) {
1555 GSTCURL_WARNING_PRINT ("All curl handles already added for QUEUE_EVENT!");
1556 } else {
1557 GSTCURL_DEBUG_PRINT ("Finished adding all handles, continuing.");
1558 context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
1559 }
1560 g_mutex_unlock (&context->mutex);
1561 } else if (context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
1562 struct timeval timeout;
1563 gint rc;
1564 fd_set fdread, fdwrite, fdexcep;
1565 int maxfd = -1;
1566 long curl_timeo = -1;
1567
1568 /* Because curl can possibly take some time here, be nice and let go of the
1569 * mutex so other threads can perform state/queue operations as we don't
1570 * care about those until the end of this. */
1571 g_mutex_unlock (&context->mutex);
1572
1573 FD_ZERO (&fdread);
1574 FD_ZERO (&fdwrite);
1575 FD_ZERO (&fdexcep);
1576
1577 timeout.tv_sec = 1;
1578 timeout.tv_usec = 0;
1579
1580 curl_multi_timeout (context->multi_handle, &curl_timeo);
1581 if (curl_timeo >= 0) {
1582 timeout.tv_sec = curl_timeo / 1000;
1583 if (timeout.tv_sec > 1) {
1584 timeout.tv_sec = 1;
1585 } else {
1586 timeout.tv_usec = (curl_timeo % 1000) * 1000;
1587 }
1588 }
1589
1590 /* get file descriptors from the transfers */
1591 curl_multi_fdset (context->multi_handle, &fdread, &fdwrite, &fdexcep,
1592 &maxfd);
1593
1594 rc = select (maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
1595
1596 switch (rc) {
1597 case -1:
1598 /* select error */
1599 break;
1600 case 0:
1601 default:
1602 /* timeout or readable/writable sockets */
1603 curl_multi_perform (context->multi_handle, &still_running);
1604 break;
1605 }
1606
1607 /*
1608 * Check the CURL message buffer to find out if any transfers have
1609 * completed. If they have, call the signal_finished function which
1610 * will signal the g_cond_wait call in that calling instance.
1611 */
1612 i = 0;
1613 while (cond != TRUE) {
1614 curl_message = curl_multi_info_read (context->multi_handle, &i);
1615 if (curl_message == NULL) {
1616 cond = TRUE;
1617 } else if (curl_message->msg == CURLMSG_DONE) {
1618 /* A hack, but I have seen curl_message->easy_handle being
1619 * NULL randomly, so check for that. */
1620 g_mutex_lock (&context->mutex);
1621 if (curl_message->easy_handle == NULL) {
1622 break;
1623 }
1624 curl_multi_remove_handle (context->multi_handle,
1625 curl_message->easy_handle);
1626 gst_curl_http_src_remove_queue_handle (&context->queue,
1627 curl_message->easy_handle, curl_message->data.result);
1628 g_mutex_unlock (&context->mutex);
1629 }
1630 }
1631
1632 if (still_running == 0) {
1633 /* We've finished processing, so set the state to wait.
1634 *
1635 * This is a little more complex, as we need to catch the edge
1636 * case of another thread adding a queue item while we've been
1637 * working.
1638 */
1639 g_mutex_lock (&context->mutex);
1640 if ((context->state != GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) &&
1641 (context->state != GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL)) {
1642 context->state = GSTCURL_MULTI_LOOP_STATE_WAIT;
1643 }
1644 g_mutex_unlock (&context->mutex);
1645 }
1646 }
1647 /* Is the following even necessary any more...? */
1648 else if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
1649 g_mutex_unlock (&context->mutex);
1650 /* Something wants us to shut down, so best to do a full cleanup as it
1651 * might be that something's gone bang.
1652 */
1653 /*gst_curl_http_src_unref_multi (NULL, GSTCURL_RETURN_PIPELINE_NULL, TRUE); */
1654 GSTCURL_INFO_PRINT ("Got instruction to shut down");
1655 } else if (context->state == GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL) {
1656 qelement = context->queue;
1657 while (qelement != NULL) {
1658 if (qelement->p == context->request_removal_element) {
1659 g_mutex_lock (&qelement->p->buffer_mutex);
1660 curl_multi_remove_handle (context->multi_handle,
1661 context->request_removal_element->curl_handle);
1662 if (qelement->p->state == GSTCURL_UNLOCK) {
1663 qelement->p->pending_state = GSTCURL_REMOVED;
1664 } else {
1665 qelement->p->state = GSTCURL_REMOVED;
1666 }
1667 g_cond_signal (&qelement->p->signal);
Sam Hurste74b3a02017-07-25 15:23:57 +01001668 g_mutex_unlock (&qelement->p->buffer_mutex);
Sam Hurst85eb69d2017-07-26 17:06:17 +01001669 gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
Sam Hurste74b3a02017-07-25 15:23:57 +01001670 }
Jun Xiee9909952018-02-27 10:51:07 +08001671 qelement = qelement->next;
Sam Hurste74b3a02017-07-25 15:23:57 +01001672 }
1673 context->request_removal_element = NULL;
Jun Xiee9909952018-02-27 10:51:07 +08001674 context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
Sam Hurste74b3a02017-07-25 15:23:57 +01001675 g_mutex_unlock (&context->mutex);
1676 } else {
1677 GSTCURL_WARNING_PRINT ("Curl Loop State was invalid or unsupported");
1678 GSTCURL_WARNING_PRINT ("Signal State is %d, resetting to RUNNING.",
1679 context->state);
1680 /* Reset to running, so if there isn't anything to do it'll be
1681 * changed the WAIT once curl_multi_perform says it has no active
1682 * handles. */
1683 context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
1684 g_mutex_unlock (&context->mutex);
1685 }
1686}
1687
1688/*
1689 * Receive headers from the remote server and put them into the http_headers
1690 * structure to be sent downstream when we've got them all and started receiving
1691 * the body (see ::_handle_response())
1692 */
1693static size_t
1694gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
1695 void *src)
1696{
1697 GstCurlHttpSrc *s = src;
1698 char *substr;
1699
1700 GST_DEBUG_OBJECT (s, "Received header: %s", (char *) header);
1701
1702 g_mutex_lock (&s->buffer_mutex);
1703
1704 if (s->state == GSTCURL_UNLOCK) {
1705 g_mutex_unlock (&s->buffer_mutex);
1706 return size * nmemb;
1707 }
1708
1709 if (s->http_headers == NULL) {
1710 /* Can't do anything here, so just silently swallow the header */
1711 GST_DEBUG_OBJECT (s, "HTTP Headers Structure has already been sent,"
1712 " ignoring header");
1713 g_mutex_unlock (&s->buffer_mutex);
1714 return size * nmemb;
1715 }
1716
1717 substr = gst_curl_http_src_strcasestr (header, "HTTP");
1718 if (substr == header) {
1719 /* We have a status line! */
1720 gchar **status_line_fields;
1721
1722 /* Have we already seen a status line? If so, delete any response headers */
1723 if (s->status_code > 0) {
Sam Hurst85eb69d2017-07-26 17:06:17 +01001724 GstStructure *empty_headers =
1725 gst_structure_new_empty (RESPONSE_HEADERS_NAME);
Sam Hurste74b3a02017-07-25 15:23:57 +01001726 gst_structure_remove_field (s->http_headers, RESPONSE_HEADERS_NAME);
1727 gst_structure_set (s->http_headers, RESPONSE_HEADERS_NAME,
Sam Hurst85eb69d2017-07-26 17:06:17 +01001728 GST_TYPE_STRUCTURE, empty_headers, NULL);
1729 gst_structure_free (empty_headers);
1730
Sam Hurste74b3a02017-07-25 15:23:57 +01001731 }
1732
1733 /* Process the status line */
1734 status_line_fields = g_strsplit ((gchar *) header, " ", 3);
1735 if (status_line_fields == NULL) {
1736 GST_ERROR_OBJECT (s, "Status line processing failed!");
1737 } else {
1738 s->status_code =
1739 (guint) g_ascii_strtoll (status_line_fields[1], NULL, 10);
1740 GST_INFO_OBJECT (s, "Received status %u for request for URI %s: %s",
1741 s->status_code, s->uri, status_line_fields[2]);
1742 gst_structure_set (s->http_headers, HTTP_STATUS_CODE,
1743 G_TYPE_UINT, s->status_code, NULL);
1744 g_strfreev (status_line_fields);
1745 }
1746 } else {
1747 /* Normal header line */
1748 gchar **header_tpl = g_strsplit ((gchar *) header, ": ", 2);
1749 if (header_tpl == NULL) {
1750 GST_ERROR_OBJECT (s, "Header processing failed! (%s)", (gchar *) header);
1751 } else {
1752 const GValue *gv_resp_hdrs = gst_structure_get_value (s->http_headers,
1753 RESPONSE_HEADERS_NAME);
1754 const GstStructure *response_headers =
1755 gst_value_get_structure (gv_resp_hdrs);
1756 /* Store header key lower case (g_ascii_strdown), makes searching through
1757 * later on easier - end applications shouldn't care, as all HTTP headers
1758 * are case-insensitive */
1759 gchar *header_key = g_ascii_strdown (header_tpl[0], -1);
1760 gchar *header_value;
1761
1762 /* If header field already exists, append to the end */
1763 if (gst_structure_has_field (response_headers, header_key) == TRUE) {
1764 header_value = g_strdup_printf ("%s, %s",
1765 g_value_get_string (gst_structure_get_value (response_headers,
1766 header_key)), header_tpl[1]);
1767 gst_structure_set ((GstStructure *) response_headers, header_key,
1768 G_TYPE_STRING, header_value, NULL);
1769 g_free (header_value);
1770 } else {
1771 header_value = header_tpl[1];
1772 gst_structure_set ((GstStructure *) response_headers, header_key,
1773 G_TYPE_STRING, header_value, NULL);
1774 }
1775
1776 /* We have some special cases - deal with them here */
1777 if (g_strcmp0 (header_key, "content-type") == 0) {
1778 gst_curl_http_src_negotiate_caps (src);
1779 }
1780
1781 g_free (header_key);
1782 g_strfreev (header_tpl);
1783 }
1784 }
1785
1786 s->hdrs_updated = TRUE;
1787
1788 g_mutex_unlock (&s->buffer_mutex);
1789
1790 return size * nmemb;
1791}
1792
1793/*
1794 * My own quick and dirty implementation of strcasestr. This is a GNU extension
1795 * (i.e. not portable) and not always guaranteed to be available.
1796 *
1797 * I know this doesn't work if the haystack and needle are the same size. But
1798 * this isn't necessarily a bad thing, as the only place we currently use this
1799 * is at a point where returning nothing even if a string match occurs but the
1800 * needle is the same size as the haystack actually saves us time.
1801 */
1802static char *
1803gst_curl_http_src_strcasestr (const char *haystack, const char *needle)
1804{
1805 int i, j, needle_len;
1806 char *location;
1807
1808 needle_len = (int) strlen (needle);
1809 i = 0;
1810 j = 0;
1811 location = NULL;
1812
1813 while (haystack[i] != '\0') {
1814 if (j == needle_len) {
1815 location = (char *) haystack + (i - j);
1816 }
1817 if (tolower (haystack[i]) == tolower (needle[j])) {
1818 j++;
1819 } else {
1820 j = 0;
1821 }
1822 i++;
1823 }
1824
1825 return location;
1826}
1827
1828/*
1829 * Receive chunks of the requested body and pass these back to the ::create()
1830 * loop
1831 */
1832static size_t
1833gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
1834{
1835 GstCurlHttpSrc *s = src;
1836 size_t chunk_len = size * nmemb;
1837 GST_TRACE_OBJECT (s,
1838 "Received curl chunk for URI %s of size %d", s->uri, (int) chunk_len);
1839 g_mutex_lock (&s->buffer_mutex);
1840 if (s->state == GSTCURL_UNLOCK) {
1841 g_mutex_unlock (&s->buffer_mutex);
1842 return chunk_len;
1843 }
1844 s->buffer =
1845 g_realloc (s->buffer, (s->buffer_len + chunk_len + 1) * sizeof (char));
1846 if (s->buffer == NULL) {
1847 GST_ERROR_OBJECT (s, "Realloc for cURL response message failed!\n");
1848 return 0;
1849 }
1850 memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
1851 s->buffer_len += chunk_len;
1852 g_cond_signal (&s->signal);
1853 g_mutex_unlock (&s->buffer_mutex);
1854 return chunk_len;
1855}
1856
1857/*
1858 * Request a cancellation of a currently running curl handle.
1859 */
1860static void
1861gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
1862{
1863 GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
1864 GST_TYPE_CURL_HTTP_SRC,
1865 GstCurlHttpSrcClass);
1866 g_mutex_lock (&klass->multi_task_context.mutex);
1867
1868 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL;
1869 klass->multi_task_context.request_removal_element = src;
1870 g_cond_signal (&klass->multi_task_context.signal);
1871 g_mutex_unlock (&klass->multi_task_context.mutex);
1872}