blob: 061119940bd0d2315e43579b2b4ebd1bb4dfad09 [file] [log] [blame]
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2014-2016 (c) Sten Grüner
* Copyright 2015 (c) Chris Iatrou
* Copyright 2015 (c) Nick Goossens
* Copyright 2015 (c) Jörg Schüler-Maroldt
* Copyright 2015-2016 (c) Oleksiy Vasylyev
* Copyright 2016-2017 (c) Florian Palm
* Copyright 2017 (c) Stefan Profanter, fortiss GmbH
* Copyright 2016 (c) Lorenz Haas
* Copyright 2017 (c) Jonas Green
*/
#include "ua_workqueue.h"
void UA_WorkQueue_init(UA_WorkQueue *wq) {
/* Initialized the linked list for delayed callbacks */
SIMPLEQ_INIT(&wq->delayedCallbacks);
#if UA_MULTITHREADING >= 200
wq->delayedCallbacks_checkpoint = NULL;
pthread_mutex_init(&wq->delayedCallbacks_accessMutex, NULL);
/* Initialize the dispatch queue for worker threads */
SIMPLEQ_INIT(&wq->dispatchQueue);
pthread_mutex_init(&wq->dispatchQueue_accessMutex, NULL);
pthread_cond_init(&wq->dispatchQueue_condition, NULL);
pthread_mutex_init(&wq->dispatchQueue_conditionMutex, NULL);
#endif
}
#if UA_MULTITHREADING >= 200
/* Forward declaration */
static void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq);
#endif
void UA_WorkQueue_cleanup(UA_WorkQueue *wq) {
#if UA_MULTITHREADING >= 200
/* Shut down workers */
UA_WorkQueue_stop(wq);
/* Execute remaining work in the dispatch queue */
while(true) {
pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
if(!dc) {
pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
break;
}
SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
dc->callback(dc->application, dc->data);
UA_free(dc);
}
#endif
/* All workers are shut down. Execute remaining delayed work here. */
UA_WorkQueue_manuallyProcessDelayed(wq);
#if UA_MULTITHREADING >= 200
wq->delayedCallbacks_checkpoint = NULL;
pthread_mutex_destroy(&wq->dispatchQueue_accessMutex);
pthread_cond_destroy(&wq->dispatchQueue_condition);
pthread_mutex_destroy(&wq->dispatchQueue_conditionMutex);
pthread_mutex_destroy(&wq->delayedCallbacks_accessMutex);
#endif
}
/***********/
/* Workers */
/***********/
#if UA_MULTITHREADING >= 200
static void *
workerLoop(UA_Worker *worker) {
UA_WorkQueue *wq = worker->queue;
UA_UInt32 *counter = &worker->counter;
volatile UA_Boolean *running = &worker->running;
/* Initialize the (thread local) random seed with the ram address
* of the worker. Not for security-critical entropy! */
UA_random_seed((uintptr_t)worker);
while(*running) {
UA_atomic_addUInt32(counter, 1);
/* Remove a callback from the queue */
pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
if(dc)
SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
/* Nothing to do. Sleep until a callback is dispatched */
if(!dc) {
pthread_mutex_lock(&wq->dispatchQueue_conditionMutex);
pthread_cond_wait(&wq->dispatchQueue_condition,
&wq->dispatchQueue_conditionMutex);
pthread_mutex_unlock(&wq->dispatchQueue_conditionMutex);
continue;
}
/* Execute */
if(dc->callback)
dc->callback(dc->application, dc->data);
UA_free(dc);
}
return NULL;
}
/* Can be called repeatedly and starts additional workers */
UA_StatusCode
UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount) {
if(wq->workersSize > 0 || workersCount == 0)
return UA_STATUSCODE_BADINTERNALERROR;
/* Create the worker array */
wq->workers = (UA_Worker*)UA_calloc(workersCount, sizeof(UA_Worker));
if(!wq->workers)
return UA_STATUSCODE_BADOUTOFMEMORY;
wq->workersSize = workersCount;
/* Spin up the workers */
for(size_t i = 0; i < workersCount; ++i) {
UA_Worker *w = &wq->workers[i];
w->queue = wq;
w->counter = 0;
w->running = true;
pthread_create(&w->thread, NULL, (void* (*)(void*))workerLoop, w);
}
return UA_STATUSCODE_GOOD;
}
void UA_WorkQueue_stop(UA_WorkQueue *wq) {
if(wq->workersSize == 0)
return;
/* Signal the workers to stop */
for(size_t i = 0; i < wq->workersSize; ++i)
wq->workers[i].running = false;
/* Wake up all workers */
pthread_cond_broadcast(&wq->dispatchQueue_condition);
/* Wait for the workers to finish, then clean up */
for(size_t i = 0; i < wq->workersSize; ++i)
pthread_join(wq->workers[i].thread, NULL);
UA_free(wq->workers);
wq->workers = NULL;
wq->workersSize = 0;
}
void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb,
void *application, void *data) {
UA_DelayedCallback *dc = (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
if(!dc) {
cb(application, data); /* Execute immediately if the memory could not be allocated */
return;
}
dc->callback = cb;
dc->application = application;
dc->data = data;
/* Enqueue for the worker threads */
pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, dc, next);
pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
/* Wake up sleeping workers */
pthread_cond_broadcast(&wq->dispatchQueue_condition);
}
#endif
/*********************/
/* Delayed Callbacks */
/*********************/
#if UA_MULTITHREADING >= 200
/* Delayed Callbacks are called only when all callbacks that were dispatched
* prior are finished. After every UA_MAX_DELAYED_SAMPLE delayed Callbacks that
* were added to the queue, we sample the counters from the workers. The
* counters are compared to the last counters that were sampled. If every worker
* has proceeded the counter, then we know that all delayed callbacks prior to
* the last sample-point are safe to execute. */
/* Sample the worker counter for every nth delayed callback. This is used to
* test that all workers have **finished** their current job before the delayed
* callback is processed. */
#define UA_MAX_DELAYED_SAMPLE 100
/* Call only with a held mutex for the delayed callbacks */
static void
dispatchDelayedCallbacks(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
/* Are callbacks before the last checkpoint ready? */
for(size_t i = 0; i < wq->workersSize; ++i) {
if(wq->workers[i].counter == wq->workers[i].checkpointCounter)
return;
}
/* Dispatch all delayed callbacks up to the checkpoint.
* TODO: Move over the entire queue up to the checkpoint in one step. */
if(wq->delayedCallbacks_checkpoint != NULL) {
UA_DelayedCallback *iter, *tmp_iter;
SIMPLEQ_FOREACH_SAFE(iter, &wq->delayedCallbacks, next, tmp_iter) {
pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, iter, next);
pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
if(iter == wq->delayedCallbacks_checkpoint)
break;
}
}
/* Create the new sample point */
for(size_t i = 0; i < wq->workersSize; ++i)
wq->workers[i].checkpointCounter = wq->workers[i].counter;
wq->delayedCallbacks_checkpoint = cb;
}
#endif
void
UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
#if UA_MULTITHREADING >= 200
pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
#endif
SIMPLEQ_INSERT_HEAD(&wq->delayedCallbacks, cb, next);
#if UA_MULTITHREADING >= 200
wq->delayedCallbacks_sinceDispatch++;
if(wq->delayedCallbacks_sinceDispatch > UA_MAX_DELAYED_SAMPLE) {
dispatchDelayedCallbacks(wq, cb);
wq->delayedCallbacks_sinceDispatch = 0;
}
pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
#endif
}
/* Assumes all workers are shut down */
void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq) {
UA_DelayedCallback *dc, *dc_tmp;
SIMPLEQ_FOREACH_SAFE(dc, &wq->delayedCallbacks, next, dc_tmp) {
SIMPLEQ_REMOVE_HEAD(&wq->delayedCallbacks, next);
if(dc->callback)
dc->callback(dc->application, dc->data);
UA_free(dc);
}
#if UA_MULTITHREADING >= 200
wq->delayedCallbacks_checkpoint = NULL;
#endif
}