blob: 08c5a9f8fc2052f27ac369970ebb07ae03049349 [file] [log] [blame]
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2017 (c) Stefan Profanter, fortiss GmbH
* Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
* Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
* Copyright 2018 (c) Fabian Arndt, Root-Core
*/
#include "ua_server_internal.h"
#include "ua_subscription.h"
#include "ua_types_encoding_binary.h"
#ifdef UA_ENABLE_DA
#include <math.h> // fabs
#endif
#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
#define UA_VALUENCODING_MAXSTACK 512
#define ABS_SUBTRACT_TYPE_INDEPENDENT(a,b) ((a)>(b)?(a)-(b):(b)-(a))
static UA_Boolean
outOfDeadBand(const void *data1, const void *data2,
const UA_DataType *type, const UA_Double deadbandValue) {
if(type == &UA_TYPES[UA_TYPES_BOOLEAN]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_Boolean*)data1, *(const UA_Boolean*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_SBYTE]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_SByte*)data1, *(const UA_SByte*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_BYTE]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_Byte*)data1, *(const UA_Byte*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_INT16]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_Int16*)data1, *(const UA_Int16*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_UINT16]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_UInt16*)data1, *(const UA_UInt16*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_INT32]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_Int32*)data1, *(const UA_Int32*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_UINT32]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_UInt32*)data1, *(const UA_UInt32*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_INT64]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_Int64*)data1, *(const UA_Int64*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_UINT64]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_UInt64*)data1, *(const UA_UInt64*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_FLOAT]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_Float*)data1, *(const UA_Float*)data2) <= deadbandValue)
return false;
} else if(type == &UA_TYPES[UA_TYPES_DOUBLE]) {
if(ABS_SUBTRACT_TYPE_INDEPENDENT(*(const UA_Double*)data1, *(const UA_Double*)data2) <= deadbandValue)
return false;
}
return true;
}
static UA_INLINE UA_Boolean
updateNeededForFilteredValue(const UA_Variant *value, const UA_Variant *oldValue,
const UA_Double deadbandValue) {
if(value->arrayLength != oldValue->arrayLength)
return true;
if(value->type != oldValue->type)
return true;
size_t length = 1;
if(!UA_Variant_isScalar(value))
length = value->arrayLength;
uintptr_t data = (uintptr_t)value->data;
for(size_t i = 0; i < length; ++i) {
if(outOfDeadBand((const void*)data, oldValue->data, value->type, deadbandValue))
return true;
data += value->type->memSize;
}
return false;
}
#ifdef UA_ENABLE_DA
static UA_Boolean
updateNeededForStatusCode(const UA_DataValue *value, const UA_MonitoredItem *mon) {
if(UA_Variant_isScalar(&value->value) && value->status != mon->lastStatus)
return true;
return false;
}
#endif
/* When a change is detected, encoding contains the heap-allocated binary
* encoded value. The default for changed is false. */
static UA_StatusCode
detectValueChangeWithFilter(UA_Server *server, UA_Session *session, UA_MonitoredItem *mon,
UA_DataValue *value, UA_ByteString *encoding, UA_Boolean *changed) {
if(UA_DataType_isNumeric(value->value.type) &&
(mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) {
if(!updateNeededForFilteredValue(&value->value, &mon->lastValue,
mon->filter.dataChangeFilter.deadbandValue))
return UA_STATUSCODE_GOOD;
}
#ifdef UA_ENABLE_DA
else if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
/* Browse for the percent range */
UA_QualifiedName qn = UA_QUALIFIEDNAME(0, "EURange");
UA_BrowsePathResult bpr = browseSimplifiedBrowsePath(server, mon->monitoredNodeId, 1, &qn);
if(bpr.statusCode != UA_STATUSCODE_GOOD || bpr.targetsSize < 1) {
UA_BrowsePathResult_deleteMembers(&bpr);
return UA_STATUSCODE_GOOD;
}
/* Read the range */
UA_ReadValueId rvi;
UA_ReadValueId_init(&rvi);
rvi.nodeId = bpr.targets->targetId.nodeId;
rvi.attributeId = UA_ATTRIBUTEID_VALUE;
UA_DataValue rangeVal = UA_Server_readWithSession(server, session, &rvi, UA_TIMESTAMPSTORETURN_NEITHER);
if(!UA_Variant_isScalar(&rangeVal.value) || rangeVal.value.type != &UA_TYPES[UA_TYPES_RANGE]) {
UA_DataValue_clear(&rangeVal);
return UA_STATUSCODE_GOOD;
}
/* Compute the max change */
UA_Range* euRange = (UA_Range*)rangeVal.value.data;
UA_Double maxDist = (mon->filter.dataChangeFilter.deadbandValue/100.0) * (euRange->high - euRange->low);
UA_DataValue_clear(&rangeVal);
/* Relevant change? */
if(!updateNeededForFilteredValue(&value->value, &mon->lastValue, maxDist) &&
!updateNeededForStatusCode(value, mon))
return UA_STATUSCODE_GOOD;
}
#endif
}
/* Stack-allocate some memory for the value encoding. We might heap-allocate
* more memory if needed. This is just enough for scalars and small
* structures. */
UA_STACKARRAY(UA_Byte, stackValueEncoding, UA_VALUENCODING_MAXSTACK);
UA_ByteString valueEncoding;
valueEncoding.data = stackValueEncoding;
valueEncoding.length = UA_VALUENCODING_MAXSTACK;
/* Encode the value */
UA_Byte *bufPos = valueEncoding.data;
const UA_Byte *bufEnd = &valueEncoding.data[valueEncoding.length];
UA_StatusCode retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
&bufPos, &bufEnd, NULL, NULL);
if(retval == UA_STATUSCODE_BADENCODINGERROR) {
size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
if(binsize == 0)
return UA_STATUSCODE_BADENCODINGERROR;
if(binsize > UA_VALUENCODING_MAXSTACK) {
retval = UA_ByteString_allocBuffer(&valueEncoding, binsize);
if(retval == UA_STATUSCODE_GOOD) {
bufPos = valueEncoding.data;
bufEnd = &valueEncoding.data[valueEncoding.length];
retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
&bufPos, &bufEnd, NULL, NULL);
}
}
}
if(retval != UA_STATUSCODE_GOOD) {
if(valueEncoding.data != stackValueEncoding)
UA_ByteString_deleteMembers(&valueEncoding);
return retval;
}
/* Has the value changed? */
valueEncoding.length = (uintptr_t)bufPos - (uintptr_t)valueEncoding.data;
*changed = (!mon->lastSampledValue.data ||
!UA_String_equal(&valueEncoding, &mon->lastSampledValue));
/* No change */
if(!(*changed)) {
if(valueEncoding.data != stackValueEncoding)
UA_ByteString_deleteMembers(&valueEncoding);
return UA_STATUSCODE_GOOD;
}
/* Change detected. Copy encoding on the heap if necessary. */
if(valueEncoding.data == stackValueEncoding)
return UA_ByteString_copy(&valueEncoding, encoding);
*encoding = valueEncoding;
return UA_STATUSCODE_GOOD;
}
/* Has this sample changed from the last one? The method may allocate additional
* space for the encoding buffer. Detect the change in encoding->data. */
static UA_StatusCode
detectValueChange(UA_Server *server, UA_Session *session, UA_MonitoredItem *mon,
UA_DataValue value, UA_ByteString *encoding, UA_Boolean *changed) {
UA_LOCK_ASSERT(server->serviceMutex, 1);
/* Apply Filter */
if(mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS)
value.hasValue = false;
value.hasServerTimestamp = false;
value.hasServerPicoseconds = false;
if(mon->filter.dataChangeFilter.trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
value.hasSourceTimestamp = false;
value.hasSourcePicoseconds = false;
}
/* Detect the value change */
return detectValueChangeWithFilter(server, session, mon, &value, encoding, changed);
}
/* movedValue returns whether the sample was moved to the notification. The
* default is false. */
static UA_StatusCode
sampleCallbackWithValue(UA_Server *server, UA_Session *session,
UA_Subscription *sub, UA_MonitoredItem *mon,
UA_DataValue *value, UA_Boolean *movedValue) {
UA_assert(mon->attributeId != UA_ATTRIBUTEID_EVENTNOTIFIER);
/* Contains heap-allocated binary encoding of the value if a change was detected */
UA_ByteString binValueEncoding = UA_BYTESTRING_NULL;
/* Has the value changed? Allocates memory in binValueEncoding if necessary.
* value is edited internally so we make a shallow copy. */
UA_Boolean changed = false;
UA_StatusCode retval = detectValueChange(server, session, mon, *value, &binValueEncoding, &changed);
if(retval != UA_STATUSCODE_GOOD) {
UA_LOG_WARNING_SESSION(&server->config.logger, session, "Subscription %u | "
"MonitoredItem %i | Value change detection failed with StatusCode %s",
sub ? sub->subscriptionId : 0, mon->monitoredItemId,
UA_StatusCode_name(retval));
return retval;
}
if(!changed) {
UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Subscription %u | "
"MonitoredItem %i | The value has not changed",
sub ? sub->subscriptionId : 0, mon->monitoredItemId);
return UA_STATUSCODE_GOOD;
}
/* The MonitoredItem is attached to a subscription (not server-local).
* Prepare a notification and enqueue it. */
if(sub) {
/* Allocate a new notification */
UA_Notification *newNotification = (UA_Notification *)UA_malloc(sizeof(UA_Notification));
if(!newNotification) {
UA_ByteString_deleteMembers(&binValueEncoding);
return UA_STATUSCODE_BADOUTOFMEMORY;
}
if(value->value.storageType == UA_VARIANT_DATA) {
newNotification->data.value = *value; /* Move the value to the notification */
*movedValue = true;
} else { /* => (value->value.storageType == UA_VARIANT_DATA_NODELETE) */
retval = UA_DataValue_copy(value, &newNotification->data.value);
if(retval != UA_STATUSCODE_GOOD) {
UA_ByteString_deleteMembers(&binValueEncoding);
UA_free(newNotification);
return retval;
}
}
/* <-- Point of no return --> */
UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Subscription %u | "
"MonitoredItem %i | Enqueue a new notification",
sub ? sub->subscriptionId : 0, mon->monitoredItemId);
newNotification->mon = mon;
UA_Notification_enqueue(server, sub, mon, newNotification);
}
/* Store the encoding for comparison */
UA_ByteString_deleteMembers(&mon->lastSampledValue);
mon->lastSampledValue = binValueEncoding;
/* Store the value for filter comparison (we don't want to decode
* lastSampledValue in every iteration). Don't test the return code here. If
* this fails, lastValue is empty and a notification will be forced for the
* next deadband comparison. */
if((mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_NONE ||
mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE ||
mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_PERCENT) &&
(mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS ||
mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
UA_Variant_deleteMembers(&mon->lastValue);
UA_Variant_copy(&value->value, &mon->lastValue);
#ifdef UA_ENABLE_DA
UA_StatusCode_deleteMembers(&mon->lastStatus);
UA_StatusCode_copy(&value->status, &mon->lastStatus);
#endif
}
/* Call the local callback if the MonitoredItem is not attached to a
* subscription. Do this at the very end. Because the callback might delete
* the subscription. */
if(!sub) {
UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) mon;
void *nodeContext = NULL;
UA_Server_getNodeContext(server, mon->monitoredNodeId, &nodeContext);
UA_UNLOCK(server->serviceMutex);
localMon->callback.dataChangeCallback(server, mon->monitoredItemId,
localMon->context,
&mon->monitoredNodeId,
nodeContext, mon->attributeId,
value);
UA_LOCK(server->serviceMutex);
}
return UA_STATUSCODE_GOOD;
}
void
UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem)
{
UA_LOCK(server->serviceMutex);
monitoredItem_sampleCallback(server, monitoredItem);
UA_UNLOCK(server->serviceMutex)
}
void
monitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
UA_Subscription *sub = monitoredItem->subscription;
UA_Session *session = &server->adminSession;
if(sub)
session = sub->session;
UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Subscription %u | "
"MonitoredItem %i | Sample callback called",
sub ? sub->subscriptionId : 0, monitoredItem->monitoredItemId);
UA_assert(monitoredItem->attributeId != UA_ATTRIBUTEID_EVENTNOTIFIER);
/* Get the node */
const UA_Node *node = UA_Nodestore_getNode(server->nsCtx, &monitoredItem->monitoredNodeId);
/* Sample the value. The sample can still point into the node. */
UA_DataValue value;
UA_DataValue_init(&value);
if(node) {
UA_ReadValueId rvid;
UA_ReadValueId_init(&rvid);
rvid.nodeId = monitoredItem->monitoredNodeId;
rvid.attributeId = monitoredItem->attributeId;
rvid.indexRange = monitoredItem->indexRange;
ReadWithNode(node, server, session, monitoredItem->timestampsToReturn, &rvid, &value);
} else {
value.hasStatus = true;
value.status = UA_STATUSCODE_BADNODEIDUNKNOWN;
}
/* Operate on the sample */
UA_Boolean movedValue = false;
UA_StatusCode retval = sampleCallbackWithValue(server, session, sub, monitoredItem, &value, &movedValue);
if(retval != UA_STATUSCODE_GOOD) {
UA_LOG_WARNING_SESSION(&server->config.logger, session, "Subscription %u | "
"MonitoredItem %i | Sampling returned the statuscode %s",
sub ? sub->subscriptionId : 0, monitoredItem->monitoredItemId,
UA_StatusCode_name(retval));
}
/* Delete the sample if it was not moved to the notification. */
if(!movedValue)
UA_DataValue_deleteMembers(&value); /* Does nothing for UA_VARIANT_DATA_NODELETE */
if(node)
UA_Nodestore_releaseNode(server->nsCtx, node);
}
#endif /* UA_ENABLE_SUBSCRIPTIONS */