| /* This Source Code Form is subject to the terms of the Mozilla Public |
| * License, v. 2.0. If a copy of the MPL was not distributed with this |
| * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| * |
| * Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
| * Copyright 2014-2016 (c) Sten Grüner |
| * Copyright 2015 (c) Chris Iatrou |
| * Copyright 2015 (c) Nick Goossens |
| * Copyright 2015 (c) Jörg Schüler-Maroldt |
| * Copyright 2015-2016 (c) Oleksiy Vasylyev |
| * Copyright 2016-2017 (c) Florian Palm |
| * Copyright 2017 (c) Stefan Profanter, fortiss GmbH |
| * Copyright 2016 (c) Lorenz Haas |
| * Copyright 2017 (c) Jonas Green |
| */ |
| |
| #include "ua_workqueue.h" |
| |
| void UA_WorkQueue_init(UA_WorkQueue *wq) { |
| /* Initialized the linked list for delayed callbacks */ |
| SIMPLEQ_INIT(&wq->delayedCallbacks); |
| |
| #if UA_MULTITHREADING >= 200 |
| wq->delayedCallbacks_checkpoint = NULL; |
| pthread_mutex_init(&wq->delayedCallbacks_accessMutex, NULL); |
| |
| /* Initialize the dispatch queue for worker threads */ |
| SIMPLEQ_INIT(&wq->dispatchQueue); |
| pthread_mutex_init(&wq->dispatchQueue_accessMutex, NULL); |
| pthread_cond_init(&wq->dispatchQueue_condition, NULL); |
| pthread_mutex_init(&wq->dispatchQueue_conditionMutex, NULL); |
| #endif |
| } |
| |
| #if UA_MULTITHREADING >= 200 |
| /* Forward declaration */ |
| static void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq); |
| #endif |
| |
| void UA_WorkQueue_cleanup(UA_WorkQueue *wq) { |
| #if UA_MULTITHREADING >= 200 |
| /* Shut down workers */ |
| UA_WorkQueue_stop(wq); |
| |
| /* Execute remaining work in the dispatch queue */ |
| while(true) { |
| pthread_mutex_lock(&wq->dispatchQueue_accessMutex); |
| UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue); |
| if(!dc) { |
| pthread_mutex_unlock(&wq->dispatchQueue_accessMutex); |
| break; |
| } |
| SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next); |
| pthread_mutex_unlock(&wq->dispatchQueue_accessMutex); |
| dc->callback(dc->application, dc->data); |
| UA_free(dc); |
| } |
| #endif |
| |
| /* All workers are shut down. Execute remaining delayed work here. */ |
| UA_WorkQueue_manuallyProcessDelayed(wq); |
| |
| #if UA_MULTITHREADING >= 200 |
| wq->delayedCallbacks_checkpoint = NULL; |
| pthread_mutex_destroy(&wq->dispatchQueue_accessMutex); |
| pthread_cond_destroy(&wq->dispatchQueue_condition); |
| pthread_mutex_destroy(&wq->dispatchQueue_conditionMutex); |
| pthread_mutex_destroy(&wq->delayedCallbacks_accessMutex); |
| #endif |
| } |
| |
| /***********/ |
| /* Workers */ |
| /***********/ |
| |
| #if UA_MULTITHREADING >= 200 |
| |
| static void * |
| workerLoop(UA_Worker *worker) { |
| UA_WorkQueue *wq = worker->queue; |
| UA_UInt32 *counter = &worker->counter; |
| volatile UA_Boolean *running = &worker->running; |
| |
| /* Initialize the (thread local) random seed with the ram address |
| * of the worker. Not for security-critical entropy! */ |
| UA_random_seed((uintptr_t)worker); |
| |
| while(*running) { |
| UA_atomic_addUInt32(counter, 1); |
| |
| /* Remove a callback from the queue */ |
| pthread_mutex_lock(&wq->dispatchQueue_accessMutex); |
| UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue); |
| if(dc) |
| SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next); |
| pthread_mutex_unlock(&wq->dispatchQueue_accessMutex); |
| |
| /* Nothing to do. Sleep until a callback is dispatched */ |
| if(!dc) { |
| pthread_mutex_lock(&wq->dispatchQueue_conditionMutex); |
| pthread_cond_wait(&wq->dispatchQueue_condition, |
| &wq->dispatchQueue_conditionMutex); |
| pthread_mutex_unlock(&wq->dispatchQueue_conditionMutex); |
| continue; |
| } |
| |
| /* Execute */ |
| if(dc->callback) |
| dc->callback(dc->application, dc->data); |
| UA_free(dc); |
| } |
| |
| return NULL; |
| } |
| |
| /* Can be called repeatedly and starts additional workers */ |
| UA_StatusCode |
| UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount) { |
| if(wq->workersSize > 0 || workersCount == 0) |
| return UA_STATUSCODE_BADINTERNALERROR; |
| |
| /* Create the worker array */ |
| wq->workers = (UA_Worker*)UA_calloc(workersCount, sizeof(UA_Worker)); |
| if(!wq->workers) |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| wq->workersSize = workersCount; |
| |
| /* Spin up the workers */ |
| for(size_t i = 0; i < workersCount; ++i) { |
| UA_Worker *w = &wq->workers[i]; |
| w->queue = wq; |
| w->counter = 0; |
| w->running = true; |
| pthread_create(&w->thread, NULL, (void* (*)(void*))workerLoop, w); |
| } |
| return UA_STATUSCODE_GOOD; |
| } |
| |
| void UA_WorkQueue_stop(UA_WorkQueue *wq) { |
| if(wq->workersSize == 0) |
| return; |
| |
| /* Signal the workers to stop */ |
| for(size_t i = 0; i < wq->workersSize; ++i) |
| wq->workers[i].running = false; |
| |
| /* Wake up all workers */ |
| pthread_cond_broadcast(&wq->dispatchQueue_condition); |
| |
| /* Wait for the workers to finish, then clean up */ |
| for(size_t i = 0; i < wq->workersSize; ++i) |
| pthread_join(wq->workers[i].thread, NULL); |
| |
| UA_free(wq->workers); |
| wq->workers = NULL; |
| wq->workersSize = 0; |
| } |
| |
| void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb, |
| void *application, void *data) { |
| UA_DelayedCallback *dc = (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback)); |
| if(!dc) { |
| cb(application, data); /* Execute immediately if the memory could not be allocated */ |
| return; |
| } |
| |
| dc->callback = cb; |
| dc->application = application; |
| dc->data = data; |
| |
| /* Enqueue for the worker threads */ |
| pthread_mutex_lock(&wq->dispatchQueue_accessMutex); |
| SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, dc, next); |
| pthread_mutex_unlock(&wq->dispatchQueue_accessMutex); |
| |
| /* Wake up sleeping workers */ |
| pthread_cond_broadcast(&wq->dispatchQueue_condition); |
| } |
| |
| #endif |
| |
| /*********************/ |
| /* Delayed Callbacks */ |
| /*********************/ |
| |
| #if UA_MULTITHREADING >= 200 |
| |
| /* Delayed Callbacks are called only when all callbacks that were dispatched |
| * prior are finished. After every UA_MAX_DELAYED_SAMPLE delayed Callbacks that |
| * were added to the queue, we sample the counters from the workers. The |
| * counters are compared to the last counters that were sampled. If every worker |
| * has proceeded the counter, then we know that all delayed callbacks prior to |
| * the last sample-point are safe to execute. */ |
| |
| /* Sample the worker counter for every nth delayed callback. This is used to |
| * test that all workers have **finished** their current job before the delayed |
| * callback is processed. */ |
| #define UA_MAX_DELAYED_SAMPLE 100 |
| |
| /* Call only with a held mutex for the delayed callbacks */ |
| static void |
| dispatchDelayedCallbacks(UA_WorkQueue *wq, UA_DelayedCallback *cb) { |
| /* Are callbacks before the last checkpoint ready? */ |
| for(size_t i = 0; i < wq->workersSize; ++i) { |
| if(wq->workers[i].counter == wq->workers[i].checkpointCounter) |
| return; |
| } |
| |
| /* Dispatch all delayed callbacks up to the checkpoint. |
| * TODO: Move over the entire queue up to the checkpoint in one step. */ |
| if(wq->delayedCallbacks_checkpoint != NULL) { |
| UA_DelayedCallback *iter, *tmp_iter; |
| SIMPLEQ_FOREACH_SAFE(iter, &wq->delayedCallbacks, next, tmp_iter) { |
| pthread_mutex_lock(&wq->dispatchQueue_accessMutex); |
| SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, iter, next); |
| pthread_mutex_unlock(&wq->dispatchQueue_accessMutex); |
| if(iter == wq->delayedCallbacks_checkpoint) |
| break; |
| } |
| } |
| |
| /* Create the new sample point */ |
| for(size_t i = 0; i < wq->workersSize; ++i) |
| wq->workers[i].checkpointCounter = wq->workers[i].counter; |
| wq->delayedCallbacks_checkpoint = cb; |
| } |
| |
| #endif |
| |
| void |
| UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb) { |
| #if UA_MULTITHREADING >= 200 |
| pthread_mutex_lock(&wq->dispatchQueue_accessMutex); |
| #endif |
| |
| SIMPLEQ_INSERT_HEAD(&wq->delayedCallbacks, cb, next); |
| |
| #if UA_MULTITHREADING >= 200 |
| wq->delayedCallbacks_sinceDispatch++; |
| if(wq->delayedCallbacks_sinceDispatch > UA_MAX_DELAYED_SAMPLE) { |
| dispatchDelayedCallbacks(wq, cb); |
| wq->delayedCallbacks_sinceDispatch = 0; |
| } |
| pthread_mutex_unlock(&wq->dispatchQueue_accessMutex); |
| #endif |
| } |
| |
| /* Assumes all workers are shut down */ |
| void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq) { |
| UA_DelayedCallback *dc, *dc_tmp; |
| SIMPLEQ_FOREACH_SAFE(dc, &wq->delayedCallbacks, next, dc_tmp) { |
| SIMPLEQ_REMOVE_HEAD(&wq->delayedCallbacks, next); |
| if(dc->callback) |
| dc->callback(dc->application, dc->data); |
| UA_free(dc); |
| } |
| #if UA_MULTITHREADING >= 200 |
| wq->delayedCallbacks_checkpoint = NULL; |
| #endif |
| } |