blob: c43b34de06a0f8905f84bd79dbe01d87b9b1087d [file] [log] [blame]
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2015 (c) Oleksiy Vasylyev
* Copyright 2016 (c) Sten Grüner
* Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
* Copyright 2016-2017 (c) Florian Palm
* Copyright 2017 (c) Frank Meerkötter
* Copyright 2017 (c) Stefan Profanter, fortiss GmbH
*/
#include <open62541/client_highlevel.h>
#include <open62541/client_highlevel_async.h>
#include "ua_client_internal.h"
#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
/*****************/
/* Subscriptions */
/*****************/
UA_CreateSubscriptionResponse UA_EXPORT
UA_Client_Subscriptions_create(UA_Client *client,
const UA_CreateSubscriptionRequest request,
void *subscriptionContext,
UA_Client_StatusChangeNotificationCallback statusChangeCallback,
UA_Client_DeleteSubscriptionCallback deleteCallback) {
UA_CreateSubscriptionResponse response;
UA_CreateSubscriptionResponse_init(&response);
/* Allocate the internal representation */
UA_Client_Subscription *newSub = (UA_Client_Subscription*)
UA_malloc(sizeof(UA_Client_Subscription));
if(!newSub) {
response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
return response;
}
/* Send the request as a synchronous service call */
__UA_Client_Service(client,
&request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
&response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
UA_free(newSub);
return response;
}
/* Prepare the internal representation */
newSub->context = subscriptionContext;
newSub->subscriptionId = response.subscriptionId;
newSub->sequenceNumber = 0;
newSub->lastActivity = UA_DateTime_nowMonotonic();
newSub->statusChangeCallback = statusChangeCallback;
newSub->deleteCallback = deleteCallback;
newSub->publishingInterval = response.revisedPublishingInterval;
newSub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
LIST_INIT(&newSub->monitoredItems);
LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
return response;
}
static UA_Client_Subscription *
findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
UA_Client_Subscription *sub = NULL;
LIST_FOREACH(sub, &client->subscriptions, listEntry) {
if(sub->subscriptionId == subscriptionId)
break;
}
return sub;
}
UA_ModifySubscriptionResponse UA_EXPORT
UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) {
UA_ModifySubscriptionResponse response;
UA_ModifySubscriptionResponse_init(&response);
/* Find the internal representation */
UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
if(!sub) {
response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
return response;
}
/* Call the service */
__UA_Client_Service(client,
&request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
&response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
/* Adjust the internal representation */
sub->publishingInterval = response.revisedPublishingInterval;
sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
return response;
}
static void
UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) {
/* Remove the MonitoredItems */
UA_Client_MonitoredItem *mon, *mon_tmp;
LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp)
UA_Client_MonitoredItem_remove(client, sub, mon);
/* Call the delete callback */
if(sub->deleteCallback)
sub->deleteCallback(client, sub->subscriptionId, sub->context);
/* Remove */
LIST_REMOVE(sub, listEntry);
UA_free(sub);
}
UA_DeleteSubscriptionsResponse UA_EXPORT
UA_Client_Subscriptions_delete(UA_Client *client, const UA_DeleteSubscriptionsRequest request) {
UA_STACKARRAY(UA_Client_Subscription*, subs, request.subscriptionIdsSize);
memset(subs, 0, sizeof(void*) * request.subscriptionIdsSize);
/* temporary remove the subscriptions from the list */
for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
subs[i] = findSubscription(client, request.subscriptionIds[i]);
if (subs[i])
LIST_REMOVE(subs[i], listEntry);
}
/* Send the request */
UA_DeleteSubscriptionsResponse response;
__UA_Client_Service(client,
&request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
&response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
goto cleanup;
if(request.subscriptionIdsSize != response.resultsSize) {
response.responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
goto cleanup;
}
/* Loop over the removed subscriptions and remove internally */
for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
if(response.results[i] != UA_STATUSCODE_GOOD &&
response.results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
/* Something was wrong, reinsert the subscription in the list */
if (subs[i])
LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
continue;
}
if(!subs[i]) {
UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"No internal representation of subscription %u",
request.subscriptionIds[i]);
continue;
}
LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
UA_Client_Subscription_deleteInternal(client, subs[i]);
}
return response;
cleanup:
for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
if (subs[i]) {
LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
}
}
return response;
}
UA_StatusCode UA_EXPORT
UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) {
UA_DeleteSubscriptionsRequest request;
UA_DeleteSubscriptionsRequest_init(&request);
request.subscriptionIds = &subscriptionId;
request.subscriptionIdsSize = 1;
UA_DeleteSubscriptionsResponse response =
UA_Client_Subscriptions_delete(client, request);
UA_StatusCode retval = response.responseHeader.serviceResult;
if(retval != UA_STATUSCODE_GOOD) {
UA_DeleteSubscriptionsResponse_deleteMembers(&response);
return retval;
}
if(response.resultsSize != 1) {
UA_DeleteSubscriptionsResponse_deleteMembers(&response);
return UA_STATUSCODE_BADINTERNALERROR;
}
retval = response.results[0];
UA_DeleteSubscriptionsResponse_deleteMembers(&response);
return retval;
}
/******************/
/* MonitoredItems */
/******************/
void
UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub,
UA_Client_MonitoredItem *mon) {
// NOLINTNEXTLINE
LIST_REMOVE(mon, listEntry);
if(mon->deleteCallback)
mon->deleteCallback(client, sub->subscriptionId, sub->context,
mon->monitoredItemId, mon->context);
UA_free(mon);
}
static void
__UA_Client_MonitoredItems_create(UA_Client *client,
const UA_CreateMonitoredItemsRequest *request,
void **contexts, void **handlingCallbacks,
UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
UA_CreateMonitoredItemsResponse *response) {
UA_CreateMonitoredItemsResponse_init(response);
if (!request->itemsToCreateSize) {
response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
return;
}
/* Fix clang warning */
size_t itemsToCreateSize = request->itemsToCreateSize;
UA_Client_Subscription *sub = NULL;
/* Allocate the memory for internal representations */
UA_STACKARRAY(UA_Client_MonitoredItem*, mis, itemsToCreateSize);
memset(mis, 0, sizeof(void*) * itemsToCreateSize);
for(size_t i = 0; i < itemsToCreateSize; i++) {
mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
if(!mis[i]) {
response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
goto cleanup;
}
}
/* Get the subscription */
sub = findSubscription(client, request->subscriptionId);
if(!sub) {
response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
goto cleanup;
}
/* Set the clientHandle */
for(size_t i = 0; i < itemsToCreateSize; i++)
request->itemsToCreate[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles);
/* Call the service */
__UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
goto cleanup;
if(response->resultsSize != itemsToCreateSize) {
response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
goto cleanup;
}
/* Add internally */
for(size_t i = 0; i < itemsToCreateSize; i++) {
if(response->results[i].statusCode != UA_STATUSCODE_GOOD) {
if (deleteCallbacks[i])
deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
UA_free(mis[i]);
mis[i] = NULL;
continue;
}
UA_Client_MonitoredItem *newMon = mis[i];
newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle;
newMon->monitoredItemId = response->results[i].monitoredItemId;
newMon->context = contexts[i];
newMon->deleteCallback = deleteCallbacks[i];
newMon->handler.dataChangeCallback =
(UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i];
newMon->isEventMonitoredItem =
(request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER);
LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Subscription %u | Added a MonitoredItem with handle %u",
sub->subscriptionId, newMon->clientHandle);
}
return;
cleanup:
for(size_t i = 0; i < itemsToCreateSize; i++) {
if (deleteCallbacks[i]) {
if (sub)
deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
else
deleteCallbacks[i](client, 0, NULL, 0, contexts[i]);
}
if(mis[i])
UA_free(mis[i]);
}
}
UA_CreateMonitoredItemsResponse UA_EXPORT
UA_Client_MonitoredItems_createDataChanges(UA_Client *client,
const UA_CreateMonitoredItemsRequest request, void **contexts,
UA_Client_DataChangeNotificationCallback *callbacks,
UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
UA_CreateMonitoredItemsResponse response;
__UA_Client_MonitoredItems_create(client, &request, contexts,
(void**)(uintptr_t)callbacks, deleteCallbacks, &response);
return response;
}
UA_MonitoredItemCreateResult UA_EXPORT
UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId,
UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
void *context, UA_Client_DataChangeNotificationCallback callback,
UA_Client_DeleteMonitoredItemCallback deleteCallback) {
UA_CreateMonitoredItemsRequest request;
UA_CreateMonitoredItemsRequest_init(&request);
request.subscriptionId = subscriptionId;
request.timestampsToReturn = timestampsToReturn;
request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
request.itemsToCreateSize = 1;
UA_CreateMonitoredItemsResponse response =
UA_Client_MonitoredItems_createDataChanges(client, request, &context,
&callback, &deleteCallback);
UA_MonitoredItemCreateResult result;
UA_MonitoredItemCreateResult_init(&result);
if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
result.statusCode = response.responseHeader.serviceResult;
if(result.statusCode == UA_STATUSCODE_GOOD &&
response.resultsSize != 1)
result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
if(result.statusCode == UA_STATUSCODE_GOOD)
UA_MonitoredItemCreateResult_copy(&response.results[0] , &result);
UA_CreateMonitoredItemsResponse_deleteMembers(&response);
return result;
}
UA_CreateMonitoredItemsResponse UA_EXPORT
UA_Client_MonitoredItems_createEvents(UA_Client *client,
const UA_CreateMonitoredItemsRequest request, void **contexts,
UA_Client_EventNotificationCallback *callback,
UA_Client_DeleteMonitoredItemCallback *deleteCallback) {
UA_CreateMonitoredItemsResponse response;
__UA_Client_MonitoredItems_create(client, &request, contexts,
(void**)(uintptr_t)callback, deleteCallback, &response);
return response;
}
UA_MonitoredItemCreateResult UA_EXPORT
UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId,
UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
void *context, UA_Client_EventNotificationCallback callback,
UA_Client_DeleteMonitoredItemCallback deleteCallback) {
UA_CreateMonitoredItemsRequest request;
UA_CreateMonitoredItemsRequest_init(&request);
request.subscriptionId = subscriptionId;
request.timestampsToReturn = timestampsToReturn;
request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
request.itemsToCreateSize = 1;
UA_CreateMonitoredItemsResponse response =
UA_Client_MonitoredItems_createEvents(client, request, &context,
&callback, &deleteCallback);
UA_StatusCode retval = response.responseHeader.serviceResult;
UA_MonitoredItemCreateResult result;
UA_MonitoredItemCreateResult_init(&result);
if(retval != UA_STATUSCODE_GOOD) {
UA_CreateMonitoredItemsResponse_deleteMembers(&response);
result.statusCode = retval;
return result;
}
UA_MonitoredItemCreateResult_copy(response.results , &result);
UA_CreateMonitoredItemsResponse_deleteMembers(&response);
return result;
}
UA_DeleteMonitoredItemsResponse UA_EXPORT
UA_Client_MonitoredItems_delete(UA_Client *client, const UA_DeleteMonitoredItemsRequest request) {
/* Send the request */
UA_DeleteMonitoredItemsResponse response;
__UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
&response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
return response;
UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
if(!sub) {
UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"No internal representation of subscription %u",
request.subscriptionId);
return response;
}
/* Loop over deleted MonitoredItems */
for(size_t i = 0; i < response.resultsSize; i++) {
if(response.results[i] != UA_STATUSCODE_GOOD &&
response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
continue;
}
#ifndef __clang_analyzer__
/* Delete the internal representation */
UA_Client_MonitoredItem *mon;
LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
// NOLINTNEXTLINE
if (mon->monitoredItemId == request.monitoredItemIds[i]) {
UA_Client_MonitoredItem_remove(client, sub, mon);
break;
}
}
#endif
}
return response;
}
UA_StatusCode UA_EXPORT
UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId) {
UA_DeleteMonitoredItemsRequest request;
UA_DeleteMonitoredItemsRequest_init(&request);
request.subscriptionId = subscriptionId;
request.monitoredItemIds = &monitoredItemId;
request.monitoredItemIdsSize = 1;
UA_DeleteMonitoredItemsResponse response =
UA_Client_MonitoredItems_delete(client, request);
UA_StatusCode retval = response.responseHeader.serviceResult;
if(retval != UA_STATUSCODE_GOOD) {
UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
return retval;
}
if(response.resultsSize != 1) {
UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
return UA_STATUSCODE_BADINTERNALERROR;
}
retval = response.results[0];
UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
return retval;
}
UA_ModifyMonitoredItemsResponse UA_EXPORT
UA_Client_MonitoredItems_modify(UA_Client *client,
const UA_ModifyMonitoredItemsRequest request) {
UA_ModifyMonitoredItemsResponse response;
UA_Client_Subscription *sub = 0;
LIST_FOREACH(sub, &client->subscriptions, listEntry) {
if (sub->subscriptionId == request.subscriptionId)
break;
}
if (!sub) {
UA_ModifyMonitoredItemsResponse_init(&response);
response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
return response;
}
UA_ModifyMonitoredItemsRequest modifiedRequest;
UA_ModifyMonitoredItemsRequest_copy(&request, &modifiedRequest);
for (size_t i = 0; i < modifiedRequest.itemsToModifySize; ++i) {
UA_Client_MonitoredItem *mon = 0;
LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
if(mon->monitoredItemId == modifiedRequest.itemsToModify[i].monitoredItemId) {
modifiedRequest.itemsToModify[i].requestedParameters.clientHandle = mon->clientHandle;
break;
}
}
}
__UA_Client_Service(client,
&modifiedRequest, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSREQUEST],
&response, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSRESPONSE]);
UA_ModifyMonitoredItemsRequest_deleteMembers(&modifiedRequest);
return response;
}
/*************************************/
/* Async Processing of Notifications */
/*************************************/
/* Assume the request is already initialized */
UA_StatusCode
UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
/* Count acks */
UA_Client_NotificationsAckNumber *ack;
LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
++request->subscriptionAcknowledgementsSize;
/* Create the array. Returns a sentinel pointer if the length is zero. */
request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
UA_Array_new(request->subscriptionAcknowledgementsSize,
&UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
if(!request->subscriptionAcknowledgements) {
request->subscriptionAcknowledgementsSize = 0;
return UA_STATUSCODE_BADOUTOFMEMORY;
}
size_t i = 0;
UA_Client_NotificationsAckNumber *ack_tmp;
LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
++i;
LIST_REMOVE(ack, listEntry);
UA_free(ack);
}
return UA_STATUSCODE_GOOD;
}
/* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */
/* The value 0 is never used for the sequence number */
static UA_UInt32
UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) {
UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
if(nextSequenceNumber == 0)
nextSequenceNumber = 1;
return nextSequenceNumber;
}
static void
processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
UA_DataChangeNotification *dataChangeNotification) {
for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j];
/* Find the MonitoredItem */
UA_Client_MonitoredItem *mon;
LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
if(mon->clientHandle == min->clientHandle)
break;
}
if(!mon) {
UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Could not process a notification with clienthandle %u on subscription %u",
min->clientHandle, sub->subscriptionId);
continue;
}
if(mon->isEventMonitoredItem) {
UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"MonitoredItem is configured for Events. But received a "
"DataChangeNotification.");
continue;
}
mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context,
mon->monitoredItemId, mon->context,
&min->value);
}
}
static void
processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
UA_EventNotificationList *eventNotificationList) {
for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
/* Find the MonitoredItem */
UA_Client_MonitoredItem *mon;
LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
if(mon->clientHandle == eventFieldList->clientHandle)
break;
}
if(!mon) {
UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Could not process a notification with clienthandle %u on subscription %u",
eventFieldList->clientHandle, sub->subscriptionId);
continue;
}
if(!mon->isEventMonitoredItem) {
UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"MonitoredItem is configured for DataChanges. But received a "
"EventNotification.");
continue;
}
mon->handler.eventCallback(client, sub->subscriptionId, sub->context,
mon->monitoredItemId, mon->context,
eventFieldList->eventFieldsSize,
eventFieldList->eventFields);
}
}
static void
processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
UA_ExtensionObject *msg) {
if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
return;
/* Handle DataChangeNotification */
if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
UA_DataChangeNotification *dataChangeNotification =
(UA_DataChangeNotification *)msg->content.decoded.data;
processDataChangeNotification(client, sub, dataChangeNotification);
return;
}
/* Handle EventNotification */
if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
UA_EventNotificationList *eventNotificationList =
(UA_EventNotificationList *)msg->content.decoded.data;
processEventNotification(client, sub, eventNotificationList);
return;
}
/* Handle StatusChangeNotification */
if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) {
if(sub->statusChangeCallback) {
sub->statusChangeCallback(client, sub->subscriptionId, sub->context,
(UA_StatusChangeNotification*)msg->content.decoded.data);
} else {
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Dropped a StatusChangeNotification since no callback is registered");
}
return;
}
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Unknown notification message type");
}
void
UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
UA_PublishResponse *response) {
UA_NotificationMessage *msg = &response->notificationMessage;
client->currentlyOutStandingPublishRequests--;
if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) {
if(client->config.outStandingPublishRequests > 1) {
client->config.outStandingPublishRequests--;
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Too many publishrequest, reduce outStandingPublishRequests to %d",
client->config.outStandingPublishRequests);
} else {
UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Too many publishrequest when outStandingPublishRequests = 1");
UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId);
}
return;
}
if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN)
return;
if(!LIST_FIRST(&client->subscriptions)) {
response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
return;
}
if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONCLOSED) {
if(client->state >= UA_CLIENTSTATE_SESSION) {
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Received Publish Response with code %s",
UA_StatusCode_name(response->responseHeader.serviceResult));
UA_Client_Subscription* sub = findSubscription(client, response->subscriptionId);
if (sub != NULL)
UA_Client_Subscription_deleteInternal(client, sub);
}
return;
}
if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) {
UA_Client_disconnect(client); /* TODO: This should be handled before the process callback */
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Received BadSessionIdInvalid");
return;
}
if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTIMEOUT) {
if (client->config.inactivityCallback)
client->config.inactivityCallback(client);
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Received Timeout for Publish Response");
return;
}
if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Received Publish Response with code %s",
UA_StatusCode_name(response->responseHeader.serviceResult));
return;
}
UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
if(!sub) {
response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Received Publish Response for a non-existant subscription");
return;
}
sub->lastActivity = UA_DateTime_nowMonotonic();
/* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
if(UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber) {
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Invalid subscription sequence number: expected %u but got %u",
UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber),
msg->sequenceNumber);
/* This is an error. But we do not abort the connection. Some server
* SDKs misbehave from time to time and send out-of-order sequence
* numbers. (Probably some multi-threading synchronization issue.) */
/* UA_Client_disconnect(client);
return; */
}
/* According to f), a keep-alive message contains no notifications and has the sequence number
* of the next NotificationMessage that is to be sent => More than one consecutive keep-alive
* message or a NotificationMessage following a keep-alive message will share the same sequence
* number. */
if (msg->notificationDataSize)
sub->sequenceNumber = msg->sequenceNumber;
/* Process the notification messages */
for(size_t k = 0; k < msg->notificationDataSize; ++k)
processNotificationMessage(client, sub, &msg->notificationData[k]);
/* Add to the list of pending acks */
for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
continue;
UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
if(!tmpAck) {
UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Not enough memory to store the acknowledgement for a publish "
"message on subscription %u", sub->subscriptionId);
break;
}
tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
tmpAck->subAck.subscriptionId = sub->subscriptionId;
LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
break;
}
}
static void
processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId,
void *response) {
UA_PublishRequest *req = (UA_PublishRequest*)userdata;
UA_PublishResponse *res = (UA_PublishResponse*)response;
/* Process the response */
UA_Client_Subscriptions_processPublishResponse(client, req, res);
/* Delete the cached request */
UA_PublishRequest_delete(req);
/* Fill up the outstanding publish requests */
UA_Client_Subscriptions_backgroundPublish(client);
}
void
UA_Client_Subscriptions_clean(UA_Client *client) {
UA_Client_NotificationsAckNumber *n, *tmp;
LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
LIST_REMOVE(n, listEntry);
UA_free(n);
}
UA_Client_Subscription *sub, *tmps;
LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */
client->monitoredItemHandles = 0;
}
void
UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) {
if(client->state < UA_CLIENTSTATE_SESSION)
return;
/* Is the lack of responses the client's fault? */
if(client->currentlyOutStandingPublishRequests == 0)
return;
UA_Client_Subscription *sub;
LIST_FOREACH(sub, &client->subscriptions, listEntry) {
UA_DateTime maxSilence = (UA_DateTime)
((sub->publishingInterval * sub->maxKeepAliveCount) +
client->config.timeout) * UA_DATETIME_MSEC;
if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) {
/* Reset activity */
sub->lastActivity = UA_DateTime_nowMonotonic();
if(client->config.subscriptionInactivityCallback)
client->config.subscriptionInactivityCallback(client, sub->subscriptionId,
sub->context);
UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
"Inactivity for Subscription %u.", sub->subscriptionId);
}
}
}
UA_StatusCode
UA_Client_Subscriptions_backgroundPublish(UA_Client *client) {
if(client->state < UA_CLIENTSTATE_SESSION)
return UA_STATUSCODE_BADSERVERNOTCONNECTED;
/* The session must have at least one subscription */
if(!LIST_FIRST(&client->subscriptions))
return UA_STATUSCODE_GOOD;
while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
UA_PublishRequest *request = UA_PublishRequest_new();
if (!request)
return UA_STATUSCODE_BADOUTOFMEMORY;
request->requestHeader.timeoutHint=60000;
UA_StatusCode retval = UA_Client_preparePublishRequest(client, request);
if(retval != UA_STATUSCODE_GOOD) {
UA_PublishRequest_delete(request);
return retval;
}
UA_UInt32 requestId;
client->currentlyOutStandingPublishRequests++;
/* Disable the timeout, it is treat in UA_Client_Subscriptions_backgroundPublishInactivityCheck */
retval = __UA_Client_AsyncServiceEx(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
processPublishResponseAsync,
&UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
(void*)request, &requestId, 0);
if(retval != UA_STATUSCODE_GOOD) {
UA_PublishRequest_delete(request);
return retval;
}
}
return UA_STATUSCODE_GOOD;
}
#endif /* UA_ENABLE_SUBSCRIPTIONS */