| /* GStreamer |
| * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> |
| * 2000 Wim Taymans <wtay@chello.be> |
| * 2003 Benjamin Otte <in7y118@public.uni-hamburg.de> |
| * |
| * gstthread.c: Threaded container object |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Library General Public |
| * License as published by the Free Software Foundation; either |
| * version 2 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Library General Public License for more details. |
| * |
| * You should have received a copy of the GNU Library General Public |
| * License along with this library; if not, write to the |
| * Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
| * Boston, MA 02111-1307, USA. |
| */ |
| |
| #include "gst_private.h" |
| |
| #include "gstthread.h" |
| #include "gstmarshal.h" |
| #include "gstscheduler.h" |
| #include "gstinfo.h" |
| #include "gstlog.h" |
| |
| #define GST_CAT_DEFAULT GST_CAT_THREAD |
| #define STACK_SIZE 0x200000 |
| |
| static GstElementDetails gst_thread_details = GST_ELEMENT_DETAILS ( |
| "Threaded container", |
| "Generic/Bin", |
| "Container that creates/manages a thread", |
| "Erik Walthinsen <omega@cse.ogi.edu>, " |
| "Benjamin Otte <in7y118@informatik.uni-hamburg.de" |
| ); |
| |
| /* Thread signals and args */ |
| enum { |
| SHUTDOWN, |
| /* FILL ME */ |
| LAST_SIGNAL |
| }; |
| |
| enum { |
| SPINUP=0, |
| STATECHANGE, |
| STARTUP |
| }; |
| |
| enum { |
| ARG_0, |
| ARG_PRIORITY |
| }; |
| |
| |
| static void gst_thread_base_init (gpointer g_class); |
| static void gst_thread_class_init (gpointer g_class, gpointer class_data); |
| static void gst_thread_init (GTypeInstance *instance, gpointer g_class); |
| |
| static void gst_thread_dispose (GObject *object); |
| |
| static void gst_thread_set_property (GObject *object, guint prop_id, |
| const GValue *value, GParamSpec *pspec); |
| static void gst_thread_get_property (GObject *object, guint prop_id, |
| GValue *value, GParamSpec *pspec); |
| static GstElementStateReturn gst_thread_change_state (GstElement *element); |
| static void gst_thread_child_state_change (GstBin *bin, GstElementState oldstate, |
| GstElementState newstate, GstElement *element); |
| |
| static void gst_thread_catch (GstThread *thread); |
| static void gst_thread_release (GstThread *thread); |
| |
| #ifndef GST_DISABLE_LOADSAVE |
| static xmlNodePtr gst_thread_save_thyself (GstObject *object, |
| xmlNodePtr parent); |
| static void gst_thread_restore_thyself (GstObject *object, |
| xmlNodePtr self); |
| #endif |
| |
| static void * gst_thread_main_loop (void *arg); |
| |
| #define GST_TYPE_THREAD_PRIORITY (gst_thread_priority_get_type()) |
| static GType |
| gst_thread_priority_get_type(void) |
| { |
| static GType thread_priority_type = 0; |
| static GEnumValue thread_priority[] = |
| { |
| { G_THREAD_PRIORITY_LOW, "LOW", "Low Priority Scheduling" }, |
| { G_THREAD_PRIORITY_NORMAL, "NORMAL", "Normal Scheduling" }, |
| { G_THREAD_PRIORITY_HIGH, "HIGH", "High Priority Scheduling" }, |
| { G_THREAD_PRIORITY_URGENT, "URGENT", "Urgent Scheduling" }, |
| { 0, NULL, NULL }, |
| }; |
| if (!thread_priority_type) { |
| thread_priority_type = g_enum_register_static("GstThreadPriority", thread_priority); |
| } |
| return thread_priority_type; |
| } |
| |
| static GstBinClass *parent_class = NULL; |
| static guint gst_thread_signals[LAST_SIGNAL] = { 0 }; |
| GPrivate *gst_thread_current; |
| |
| GType |
| gst_thread_get_type(void) { |
| static GType thread_type = 0; |
| |
| if (!thread_type) { |
| static const GTypeInfo thread_info = { |
| sizeof (GstThreadClass), |
| gst_thread_base_init, |
| NULL, |
| gst_thread_class_init, |
| NULL, |
| NULL, |
| sizeof (GstThread), |
| 4, |
| gst_thread_init, |
| NULL |
| }; |
| thread_type = g_type_register_static (GST_TYPE_BIN, "GstThread", |
| &thread_info, 0); |
| } |
| return thread_type; |
| } |
| |
| static void |
| gst_thread_base_init (gpointer g_class) |
| { |
| GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); |
| |
| gst_element_class_set_details (gstelement_class, &gst_thread_details); |
| } |
| static void do_nothing (gpointer hi) {} |
| static void |
| gst_thread_class_init (gpointer g_class, gpointer class_data) |
| { |
| GObjectClass *gobject_class = G_OBJECT_CLASS (g_class); |
| GstObjectClass *gstobject_class = GST_OBJECT_CLASS (g_class); |
| GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class); |
| GstBinClass *gstbin_class = GST_BIN_CLASS (g_class); |
| GstThreadClass *klass = GST_THREAD_CLASS (g_class); |
| |
| /* setup gst_thread_current */ |
| gst_thread_current = g_private_new (do_nothing); |
| |
| parent_class = g_type_class_peek_parent (g_class); |
| |
| g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PRIORITY, |
| g_param_spec_enum ("priority", "Scheduling Policy", "The scheduling priority of the thread", |
| GST_TYPE_THREAD_PRIORITY, G_THREAD_PRIORITY_NORMAL, G_PARAM_READWRITE)); |
| |
| gst_thread_signals[SHUTDOWN] = |
| g_signal_new ("shutdown", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, |
| G_STRUCT_OFFSET (GstThreadClass, shutdown), NULL, NULL, |
| gst_marshal_VOID__VOID, G_TYPE_NONE, 0); |
| |
| gobject_class->dispose = gst_thread_dispose; |
| |
| #ifndef GST_DISABLE_LOADSAVE |
| gstobject_class->save_thyself = GST_DEBUG_FUNCPTR (gst_thread_save_thyself); |
| gstobject_class->restore_thyself = GST_DEBUG_FUNCPTR (gst_thread_restore_thyself); |
| #endif |
| |
| gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_thread_change_state); |
| |
| gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_thread_set_property); |
| gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_thread_get_property); |
| |
| gstbin_class->child_state_change = GST_DEBUG_FUNCPTR (gst_thread_child_state_change); |
| } |
| |
| static void |
| gst_thread_init (GTypeInstance *instance, gpointer g_class) |
| { |
| GstScheduler *scheduler; |
| GstThread *thread = GST_THREAD (instance); |
| |
| GST_DEBUG ("initializing thread"); |
| |
| /* threads are managing bins and iterate themselves */ |
| /* CR1: the GstBin code checks these flags */ |
| GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER); |
| GST_FLAG_SET (thread, GST_BIN_SELF_SCHEDULABLE); |
| |
| scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (thread)); |
| g_assert (scheduler); |
| |
| thread->lock = g_mutex_new (); |
| thread->cond = g_cond_new (); |
| |
| thread->thread_id = (GThread *) NULL; /* set in NULL -> READY */ |
| thread->priority = G_THREAD_PRIORITY_NORMAL; |
| } |
| |
| static void |
| gst_thread_dispose (GObject *object) |
| { |
| GstThread *thread = GST_THREAD (object); |
| |
| GST_CAT_DEBUG (GST_CAT_REFCOUNTING, "GstThread: dispose"); |
| |
| G_OBJECT_CLASS (parent_class)->dispose (object); |
| |
| g_assert (GST_STATE (thread) == GST_STATE_NULL); |
| |
| g_mutex_free (thread->lock); |
| g_cond_free (thread->cond); |
| |
| gst_object_replace ((GstObject **)&GST_ELEMENT_SCHED (thread), NULL); |
| } |
| |
| /** |
| * gst_thread_set_priority: |
| * @thread: the thread to change |
| * @priority: the new priority for the thread |
| * |
| * change the thread's priority |
| */ |
| void |
| gst_thread_set_priority (GstThread *thread, GThreadPriority priority) |
| { |
| g_return_if_fail (GST_IS_THREAD (thread)); |
| |
| thread->priority = priority; |
| } |
| |
| static void |
| gst_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) |
| { |
| GstThread *thread; |
| |
| thread = GST_THREAD (object); |
| |
| switch (prop_id) { |
| case ARG_PRIORITY: |
| thread->priority = g_value_get_enum (value); |
| break; |
| default: |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
| break; |
| } |
| } |
| |
| static void |
| gst_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) |
| { |
| GstThread *thread; |
| |
| thread = GST_THREAD (object); |
| |
| switch (prop_id) { |
| case ARG_PRIORITY: |
| g_value_set_enum (value, thread->priority); |
| break; |
| default: |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
| break; |
| } |
| } |
| |
| |
| /** |
| * gst_thread_new: |
| * @name: the name of the thread |
| * |
| * Create a new thread with the given name. |
| * |
| * Returns: The new thread |
| */ |
| GstElement* |
| gst_thread_new (const gchar *name) |
| { |
| return gst_element_factory_make ("thread", name); |
| } |
| |
| /** |
| * gst_thread_get_current: |
| * |
| * Create a new thread with the given name. |
| * |
| * Returns: The current GstThread or NULL if you are not running inside a |
| * #GstThread. |
| */ |
| GstThread * |
| gst_thread_get_current (void) |
| { |
| return (GstThread *) g_private_get (gst_thread_current); |
| } |
| |
| static inline void |
| gst_thread_release_children_locks (GstThread *thread) |
| { |
| GstRealPad *peer = NULL; |
| GstElement *peerelement; |
| GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread)); |
| |
| while (elements) { |
| GstElement *element = GST_ELEMENT (elements->data); |
| GList *pads; |
| |
| g_assert (element); |
| GST_DEBUG_OBJECT (thread, "waking element \"%s\"", GST_ELEMENT_NAME (element)); |
| elements = g_list_next (elements); |
| |
| if (!gst_element_release_locks (element)) |
| g_warning ("element %s could not release locks", GST_ELEMENT_NAME (element)); |
| |
| pads = GST_ELEMENT_PADS (element); |
| |
| while (pads) { |
| if (GST_PAD_PEER (pads->data)) { |
| peer = GST_REAL_PAD (GST_PAD_PEER (pads->data)); |
| pads = g_list_next (pads); |
| } else { |
| pads = g_list_next (pads); |
| continue; |
| } |
| |
| if (!peer) |
| continue; |
| |
| peerelement = GST_PAD_PARENT (peer); |
| if (!peerelement) |
| continue; /* FIXME: deal with case where there's no peer */ |
| |
| if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) { |
| GST_LOG_OBJECT (thread, "element \"%s\" has pad cross sched boundary", GST_ELEMENT_NAME (element)); |
| GST_LOG_OBJECT (thread, "waking element \"%s\"", GST_ELEMENT_NAME (peerelement)); |
| if (!gst_element_release_locks (peerelement)) |
| g_warning ("element %s could not release locks", GST_ELEMENT_NAME (peerelement)); |
| } |
| } |
| } |
| } |
| /* stops the main thread, if there is one and grabs the thread's mutex */ |
| static void |
| gst_thread_catch (GstThread *thread) |
| { |
| gboolean wait; |
| if (thread == gst_thread_get_current()) { |
| /* we're trying to catch ourself */ |
| if (!GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) { |
| g_mutex_lock (thread->lock); |
| GST_FLAG_SET (thread, GST_THREAD_MUTEX_LOCKED); |
| } |
| GST_DEBUG_OBJECT (thread, "catching itself"); |
| GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING); |
| } else { |
| /* another thread is trying to catch us */ |
| g_mutex_lock (thread->lock); |
| wait = !GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING); |
| while (!wait) { |
| GTimeVal tv; |
| GST_LOG_OBJECT (thread, "catching thread..."); |
| GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING); |
| g_cond_signal (thread->cond); |
| gst_thread_release_children_locks (thread); |
| g_get_current_time (&tv); |
| g_time_val_add (&tv, 1000); /* wait a millisecond to catch the thread */ |
| wait = g_cond_timed_wait (thread->cond, thread->lock, &tv); |
| } |
| GST_LOG_OBJECT (thread, "caught thread"); |
| } |
| g_assert (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)); |
| } |
| |
| static void |
| gst_thread_release (GstThread *thread) |
| { |
| if (thread != gst_thread_get_current()) { |
| g_cond_signal (thread->cond); |
| g_mutex_unlock (thread->lock); |
| } |
| } |
| |
| static GstElementStateReturn |
| gst_thread_change_state (GstElement *element) |
| { |
| GstThread *thread; |
| GstElementStateReturn ret; |
| gint transition; |
| |
| g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE); |
| transition = GST_STATE_TRANSITION (element); |
| |
| thread = GST_THREAD (element); |
| |
| GST_DEBUG_OBJECT (element, "changing state from %s to %s", |
| gst_element_state_get_name (GST_STATE (element)), |
| gst_element_state_get_name (GST_STATE_PENDING (element))); |
| |
| gst_thread_catch (thread); |
| |
| /* FIXME: (or GStreamers ideas about "threading"): the element variables are |
| commonly accessed by multiple threads at the same time (see bug #111146 |
| for an example) */ |
| if (transition != GST_STATE_TRANSITION (element)) { |
| g_warning ("inconsistent state information, fix threading please"); |
| } |
| |
| switch (transition) { |
| case GST_STATE_NULL_TO_READY: |
| /* create the thread */ |
| GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING); |
| thread->thread_id = g_thread_create_full(gst_thread_main_loop, |
| thread, STACK_SIZE, FALSE, TRUE, thread->priority, |
| NULL); |
| if (!thread->thread_id){ |
| GST_ERROR_OBJECT (element, "g_thread_create_full failed"); |
| goto error_out; |
| } |
| GST_LOG_OBJECT (element, "GThread created"); |
| |
| /* wait for it to 'spin up' */ |
| g_cond_wait (thread->cond, thread->lock); |
| break; |
| case GST_STATE_READY_TO_PAUSED: |
| break; |
| case GST_STATE_PAUSED_TO_PLAYING: |
| { |
| /* FIXME: recurse into sub-bins */ |
| GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread)); |
| while (elements) { |
| gst_element_enable_threadsafe_properties ((GstElement*)elements->data); |
| elements = g_list_next (elements); |
| } |
| /* reset self to spinning */ |
| if (thread == gst_thread_get_current()) |
| GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING); |
| break; |
| } |
| case GST_STATE_PLAYING_TO_PAUSED: |
| { |
| GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread)); |
| while (elements) { |
| gst_element_disable_threadsafe_properties ((GstElement*)elements->data); |
| elements = g_list_next (elements); |
| } |
| break; |
| } |
| case GST_STATE_PAUSED_TO_READY: |
| break; |
| case GST_STATE_READY_TO_NULL: |
| /* we can't join the threads here, because this could have been triggered |
| by ourself (ouch) */ |
| GST_LOG_OBJECT (thread, "destroying GThread %p", thread->thread_id); |
| GST_FLAG_SET (thread, GST_THREAD_STATE_REAPING); |
| thread->thread_id = NULL; |
| if (thread == gst_thread_get_current()) { |
| /* or should we continue? */ |
| g_warning ("Thread %s is destroying itself. Function call will not return!", GST_ELEMENT_NAME (thread)); |
| gst_scheduler_reset (GST_ELEMENT_SCHED (thread)); |
| |
| /* unlock and signal - we are out */ |
| gst_thread_release (thread); |
| |
| GST_INFO_OBJECT (thread, "GThread %p is exiting", g_thread_self()); |
| |
| g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0); |
| |
| g_thread_exit (NULL); |
| } |
| /* now wait for the thread to destroy itself */ |
| g_cond_signal (thread->cond); |
| g_cond_wait (thread->cond, thread->lock); |
| /* it should be dead now */ |
| break; |
| default: |
| GST_ERROR_OBJECT (element, "UNHANDLED STATE CHANGE! %x", |
| GST_STATE_TRANSITION (element)); |
| g_assert_not_reached (); |
| break; |
| } |
| |
| if (GST_ELEMENT_CLASS (parent_class)->change_state) { |
| ret = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT (thread)); |
| } else { |
| ret = GST_STATE_SUCCESS; |
| } |
| |
| gst_thread_release (thread); |
| return ret; |
| |
| error_out: |
| GST_CAT_DEBUG (GST_CAT_STATES, "changing state from %s to %s failed for %s", |
| gst_element_state_get_name (GST_STATE (element)), |
| gst_element_state_get_name (GST_STATE_PENDING (element)), |
| GST_ELEMENT_NAME (element)); |
| gst_thread_release (thread); |
| return GST_STATE_FAILURE; |
| } |
| |
| /* state changes work this way: We grab the lock and stop the thread from |
| spinning (via gst_thread_catch) - then we change the state. After that the |
| thread may spin on. */ |
| static void |
| gst_thread_child_state_change (GstBin *bin, GstElementState oldstate, |
| GstElementState newstate, GstElement *element) |
| { |
| GST_LOG_OBJECT (bin, "(from thread %s) child %s changed state from %s to %s", |
| gst_thread_get_current() ? GST_ELEMENT_NAME (gst_thread_get_current()) : "(none)", |
| GST_ELEMENT_NAME (element), gst_element_state_get_name (oldstate), |
| gst_element_state_get_name (newstate)); |
| if (parent_class->child_state_change) |
| parent_class->child_state_change (bin, oldstate, newstate, element); |
| /* We'll wake up the main thread now. Note that we can't lock the thread here, |
| because we might be called from inside gst_thread_change_state when holding |
| the lock. But this doesn't cause any problems. */ |
| if (newstate == GST_STATE_PLAYING) |
| g_cond_signal (GST_THREAD (bin)->cond); |
| } |
| /** |
| * gst_thread_main_loop: |
| * @arg: the thread to start |
| * |
| * The main loop of the thread. The thread will iterate |
| * while the state is GST_THREAD_STATE_SPINNING. |
| */ |
| static void * |
| gst_thread_main_loop (void *arg) |
| { |
| GstThread *thread = NULL; |
| gboolean status; |
| |
| thread = GST_THREAD (arg); |
| g_mutex_lock (thread->lock); |
| GST_LOG_OBJECT (thread, "Started main loop"); |
| |
| /* initialize gst_thread_current */ |
| g_private_set (gst_thread_current, thread); |
| |
| /* set up the element's scheduler */ |
| gst_scheduler_setup (GST_ELEMENT_SCHED (thread)); |
| GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING); |
| |
| g_cond_signal (thread->cond); |
| while (!(GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING))) { |
| if (GST_STATE (thread) == GST_STATE_PLAYING) { |
| GST_FLAG_SET (thread, GST_THREAD_STATE_SPINNING); |
| status = TRUE; |
| GST_LOG_OBJECT (thread, "starting to iterate"); |
| while (status && GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) { |
| g_mutex_unlock (thread->lock); |
| status = gst_bin_iterate (GST_BIN (thread)); |
| if (GST_FLAG_IS_SET (thread, GST_THREAD_MUTEX_LOCKED)) { |
| GST_FLAG_UNSET (thread, GST_THREAD_MUTEX_LOCKED); |
| } else { |
| g_mutex_lock (thread->lock); |
| } |
| } |
| GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING); |
| } |
| if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING)) |
| break; |
| GST_LOG_OBJECT (thread, "we're caught"); |
| g_cond_signal (thread->cond); |
| g_cond_wait (thread->cond, thread->lock); |
| } |
| |
| /* we need to destroy the scheduler here because it has mapped it's |
| * stack into the threads stack space */ |
| gst_scheduler_reset (GST_ELEMENT_SCHED (thread)); |
| |
| /* must do that before releasing the lock - we might get disposed before being done */ |
| g_signal_emit (G_OBJECT (thread), gst_thread_signals[SHUTDOWN], 0); |
| |
| /* unlock and signal - we are out */ |
| |
| GST_LOG_OBJECT (thread, "Thread %p exits main loop", g_thread_self()); |
| g_cond_signal (thread->cond); |
| g_mutex_unlock (thread->lock); |
| /* don't assume the GstThread object exists anymore now */ |
| |
| return NULL; |
| } |
| |
| #ifndef GST_DISABLE_LOADSAVE |
| static xmlNodePtr |
| gst_thread_save_thyself (GstObject *object, |
| xmlNodePtr self) |
| { |
| if (GST_OBJECT_CLASS (parent_class)->save_thyself) |
| GST_OBJECT_CLASS (parent_class)->save_thyself (object, self); |
| return NULL; |
| } |
| |
| static void |
| gst_thread_restore_thyself (GstObject *object, |
| xmlNodePtr self) |
| { |
| GST_LOG_OBJECT (object, "restoring"); |
| |
| if (GST_OBJECT_CLASS (parent_class)->restore_thyself) |
| GST_OBJECT_CLASS (parent_class)->restore_thyself (object, self); |
| } |
| #endif /* GST_DISABLE_LOADSAVE */ |