| /* This Source Code Form is subject to the terms of the Mozilla Public |
| * License, v. 2.0. If a copy of the MPL was not distributed with this |
| * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| * |
| * Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
| * Copyright 2014-2016 (c) Sten Grüner |
| * Copyright 2015 (c) Chris Iatrou |
| * Copyright 2015 (c) Nick Goossens |
| * Copyright 2015 (c) Jörg Schüler-Maroldt |
| * Copyright 2015-2016 (c) Oleksiy Vasylyev |
| * Copyright 2016-2017 (c) Florian Palm |
| * Copyright 2017 (c) Stefan Profanter, fortiss GmbH |
| * Copyright 2016 (c) Lorenz Haas |
| * Copyright 2017 (c) Jonas Green |
| */ |
| |
| #ifndef UA_WORKQUEUE_H_ |
| #define UA_WORKQUEUE_H_ |
| |
| #include "ua_util_internal.h" |
| #include "open62541_queue.h" |
| |
| #if UA_MULTITHREADING >= 200 |
| #include <pthread.h> |
| #endif |
| |
| _UA_BEGIN_DECLS |
| |
| /* Callback where the application is either a client or a server */ |
| typedef void (*UA_ApplicationCallback)(void *application, void *data); |
| |
| /* Delayed callbacks are executed when all previously enqueue work is finished. |
| * This is used to free memory that might used by a parallel worker or where the |
| * current threat has remaining pointers to until the current operation |
| * finishes. */ |
| typedef struct UA_DelayedCallback { |
| SIMPLEQ_ENTRY(UA_DelayedCallback) next; |
| UA_ApplicationCallback callback; |
| void *application; |
| void *data; |
| } UA_DelayedCallback; |
| |
| struct UA_WorkQueue; |
| typedef struct UA_WorkQueue UA_WorkQueue; |
| |
| #if UA_MULTITHREADING >= 200 |
| |
| /* Workers take out callbacks from the work queue and execute them. |
| * |
| * Future Plans: Use work-stealing to load-balance between cores. |
| * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory |
| * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */ |
| typedef struct { |
| pthread_t thread; |
| volatile UA_Boolean running; |
| UA_WorkQueue *queue; |
| UA_UInt32 counter; |
| UA_UInt32 checkpointCounter; /* Counter when the last checkpoint was made |
| * for the delayed callbacks */ |
| |
| /* separate cache lines */ |
| char padding[64 - sizeof(void*) - sizeof(pthread_t) - |
| sizeof(UA_UInt32) - sizeof(UA_Boolean)]; |
| } UA_Worker; |
| |
| #endif |
| |
| struct UA_WorkQueue { |
| /* Worker threads and work queue. Without multithreading, work is executed |
| immediately. */ |
| #if UA_MULTITHREADING >= 200 |
| UA_Worker *workers; |
| size_t workersSize; |
| |
| /* Work queue */ |
| SIMPLEQ_HEAD(, UA_DelayedCallback) dispatchQueue; /* Dispatch queue for the worker threads */ |
| pthread_mutex_t dispatchQueue_accessMutex; /* mutex for access to queue */ |
| pthread_cond_t dispatchQueue_condition; /* so the workers don't spin if the queue is empty */ |
| pthread_mutex_t dispatchQueue_conditionMutex; /* mutex for access to condition variable */ |
| #endif |
| |
| /* Delayed callbacks |
| * To be executed after all curretly dispatched works has finished */ |
| SIMPLEQ_HEAD(, UA_DelayedCallback) delayedCallbacks; |
| #if UA_MULTITHREADING >= 200 |
| pthread_mutex_t delayedCallbacks_accessMutex; |
| UA_DelayedCallback *delayedCallbacks_checkpoint; |
| size_t delayedCallbacks_sinceDispatch; /* How many have been added since we |
| * tried to dispatch callbacks? */ |
| #endif |
| }; |
| |
| void UA_WorkQueue_init(UA_WorkQueue *wq); |
| |
| /* Enqueue a delayed callback. It is executed when all previous work in the |
| * queue has been finished. The ``cb`` pointer is freed afterwards. ``cb`` can |
| * have a NULL callback that is not executed. |
| * |
| * This method checks internally if existing delayed work can be moved from the |
| * delayed queue to the worker dispatch queue. */ |
| void UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb); |
| |
| /* Stop the workers, process all enqueued work in the calling thread, clean up |
| * mutexes etc. */ |
| void UA_WorkQueue_cleanup(UA_WorkQueue *wq); |
| |
| #if UA_MULTITHREADING >= 200 |
| |
| /* Spin up a number of worker threads that listen on the work queue */ |
| UA_StatusCode UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount); |
| |
| void UA_WorkQueue_stop(UA_WorkQueue *wq); |
| |
| /* Enqueue work for the worker threads */ |
| void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb, |
| void *application, void *data); |
| |
| #else |
| |
| /* Process all enqueued delayed work. This is not needed when workers are |
| * running for the multithreading case. (UA_WorkQueue_cleanup still calls this |
| * method during cleanup when the workers are shut down.) */ |
| void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq); |
| |
| #endif |
| |
| _UA_END_DECLS |
| |
| #endif /* UA_SERVER_WORKQUEUE_H_ */ |