blob: cf601034e2460f662105eca55fec55f9917206b9 [file] [log] [blame]
/* GStreamer
* Copyright (C) 2007 David Schleef <ds@schleef.org>
* (C) 2008 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:gstappsrc
* @title: GstAppSrc
* @short_description: Easy way for applications to inject buffers into a
* pipeline
* @see_also: #GstBaseSrc, appsink
*
* The appsrc element can be used by applications to insert data into a
* GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
* external API functions.
*
* appsrc can be used by linking with the libgstapp library to access the
* methods directly or by using the appsrc action signals.
*
* Before operating appsrc, the caps property must be set to fixed caps
* describing the format of the data that will be pushed with appsrc. An
* exception to this is when pushing buffers with unknown caps, in which case no
* caps should be set. This is typically true of file-like sources that push raw
* byte buffers. If you don't want to explicitly set the caps, you can use
* gst_app_src_push_sample. This method gets the caps associated with the
* sample and sets them on the appsrc replacing any previously set caps (if
* different from sample's caps).
*
* The main way of handing data to the appsrc element is by calling the
* gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
* This will put the buffer onto a queue from which appsrc will read from in its
* streaming thread. It is important to note that data transport will not happen
* from the thread that performed the push-buffer call.
*
* The "max-bytes" property controls how much data can be queued in appsrc
* before appsrc considers the queue full. A filled internal queue will always
* signal the "enough-data" signal, which signals the application that it should
* stop pushing data into appsrc. The "block" property will cause appsrc to
* block the push-buffer method until free data becomes available again.
*
* When the internal queue is running out of data, the "need-data" signal is
* emitted, which signals the application that it should start pushing more data
* into appsrc.
*
* In addition to the "need-data" and "enough-data" signals, appsrc can emit the
* "seek-data" signal when the "stream-mode" property is set to "seekable" or
* "random-access". The signal argument will contain the new desired position in
* the stream expressed in the unit set with the "format" property. After
* receiving the seek-data signal, the application should push-buffers from the
* new position.
*
* These signals allow the application to operate the appsrc in two different
* ways:
*
* The push mode, in which the application repeatedly calls the push-buffer/push-sample
* method with a new buffer/sample. Optionally, the queue size in the appsrc
* can be controlled with the enough-data and need-data signals by respectively
* stopping/starting the push-buffer/push-sample calls. This is a typical
* mode of operation for the stream-type "stream" and "seekable". Use this
* mode when implementing various network protocols or hardware devices.
*
* The pull mode, in which the need-data signal triggers the next push-buffer call.
* This mode is typically used in the "random-access" stream-type. Use this
* mode for file access or other randomly accessable sources. In this mode, a
* buffer of exactly the amount of bytes given by the need-data signal should be
* pushed into appsrc.
*
* In all modes, the size property on appsrc should contain the total stream
* size in bytes. Setting this property is mandatory in the random-access mode.
* For the stream and seekable modes, setting this property is optional but
* recommended.
*
* When the application has finished pushing data into appsrc, it should call
* gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
* this call, no more buffers can be pushed into appsrc until a flushing seek
* occurs or the state of the appsrc has gone through READY.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <gst/base/base.h>
#include <string.h>
#include "gstappsrc.h"
typedef enum
{
NOONE_WAITING = 0,
STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
} GstAppSrcWaitStatus;
struct _GstAppSrcPrivate
{
GCond cond;
GMutex mutex;
GstQueueArray *queue;
GstAppSrcWaitStatus wait_status;
GstCaps *last_caps;
GstCaps *current_caps;
gint64 size;
GstClockTime duration;
GstAppStreamType stream_type;
guint64 max_bytes;
GstFormat format;
gboolean block;
gchar *uri;
gboolean flushing;
gboolean started;
gboolean is_eos;
guint64 queued_bytes;
guint64 offset;
GstAppStreamType current_type;
guint64 min_latency;
guint64 max_latency;
gboolean emit_signals;
guint min_percent;
GstAppSrcCallbacks callbacks;
gpointer user_data;
GDestroyNotify notify;
};
GST_DEBUG_CATEGORY_STATIC (app_src_debug);
#define GST_CAT_DEFAULT app_src_debug
enum
{
/* signals */
SIGNAL_NEED_DATA,
SIGNAL_ENOUGH_DATA,
SIGNAL_SEEK_DATA,
/* actions */
SIGNAL_PUSH_BUFFER,
SIGNAL_END_OF_STREAM,
SIGNAL_PUSH_SAMPLE,
SIGNAL_PUSH_BUFFER_LIST,
LAST_SIGNAL
};
#define DEFAULT_PROP_SIZE -1
#define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
#define DEFAULT_PROP_MAX_BYTES 200000
#define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
#define DEFAULT_PROP_BLOCK FALSE
#define DEFAULT_PROP_IS_LIVE FALSE
#define DEFAULT_PROP_MIN_LATENCY -1
#define DEFAULT_PROP_MAX_LATENCY -1
#define DEFAULT_PROP_EMIT_SIGNALS TRUE
#define DEFAULT_PROP_MIN_PERCENT 0
#define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0
#define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
enum
{
PROP_0,
PROP_CAPS,
PROP_SIZE,
PROP_STREAM_TYPE,
PROP_MAX_BYTES,
PROP_FORMAT,
PROP_BLOCK,
PROP_IS_LIVE,
PROP_MIN_LATENCY,
PROP_MAX_LATENCY,
PROP_EMIT_SIGNALS,
PROP_MIN_PERCENT,
PROP_CURRENT_LEVEL_BYTES,
PROP_DURATION,
PROP_LAST
};
static GstStaticPadTemplate gst_app_src_template =
GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
static void gst_app_src_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static void gst_app_src_dispose (GObject * object);
static void gst_app_src_finalize (GObject * object);
static void gst_app_src_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_app_src_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
static void gst_app_src_set_latencies (GstAppSrc * appsrc,
gboolean do_min, guint64 min, gboolean do_max, guint64 max);
static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
GstCaps * filter);
static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
guint size, GstBuffer ** buf);
static gboolean gst_app_src_start (GstBaseSrc * bsrc);
static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
GstBuffer * buffer);
static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
GstBufferList * buffer_list);
static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
GstSample * sample);
static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
#define gst_app_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
static void
gst_app_src_class_init (GstAppSrcClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstElementClass *element_class = (GstElementClass *) klass;
GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
gobject_class->dispose = gst_app_src_dispose;
gobject_class->finalize = gst_app_src_finalize;
gobject_class->set_property = gst_app_src_set_property;
gobject_class->get_property = gst_app_src_get_property;
/**
* GstAppSrc::caps:
*
* The GstCaps that will negotiated downstream and will be put
* on outgoing buffers.
*/
g_object_class_install_property (gobject_class, PROP_CAPS,
g_param_spec_boxed ("caps", "Caps",
"The allowed caps for the src pad", GST_TYPE_CAPS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::format:
*
* The format to use for segment events. When the source is producing
* timestamped buffers this property should be set to GST_FORMAT_TIME.
*/
g_object_class_install_property (gobject_class, PROP_FORMAT,
g_param_spec_enum ("format", "Format",
"The format of the segment events and seek", GST_TYPE_FORMAT,
DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::size:
*
* The total size in bytes of the data stream. If the total size is known, it
* is recommended to configure it with this property.
*/
g_object_class_install_property (gobject_class, PROP_SIZE,
g_param_spec_int64 ("size", "Size",
"The size of the data stream in bytes (-1 if unknown)",
-1, G_MAXINT64, DEFAULT_PROP_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::stream-type:
*
* The type of stream that this source is producing. For seekable streams the
* application should connect to the seek-data signal.
*/
g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
g_param_spec_enum ("stream-type", "Stream Type",
"the type of the stream", GST_TYPE_APP_STREAM_TYPE,
DEFAULT_PROP_STREAM_TYPE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::max-bytes:
*
* The maximum amount of bytes that can be queued internally.
* After the maximum amount of bytes are queued, appsrc will emit the
* "enough-data" signal.
*/
g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
g_param_spec_uint64 ("max-bytes", "Max bytes",
"The maximum number of bytes to queue internally (0 = unlimited)",
0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::block:
*
* When max-bytes are queued and after the enough-data signal has been emitted,
* block any further push-buffer calls until the amount of queued bytes drops
* below the max-bytes limit.
*/
g_object_class_install_property (gobject_class, PROP_BLOCK,
g_param_spec_boolean ("block", "Block",
"Block push-buffer when max-bytes are queued",
DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::is-live:
*
* Instruct the source to behave like a live source. This includes that it
* will only push out buffers in the PLAYING state.
*/
g_object_class_install_property (gobject_class, PROP_IS_LIVE,
g_param_spec_boolean ("is-live", "Is Live",
"Whether to act as a live source",
DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::min-latency:
*
* The minimum latency of the source. A value of -1 will use the default
* latency calculations of #GstBaseSrc.
*/
g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
g_param_spec_int64 ("min-latency", "Min Latency",
"The minimum latency (-1 = default)",
-1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::max-latency:
*
* The maximum latency of the source. A value of -1 means an unlimited amout
* of latency.
*/
g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
g_param_spec_int64 ("max-latency", "Max Latency",
"The maximum latency (-1 = unlimited)",
-1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::emit-signals:
*
* Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
* This option is by default enabled for backwards compatibility reasons but
* can disabled when needed because signal emission is expensive.
*/
g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
g_param_spec_boolean ("emit-signals", "Emit signals",
"Emit need-data, enough-data and seek-data signals",
DEFAULT_PROP_EMIT_SIGNALS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::empty-percent:
*
* Make appsrc emit the "need-data" signal when the amount of bytes in the
* queue drops below this percentage of max-bytes.
*/
g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
g_param_spec_uint ("min-percent", "Min Percent",
"Emit need-data when queued bytes drops below this percent of max-bytes",
0, 100, DEFAULT_PROP_MIN_PERCENT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::current-level-bytes:
*
* The number of currently queued bytes inside appsrc.
*
* Since: 1.2
*/
g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
"The number of currently queued bytes",
0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::duration:
*
* The total duration in nanoseconds of the data stream. If the total duration is known, it
* is recommended to configure it with this property.
*
* Since: 1.10
*/
g_object_class_install_property (gobject_class, PROP_DURATION,
g_param_spec_uint64 ("duration", "Duration",
"The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
0, G_MAXUINT64, DEFAULT_PROP_DURATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::need-data:
* @appsrc: the appsrc element that emitted the signal
* @length: the amount of bytes needed.
*
* Signal that the source needs more data. In the callback or from another
* thread you should call push-buffer or end-of-stream.
*
* @length is just a hint and when it is set to -1, any number of bytes can be
* pushed into @appsrc.
*
* You can call push-buffer multiple times until the enough-data signal is
* fired.
*/
gst_app_src_signals[SIGNAL_NEED_DATA] =
g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, need_data),
NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstAppSrc::enough-data:
* @appsrc: the appsrc element that emitted the signal
*
* Signal that the source has enough data. It is recommended that the
* application stops calling push-buffer until the need-data signal is
* emitted again to avoid excessive buffer queueing.
*/
gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstAppSrc::seek-data:
* @appsrc: the appsrc element that emitted the signal
* @offset: the offset to seek to
*
* Seek to the given offset. The next push-buffer should produce buffers from
* the new @offset.
* This callback is only called for seekable stream types.
*
* Returns: %TRUE if the seek succeeded.
*/
gst_app_src_signals[SIGNAL_SEEK_DATA] =
g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
/**
* GstAppSrc::push-buffer:
* @appsrc: the appsrc
* @buffer: a buffer to push
*
* Adds a buffer to the queue of buffers that the appsrc element will
* push to its source pad. This function does not take ownership of the
* buffer so the buffer needs to be unreffed after calling this function.
*
* When the block property is TRUE, this function can block until free space
* becomes available in the queue.
*/
gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
push_buffer), NULL, NULL, NULL,
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
/**
* GstAppSrc::push-buffer-list:
* @appsrc: the appsrc
* @buffer_list: a buffer list to push
*
* Adds a buffer list to the queue of buffers and buffer lists that the
* appsrc element will push to its source pad. This function does not take
* ownership of the buffer list so the buffer list needs to be unreffed
* after calling this function.
*
* When the block property is TRUE, this function can block until free space
* becomes available in the queue.
*
* Since: 1.14
*/
gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
push_buffer_list), NULL, NULL, NULL,
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
/**
* GstAppSrc::push-sample:
* @appsrc: the appsrc
* @sample: a sample from which extract buffer to push
*
* Extract a buffer from the provided sample and adds the extracted buffer
* to the queue of buffers that the appsrc element will
* push to its source pad. This function set the appsrc caps based on the caps
* in the sample and reset the caps if they change.
* Only the caps and the buffer of the provided sample are used and not
* for example the segment in the sample.
* This function does not take ownership of the
* sample so the sample needs to be unreffed after calling this function.
*
* When the block property is TRUE, this function can block until free space
* becomes available in the queue.
*
* Since: 1.6
*
*/
gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
push_sample), NULL, NULL, NULL,
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
/**
* GstAppSrc::end-of-stream:
* @appsrc: the appsrc
*
* Notify @appsrc that no more buffer are available.
*/
gst_app_src_signals[SIGNAL_END_OF_STREAM] =
g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
end_of_stream), NULL, NULL, NULL,
GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
gst_element_class_set_static_metadata (element_class, "AppSrc",
"Generic/Source", "Allow the application to feed buffers to a pipeline",
"David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
gst_element_class_add_static_pad_template (element_class,
&gst_app_src_template);
element_class->send_event = gst_app_src_send_event;
basesrc_class->negotiate = gst_app_src_negotiate;
basesrc_class->get_caps = gst_app_src_internal_get_caps;
basesrc_class->create = gst_app_src_create;
basesrc_class->start = gst_app_src_start;
basesrc_class->stop = gst_app_src_stop;
basesrc_class->unlock = gst_app_src_unlock;
basesrc_class->unlock_stop = gst_app_src_unlock_stop;
basesrc_class->do_seek = gst_app_src_do_seek;
basesrc_class->is_seekable = gst_app_src_is_seekable;
basesrc_class->get_size = gst_app_src_do_get_size;
basesrc_class->query = gst_app_src_query;
basesrc_class->event = gst_app_src_event;
klass->push_buffer = gst_app_src_push_buffer_action;
klass->push_buffer_list = gst_app_src_push_buffer_list_action;
klass->push_sample = gst_app_src_push_sample_action;
klass->end_of_stream = gst_app_src_end_of_stream;
g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
}
static void
gst_app_src_init (GstAppSrc * appsrc)
{
GstAppSrcPrivate *priv;
priv = appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
GstAppSrcPrivate);
g_mutex_init (&priv->mutex);
g_cond_init (&priv->cond);
priv->queue = gst_queue_array_new (16);
priv->wait_status = NOONE_WAITING;
priv->size = DEFAULT_PROP_SIZE;
priv->duration = DEFAULT_PROP_DURATION;
priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
priv->format = DEFAULT_PROP_FORMAT;
priv->block = DEFAULT_PROP_BLOCK;
priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
}
/* Must be called with priv->mutex */
static void
gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
{
GstMiniObject *obj;
GstAppSrcPrivate *priv = src->priv;
GstCaps *requeue_caps = NULL;
while (!gst_queue_array_is_empty (priv->queue)) {
obj = gst_queue_array_pop_head (priv->queue);
if (obj) {
if (GST_IS_CAPS (obj) && retain_last_caps) {
gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
}
gst_mini_object_unref (obj);
}
}
if (requeue_caps) {
gst_queue_array_push_tail (priv->queue, requeue_caps);
}
priv->queued_bytes = 0;
}
static void
gst_app_src_dispose (GObject * obj)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
GstAppSrcPrivate *priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
if (priv->current_caps) {
gst_caps_unref (priv->current_caps);
priv->current_caps = NULL;
}
if (priv->last_caps) {
gst_caps_unref (priv->last_caps);
priv->last_caps = NULL;
}
if (priv->notify) {
priv->notify (priv->user_data);
}
priv->user_data = NULL;
priv->notify = NULL;
GST_OBJECT_UNLOCK (appsrc);
g_mutex_lock (&priv->mutex);
gst_app_src_flush_queued (appsrc, FALSE);
g_mutex_unlock (&priv->mutex);
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
static void
gst_app_src_finalize (GObject * obj)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
GstAppSrcPrivate *priv = appsrc->priv;
g_mutex_clear (&priv->mutex);
g_cond_clear (&priv->cond);
gst_queue_array_free (priv->queue);
g_free (priv->uri);
G_OBJECT_CLASS (parent_class)->finalize (obj);
}
static GstCaps *
gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
{
GstAppSrc *appsrc = GST_APP_SRC (bsrc);
GstCaps *caps;
GST_OBJECT_LOCK (appsrc);
if ((caps = appsrc->priv->current_caps))
gst_caps_ref (caps);
GST_OBJECT_UNLOCK (appsrc);
if (filter) {
if (caps) {
GstCaps *intersection =
gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
gst_caps_unref (caps);
caps = intersection;
} else {
caps = gst_caps_ref (filter);
}
}
GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
return caps;
}
static void
gst_app_src_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
GstAppSrcPrivate *priv = appsrc->priv;
switch (prop_id) {
case PROP_CAPS:
gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
break;
case PROP_SIZE:
gst_app_src_set_size (appsrc, g_value_get_int64 (value));
break;
case PROP_STREAM_TYPE:
gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
break;
case PROP_MAX_BYTES:
gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
break;
case PROP_FORMAT:
priv->format = g_value_get_enum (value);
break;
case PROP_BLOCK:
priv->block = g_value_get_boolean (value);
break;
case PROP_IS_LIVE:
gst_base_src_set_live (GST_BASE_SRC (appsrc),
g_value_get_boolean (value));
break;
case PROP_MIN_LATENCY:
gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
FALSE, -1);
break;
case PROP_MAX_LATENCY:
gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
g_value_get_int64 (value));
break;
case PROP_EMIT_SIGNALS:
gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
break;
case PROP_MIN_PERCENT:
priv->min_percent = g_value_get_uint (value);
break;
case PROP_DURATION:
gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
GstAppSrcPrivate *priv = appsrc->priv;
switch (prop_id) {
case PROP_CAPS:
g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
break;
case PROP_SIZE:
g_value_set_int64 (value, gst_app_src_get_size (appsrc));
break;
case PROP_STREAM_TYPE:
g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
break;
case PROP_MAX_BYTES:
g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
break;
case PROP_FORMAT:
g_value_set_enum (value, priv->format);
break;
case PROP_BLOCK:
g_value_set_boolean (value, priv->block);
break;
case PROP_IS_LIVE:
g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
break;
case PROP_MIN_LATENCY:
{
guint64 min = 0;
gst_app_src_get_latency (appsrc, &min, NULL);
g_value_set_int64 (value, min);
break;
}
case PROP_MAX_LATENCY:
{
guint64 max = 0;
gst_app_src_get_latency (appsrc, NULL, &max);
g_value_set_int64 (value, max);
break;
}
case PROP_EMIT_SIGNALS:
g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
break;
case PROP_MIN_PERCENT:
g_value_set_uint (value, priv->min_percent);
break;
case PROP_CURRENT_LEVEL_BYTES:
g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
break;
case PROP_DURATION:
g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gboolean
gst_app_src_send_event (GstElement * element, GstEvent * event)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
GstAppSrcPrivate *priv = appsrc->priv;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
g_mutex_lock (&priv->mutex);
gst_app_src_flush_queued (appsrc, TRUE);
g_mutex_unlock (&priv->mutex);
break;
default:
break;
}
return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
event), FALSE);
}
static gboolean
gst_app_src_unlock (GstBaseSrc * bsrc)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
GstAppSrcPrivate *priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc, "unlock start");
priv->flushing = TRUE;
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex);
return TRUE;
}
static gboolean
gst_app_src_unlock_stop (GstBaseSrc * bsrc)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
GstAppSrcPrivate *priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc, "unlock stop");
priv->flushing = FALSE;
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex);
return TRUE;
}
static gboolean
gst_app_src_start (GstBaseSrc * bsrc)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
GstAppSrcPrivate *priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc, "starting");
priv->started = TRUE;
/* set the offset to -1 so that we always do a first seek. This is only used
* in random-access mode. */
priv->offset = -1;
priv->flushing = FALSE;
g_mutex_unlock (&priv->mutex);
gst_base_src_set_format (bsrc, priv->format);
return TRUE;
}
static gboolean
gst_app_src_stop (GstBaseSrc * bsrc)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
GstAppSrcPrivate *priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc, "stopping");
priv->is_eos = FALSE;
priv->flushing = TRUE;
priv->started = FALSE;
gst_app_src_flush_queued (appsrc, TRUE);
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex);
return TRUE;
}
static gboolean
gst_app_src_is_seekable (GstBaseSrc * src)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
GstAppSrcPrivate *priv = appsrc->priv;
gboolean res = FALSE;
switch (priv->stream_type) {
case GST_APP_STREAM_TYPE_STREAM:
break;
case GST_APP_STREAM_TYPE_SEEKABLE:
case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
res = TRUE;
break;
}
return res;
}
static gboolean
gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
*size = gst_app_src_get_size (appsrc);
return TRUE;
}
static gboolean
gst_app_src_query (GstBaseSrc * src, GstQuery * query)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
GstAppSrcPrivate *priv = appsrc->priv;
gboolean res;
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_LATENCY:
{
GstClockTime min, max;
gboolean live;
/* Query the parent class for the defaults */
res = gst_base_src_query_latency (src, &live, &min, &max);
/* overwrite with our values when we need to */
g_mutex_lock (&priv->mutex);
if (priv->min_latency != -1) {
min = priv->min_latency;
max = priv->max_latency;
}
g_mutex_unlock (&priv->mutex);
gst_query_set_latency (query, live, min, max);
break;
}
case GST_QUERY_SCHEDULING:
{
gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
switch (priv->stream_type) {
case GST_APP_STREAM_TYPE_STREAM:
case GST_APP_STREAM_TYPE_SEEKABLE:
break;
case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
break;
}
res = TRUE;
break;
}
case GST_QUERY_DURATION:
{
GstFormat format;
gst_query_parse_duration (query, &format, NULL);
if (format == GST_FORMAT_BYTES) {
gst_query_set_duration (query, format, priv->size);
res = TRUE;
} else if (format == GST_FORMAT_TIME) {
if (priv->duration != GST_CLOCK_TIME_NONE) {
gst_query_set_duration (query, format, priv->duration);
res = TRUE;
} else {
res = FALSE;
}
} else {
res = FALSE;
}
break;
}
default:
res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
break;
}
return res;
}
/* will be called in push mode */
static gboolean
gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
GstAppSrcPrivate *priv = appsrc->priv;
gint64 desired_position;
gboolean res = FALSE;
desired_position = segment->position;
/* no need to try to seek in streaming mode */
if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
return TRUE;
GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
desired_position, gst_format_get_name (segment->format));
if (priv->callbacks.seek_data)
res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
else {
gboolean emit;
g_mutex_lock (&priv->mutex);
emit = priv->emit_signals;
g_mutex_unlock (&priv->mutex);
if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
desired_position, &res);
}
if (res) {
GST_DEBUG_OBJECT (appsrc, "flushing queue");
g_mutex_lock (&priv->mutex);
gst_app_src_flush_queued (appsrc, TRUE);
g_mutex_unlock (&priv->mutex);
priv->is_eos = FALSE;
} else {
GST_WARNING_OBJECT (appsrc, "seek failed");
}
return res;
}
/* must be called with the appsrc mutex */
static gboolean
gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
{
gboolean res = FALSE;
gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv;
emit = priv->emit_signals;
g_mutex_unlock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc,
"we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
priv->offset, offset);
if (priv->callbacks.seek_data)
res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
offset, &res);
g_mutex_lock (&priv->mutex);
return res;
}
/* must be called with the appsrc mutex. After this call things can be
* flushing */
static void
gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
{
gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv;
emit = priv->emit_signals;
g_mutex_unlock (&priv->mutex);
/* we have no data, we need some. We fire the signal with the size hint. */
if (priv->callbacks.need_data)
priv->callbacks.need_data (appsrc, size, priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
NULL);
g_mutex_lock (&priv->mutex);
/* we can be flushing now because we released the lock */
}
/* must be called with the appsrc mutex */
static gboolean
gst_app_src_do_negotiate (GstBaseSrc * basesrc)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
GstAppSrcPrivate *priv = appsrc->priv;
gboolean result;
GstCaps *caps;
GST_OBJECT_LOCK (basesrc);
caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
GST_OBJECT_UNLOCK (basesrc);
/* Avoid deadlock by unlocking mutex
* otherwise we get deadlock between this and stream lock */
g_mutex_unlock (&priv->mutex);
if (caps) {
result = gst_base_src_set_caps (basesrc, caps);
gst_caps_unref (caps);
} else {
result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
}
g_mutex_lock (&priv->mutex);
return result;
}
static gboolean
gst_app_src_negotiate (GstBaseSrc * basesrc)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
GstAppSrcPrivate *priv = appsrc->priv;
gboolean result;
g_mutex_lock (&priv->mutex);
result = gst_app_src_do_negotiate (basesrc);
g_mutex_unlock (&priv->mutex);
return result;
}
static GstFlowReturn
gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
GstBuffer ** buf)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
GstAppSrcPrivate *priv = appsrc->priv;
GstFlowReturn ret;
GST_OBJECT_LOCK (appsrc);
if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
bsrc->segment.format == GST_FORMAT_BYTES)) {
GST_DEBUG_OBJECT (appsrc,
"Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
bsrc->segment.duration, priv->size);
bsrc->segment.duration = priv->size;
GST_OBJECT_UNLOCK (appsrc);
gst_element_post_message (GST_ELEMENT (appsrc),
gst_message_new_duration_changed (GST_OBJECT (appsrc)));
} else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
bsrc->segment.format == GST_FORMAT_TIME)) {
GST_DEBUG_OBJECT (appsrc,
"Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
bsrc->segment.duration = priv->duration;
GST_OBJECT_UNLOCK (appsrc);
gst_element_post_message (GST_ELEMENT (appsrc),
gst_message_new_duration_changed (GST_OBJECT (appsrc)));
} else {
GST_OBJECT_UNLOCK (appsrc);
}
g_mutex_lock (&priv->mutex);
/* check flushing first */
if (G_UNLIKELY (priv->flushing))
goto flushing;
if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
/* if we are dealing with a random-access stream, issue a seek if the offset
* changed. */
if (G_UNLIKELY (priv->offset != offset)) {
gboolean res;
/* do the seek */
res = gst_app_src_emit_seek (appsrc, offset);
if (G_UNLIKELY (!res))
/* failing to seek is fatal */
goto seek_error;
priv->offset = offset;
priv->is_eos = FALSE;
}
}
while (TRUE) {
/* return data as long as we have some */
if (!gst_queue_array_is_empty (priv->queue)) {
guint buf_size;
GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
if (GST_IS_CAPS (obj)) {
GstCaps *next_caps = GST_CAPS (obj);
gboolean caps_changed = TRUE;
if (next_caps && priv->current_caps)
caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
else
caps_changed = (next_caps != priv->current_caps);
gst_caps_replace (&priv->current_caps, next_caps);
if (next_caps) {
gst_caps_unref (next_caps);
}
if (caps_changed)
gst_app_src_do_negotiate (bsrc);
/* Lock has released so now may need
*- flushing
*- new caps change
*- check queue has data */
if (G_UNLIKELY (priv->flushing))
goto flushing;
/* Continue checks caps and queue */
continue;
}
if (GST_IS_BUFFER (obj)) {
*buf = GST_BUFFER (obj);
buf_size = gst_buffer_get_size (*buf);
GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", *buf, buf_size);
} else {
GstBufferList *buffer_list;
g_assert (GST_IS_BUFFER_LIST (obj));
buffer_list = GST_BUFFER_LIST (obj);
buf_size = gst_buffer_list_calculate_size (buffer_list);
GST_LOG_OBJECT (appsrc, "have buffer list %p of size %u, %u buffers",
buffer_list, buf_size, gst_buffer_list_length (buffer_list));
gst_base_src_submit_buffer_list (bsrc, buffer_list);
*buf = NULL;
}
priv->queued_bytes -= buf_size;
/* only update the offset when in random_access mode */
if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
priv->offset += buf_size;
/* signal that we removed an item */
if ((priv->wait_status & APP_WAITING))
g_cond_broadcast (&priv->cond);
/* see if we go lower than the empty-percent */
if (priv->min_percent && priv->max_bytes) {
if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
/* ignore flushing state, we got a buffer and we will return it now.
* Errors will be handled in the next round */
gst_app_src_emit_need_data (appsrc, size);
}
ret = GST_FLOW_OK;
break;
} else {
gst_app_src_emit_need_data (appsrc, size);
/* we can be flushing now because we released the lock above */
if (G_UNLIKELY (priv->flushing))
goto flushing;
/* if we have a buffer now, continue the loop and try to return it. In
* random-access mode (where a buffer is normally pushed in the above
* signal) we can still be empty because the pushed buffer got flushed or
* when the application pushes the requested buffer later, we support both
* possibilities. */
if (!gst_queue_array_is_empty (priv->queue))
continue;
/* no buffer yet, maybe we are EOS, if not, block for more data. */
}
/* check EOS */
if (G_UNLIKELY (priv->is_eos))
goto eos;
/* nothing to return, wait a while for new data or flushing. */
priv->wait_status |= STREAM_WAITING;
g_cond_wait (&priv->cond, &priv->mutex);
priv->wait_status &= ~STREAM_WAITING;
}
g_mutex_unlock (&priv->mutex);
return ret;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (appsrc, "we are flushing");
g_mutex_unlock (&priv->mutex);
return GST_FLOW_FLUSHING;
}
eos:
{
GST_DEBUG_OBJECT (appsrc, "we are EOS");
g_mutex_unlock (&priv->mutex);
return GST_FLOW_EOS;
}
seek_error:
{
g_mutex_unlock (&priv->mutex);
GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
GST_ERROR_SYSTEM);
return GST_FLOW_ERROR;
}
}
/* external API */
/**
* gst_app_src_set_caps:
* @appsrc: a #GstAppSrc
* @caps: caps to set
*
* Set the capabilities on the appsrc element. This function takes
* a copy of the caps structure. After calling this method, the source will
* only produce caps that match @caps. @caps must be fixed and the caps on the
* buffers must match the caps or left NULL.
*/
void
gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
{
GstAppSrcPrivate *priv;
gboolean caps_changed;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
GST_OBJECT_LOCK (appsrc);
if (caps && priv->last_caps)
caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
else
caps_changed = (caps != priv->last_caps);
if (caps_changed) {
GstCaps *new_caps;
gpointer t;
new_caps = caps ? gst_caps_copy (caps) : NULL;
GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
}
gst_queue_array_push_tail (priv->queue, new_caps);
gst_caps_replace (&priv->last_caps, new_caps);
}
GST_OBJECT_UNLOCK (appsrc);
g_mutex_unlock (&priv->mutex);
}
/**
* gst_app_src_get_caps:
* @appsrc: a #GstAppSrc
*
* Get the configured caps on @appsrc.
*
* Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
*/
GstCaps *
gst_app_src_get_caps (GstAppSrc * appsrc)
{
GstCaps *caps;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
GST_OBJECT_LOCK (appsrc);
if ((caps = appsrc->priv->last_caps))
gst_caps_ref (caps);
GST_OBJECT_UNLOCK (appsrc);
return caps;
}
/**
* gst_app_src_set_size:
* @appsrc: a #GstAppSrc
* @size: the size to set
*
* Set the size of the stream in bytes. A value of -1 means that the size is
* not known.
*/
void
gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
{
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
priv->size = size;
GST_OBJECT_UNLOCK (appsrc);
}
/**
* gst_app_src_get_size:
* @appsrc: a #GstAppSrc
*
* Get the size of the stream in bytes. A value of -1 means that the size is
* not known.
*
* Returns: the size of the stream previously set with gst_app_src_set_size();
*/
gint64
gst_app_src_get_size (GstAppSrc * appsrc)
{
gint64 size;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
size = priv->size;
GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
GST_OBJECT_UNLOCK (appsrc);
return size;
}
/**
* gst_app_src_set_duration:
* @appsrc: a #GstAppSrc
* @duration: the duration to set
*
* Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
* not known.
*
* Since: 1.10
*/
void
gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
{
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
GST_TIME_ARGS (duration));
priv->duration = duration;
GST_OBJECT_UNLOCK (appsrc);
}
/**
* gst_app_src_get_duration:
* @appsrc: a #GstAppSrc
*
* Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
* not known.
*
* Returns: the duration of the stream previously set with gst_app_src_set_duration();
*
* Since: 1.10
*/
GstClockTime
gst_app_src_get_duration (GstAppSrc * appsrc)
{
GstClockTime duration;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
duration = priv->duration;
GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
GST_TIME_ARGS (duration));
GST_OBJECT_UNLOCK (appsrc);
return duration;
}
/**
* gst_app_src_set_stream_type:
* @appsrc: a #GstAppSrc
* @type: the new state
*
* Set the stream type on @appsrc. For seekable streams, the "seek" signal must
* be connected to.
*
* A stream_type stream
*/
void
gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
{
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
priv->stream_type = type;
GST_OBJECT_UNLOCK (appsrc);
}
/**
* gst_app_src_get_stream_type:
* @appsrc: a #GstAppSrc
*
* Get the stream type. Control the stream type of @appsrc
* with gst_app_src_set_stream_type().
*
* Returns: the stream type.
*/
GstAppStreamType
gst_app_src_get_stream_type (GstAppSrc * appsrc)
{
gboolean stream_type;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
stream_type = priv->stream_type;
GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
GST_OBJECT_UNLOCK (appsrc);
return stream_type;
}
/**
* gst_app_src_set_max_bytes:
* @appsrc: a #GstAppSrc
* @max: the maximum number of bytes to queue
*
* Set the maximum amount of bytes that can be queued in @appsrc.
* After the maximum amount of bytes are queued, @appsrc will emit the
* "enough-data" signal.
*/
void
gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
{
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
if (max != priv->max_bytes) {
GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
priv->max_bytes = max;
/* signal the change */
g_cond_broadcast (&priv->cond);
}
g_mutex_unlock (&priv->mutex);
}
/**
* gst_app_src_get_max_bytes:
* @appsrc: a #GstAppSrc
*
* Get the maximum amount of bytes that can be queued in @appsrc.
*
* Returns: The maximum amount of bytes that can be queued.
*/
guint64
gst_app_src_get_max_bytes (GstAppSrc * appsrc)
{
guint64 result;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
result = priv->max_bytes;
GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
g_mutex_unlock (&priv->mutex);
return result;
}
/**
* gst_app_src_get_current_level_bytes:
* @appsrc: a #GstAppSrc
*
* Get the number of currently queued bytes inside @appsrc.
*
* Returns: The number of currently queued bytes.
*
* Since: 1.2
*/
guint64
gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
{
gint64 queued;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
queued = priv->queued_bytes;
GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
queued);
GST_OBJECT_UNLOCK (appsrc);
return queued;
}
static void
gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
gboolean do_max, guint64 max)
{
GstAppSrcPrivate *priv = appsrc->priv;
gboolean changed = FALSE;
g_mutex_lock (&priv->mutex);
if (do_min && priv->min_latency != min) {
priv->min_latency = min;
changed = TRUE;
}
if (do_max && priv->max_latency != max) {
priv->max_latency = max;
changed = TRUE;
}
g_mutex_unlock (&priv->mutex);
if (changed) {
GST_DEBUG_OBJECT (appsrc, "posting latency changed");
gst_element_post_message (GST_ELEMENT_CAST (appsrc),
gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
}
}
/**
* gst_app_src_set_latency:
* @appsrc: a #GstAppSrc
* @min: the min latency
* @max: the max latency
*
* Configure the @min and @max latency in @src. If @min is set to -1, the
* default latency calculations for pseudo-live sources will be used.
*/
void
gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
{
gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
}
/**
* gst_app_src_get_latency:
* @appsrc: a #GstAppSrc
* @min: (out): the min latency
* @max: (out): the max latency
*
* Retrieve the min and max latencies in @min and @max respectively.
*/
void
gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
{
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
if (min)
*min = priv->min_latency;
if (max)
*max = priv->max_latency;
g_mutex_unlock (&priv->mutex);
}
/**
* gst_app_src_set_emit_signals:
* @appsrc: a #GstAppSrc
* @emit: the new state
*
* Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
* by default disabled because signal emission is expensive and unneeded when
* the application prefers to operate in pull mode.
*/
void
gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
{
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
priv->emit_signals = emit;
g_mutex_unlock (&priv->mutex);
}
/**
* gst_app_src_get_emit_signals:
* @appsrc: a #GstAppSrc
*
* Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
*
* Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
* signals.
*/
gboolean
gst_app_src_get_emit_signals (GstAppSrc * appsrc)
{
gboolean result;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
result = priv->emit_signals;
g_mutex_unlock (&priv->mutex);
return result;
}
static GstFlowReturn
gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
GstBufferList * buflist, gboolean steal_ref)
{
gboolean first = TRUE;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
priv = appsrc->priv;
if (buffer != NULL)
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
else
g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
if (buflist != NULL) {
if (gst_buffer_list_length (buflist) == 0)
return GST_FLOW_OK;
buffer = gst_buffer_list_get (buflist, 0);
}
if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
GstClock *clock;
clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
if (clock) {
GstClockTime now;
GstClockTime base_time =
gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
now = gst_clock_get_time (clock);
if (now > base_time)
now -= base_time;
else
now = 0;
gst_object_unref (clock);
if (buflist == NULL) {
if (!steal_ref) {
buffer = gst_buffer_copy (buffer);
steal_ref = TRUE;
} else {
buffer = gst_buffer_make_writable (buffer);
}
} else {
if (!steal_ref) {
buflist = gst_buffer_list_copy (buflist);
steal_ref = TRUE;
} else {
buflist = gst_buffer_list_make_writable (buflist);
}
buffer = gst_buffer_list_get_writable (buflist, 0);
}
GST_BUFFER_PTS (buffer) = now;
GST_BUFFER_DTS (buffer) = now;
} else {
GST_WARNING_OBJECT (appsrc,
"do-timestamp=TRUE but buffers are provided before "
"reaching the PLAYING state and having a clock. Timestamps will "
"not be accurate!");
}
}
g_mutex_lock (&priv->mutex);
while (TRUE) {
/* can't accept buffers when we are flushing or EOS */
if (priv->flushing)
goto flushing;
if (priv->is_eos)
goto eos;
if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
GST_DEBUG_OBJECT (appsrc,
"queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
priv->queued_bytes, priv->max_bytes);
if (first) {
gboolean emit;
emit = priv->emit_signals;
/* only signal on the first push */
g_mutex_unlock (&priv->mutex);
if (priv->callbacks.enough_data)
priv->callbacks.enough_data (appsrc, priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
NULL);
g_mutex_lock (&priv->mutex);
/* continue to check for flushing/eos after releasing the lock */
first = FALSE;
continue;
}
if (priv->block) {
GST_DEBUG_OBJECT (appsrc, "waiting for free space");
/* we are filled, wait until a buffer gets popped or when we
* flush. */
priv->wait_status |= APP_WAITING;
g_cond_wait (&priv->cond, &priv->mutex);
priv->wait_status &= ~APP_WAITING;
} else {
/* no need to wait for free space, we just pump more data into the
* queue hoping that the caller reacts to the enough-data signal and
* stops pushing buffers. */
break;
}
} else
break;
}
if (buflist != NULL) {
GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
if (!steal_ref)
gst_buffer_list_ref (buflist);
gst_queue_array_push_tail (priv->queue, buflist);
priv->queued_bytes += gst_buffer_list_calculate_size (buflist);
} else {
GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
if (!steal_ref)
gst_buffer_ref (buffer);
gst_queue_array_push_tail (priv->queue, buffer);
priv->queued_bytes += gst_buffer_get_size (buffer);
}
if ((priv->wait_status & STREAM_WAITING))
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex);
return GST_FLOW_OK;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
if (steal_ref)
gst_buffer_unref (buffer);
g_mutex_unlock (&priv->mutex);
return GST_FLOW_FLUSHING;
}
eos:
{
GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
if (steal_ref)
gst_buffer_unref (buffer);
g_mutex_unlock (&priv->mutex);
return GST_FLOW_EOS;
}
}
static GstFlowReturn
gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
gboolean steal_ref)
{
return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
}
static GstFlowReturn
gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
{
GstBufferList *buffer_list;
GstBuffer *buffer;
GstCaps *caps;
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
caps = gst_sample_get_caps (sample);
if (caps != NULL) {
gst_app_src_set_caps (appsrc, caps);
} else {
GST_WARNING_OBJECT (appsrc, "received sample without caps");
}
buffer = gst_sample_get_buffer (sample);
if (buffer != NULL)
return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
buffer_list = gst_sample_get_buffer_list (sample);
if (buffer_list != NULL)
return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
return GST_FLOW_OK;
}
/**
* gst_app_src_push_buffer:
* @appsrc: a #GstAppSrc
* @buffer: (transfer full): a #GstBuffer to push
*
* Adds a buffer to the queue of buffers that the appsrc element will
* push to its source pad. This function takes ownership of the buffer.
*
* When the block property is TRUE, this function can block until free
* space becomes available in the queue.
*
* Returns: #GST_FLOW_OK when the buffer was successfuly queued.
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
* #GST_FLOW_EOS when EOS occured.
*/
GstFlowReturn
gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
{
return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
}
/**
* gst_app_src_push_buffer_list:
* @appsrc: a #GstAppSrc
* @buffer_list: (transfer full): a #GstBufferList to push
*
* Adds a buffer list to the queue of buffers and buffer lists that the
* appsrc element will push to its source pad. This function takes ownership
* of @buffer_list.
*
* When the block property is TRUE, this function can block until free
* space becomes available in the queue.
*
* Returns: #GST_FLOW_OK when the buffer list was successfuly queued.
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
* #GST_FLOW_EOS when EOS occured.
*
* Since: 1.14
*/
GstFlowReturn
gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
{
return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
}
/**
* gst_app_src_push_sample:
* @appsrc: a #GstAppSrc
* @sample: (transfer none): a #GstSample from which buffer and caps may be
* extracted
*
* Extract a buffer from the provided sample and adds it to the queue of
* buffers that the appsrc element will push to its source pad. Any
* previous caps that were set on appsrc will be replaced by the caps
* associated with the sample if not equal.
*
* When the block property is TRUE, this function can block until free
* space becomes available in the queue.
*
* Returns: #GST_FLOW_OK when the buffer was successfuly queued.
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
* #GST_FLOW_EOS when EOS occured.
*
* Since: 1.6
*
*/
GstFlowReturn
gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
{
return gst_app_src_push_sample_internal (appsrc, sample);
}
/* push a buffer without stealing the ref of the buffer. This is used for the
* action signal. */
static GstFlowReturn
gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
{
return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
}
/* push a buffer list without stealing the ref of the buffer list. This is
* used for the action signal. */
static GstFlowReturn
gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
GstBufferList * buffer_list)
{
return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
}
/* push a sample without stealing the ref. This is used for the
* action signal. */
static GstFlowReturn
gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
{
return gst_app_src_push_sample_internal (appsrc, sample);
}
/**
* gst_app_src_end_of_stream:
* @appsrc: a #GstAppSrc
*
* Indicates to the appsrc element that the last buffer queued in the
* element is the last buffer of the stream.
*
* Returns: #GST_FLOW_OK when the EOS was successfuly queued.
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
*/
GstFlowReturn
gst_app_src_end_of_stream (GstAppSrc * appsrc)
{
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
priv = appsrc->priv;
g_mutex_lock (&priv->mutex);
/* can't accept buffers when we are flushing. We can accept them when we are
* EOS although it will not do anything. */
if (priv->flushing)
goto flushing;
GST_DEBUG_OBJECT (appsrc, "sending EOS");
priv->is_eos = TRUE;
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex);
return GST_FLOW_OK;
/* ERRORS */
flushing:
{
g_mutex_unlock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
return GST_FLOW_FLUSHING;
}
}
/**
* gst_app_src_set_callbacks: (skip)
* @appsrc: a #GstAppSrc
* @callbacks: the callbacks
* @user_data: a user_data argument for the callbacks
* @notify: a destroy notify function
*
* Set callbacks which will be executed when data is needed, enough data has
* been collected or when a seek should be performed.
* This is an alternative to using the signals, it has lower overhead and is thus
* less expensive, but also less flexible.
*
* If callbacks are installed, no signals will be emitted for performance
* reasons.
*/
void
gst_app_src_set_callbacks (GstAppSrc * appsrc,
GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
{
GDestroyNotify old_notify;
GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc));
g_return_if_fail (callbacks != NULL);
priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc);
old_notify = priv->notify;
if (old_notify) {
gpointer old_data;
old_data = priv->user_data;
priv->user_data = NULL;
priv->notify = NULL;
GST_OBJECT_UNLOCK (appsrc);
old_notify (old_data);
GST_OBJECT_LOCK (appsrc);
}
priv->callbacks = *callbacks;
priv->user_data = user_data;
priv->notify = notify;
GST_OBJECT_UNLOCK (appsrc);
}
/*** GSTURIHANDLER INTERFACE *************************************************/
static GstURIType
gst_app_src_uri_get_type (GType type)
{
return GST_URI_SRC;
}
static const gchar *const *
gst_app_src_uri_get_protocols (GType type)
{
static const gchar *protocols[] = { "appsrc", NULL };
return protocols;
}
static gchar *
gst_app_src_uri_get_uri (GstURIHandler * handler)
{
GstAppSrc *appsrc = GST_APP_SRC (handler);
return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
}
static gboolean
gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
GError ** error)
{
GstAppSrc *appsrc = GST_APP_SRC (handler);
g_free (appsrc->priv->uri);
appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
return TRUE;
}
static void
gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
iface->get_type = gst_app_src_uri_get_type;
iface->get_protocols = gst_app_src_uri_get_protocols;
iface->get_uri = gst_app_src_uri_get_uri;
iface->set_uri = gst_app_src_uri_set_uri;
}
static gboolean
gst_app_src_event (GstBaseSrc * src, GstEvent * event)
{
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
GstAppSrcPrivate *priv = appsrc->priv;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
g_mutex_lock (&priv->mutex);
priv->is_eos = FALSE;
g_mutex_unlock (&priv->mutex);
break;
default:
break;
}
return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
}