check/generic/states.c: Make sure all tasks are stopped.
Original commit message from CVS:
* check/generic/states.c: (GST_START_TEST):
Make sure all tasks are stopped.
* check/gst/gstbin.c: (GST_START_TEST):
Unref after usage for proper valgrinding.
* gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task):
Really wait for the task to stop before destroying the
mutex.
* gst/gstqueue.c: (gst_queue_sink_activate_push),
(gst_queue_src_activate_push):
Small cleanups. Don't stop the task when we did not start
it.
* gst/gsttask.c: (gst_task_get_type), (gst_task_init),
(gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock),
(gst_task_get_state), (gst_task_start), (gst_task_pause),
(gst_task_join):
* gst/gsttask.h:
Protect the stream lock with the object lock.
Disallow setting the stream lock when running.
Add cleanup_all to wait for the threadpool to finish.
Remove code to autoallocate a mutex if none was provided.
Add _join() to wait for a task to stop.
Protect the thread pool with a global lock.
diff --git a/ChangeLog b/ChangeLog
index e483ed8..98eed42 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,34 @@
2005-08-24 Wim Taymans <wim@fluendo.com>
+ * check/generic/states.c: (GST_START_TEST):
+ Make sure all tasks are stopped.
+
+ * check/gst/gstbin.c: (GST_START_TEST):
+ Unref after usage for proper valgrinding.
+
+ * gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task):
+ Really wait for the task to stop before destroying the
+ mutex.
+
+ * gst/gstqueue.c: (gst_queue_sink_activate_push),
+ (gst_queue_src_activate_push):
+ Small cleanups. Don't stop the task when we did not start
+ it.
+
+ * gst/gsttask.c: (gst_task_get_type), (gst_task_init),
+ (gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock),
+ (gst_task_get_state), (gst_task_start), (gst_task_pause),
+ (gst_task_join):
+ * gst/gsttask.h:
+ Protect the stream lock with the object lock.
+ Disallow setting the stream lock when running.
+ Add cleanup_all to wait for the threadpool to finish.
+ Remove code to autoallocate a mutex if none was provided.
+ Add _join() to wait for a task to stop.
+ Protect the thread pool with a global lock.
+
+2005-08-24 Wim Taymans <wim@fluendo.com>
+
* gst/base/gstbasesink.c: (gst_base_sink_handle_object),
(gst_base_sink_get_times), (gst_base_sink_do_sync),
(gst_base_sink_handle_buffer), (gst_base_sink_change_state):
diff --git a/check/generic/states.c b/check/generic/states.c
index 3b9b961..a441686 100644
--- a/check/generic/states.c
+++ b/check/generic/states.c
@@ -50,6 +50,8 @@
gst_element_set_state (element, GST_STATE_PAUSED);
gst_element_set_state (element, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (element));
+
+ gst_task_cleanup_all ();
}
}
diff --git a/check/gst/gstbin.c b/check/gst/gstbin.c
index f39f5c0..fffada3 100644
--- a/check/gst/gstbin.c
+++ b/check/gst/gstbin.c
@@ -423,6 +423,10 @@
/* check if pads really are linked */
fail_unless (gst_pad_is_linked (srcpad));
fail_unless (gst_pad_is_linked (sinkpad));
+
+ gst_object_unref (srcpad);
+ gst_object_unref (sinkpad);
+ gst_object_unref (pipeline);
}
GST_END_TEST;
diff --git a/gst/gstpad.c b/gst/gstpad.c
index d61418b..21ce573 100644
--- a/gst/gstpad.c
+++ b/gst/gstpad.c
@@ -295,6 +295,14 @@
gst_pad_finalize (GObject * object)
{
GstPad *pad = GST_PAD (object);
+ GstTask *task;
+
+ /* in case the task is still around, clean it up */
+ if ((task = GST_PAD_TASK (pad))) {
+ gst_task_join (task);
+ GST_PAD_TASK (pad) = NULL;
+ gst_object_unref (task);
+ }
if (pad->stream_rec_lock) {
g_static_rec_mutex_free (pad->stream_rec_lock);
@@ -1878,8 +1886,8 @@
* @pad: a #GstPad to fixate
* @caps: the #GstCaps to fixate
*
- * Fixate a caps on the given pad. Modifies the caps in place, so you should be
- * that the caps are actually writable (see gst_caps_make_writable()).
+ * Fixate a caps on the given pad. Modifies the caps in place, so you should
+ * make sure that the caps are actually writable (see gst_caps_make_writable()).
*/
void
gst_pad_fixate_caps (GstPad * pad, GstCaps * caps)
@@ -3759,7 +3767,11 @@
* @pad: the #GstPad to stop the task of
*
* Stop the task of @pad. This function will also make sure that the
- * function executed by the task will effectively stop.
+ * function executed by the task will effectively stop if not called
+ * from the GstTaskFunction.
+ *
+ * This function will deadlock if called from the GstTaskFunction of
+ * the task. Use #gst_task_pause() instead.
*
* Returns: a TRUE if the task could be stopped or FALSE when the pad
* has no task.
@@ -3782,6 +3794,8 @@
GST_STREAM_LOCK (pad);
GST_STREAM_UNLOCK (pad);
+ gst_task_join (task);
+
gst_object_unref (task);
return TRUE;
diff --git a/gst/gstqueue.c b/gst/gstqueue.c
index 6286794..fe42421 100644
--- a/gst/gstqueue.c
+++ b/gst/gstqueue.c
@@ -927,25 +927,27 @@
static gboolean
gst_queue_sink_activate_push (GstPad * pad, gboolean active)
{
- gboolean result = FALSE;
+ gboolean result = TRUE;
GstQueue *queue;
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
+ GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_OK;
- result = TRUE;
+ GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
- /* step 1, unblock chain and loop functions */
+ /* step 1, unblock chain function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
gst_queue_locked_flush (queue);
- g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK (queue);
- /* step 2, make sure streaming finishes */
- result = gst_pad_stop_task (pad);
+ /* and make sure the chain function finishes */
+ GST_STREAM_LOCK (pad);
+ GST_STREAM_UNLOCK (pad);
}
+
gst_object_unref (queue);
return result;
@@ -971,9 +973,10 @@
}
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
- /* step 1, unblock chain and loop functions */
+ /* step 1, unblock loop function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
+ /* the item add signal will unblock */
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK (queue);
diff --git a/gst/gsttask.c b/gst/gsttask.c
index fec1f8e..23de33f 100644
--- a/gst/gsttask.c
+++ b/gst/gsttask.c
@@ -25,6 +25,9 @@
#include "gstinfo.h"
#include "gsttask.h"
+GST_DEBUG_CATEGORY (task_debug);
+#define GST_CAT_DEFAULT (task_debug)
+
static void gst_task_class_init (GstTaskClass * klass);
static void gst_task_init (GstTask * task);
static void gst_task_finalize (GObject * object);
@@ -33,6 +36,8 @@
static GstObjectClass *parent_class = NULL;
+static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
+
GType
gst_task_get_type (void)
{
@@ -54,6 +59,8 @@
_gst_task_type =
g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0);
+
+ GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks");
}
return _gst_task_type;
}
@@ -76,6 +83,7 @@
static void
gst_task_init (GstTask * task)
{
+ task->running = FALSE;
task->lock = NULL;
task->cond = g_cond_new ();
task->state = GST_TASK_STOPPED;
@@ -97,16 +105,28 @@
static void
gst_task_func (GstTask * task, GstTaskClass * tclass)
{
+ GStaticRecMutex *lock;
+
GST_DEBUG ("Entering task %p, thread %p", task, g_thread_self ());
+ /* we have to grab the lock to get the mutex. We also
+ * mark our state running so that nobody can mess with
+ * the mutex. */
+ GST_LOCK (task);
+ if (task->state == GST_TASK_STOPPED)
+ goto exit;
+ lock = GST_TASK_GET_LOCK (task);
+ task->running = TRUE;
+ GST_UNLOCK (task);
+
/* locking order is TASK_LOCK, LOCK */
- GST_TASK_LOCK (task);
+ g_static_rec_mutex_lock (lock);
GST_LOCK (task);
while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
gint t;
- t = GST_TASK_UNLOCK_FULL (task);
+ t = g_static_rec_mutex_unlock_full (lock);
if (t <= 0) {
g_warning ("wrong STREAM_LOCK count %d", t);
}
@@ -115,7 +135,7 @@
GST_UNLOCK (task);
/* locking order.. */
if (t > 0)
- GST_TASK_LOCK_FULL (task, t);
+ g_static_rec_mutex_lock_full (lock, t);
GST_LOCK (task);
if (task->state == GST_TASK_STOPPED)
@@ -129,7 +149,14 @@
}
done:
GST_UNLOCK (task);
- GST_TASK_UNLOCK (task);
+ g_static_rec_mutex_unlock (lock);
+
+ /* now we allow messing with the lock again */
+ GST_LOCK (task);
+ task->running = FALSE;
+exit:
+ GST_TASK_SIGNAL (task);
+ GST_UNLOCK (task);
GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
@@ -137,6 +164,35 @@
}
/**
+ * gst_task_cleanup_all:
+ *
+ * Wait for all tasks to be stopped. This is mainly used internally
+ * to ensure proper cleanup of internal datastructures in testsuites.
+ *
+ * MT safe.
+ */
+void
+gst_task_cleanup_all (void)
+{
+ GstTaskClass *klass;
+
+ if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
+ g_static_mutex_lock (&pool_lock);
+ if (klass->pool) {
+ /* Shut down all the threads, we still process the ones scheduled
+ * because the unref happens in the thread function.
+ * Also wait for currently running ones to finish. */
+ g_thread_pool_free (klass->pool, FALSE, TRUE);
+ /* create new pool, so we can still do something after this
+ * call. */
+ klass->pool = g_thread_pool_new (
+ (GFunc) gst_task_func, klass, -1, FALSE, NULL);
+ }
+ g_static_mutex_unlock (&pool_lock);
+ }
+}
+
+/**
* gst_task_create:
* @func: The #GstTaskFunction to use
* @data: User data to pass to @func
@@ -176,8 +232,19 @@
gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
{
GST_LOCK (task);
- task->lock = mutex;
+ if (task->running)
+ goto is_running;
+ GST_TASK_GET_LOCK (task) = mutex;
GST_UNLOCK (task);
+
+ return;
+
+ /* ERRORS */
+is_running:
+ {
+ g_warning ("cannot call set_lock on a running task");
+ GST_UNLOCK (task);
+ }
}
@@ -218,39 +285,51 @@
gboolean
gst_task_start (GstTask * task)
{
- GstTaskClass *tclass;
GstTaskState old;
- GStaticRecMutex *lock;
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
- tclass = GST_TASK_GET_CLASS (task);
-
GST_DEBUG_OBJECT (task, "Starting task %p", task);
GST_LOCK (task);
- if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) {
- lock = g_new (GStaticRecMutex, 1);
- g_static_rec_mutex_init (lock);
- GST_TASK_GET_LOCK (task) = lock;
- }
+ if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
+ goto no_lock;
old = task->state;
task->state = GST_TASK_STARTED;
switch (old) {
case GST_TASK_STOPPED:
+ {
+ GstTaskClass *tclass;
+
+ tclass = GST_TASK_GET_CLASS (task);
+
+ /* new task, push on threadpool. We ref before so
+ * that it remains alive while on the threadpool. */
gst_object_ref (task);
+ g_static_mutex_lock (&pool_lock);
g_thread_pool_push (tclass->pool, task, NULL);
+ g_static_mutex_unlock (&pool_lock);
break;
+ }
case GST_TASK_PAUSED:
+ /* PAUSE to PLAY, signal */
GST_TASK_SIGNAL (task);
break;
case GST_TASK_STARTED:
+ /* was OK */
break;
}
GST_UNLOCK (task);
return TRUE;
+
+ /* ERRORS */
+no_lock:
+ {
+ g_warning ("starting task without a lock");
+ return FALSE;
+ }
}
/**
@@ -305,13 +384,10 @@
gboolean
gst_task_pause (GstTask * task)
{
- GstTaskClass *tclass;
GstTaskState old;
g_return_val_if_fail (GST_IS_TASK (task), FALSE);
- tclass = GST_TASK_GET_CLASS (task);
-
GST_DEBUG_OBJECT (task, "Pausing task %p", task);
GST_LOCK (task);
@@ -319,9 +395,17 @@
task->state = GST_TASK_PAUSED;
switch (old) {
case GST_TASK_STOPPED:
+ {
+ GstTaskClass *tclass;
+
+ tclass = GST_TASK_GET_CLASS (task);
+
gst_object_ref (task);
+ g_static_mutex_lock (&pool_lock);
g_thread_pool_push (tclass->pool, task, NULL);
+ g_static_mutex_unlock (&pool_lock);
break;
+ }
case GST_TASK_PAUSED:
break;
case GST_TASK_STARTED:
@@ -331,3 +415,35 @@
return TRUE;
}
+
+/**
+ * gst_task_join:
+ * @task: The #GstTask to join
+ *
+ * Joins @task. After this call, it is safe to unref the task
+ * and clean up the lock set with #gst_task_set_lock().
+ *
+ * The task will automatically be stopped with this call.
+ *
+ * This function cannot be called from within a task function.
+ *
+ * Returns: TRUE if the task could be joined.
+ *
+ * MT safe.
+ */
+gboolean
+gst_task_join (GstTask * task)
+{
+ g_return_val_if_fail (GST_IS_TASK (task), FALSE);
+
+ GST_DEBUG_OBJECT (task, "Joining task %p", task);
+
+ GST_LOCK (task);
+ task->state = GST_TASK_STOPPED;
+ GST_TASK_SIGNAL (task);
+ while (task->running)
+ GST_TASK_WAIT (task);
+ GST_UNLOCK (task);
+
+ return TRUE;
+}
diff --git a/gst/gsttask.h b/gst/gsttask.h
index c1210c3..33c26b0 100644
--- a/gst/gsttask.h
+++ b/gst/gsttask.h
@@ -55,10 +55,6 @@
#define GST_TASK_BROADCAST(task) g_cond_breadcast(GST_TASK_GET_COND (task))
#define GST_TASK_GET_LOCK(task) (GST_TASK_CAST(task)->lock)
-#define GST_TASK_LOCK(task) g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task))
-#define GST_TASK_UNLOCK(task) g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task))
-#define GST_TASK_UNLOCK_FULL(task) g_static_rec_mutex_unlock_full(GST_TASK_GET_LOCK(task))
-#define GST_TASK_LOCK_FULL(task,t) g_static_rec_mutex_lock_full(GST_TASK_GET_LOCK(task),(t))
struct _GstTask {
GstObject object;
@@ -69,8 +65,10 @@
GStaticRecMutex *lock;
- GstTaskFunction func;
- gpointer data;
+ GstTaskFunction func;
+ gpointer data;
+
+ gboolean running;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
@@ -86,6 +84,8 @@
gpointer _gst_reserved[GST_PADDING];
};
+void gst_task_cleanup_all (void);
+
GType gst_task_get_type (void);
GstTask* gst_task_create (GstTaskFunction func, gpointer data);
@@ -97,6 +97,8 @@
gboolean gst_task_stop (GstTask *task);
gboolean gst_task_pause (GstTask *task);
+gboolean gst_task_join (GstTask *task);
+
G_END_DECLS
#endif /* __GST_TASK_H__ */
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c
index 6286794..fe42421 100644
--- a/plugins/elements/gstqueue.c
+++ b/plugins/elements/gstqueue.c
@@ -927,25 +927,27 @@
static gboolean
gst_queue_sink_activate_push (GstPad * pad, gboolean active)
{
- gboolean result = FALSE;
+ gboolean result = TRUE;
GstQueue *queue;
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
+ GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_OK;
- result = TRUE;
+ GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
- /* step 1, unblock chain and loop functions */
+ /* step 1, unblock chain function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
gst_queue_locked_flush (queue);
- g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK (queue);
- /* step 2, make sure streaming finishes */
- result = gst_pad_stop_task (pad);
+ /* and make sure the chain function finishes */
+ GST_STREAM_LOCK (pad);
+ GST_STREAM_UNLOCK (pad);
}
+
gst_object_unref (queue);
return result;
@@ -971,9 +973,10 @@
}
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
- /* step 1, unblock chain and loop functions */
+ /* step 1, unblock loop function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
+ /* the item add signal will unblock */
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK (queue);
diff --git a/tests/check/generic/states.c b/tests/check/generic/states.c
index 3b9b961..a441686 100644
--- a/tests/check/generic/states.c
+++ b/tests/check/generic/states.c
@@ -50,6 +50,8 @@
gst_element_set_state (element, GST_STATE_PAUSED);
gst_element_set_state (element, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (element));
+
+ gst_task_cleanup_all ();
}
}
diff --git a/tests/check/gst/gstbin.c b/tests/check/gst/gstbin.c
index f39f5c0..fffada3 100644
--- a/tests/check/gst/gstbin.c
+++ b/tests/check/gst/gstbin.c
@@ -423,6 +423,10 @@
/* check if pads really are linked */
fail_unless (gst_pad_is_linked (srcpad));
fail_unless (gst_pad_is_linked (sinkpad));
+
+ gst_object_unref (srcpad);
+ gst_object_unref (sinkpad);
+ gst_object_unref (pipeline);
}
GST_END_TEST;