| /* This Source Code Form is subject to the terms of the Mozilla Public |
| * License, v. 2.0. If a copy of the MPL was not distributed with this |
| * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| * |
| * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
| * Copyright 2015 (c) Oleksiy Vasylyev |
| * Copyright 2016 (c) Sten Grüner |
| * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA |
| * Copyright 2016-2017 (c) Florian Palm |
| * Copyright 2017 (c) Frank Meerkötter |
| * Copyright 2017 (c) Stefan Profanter, fortiss GmbH |
| */ |
| |
| #include <open62541/client_highlevel.h> |
| #include <open62541/client_highlevel_async.h> |
| |
| #include "ua_client_internal.h" |
| |
| #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ |
| |
| /*****************/ |
| /* Subscriptions */ |
| /*****************/ |
| |
| UA_CreateSubscriptionResponse UA_EXPORT |
| UA_Client_Subscriptions_create(UA_Client *client, |
| const UA_CreateSubscriptionRequest request, |
| void *subscriptionContext, |
| UA_Client_StatusChangeNotificationCallback statusChangeCallback, |
| UA_Client_DeleteSubscriptionCallback deleteCallback) { |
| UA_CreateSubscriptionResponse response; |
| UA_CreateSubscriptionResponse_init(&response); |
| |
| /* Allocate the internal representation */ |
| UA_Client_Subscription *newSub = (UA_Client_Subscription*) |
| UA_malloc(sizeof(UA_Client_Subscription)); |
| if(!newSub) { |
| response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
| return response; |
| } |
| |
| /* Send the request as a synchronous service call */ |
| __UA_Client_Service(client, |
| &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST], |
| &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]); |
| if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) { |
| UA_free(newSub); |
| return response; |
| } |
| |
| /* Prepare the internal representation */ |
| newSub->context = subscriptionContext; |
| newSub->subscriptionId = response.subscriptionId; |
| newSub->sequenceNumber = 0; |
| newSub->lastActivity = UA_DateTime_nowMonotonic(); |
| newSub->statusChangeCallback = statusChangeCallback; |
| newSub->deleteCallback = deleteCallback; |
| newSub->publishingInterval = response.revisedPublishingInterval; |
| newSub->maxKeepAliveCount = response.revisedMaxKeepAliveCount; |
| LIST_INIT(&newSub->monitoredItems); |
| LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry); |
| |
| return response; |
| } |
| |
| static UA_Client_Subscription * |
| findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) { |
| UA_Client_Subscription *sub = NULL; |
| LIST_FOREACH(sub, &client->subscriptions, listEntry) { |
| if(sub->subscriptionId == subscriptionId) |
| break; |
| } |
| return sub; |
| } |
| |
| UA_ModifySubscriptionResponse UA_EXPORT |
| UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) { |
| UA_ModifySubscriptionResponse response; |
| UA_ModifySubscriptionResponse_init(&response); |
| |
| /* Find the internal representation */ |
| UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId); |
| if(!sub) { |
| response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
| return response; |
| } |
| |
| /* Call the service */ |
| __UA_Client_Service(client, |
| &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST], |
| &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]); |
| |
| /* Adjust the internal representation */ |
| sub->publishingInterval = response.revisedPublishingInterval; |
| sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount; |
| return response; |
| } |
| |
| static void |
| UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) { |
| /* Remove the MonitoredItems */ |
| UA_Client_MonitoredItem *mon, *mon_tmp; |
| LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) |
| UA_Client_MonitoredItem_remove(client, sub, mon); |
| |
| /* Call the delete callback */ |
| if(sub->deleteCallback) |
| sub->deleteCallback(client, sub->subscriptionId, sub->context); |
| |
| /* Remove */ |
| LIST_REMOVE(sub, listEntry); |
| UA_free(sub); |
| } |
| |
| UA_DeleteSubscriptionsResponse UA_EXPORT |
| UA_Client_Subscriptions_delete(UA_Client *client, const UA_DeleteSubscriptionsRequest request) { |
| UA_STACKARRAY(UA_Client_Subscription*, subs, request.subscriptionIdsSize); |
| memset(subs, 0, sizeof(void*) * request.subscriptionIdsSize); |
| |
| /* temporary remove the subscriptions from the list */ |
| for(size_t i = 0; i < request.subscriptionIdsSize; i++) { |
| subs[i] = findSubscription(client, request.subscriptionIds[i]); |
| if (subs[i]) |
| LIST_REMOVE(subs[i], listEntry); |
| } |
| |
| /* Send the request */ |
| UA_DeleteSubscriptionsResponse response; |
| __UA_Client_Service(client, |
| &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST], |
| &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]); |
| if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) |
| goto cleanup; |
| |
| if(request.subscriptionIdsSize != response.resultsSize) { |
| response.responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR; |
| goto cleanup; |
| } |
| |
| /* Loop over the removed subscriptions and remove internally */ |
| for(size_t i = 0; i < request.subscriptionIdsSize; i++) { |
| if(response.results[i] != UA_STATUSCODE_GOOD && |
| response.results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) { |
| /* Something was wrong, reinsert the subscription in the list */ |
| if (subs[i]) |
| LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry); |
| continue; |
| } |
| |
| if(!subs[i]) { |
| UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "No internal representation of subscription %u", |
| request.subscriptionIds[i]); |
| continue; |
| } |
| |
| LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry); |
| UA_Client_Subscription_deleteInternal(client, subs[i]); |
| } |
| |
| return response; |
| |
| cleanup: |
| for(size_t i = 0; i < request.subscriptionIdsSize; i++) { |
| if (subs[i]) { |
| LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry); |
| } |
| } |
| return response; |
| } |
| |
| UA_StatusCode UA_EXPORT |
| UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) { |
| UA_DeleteSubscriptionsRequest request; |
| UA_DeleteSubscriptionsRequest_init(&request); |
| request.subscriptionIds = &subscriptionId; |
| request.subscriptionIdsSize = 1; |
| |
| UA_DeleteSubscriptionsResponse response = |
| UA_Client_Subscriptions_delete(client, request); |
| |
| UA_StatusCode retval = response.responseHeader.serviceResult; |
| if(retval != UA_STATUSCODE_GOOD) { |
| UA_DeleteSubscriptionsResponse_deleteMembers(&response); |
| return retval; |
| } |
| |
| if(response.resultsSize != 1) { |
| UA_DeleteSubscriptionsResponse_deleteMembers(&response); |
| return UA_STATUSCODE_BADINTERNALERROR; |
| } |
| |
| retval = response.results[0]; |
| UA_DeleteSubscriptionsResponse_deleteMembers(&response); |
| return retval; |
| } |
| |
| /******************/ |
| /* MonitoredItems */ |
| /******************/ |
| |
| void |
| UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub, |
| UA_Client_MonitoredItem *mon) { |
| // NOLINTNEXTLINE |
| LIST_REMOVE(mon, listEntry); |
| if(mon->deleteCallback) |
| mon->deleteCallback(client, sub->subscriptionId, sub->context, |
| mon->monitoredItemId, mon->context); |
| UA_free(mon); |
| } |
| |
| static void |
| __UA_Client_MonitoredItems_create(UA_Client *client, |
| const UA_CreateMonitoredItemsRequest *request, |
| void **contexts, void **handlingCallbacks, |
| UA_Client_DeleteMonitoredItemCallback *deleteCallbacks, |
| UA_CreateMonitoredItemsResponse *response) { |
| UA_CreateMonitoredItemsResponse_init(response); |
| |
| if (!request->itemsToCreateSize) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR; |
| return; |
| } |
| |
| /* Fix clang warning */ |
| size_t itemsToCreateSize = request->itemsToCreateSize; |
| UA_Client_Subscription *sub = NULL; |
| |
| /* Allocate the memory for internal representations */ |
| UA_STACKARRAY(UA_Client_MonitoredItem*, mis, itemsToCreateSize); |
| memset(mis, 0, sizeof(void*) * itemsToCreateSize); |
| for(size_t i = 0; i < itemsToCreateSize; i++) { |
| mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem)); |
| if(!mis[i]) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
| goto cleanup; |
| } |
| } |
| |
| /* Get the subscription */ |
| sub = findSubscription(client, request->subscriptionId); |
| if(!sub) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
| goto cleanup; |
| } |
| |
| /* Set the clientHandle */ |
| for(size_t i = 0; i < itemsToCreateSize; i++) |
| request->itemsToCreate[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles); |
| |
| /* Call the service */ |
| __UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST], |
| response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]); |
| if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) |
| goto cleanup; |
| |
| if(response->resultsSize != itemsToCreateSize) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR; |
| goto cleanup; |
| } |
| |
| /* Add internally */ |
| for(size_t i = 0; i < itemsToCreateSize; i++) { |
| if(response->results[i].statusCode != UA_STATUSCODE_GOOD) { |
| if (deleteCallbacks[i]) |
| deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]); |
| UA_free(mis[i]); |
| mis[i] = NULL; |
| continue; |
| } |
| |
| UA_Client_MonitoredItem *newMon = mis[i]; |
| newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle; |
| newMon->monitoredItemId = response->results[i].monitoredItemId; |
| newMon->context = contexts[i]; |
| newMon->deleteCallback = deleteCallbacks[i]; |
| newMon->handler.dataChangeCallback = |
| (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i]; |
| newMon->isEventMonitoredItem = |
| (request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER); |
| LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry); |
| |
| UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Subscription %u | Added a MonitoredItem with handle %u", |
| sub->subscriptionId, newMon->clientHandle); |
| } |
| |
| return; |
| |
| cleanup: |
| for(size_t i = 0; i < itemsToCreateSize; i++) { |
| if (deleteCallbacks[i]) { |
| if (sub) |
| deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]); |
| else |
| deleteCallbacks[i](client, 0, NULL, 0, contexts[i]); |
| } |
| if(mis[i]) |
| UA_free(mis[i]); |
| } |
| } |
| |
| UA_CreateMonitoredItemsResponse UA_EXPORT |
| UA_Client_MonitoredItems_createDataChanges(UA_Client *client, |
| const UA_CreateMonitoredItemsRequest request, void **contexts, |
| UA_Client_DataChangeNotificationCallback *callbacks, |
| UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) { |
| UA_CreateMonitoredItemsResponse response; |
| __UA_Client_MonitoredItems_create(client, &request, contexts, |
| (void**)(uintptr_t)callbacks, deleteCallbacks, &response); |
| return response; |
| } |
| |
| UA_MonitoredItemCreateResult UA_EXPORT |
| UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId, |
| UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item, |
| void *context, UA_Client_DataChangeNotificationCallback callback, |
| UA_Client_DeleteMonitoredItemCallback deleteCallback) { |
| UA_CreateMonitoredItemsRequest request; |
| UA_CreateMonitoredItemsRequest_init(&request); |
| request.subscriptionId = subscriptionId; |
| request.timestampsToReturn = timestampsToReturn; |
| request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item; |
| request.itemsToCreateSize = 1; |
| UA_CreateMonitoredItemsResponse response = |
| UA_Client_MonitoredItems_createDataChanges(client, request, &context, |
| &callback, &deleteCallback); |
| UA_MonitoredItemCreateResult result; |
| UA_MonitoredItemCreateResult_init(&result); |
| if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) |
| result.statusCode = response.responseHeader.serviceResult; |
| |
| if(result.statusCode == UA_STATUSCODE_GOOD && |
| response.resultsSize != 1) |
| result.statusCode = UA_STATUSCODE_BADINTERNALERROR; |
| |
| if(result.statusCode == UA_STATUSCODE_GOOD) |
| UA_MonitoredItemCreateResult_copy(&response.results[0] , &result); |
| UA_CreateMonitoredItemsResponse_deleteMembers(&response); |
| return result; |
| } |
| |
| UA_CreateMonitoredItemsResponse UA_EXPORT |
| UA_Client_MonitoredItems_createEvents(UA_Client *client, |
| const UA_CreateMonitoredItemsRequest request, void **contexts, |
| UA_Client_EventNotificationCallback *callback, |
| UA_Client_DeleteMonitoredItemCallback *deleteCallback) { |
| UA_CreateMonitoredItemsResponse response; |
| __UA_Client_MonitoredItems_create(client, &request, contexts, |
| (void**)(uintptr_t)callback, deleteCallback, &response); |
| return response; |
| } |
| |
| UA_MonitoredItemCreateResult UA_EXPORT |
| UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId, |
| UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item, |
| void *context, UA_Client_EventNotificationCallback callback, |
| UA_Client_DeleteMonitoredItemCallback deleteCallback) { |
| UA_CreateMonitoredItemsRequest request; |
| UA_CreateMonitoredItemsRequest_init(&request); |
| request.subscriptionId = subscriptionId; |
| request.timestampsToReturn = timestampsToReturn; |
| request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item; |
| request.itemsToCreateSize = 1; |
| UA_CreateMonitoredItemsResponse response = |
| UA_Client_MonitoredItems_createEvents(client, request, &context, |
| &callback, &deleteCallback); |
| UA_StatusCode retval = response.responseHeader.serviceResult; |
| UA_MonitoredItemCreateResult result; |
| UA_MonitoredItemCreateResult_init(&result); |
| if(retval != UA_STATUSCODE_GOOD) { |
| UA_CreateMonitoredItemsResponse_deleteMembers(&response); |
| result.statusCode = retval; |
| return result; |
| } |
| UA_MonitoredItemCreateResult_copy(response.results , &result); |
| UA_CreateMonitoredItemsResponse_deleteMembers(&response); |
| return result; |
| } |
| |
| UA_DeleteMonitoredItemsResponse UA_EXPORT |
| UA_Client_MonitoredItems_delete(UA_Client *client, const UA_DeleteMonitoredItemsRequest request) { |
| /* Send the request */ |
| UA_DeleteMonitoredItemsResponse response; |
| __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST], |
| &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]); |
| if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) |
| return response; |
| |
| UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId); |
| if(!sub) { |
| UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "No internal representation of subscription %u", |
| request.subscriptionId); |
| return response; |
| } |
| |
| /* Loop over deleted MonitoredItems */ |
| for(size_t i = 0; i < response.resultsSize; i++) { |
| if(response.results[i] != UA_STATUSCODE_GOOD && |
| response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) { |
| continue; |
| } |
| |
| #ifndef __clang_analyzer__ |
| /* Delete the internal representation */ |
| UA_Client_MonitoredItem *mon; |
| LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
| // NOLINTNEXTLINE |
| if (mon->monitoredItemId == request.monitoredItemIds[i]) { |
| UA_Client_MonitoredItem_remove(client, sub, mon); |
| break; |
| } |
| } |
| #endif |
| } |
| |
| return response; |
| } |
| |
| UA_StatusCode UA_EXPORT |
| UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId) { |
| UA_DeleteMonitoredItemsRequest request; |
| UA_DeleteMonitoredItemsRequest_init(&request); |
| request.subscriptionId = subscriptionId; |
| request.monitoredItemIds = &monitoredItemId; |
| request.monitoredItemIdsSize = 1; |
| |
| UA_DeleteMonitoredItemsResponse response = |
| UA_Client_MonitoredItems_delete(client, request); |
| |
| UA_StatusCode retval = response.responseHeader.serviceResult; |
| if(retval != UA_STATUSCODE_GOOD) { |
| UA_DeleteMonitoredItemsResponse_deleteMembers(&response); |
| return retval; |
| } |
| |
| if(response.resultsSize != 1) { |
| UA_DeleteMonitoredItemsResponse_deleteMembers(&response); |
| return UA_STATUSCODE_BADINTERNALERROR; |
| } |
| |
| retval = response.results[0]; |
| UA_DeleteMonitoredItemsResponse_deleteMembers(&response); |
| return retval; |
| } |
| |
| UA_ModifyMonitoredItemsResponse UA_EXPORT |
| UA_Client_MonitoredItems_modify(UA_Client *client, |
| const UA_ModifyMonitoredItemsRequest request) { |
| UA_ModifyMonitoredItemsResponse response; |
| |
| UA_Client_Subscription *sub = 0; |
| LIST_FOREACH(sub, &client->subscriptions, listEntry) { |
| if (sub->subscriptionId == request.subscriptionId) |
| break; |
| } |
| |
| if (!sub) { |
| UA_ModifyMonitoredItemsResponse_init(&response); |
| response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
| return response; |
| } |
| |
| UA_ModifyMonitoredItemsRequest modifiedRequest; |
| UA_ModifyMonitoredItemsRequest_copy(&request, &modifiedRequest); |
| |
| for (size_t i = 0; i < modifiedRequest.itemsToModifySize; ++i) { |
| UA_Client_MonitoredItem *mon = 0; |
| LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
| if(mon->monitoredItemId == modifiedRequest.itemsToModify[i].monitoredItemId) { |
| modifiedRequest.itemsToModify[i].requestedParameters.clientHandle = mon->clientHandle; |
| break; |
| } |
| } |
| } |
| |
| __UA_Client_Service(client, |
| &modifiedRequest, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSREQUEST], |
| &response, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSRESPONSE]); |
| |
| UA_ModifyMonitoredItemsRequest_deleteMembers(&modifiedRequest); |
| return response; |
| } |
| |
| /*************************************/ |
| /* Async Processing of Notifications */ |
| /*************************************/ |
| |
| /* Assume the request is already initialized */ |
| UA_StatusCode |
| UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) { |
| /* Count acks */ |
| UA_Client_NotificationsAckNumber *ack; |
| LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) |
| ++request->subscriptionAcknowledgementsSize; |
| |
| /* Create the array. Returns a sentinel pointer if the length is zero. */ |
| request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*) |
| UA_Array_new(request->subscriptionAcknowledgementsSize, |
| &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]); |
| if(!request->subscriptionAcknowledgements) { |
| request->subscriptionAcknowledgementsSize = 0; |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| } |
| |
| size_t i = 0; |
| UA_Client_NotificationsAckNumber *ack_tmp; |
| LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) { |
| request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber; |
| request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId; |
| ++i; |
| LIST_REMOVE(ack, listEntry); |
| UA_free(ack); |
| } |
| return UA_STATUSCODE_GOOD; |
| } |
| |
| /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */ |
| /* The value 0 is never used for the sequence number */ |
| static UA_UInt32 |
| UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) { |
| UA_UInt32 nextSequenceNumber = sequenceNumber + 1; |
| if(nextSequenceNumber == 0) |
| nextSequenceNumber = 1; |
| return nextSequenceNumber; |
| } |
| |
| static void |
| processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub, |
| UA_DataChangeNotification *dataChangeNotification) { |
| for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) { |
| UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j]; |
| |
| /* Find the MonitoredItem */ |
| UA_Client_MonitoredItem *mon; |
| LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
| if(mon->clientHandle == min->clientHandle) |
| break; |
| } |
| |
| if(!mon) { |
| UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Could not process a notification with clienthandle %u on subscription %u", |
| min->clientHandle, sub->subscriptionId); |
| continue; |
| } |
| |
| if(mon->isEventMonitoredItem) { |
| UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "MonitoredItem is configured for Events. But received a " |
| "DataChangeNotification."); |
| continue; |
| } |
| |
| mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context, |
| mon->monitoredItemId, mon->context, |
| &min->value); |
| } |
| } |
| |
| static void |
| processEventNotification(UA_Client *client, UA_Client_Subscription *sub, |
| UA_EventNotificationList *eventNotificationList) { |
| for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) { |
| UA_EventFieldList *eventFieldList = &eventNotificationList->events[j]; |
| |
| /* Find the MonitoredItem */ |
| UA_Client_MonitoredItem *mon; |
| LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
| if(mon->clientHandle == eventFieldList->clientHandle) |
| break; |
| } |
| |
| if(!mon) { |
| UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Could not process a notification with clienthandle %u on subscription %u", |
| eventFieldList->clientHandle, sub->subscriptionId); |
| continue; |
| } |
| |
| if(!mon->isEventMonitoredItem) { |
| UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "MonitoredItem is configured for DataChanges. But received a " |
| "EventNotification."); |
| continue; |
| } |
| |
| mon->handler.eventCallback(client, sub->subscriptionId, sub->context, |
| mon->monitoredItemId, mon->context, |
| eventFieldList->eventFieldsSize, |
| eventFieldList->eventFields); |
| } |
| } |
| |
| static void |
| processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub, |
| UA_ExtensionObject *msg) { |
| if(msg->encoding != UA_EXTENSIONOBJECT_DECODED) |
| return; |
| |
| /* Handle DataChangeNotification */ |
| if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) { |
| UA_DataChangeNotification *dataChangeNotification = |
| (UA_DataChangeNotification *)msg->content.decoded.data; |
| processDataChangeNotification(client, sub, dataChangeNotification); |
| return; |
| } |
| |
| /* Handle EventNotification */ |
| if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) { |
| UA_EventNotificationList *eventNotificationList = |
| (UA_EventNotificationList *)msg->content.decoded.data; |
| processEventNotification(client, sub, eventNotificationList); |
| return; |
| } |
| |
| /* Handle StatusChangeNotification */ |
| if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) { |
| if(sub->statusChangeCallback) { |
| sub->statusChangeCallback(client, sub->subscriptionId, sub->context, |
| (UA_StatusChangeNotification*)msg->content.decoded.data); |
| } else { |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Dropped a StatusChangeNotification since no callback is registered"); |
| } |
| return; |
| } |
| |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Unknown notification message type"); |
| } |
| |
| void |
| UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request, |
| UA_PublishResponse *response) { |
| UA_NotificationMessage *msg = &response->notificationMessage; |
| |
| client->currentlyOutStandingPublishRequests--; |
| |
| if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) { |
| if(client->config.outStandingPublishRequests > 1) { |
| client->config.outStandingPublishRequests--; |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Too many publishrequest, reduce outStandingPublishRequests to %d", |
| client->config.outStandingPublishRequests); |
| } else { |
| UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Too many publishrequest when outStandingPublishRequests = 1"); |
| UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId); |
| } |
| return; |
| } |
| |
| if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN) |
| return; |
| |
| if(!LIST_FIRST(&client->subscriptions)) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION; |
| return; |
| } |
| |
| if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONCLOSED) { |
| if(client->state >= UA_CLIENTSTATE_SESSION) { |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Received Publish Response with code %s", |
| UA_StatusCode_name(response->responseHeader.serviceResult)); |
| UA_Client_Subscription* sub = findSubscription(client, response->subscriptionId); |
| if (sub != NULL) |
| UA_Client_Subscription_deleteInternal(client, sub); |
| } |
| return; |
| } |
| |
| if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) { |
| UA_Client_disconnect(client); /* TODO: This should be handled before the process callback */ |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Received BadSessionIdInvalid"); |
| return; |
| } |
| |
| if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTIMEOUT) { |
| if (client->config.inactivityCallback) |
| client->config.inactivityCallback(client); |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Received Timeout for Publish Response"); |
| return; |
| } |
| |
| if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) { |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Received Publish Response with code %s", |
| UA_StatusCode_name(response->responseHeader.serviceResult)); |
| return; |
| } |
| |
| UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId); |
| if(!sub) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR; |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Received Publish Response for a non-existant subscription"); |
| return; |
| } |
| |
| sub->lastActivity = UA_DateTime_nowMonotonic(); |
| |
| /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */ |
| if(UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber) { |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Invalid subscription sequence number: expected %u but got %u", |
| UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber), |
| msg->sequenceNumber); |
| /* This is an error. But we do not abort the connection. Some server |
| * SDKs misbehave from time to time and send out-of-order sequence |
| * numbers. (Probably some multi-threading synchronization issue.) */ |
| /* UA_Client_disconnect(client); |
| return; */ |
| } |
| /* According to f), a keep-alive message contains no notifications and has the sequence number |
| * of the next NotificationMessage that is to be sent => More than one consecutive keep-alive |
| * message or a NotificationMessage following a keep-alive message will share the same sequence |
| * number. */ |
| if (msg->notificationDataSize) |
| sub->sequenceNumber = msg->sequenceNumber; |
| |
| /* Process the notification messages */ |
| for(size_t k = 0; k < msg->notificationDataSize; ++k) |
| processNotificationMessage(client, sub, &msg->notificationData[k]); |
| |
| /* Add to the list of pending acks */ |
| for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) { |
| if(response->availableSequenceNumbers[i] != msg->sequenceNumber) |
| continue; |
| UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*) |
| UA_malloc(sizeof(UA_Client_NotificationsAckNumber)); |
| if(!tmpAck) { |
| UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Not enough memory to store the acknowledgement for a publish " |
| "message on subscription %u", sub->subscriptionId); |
| break; |
| } |
| tmpAck->subAck.sequenceNumber = msg->sequenceNumber; |
| tmpAck->subAck.subscriptionId = sub->subscriptionId; |
| LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry); |
| break; |
| } |
| } |
| |
| static void |
| processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId, |
| void *response) { |
| UA_PublishRequest *req = (UA_PublishRequest*)userdata; |
| UA_PublishResponse *res = (UA_PublishResponse*)response; |
| |
| /* Process the response */ |
| UA_Client_Subscriptions_processPublishResponse(client, req, res); |
| |
| /* Delete the cached request */ |
| UA_PublishRequest_delete(req); |
| |
| /* Fill up the outstanding publish requests */ |
| UA_Client_Subscriptions_backgroundPublish(client); |
| } |
| |
| void |
| UA_Client_Subscriptions_clean(UA_Client *client) { |
| UA_Client_NotificationsAckNumber *n, *tmp; |
| LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) { |
| LIST_REMOVE(n, listEntry); |
| UA_free(n); |
| } |
| |
| UA_Client_Subscription *sub, *tmps; |
| LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps) |
| UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */ |
| |
| client->monitoredItemHandles = 0; |
| } |
| |
| void |
| UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) { |
| if(client->state < UA_CLIENTSTATE_SESSION) |
| return; |
| |
| /* Is the lack of responses the client's fault? */ |
| if(client->currentlyOutStandingPublishRequests == 0) |
| return; |
| |
| UA_Client_Subscription *sub; |
| LIST_FOREACH(sub, &client->subscriptions, listEntry) { |
| UA_DateTime maxSilence = (UA_DateTime) |
| ((sub->publishingInterval * sub->maxKeepAliveCount) + |
| client->config.timeout) * UA_DATETIME_MSEC; |
| if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) { |
| /* Reset activity */ |
| sub->lastActivity = UA_DateTime_nowMonotonic(); |
| |
| if(client->config.subscriptionInactivityCallback) |
| client->config.subscriptionInactivityCallback(client, sub->subscriptionId, |
| sub->context); |
| UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT, |
| "Inactivity for Subscription %u.", sub->subscriptionId); |
| } |
| } |
| } |
| |
| UA_StatusCode |
| UA_Client_Subscriptions_backgroundPublish(UA_Client *client) { |
| if(client->state < UA_CLIENTSTATE_SESSION) |
| return UA_STATUSCODE_BADSERVERNOTCONNECTED; |
| |
| /* The session must have at least one subscription */ |
| if(!LIST_FIRST(&client->subscriptions)) |
| return UA_STATUSCODE_GOOD; |
| |
| while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) { |
| UA_PublishRequest *request = UA_PublishRequest_new(); |
| if (!request) |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| |
| request->requestHeader.timeoutHint=60000; |
| UA_StatusCode retval = UA_Client_preparePublishRequest(client, request); |
| if(retval != UA_STATUSCODE_GOOD) { |
| UA_PublishRequest_delete(request); |
| return retval; |
| } |
| |
| UA_UInt32 requestId; |
| client->currentlyOutStandingPublishRequests++; |
| |
| /* Disable the timeout, it is treat in UA_Client_Subscriptions_backgroundPublishInactivityCheck */ |
| retval = __UA_Client_AsyncServiceEx(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST], |
| processPublishResponseAsync, |
| &UA_TYPES[UA_TYPES_PUBLISHRESPONSE], |
| (void*)request, &requestId, 0); |
| if(retval != UA_STATUSCODE_GOOD) { |
| UA_PublishRequest_delete(request); |
| return retval; |
| } |
| } |
| |
| return UA_STATUSCODE_GOOD; |
| } |
| |
| #endif /* UA_ENABLE_SUBSCRIPTIONS */ |