check/generic/states.c: Make sure all tasks are stopped.

Original commit message from CVS:
* check/generic/states.c: (GST_START_TEST):
Make sure all tasks are stopped.

* check/gst/gstbin.c: (GST_START_TEST):
Unref after usage for proper valgrinding.

* gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task):
Really wait for the task to stop before destroying the
mutex.

* gst/gstqueue.c: (gst_queue_sink_activate_push),
(gst_queue_src_activate_push):
Small cleanups. Don't stop the task when we did not start
it.

* gst/gsttask.c: (gst_task_get_type), (gst_task_init),
(gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock),
(gst_task_get_state), (gst_task_start), (gst_task_pause),
(gst_task_join):
* gst/gsttask.h:
Protect the stream lock with the object lock.
Disallow setting the stream lock when running.
Add cleanup_all to wait for the threadpool to finish.
Remove code to autoallocate a mutex if none was provided.
Add _join() to wait for a task to stop.
Protect the thread pool with a global lock.
diff --git a/ChangeLog b/ChangeLog
index e483ed8..98eed42 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,34 @@
 2005-08-24  Wim Taymans  <wim@fluendo.com>
 
+	* check/generic/states.c: (GST_START_TEST):
+	Make sure all tasks are stopped.
+
+	* check/gst/gstbin.c: (GST_START_TEST):
+	Unref after usage for proper valgrinding.
+
+	* gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task):
+	Really wait for the task to stop before destroying the
+	mutex.
+
+	* gst/gstqueue.c: (gst_queue_sink_activate_push),
+	(gst_queue_src_activate_push):
+	Small cleanups. Don't stop the task when we did not start
+	it.
+
+	* gst/gsttask.c: (gst_task_get_type), (gst_task_init),
+	(gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock),
+	(gst_task_get_state), (gst_task_start), (gst_task_pause),
+	(gst_task_join):
+	* gst/gsttask.h:
+	Protect the stream lock with the object lock.
+	Disallow setting the stream lock when running.
+	Add cleanup_all to wait for the threadpool to finish.
+	Remove code to autoallocate a mutex if none was provided.
+	Add _join() to wait for a task to stop.
+	Protect the thread pool with a global lock.
+
+2005-08-24  Wim Taymans  <wim@fluendo.com>
+
 	* gst/base/gstbasesink.c: (gst_base_sink_handle_object),
 	(gst_base_sink_get_times), (gst_base_sink_do_sync),
 	(gst_base_sink_handle_buffer), (gst_base_sink_change_state):
diff --git a/check/generic/states.c b/check/generic/states.c
index 3b9b961..a441686 100644
--- a/check/generic/states.c
+++ b/check/generic/states.c
@@ -50,6 +50,8 @@
     gst_element_set_state (element, GST_STATE_PAUSED);
     gst_element_set_state (element, GST_STATE_NULL);
     gst_object_unref (GST_OBJECT (element));
+
+    gst_task_cleanup_all ();
   }
 }
 
diff --git a/check/gst/gstbin.c b/check/gst/gstbin.c
index f39f5c0..fffada3 100644
--- a/check/gst/gstbin.c
+++ b/check/gst/gstbin.c
@@ -423,6 +423,10 @@
   /* check if pads really are linked */
   fail_unless (gst_pad_is_linked (srcpad));
   fail_unless (gst_pad_is_linked (sinkpad));
+
+  gst_object_unref (srcpad);
+  gst_object_unref (sinkpad);
+  gst_object_unref (pipeline);
 }
 
 GST_END_TEST;
diff --git a/gst/gstpad.c b/gst/gstpad.c
index d61418b..21ce573 100644
--- a/gst/gstpad.c
+++ b/gst/gstpad.c
@@ -295,6 +295,14 @@
 gst_pad_finalize (GObject * object)
 {
   GstPad *pad = GST_PAD (object);
+  GstTask *task;
+
+  /* in case the task is still around, clean it up */
+  if ((task = GST_PAD_TASK (pad))) {
+    gst_task_join (task);
+    GST_PAD_TASK (pad) = NULL;
+    gst_object_unref (task);
+  }
 
   if (pad->stream_rec_lock) {
     g_static_rec_mutex_free (pad->stream_rec_lock);
@@ -1878,8 +1886,8 @@
  * @pad: a  #GstPad to fixate
  * @caps: the  #GstCaps to fixate
  *
- * Fixate a caps on the given pad. Modifies the caps in place, so you should be
- * that the caps are actually writable (see gst_caps_make_writable()).
+ * Fixate a caps on the given pad. Modifies the caps in place, so you should
+ * make sure that the caps are actually writable (see gst_caps_make_writable()).
  */
 void
 gst_pad_fixate_caps (GstPad * pad, GstCaps * caps)
@@ -3759,7 +3767,11 @@
  * @pad: the #GstPad to stop the task of
  *
  * Stop the task of @pad. This function will also make sure that the 
- * function executed by the task will effectively stop.
+ * function executed by the task will effectively stop if not called
+ * from the GstTaskFunction.
+ *
+ * This function will deadlock if called from the GstTaskFunction of
+ * the task. Use #gst_task_pause() instead.
  *
  * Returns: a TRUE if the task could be stopped or FALSE when the pad
  * has no task.
@@ -3782,6 +3794,8 @@
   GST_STREAM_LOCK (pad);
   GST_STREAM_UNLOCK (pad);
 
+  gst_task_join (task);
+
   gst_object_unref (task);
 
   return TRUE;
diff --git a/gst/gstqueue.c b/gst/gstqueue.c
index 6286794..fe42421 100644
--- a/gst/gstqueue.c
+++ b/gst/gstqueue.c
@@ -927,25 +927,27 @@
 static gboolean
 gst_queue_sink_activate_push (GstPad * pad, gboolean active)
 {
-  gboolean result = FALSE;
+  gboolean result = TRUE;
   GstQueue *queue;
 
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_OK;
-    result = TRUE;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
-    /* step 1, unblock chain and loop functions */
+    /* step 1, unblock chain function */
     GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
-    g_cond_signal (queue->item_del);
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    /* step 2, make sure streaming finishes */
-    result = gst_pad_stop_task (pad);
+    /* and make sure the chain function finishes */
+    GST_STREAM_LOCK (pad);
+    GST_STREAM_UNLOCK (pad);
   }
+
   gst_object_unref (queue);
 
   return result;
@@ -971,9 +973,10 @@
     }
     GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
-    /* step 1, unblock chain and loop functions */
+    /* step 1, unblock loop function */
     GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
+    /* the item add signal will unblock */
     g_cond_signal (queue->item_add);
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
diff --git a/gst/gsttask.c b/gst/gsttask.c
index fec1f8e..23de33f 100644
--- a/gst/gsttask.c
+++ b/gst/gsttask.c
@@ -25,6 +25,9 @@
 #include "gstinfo.h"
 #include "gsttask.h"
 
+GST_DEBUG_CATEGORY (task_debug);
+#define GST_CAT_DEFAULT (task_debug)
+
 static void gst_task_class_init (GstTaskClass * klass);
 static void gst_task_init (GstTask * task);
 static void gst_task_finalize (GObject * object);
@@ -33,6 +36,8 @@
 
 static GstObjectClass *parent_class = NULL;
 
+static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
+
 GType
 gst_task_get_type (void)
 {
@@ -54,6 +59,8 @@
 
     _gst_task_type =
         g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0);
+
+    GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks");
   }
   return _gst_task_type;
 }
@@ -76,6 +83,7 @@
 static void
 gst_task_init (GstTask * task)
 {
+  task->running = FALSE;
   task->lock = NULL;
   task->cond = g_cond_new ();
   task->state = GST_TASK_STOPPED;
@@ -97,16 +105,28 @@
 static void
 gst_task_func (GstTask * task, GstTaskClass * tclass)
 {
+  GStaticRecMutex *lock;
+
   GST_DEBUG ("Entering task %p, thread %p", task, g_thread_self ());
 
+  /* we have to grab the lock to get the mutex. We also
+   * mark our state running so that nobody can mess with
+   * the mutex. */
+  GST_LOCK (task);
+  if (task->state == GST_TASK_STOPPED)
+    goto exit;
+  lock = GST_TASK_GET_LOCK (task);
+  task->running = TRUE;
+  GST_UNLOCK (task);
+
   /* locking order is TASK_LOCK, LOCK */
-  GST_TASK_LOCK (task);
+  g_static_rec_mutex_lock (lock);
   GST_LOCK (task);
   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
     while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
       gint t;
 
-      t = GST_TASK_UNLOCK_FULL (task);
+      t = g_static_rec_mutex_unlock_full (lock);
       if (t <= 0) {
         g_warning ("wrong STREAM_LOCK count %d", t);
       }
@@ -115,7 +135,7 @@
       GST_UNLOCK (task);
       /* locking order.. */
       if (t > 0)
-        GST_TASK_LOCK_FULL (task, t);
+        g_static_rec_mutex_lock_full (lock, t);
 
       GST_LOCK (task);
       if (task->state == GST_TASK_STOPPED)
@@ -129,7 +149,14 @@
   }
 done:
   GST_UNLOCK (task);
-  GST_TASK_UNLOCK (task);
+  g_static_rec_mutex_unlock (lock);
+
+  /* now we allow messing with the lock again */
+  GST_LOCK (task);
+  task->running = FALSE;
+exit:
+  GST_TASK_SIGNAL (task);
+  GST_UNLOCK (task);
 
   GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
 
@@ -137,6 +164,35 @@
 }
 
 /**
+ * gst_task_cleanup_all:
+ *
+ * Wait for all tasks to be stopped. This is mainly used internally
+ * to ensure proper cleanup of internal datastructures in testsuites.
+ *
+ * MT safe.
+ */
+void
+gst_task_cleanup_all (void)
+{
+  GstTaskClass *klass;
+
+  if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
+    g_static_mutex_lock (&pool_lock);
+    if (klass->pool) {
+      /* Shut down all the threads, we still process the ones scheduled
+       * because the unref happens in the thread function.
+       * Also wait for currently running ones to finish. */
+      g_thread_pool_free (klass->pool, FALSE, TRUE);
+      /* create new pool, so we can still do something after this
+       * call. */
+      klass->pool = g_thread_pool_new (
+          (GFunc) gst_task_func, klass, -1, FALSE, NULL);
+    }
+    g_static_mutex_unlock (&pool_lock);
+  }
+}
+
+/**
  * gst_task_create:
  * @func: The #GstTaskFunction to use
  * @data: User data to pass to @func
@@ -176,8 +232,19 @@
 gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
 {
   GST_LOCK (task);
-  task->lock = mutex;
+  if (task->running)
+    goto is_running;
+  GST_TASK_GET_LOCK (task) = mutex;
   GST_UNLOCK (task);
+
+  return;
+
+  /* ERRORS */
+is_running:
+  {
+    g_warning ("cannot call set_lock on a running task");
+    GST_UNLOCK (task);
+  }
 }
 
 
@@ -218,39 +285,51 @@
 gboolean
 gst_task_start (GstTask * task)
 {
-  GstTaskClass *tclass;
   GstTaskState old;
-  GStaticRecMutex *lock;
 
   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
 
-  tclass = GST_TASK_GET_CLASS (task);
-
   GST_DEBUG_OBJECT (task, "Starting task %p", task);
 
   GST_LOCK (task);
-  if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) {
-    lock = g_new (GStaticRecMutex, 1);
-    g_static_rec_mutex_init (lock);
-    GST_TASK_GET_LOCK (task) = lock;
-  }
+  if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
+    goto no_lock;
 
   old = task->state;
   task->state = GST_TASK_STARTED;
   switch (old) {
     case GST_TASK_STOPPED:
+    {
+      GstTaskClass *tclass;
+
+      tclass = GST_TASK_GET_CLASS (task);
+
+      /* new task, push on threadpool. We ref before so
+       * that it remains alive while on the threadpool. */
       gst_object_ref (task);
+      g_static_mutex_lock (&pool_lock);
       g_thread_pool_push (tclass->pool, task, NULL);
+      g_static_mutex_unlock (&pool_lock);
       break;
+    }
     case GST_TASK_PAUSED:
+      /* PAUSE to PLAY, signal */
       GST_TASK_SIGNAL (task);
       break;
     case GST_TASK_STARTED:
+      /* was OK */
       break;
   }
   GST_UNLOCK (task);
 
   return TRUE;
+
+  /* ERRORS */
+no_lock:
+  {
+    g_warning ("starting task without a lock");
+    return FALSE;
+  }
 }
 
 /**
@@ -305,13 +384,10 @@
 gboolean
 gst_task_pause (GstTask * task)
 {
-  GstTaskClass *tclass;
   GstTaskState old;
 
   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
 
-  tclass = GST_TASK_GET_CLASS (task);
-
   GST_DEBUG_OBJECT (task, "Pausing task %p", task);
 
   GST_LOCK (task);
@@ -319,9 +395,17 @@
   task->state = GST_TASK_PAUSED;
   switch (old) {
     case GST_TASK_STOPPED:
+    {
+      GstTaskClass *tclass;
+
+      tclass = GST_TASK_GET_CLASS (task);
+
       gst_object_ref (task);
+      g_static_mutex_lock (&pool_lock);
       g_thread_pool_push (tclass->pool, task, NULL);
+      g_static_mutex_unlock (&pool_lock);
       break;
+    }
     case GST_TASK_PAUSED:
       break;
     case GST_TASK_STARTED:
@@ -331,3 +415,35 @@
 
   return TRUE;
 }
+
+/**
+ * gst_task_join:
+ * @task: The #GstTask to join
+ *
+ * Joins @task. After this call, it is safe to unref the task
+ * and clean up the lock set with #gst_task_set_lock().
+ *
+ * The task will automatically be stopped with this call.
+ *
+ * This function cannot be called from within a task function.
+ *
+ * Returns: TRUE if the task could be joined.
+ *
+ * MT safe.
+ */
+gboolean
+gst_task_join (GstTask * task)
+{
+  g_return_val_if_fail (GST_IS_TASK (task), FALSE);
+
+  GST_DEBUG_OBJECT (task, "Joining task %p", task);
+
+  GST_LOCK (task);
+  task->state = GST_TASK_STOPPED;
+  GST_TASK_SIGNAL (task);
+  while (task->running)
+    GST_TASK_WAIT (task);
+  GST_UNLOCK (task);
+
+  return TRUE;
+}
diff --git a/gst/gsttask.h b/gst/gsttask.h
index c1210c3..33c26b0 100644
--- a/gst/gsttask.h
+++ b/gst/gsttask.h
@@ -55,10 +55,6 @@
 #define GST_TASK_BROADCAST(task)	g_cond_breadcast(GST_TASK_GET_COND (task))
 
 #define GST_TASK_GET_LOCK(task)		(GST_TASK_CAST(task)->lock)
-#define GST_TASK_LOCK(task)		g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task))
-#define GST_TASK_UNLOCK(task)		g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task))
-#define GST_TASK_UNLOCK_FULL(task)	g_static_rec_mutex_unlock_full(GST_TASK_GET_LOCK(task))
-#define GST_TASK_LOCK_FULL(task,t)	g_static_rec_mutex_lock_full(GST_TASK_GET_LOCK(task),(t))
 
 struct _GstTask {
   GstObject      object;
@@ -69,8 +65,10 @@
 
   GStaticRecMutex *lock;
 
-  GstTaskFunction func;
-  gpointer 	 data;
+  GstTaskFunction  func;
+  gpointer 	   data;
+
+  gboolean	   running;
 
   /*< private >*/
   gpointer _gst_reserved[GST_PADDING];
@@ -86,6 +84,8 @@
   gpointer _gst_reserved[GST_PADDING];
 };
 
+void		gst_task_cleanup_all	(void);
+
 GType           gst_task_get_type       (void);
 
 GstTask*	gst_task_create		(GstTaskFunction func, gpointer data);
@@ -97,6 +97,8 @@
 gboolean	gst_task_stop		(GstTask *task);
 gboolean	gst_task_pause		(GstTask *task);
 
+gboolean	gst_task_join		(GstTask *task);
+
 G_END_DECLS
 
 #endif /* __GST_TASK_H__ */
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c
index 6286794..fe42421 100644
--- a/plugins/elements/gstqueue.c
+++ b/plugins/elements/gstqueue.c
@@ -927,25 +927,27 @@
 static gboolean
 gst_queue_sink_activate_push (GstPad * pad, gboolean active)
 {
-  gboolean result = FALSE;
+  gboolean result = TRUE;
   GstQueue *queue;
 
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_OK;
-    result = TRUE;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
-    /* step 1, unblock chain and loop functions */
+    /* step 1, unblock chain function */
     GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
-    g_cond_signal (queue->item_del);
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    /* step 2, make sure streaming finishes */
-    result = gst_pad_stop_task (pad);
+    /* and make sure the chain function finishes */
+    GST_STREAM_LOCK (pad);
+    GST_STREAM_UNLOCK (pad);
   }
+
   gst_object_unref (queue);
 
   return result;
@@ -971,9 +973,10 @@
     }
     GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
-    /* step 1, unblock chain and loop functions */
+    /* step 1, unblock loop function */
     GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
+    /* the item add signal will unblock */
     g_cond_signal (queue->item_add);
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
diff --git a/tests/check/generic/states.c b/tests/check/generic/states.c
index 3b9b961..a441686 100644
--- a/tests/check/generic/states.c
+++ b/tests/check/generic/states.c
@@ -50,6 +50,8 @@
     gst_element_set_state (element, GST_STATE_PAUSED);
     gst_element_set_state (element, GST_STATE_NULL);
     gst_object_unref (GST_OBJECT (element));
+
+    gst_task_cleanup_all ();
   }
 }
 
diff --git a/tests/check/gst/gstbin.c b/tests/check/gst/gstbin.c
index f39f5c0..fffada3 100644
--- a/tests/check/gst/gstbin.c
+++ b/tests/check/gst/gstbin.c
@@ -423,6 +423,10 @@
   /* check if pads really are linked */
   fail_unless (gst_pad_is_linked (srcpad));
   fail_unless (gst_pad_is_linked (sinkpad));
+
+  gst_object_unref (srcpad);
+  gst_object_unref (sinkpad);
+  gst_object_unref (pipeline);
 }
 
 GST_END_TEST;