blob: af6be064121a7e4501590be4d2de3aa43269e118 [file] [log] [blame]
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2014-2016 (c) Sten Grüner
* Copyright 2015 (c) Chris Iatrou
* Copyright 2015 (c) Nick Goossens
* Copyright 2015 (c) Jörg Schüler-Maroldt
* Copyright 2015-2016 (c) Oleksiy Vasylyev
* Copyright 2016-2017 (c) Florian Palm
* Copyright 2017 (c) Stefan Profanter, fortiss GmbH
* Copyright 2016 (c) Lorenz Haas
* Copyright 2017 (c) Jonas Green
*/
#ifndef UA_WORKQUEUE_H_
#define UA_WORKQUEUE_H_
#include "ua_util_internal.h"
#include "open62541_queue.h"
#if UA_MULTITHREADING >= 200
#include <pthread.h>
#endif
_UA_BEGIN_DECLS
/* Callback where the application is either a client or a server */
typedef void (*UA_ApplicationCallback)(void *application, void *data);
/* Delayed callbacks are executed when all previously enqueue work is finished.
* This is used to free memory that might used by a parallel worker or where the
* current threat has remaining pointers to until the current operation
* finishes. */
typedef struct UA_DelayedCallback {
SIMPLEQ_ENTRY(UA_DelayedCallback) next;
UA_ApplicationCallback callback;
void *application;
void *data;
} UA_DelayedCallback;
struct UA_WorkQueue;
typedef struct UA_WorkQueue UA_WorkQueue;
#if UA_MULTITHREADING >= 200
/* Workers take out callbacks from the work queue and execute them.
*
* Future Plans: Use work-stealing to load-balance between cores.
* Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
* models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
typedef struct {
pthread_t thread;
volatile UA_Boolean running;
UA_WorkQueue *queue;
UA_UInt32 counter;
UA_UInt32 checkpointCounter; /* Counter when the last checkpoint was made
* for the delayed callbacks */
/* separate cache lines */
char padding[64 - sizeof(void*) - sizeof(pthread_t) -
sizeof(UA_UInt32) - sizeof(UA_Boolean)];
} UA_Worker;
#endif
struct UA_WorkQueue {
/* Worker threads and work queue. Without multithreading, work is executed
immediately. */
#if UA_MULTITHREADING >= 200
UA_Worker *workers;
size_t workersSize;
/* Work queue */
SIMPLEQ_HEAD(, UA_DelayedCallback) dispatchQueue; /* Dispatch queue for the worker threads */
pthread_mutex_t dispatchQueue_accessMutex; /* mutex for access to queue */
pthread_cond_t dispatchQueue_condition; /* so the workers don't spin if the queue is empty */
pthread_mutex_t dispatchQueue_conditionMutex; /* mutex for access to condition variable */
#endif
/* Delayed callbacks
* To be executed after all curretly dispatched works has finished */
SIMPLEQ_HEAD(, UA_DelayedCallback) delayedCallbacks;
#if UA_MULTITHREADING >= 200
pthread_mutex_t delayedCallbacks_accessMutex;
UA_DelayedCallback *delayedCallbacks_checkpoint;
size_t delayedCallbacks_sinceDispatch; /* How many have been added since we
* tried to dispatch callbacks? */
#endif
};
void UA_WorkQueue_init(UA_WorkQueue *wq);
/* Enqueue a delayed callback. It is executed when all previous work in the
* queue has been finished. The ``cb`` pointer is freed afterwards. ``cb`` can
* have a NULL callback that is not executed.
*
* This method checks internally if existing delayed work can be moved from the
* delayed queue to the worker dispatch queue. */
void UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb);
/* Stop the workers, process all enqueued work in the calling thread, clean up
* mutexes etc. */
void UA_WorkQueue_cleanup(UA_WorkQueue *wq);
#if UA_MULTITHREADING >= 200
/* Spin up a number of worker threads that listen on the work queue */
UA_StatusCode UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount);
void UA_WorkQueue_stop(UA_WorkQueue *wq);
/* Enqueue work for the worker threads */
void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb,
void *application, void *data);
#else
/* Process all enqueued delayed work. This is not needed when workers are
* running for the multithreading case. (UA_WorkQueue_cleanup still calls this
* method during cleanup when the workers are shut down.) */
void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq);
#endif
_UA_END_DECLS
#endif /* UA_SERVER_WORKQUEUE_H_ */