blob: 21ab945667f3a0819cfc9bfc5405d84183a487a4 [file] [log] [blame]
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright 2017-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2017 (c) Stefan Profanter, fortiss GmbH
* Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
* Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
* Copyright 2018 (c) Fabian Arndt, Root-Core
*/
#include "ua_server_internal.h"
#include "ua_subscription.h"
#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
/****************/
/* Notification */
/****************/
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
static const UA_NodeId overflowEventType =
{0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE}};
static const UA_NodeId simpleOverflowEventType =
{0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE}};
static UA_Boolean
UA_Notification_isOverflowEvent(UA_Server *server, UA_Notification *n) {
UA_MonitoredItem *mon = n->mon;
if(mon->attributeId != UA_ATTRIBUTEID_EVENTNOTIFIER)
return false;
UA_EventFieldList *efl = &n->data.event.fields;
if(efl->eventFieldsSize >= 1 &&
efl->eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
isNodeInTree(server->nsCtx, (const UA_NodeId *)efl->eventFields[0].data,
&overflowEventType, &subtypeId, 1)) {
return true;
}
return false;
}
/* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent
* "is generated when the first Event has to be discarded [...] without
* discarding any other event". So only generate one for all deleted events. */
static UA_StatusCode
createEventOverflowNotification(UA_Server *server, UA_Subscription *sub,
UA_MonitoredItem *mon, UA_Notification *indicator) {
/* Avoid two redundant overflow events in a row */
if(UA_Notification_isOverflowEvent(server, indicator))
return UA_STATUSCODE_GOOD;
/* A notification is inserted into the queue which includes only the
* NodeId of the overflowEventType. It is up to the client to check for
* possible overflows. */
/* Allocate the notification */
UA_Notification *overflowNotification = (UA_Notification *)
UA_malloc(sizeof(UA_Notification));
if(!overflowNotification)
return UA_STATUSCODE_BADOUTOFMEMORY;;
/* Set the notification fields */
overflowNotification->mon = mon;
UA_EventFieldList_init(&overflowNotification->data.event.fields);
overflowNotification->data.event.fields.eventFields = UA_Variant_new();
if(!overflowNotification->data.event.fields.eventFields) {
UA_free(overflowNotification);
return UA_STATUSCODE_BADOUTOFMEMORY;;
}
overflowNotification->data.event.fields.eventFieldsSize = 1;
UA_StatusCode retval =
UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
&simpleOverflowEventType, &UA_TYPES[UA_TYPES_NODEID]);
if(retval != UA_STATUSCODE_GOOD) {
UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
UA_free(overflowNotification);
return retval;
}
/* Insert before the "indicator notification". This is either first in the
* queue (if the oldest notification was removed) or before the new event
* that remains the last element of the queue. */
TAILQ_INSERT_BEFORE(indicator, overflowNotification, listEntry);
++mon->eventOverflows;
++mon->queueSize;
TAILQ_NEXT(overflowNotification, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
TAILQ_INSERT_BEFORE(indicator, overflowNotification, globalEntry);
++sub->notificationQueueSize;
++sub->eventNotifications;
}
return UA_STATUSCODE_GOOD;
}
#endif
/* !!! The enqueue and dequeue operations need to match the reporting
* disable/enable logic in Operation_SetMonitoringMode !!! */
void
UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
UA_MonitoredItem *mon, UA_Notification *n) {
/* Add to the MonitoredItem */
TAILQ_INSERT_TAIL(&mon->queue, n, listEntry);
++mon->queueSize;
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER &&
UA_Notification_isOverflowEvent(server, n))
++mon->eventOverflows;
#endif
/* Add to the subscription if reporting is enabled */
TAILQ_NEXT(n, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
++sub->notificationQueueSize;
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
++sub->eventNotifications;
} else
#endif
{
++sub->dataChangeNotifications;
}
}
/* Ensure enough space is available in the MonitoredItem. Do this only after
* adding the new Notification. */
UA_MonitoredItem_ensureQueueSpace(server, mon);
}
void
UA_Notification_dequeue(UA_Server *server, UA_Notification *n) {
UA_MonitoredItem *mon = n->mon;
UA_Subscription *sub = mon->subscription;
/* Remove from the MonitoredItem queue */
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER &&
UA_Notification_isOverflowEvent(server, n))
--mon->eventOverflows;
#endif
TAILQ_REMOVE(&mon->queue, n, listEntry);
--mon->queueSize;
/* Remove from the subscription's queue */
if(TAILQ_NEXT(n, globalEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
--sub->eventNotifications;
} else
#endif
{
--sub->dataChangeNotifications;
}
TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry);
--sub->notificationQueueSize;
}
}
void
UA_Notification_delete(UA_Notification *n) {
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
UA_MonitoredItem *mon = n->mon;
if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
UA_EventFieldList_deleteMembers(&n->data.event.fields);
/* EventFilterResult currently isn't being used
* UA_EventFilterResult_delete(notification->data.event->result); */
} else
#endif
{
UA_DataValue_deleteMembers(&n->data.value);
}
UA_free(n);
}
/*****************/
/* MonitoredItem */
/*****************/
void
UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
memset(mon, 0, sizeof(UA_MonitoredItem));
mon->subscription = sub;
TAILQ_INIT(&mon->queue);
}
void
UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
/* Remove the sampling callback */
UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
/* Remove the queued notifications if attached to a subscription (not a
* local MonitoredItem) */
if(monitoredItem->subscription) {
UA_Notification *notification, *notification_tmp;
TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
listEntry, notification_tmp) {
/* Remove the item from the queues and free the memory */
UA_Notification_dequeue(server, notification);
UA_Notification_delete(notification);
}
}
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
if(monitoredItem->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
/* Remove the monitored item from the node queue */
UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
UA_MonitoredItem_removeNodeEventCallback, monitoredItem);
UA_EventFilter_clear(&monitoredItem->filter.eventFilter);
} else
#endif
{
/* UA_DataChangeFilter does not hold dynamic content we need to free */
/* UA_DataChangeFilter_clear(&monitoredItem->filter.dataChangeFilter); */
}
/* Deregister MonitoredItem in userland */
if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) {
/* Get the session context. Local MonitoredItems don't have a subscription. */
UA_Session *session = NULL;
if(monitoredItem->subscription)
session = monitoredItem->subscription->session;
if(!session)
session = &server->adminSession;
/* Get the node context */
void *targetContext = NULL;
UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext);
/* Deregister */
server->config.monitoredItemRegisterCallback(server, &session->sessionId,
session->sessionHandle,
&monitoredItem->monitoredNodeId,
targetContext, monitoredItem->attributeId, true);
}
/* Remove the monitored item */
if(monitoredItem->listEntry.le_prev != NULL)
LIST_REMOVE(monitoredItem, listEntry);
UA_String_deleteMembers(&monitoredItem->indexRange);
UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
UA_Variant_deleteMembers(&monitoredItem->lastValue);
UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
/* No actual callback, just remove the structure */
monitoredItem->delayedFreePointers.callback = NULL;
UA_WorkQueue_enqueueDelayed(&server->workQueue, &monitoredItem->delayedFreePointers);
}
UA_StatusCode
UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
/* Assert: The eventoverflow are counted in the queue size; There can be
* only one eventoverflow more than normal entries */
UA_assert(mon->queueSize >= mon->eventOverflows);
UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1);
/* Nothing to do */
if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
return UA_STATUSCODE_GOOD;
#ifdef __clang_analyzer__
return UA_STATUSCODE_GOOD;
#endif
/* Remove notifications until the queue size is reached */
UA_Subscription *sub = mon->subscription;
while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
/* At least two notifications that are not eventOverflows in the queue */
UA_assert(mon->queueSize - mon->eventOverflows >= 2);
/* Select the next notification to delete. Skip over overflow events. */
UA_Notification *del = NULL;
if(mon->discardOldest) {
/* Remove the oldest */
del = TAILQ_FIRST(&mon->queue);
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
while(UA_Notification_isOverflowEvent(server, del))
del = TAILQ_NEXT(del, listEntry); /* skip overflow events */
#endif
} else {
/* Remove the second newest (to keep the up-to-date notification).
* The last entry is not an OverflowEvent -- we just added it. */
del = TAILQ_LAST(&mon->queue, NotificationQueue);
del = TAILQ_PREV(del, NotificationQueue, listEntry);
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
while(UA_Notification_isOverflowEvent(server, del))
del = TAILQ_PREV(del, NotificationQueue, listEntry); /* skip overflow events */
#endif
}
UA_assert(del); /* There must have been one entry that can be deleted */
/* If reporting is activated (entries are also in the subscriptions
* global queue): Move the entry after del in the per-MonitoredItem
* queue right after del in the global queue. (It is already right after
* del in the per-MonitoredItem queue.) This is required so we don't
* starve MonitoredItems with a high sampling interval by always
* removing their first appearance in the gloal queue for the
* Subscription. */
if(TAILQ_NEXT(del, globalEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
UA_Notification *after_del = TAILQ_NEXT(del, listEntry);
UA_assert(after_del); /* There must be one remaining element after del */
TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
}
/* Delete the notification */
UA_Notification_dequeue(server, del);
UA_Notification_delete(del);
}
/* Get the element where the overflow shall be announced (infobits or
* overflowevent) */
UA_Notification *indicator;
if(mon->discardOldest)
indicator = TAILQ_FIRST(&mon->queue);
else
indicator = TAILQ_LAST(&mon->queue, NotificationQueue);
UA_assert(indicator);
/* Create an overflow notification */
#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
return createEventOverflowNotification(server, sub, mon, indicator);
} else
#endif
{
/* Set the infobits of a datachange notification */
if(mon->maxQueueSize > 1) {
/* Add the infobits either to the newest or the new last entry */
indicator->data.value.hasStatus = true;
indicator->data.value.status |=
(UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
}
}
return UA_STATUSCODE_GOOD;
}
UA_StatusCode
UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
if(mon->sampleCallbackIsRegistered)
return UA_STATUSCODE_GOOD;
/* Only DataChange MonitoredItems have a callback with a sampling interval */
if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER)
return UA_STATUSCODE_GOOD;
UA_StatusCode retval =
UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
mon, mon->samplingInterval, &mon->sampleCallbackId);
if(retval == UA_STATUSCODE_GOOD)
mon->sampleCallbackIsRegistered = true;
return retval;
}
void
UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
if(!mon->sampleCallbackIsRegistered)
return;
UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
mon->sampleCallbackIsRegistered = false;
}
#endif /* UA_ENABLE_SUBSCRIPTIONS */