| /* This Source Code Form is subject to the terms of the Mozilla Public |
| * License, v. 2.0. If a copy of the MPL was not distributed with this |
| * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| * |
| * Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
| * Copyright 2016-2017 (c) Florian Palm |
| * Copyright 2015 (c) Chris Iatrou |
| * Copyright 2015-2016 (c) Sten GrĂ¼ner |
| * Copyright 2015-2016 (c) Oleksiy Vasylyev |
| * Copyright 2017 (c) Stefan Profanter, fortiss GmbH |
| * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH |
| * Copyright 2017 (c) Mattias Bornhager |
| * Copyright 2017 (c) Henrik Norrman |
| * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA |
| * Copyright 2018 (c) Fabian Arndt, Root-Core |
| */ |
| |
| #include "ua_server_internal.h" |
| #include "ua_services.h" |
| #include "ua_subscription.h" |
| |
| #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ |
| |
| static UA_StatusCode |
| setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription, |
| UA_Double requestedPublishingInterval, |
| UA_UInt32 requestedLifetimeCount, |
| UA_UInt32 requestedMaxKeepAliveCount, |
| UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) { |
| /* deregister the callback if required */ |
| Subscription_unregisterPublishCallback(server, subscription); |
| |
| /* re-parameterize the subscription */ |
| UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits, |
| requestedPublishingInterval, subscription->publishingInterval); |
| /* check for nan*/ |
| if(requestedPublishingInterval != requestedPublishingInterval) |
| subscription->publishingInterval = server->config.publishingIntervalLimits.min; |
| UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits, |
| requestedMaxKeepAliveCount, subscription->maxKeepAliveCount); |
| UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits, |
| requestedLifetimeCount, subscription->lifeTimeCount); |
| if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount) |
| subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount; |
| subscription->notificationsPerPublish = maxNotificationsPerPublish; |
| if(maxNotificationsPerPublish == 0 || |
| maxNotificationsPerPublish > server->config.maxNotificationsPerPublish) |
| subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish; |
| subscription->priority = priority; |
| |
| UA_StatusCode retval = Subscription_registerPublishCallback(server, subscription); |
| if(retval != UA_STATUSCODE_GOOD) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, subscription->session, |
| "Subscription %u | Could not register publish callback with error code %s", |
| subscription->subscriptionId, UA_StatusCode_name(retval)); |
| } |
| return retval; |
| } |
| |
| void |
| Service_CreateSubscription(UA_Server *server, UA_Session *session, |
| const UA_CreateSubscriptionRequest *request, |
| UA_CreateSubscriptionResponse *response) { |
| UA_LOCK_ASSERT(server->serviceMutex, 1); |
| |
| /* Check limits for the number of subscriptions */ |
| if(((server->config.maxSubscriptions != 0) && |
| (server->numSubscriptions >= server->config.maxSubscriptions)) || |
| ((server->config.maxSubscriptionsPerSession != 0) && |
| (session->numSubscriptions >= server->config.maxSubscriptionsPerSession))) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS; |
| return; |
| } |
| |
| /* Create the subscription */ |
| UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId); |
| if(!newSubscription) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Processing CreateSubscriptionRequest failed"); |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; |
| return; |
| } |
| |
| UA_Session_addSubscription(server, session, newSubscription); /* Also assigns the subscription id */ |
| |
| /* Set the subscription parameters */ |
| newSubscription->publishingEnabled = request->publishingEnabled; |
| UA_StatusCode retval = setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval, |
| request->requestedLifetimeCount, request->requestedMaxKeepAliveCount, |
| request->maxNotificationsPerPublish, request->priority); |
| |
| if(retval != UA_STATUSCODE_GOOD) { |
| response->responseHeader.serviceResult = retval; |
| return; |
| } |
| |
| newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount; /* set settings first */ |
| |
| /* Prepare the response */ |
| response->subscriptionId = newSubscription->subscriptionId; |
| response->revisedPublishingInterval = newSubscription->publishingInterval; |
| response->revisedLifetimeCount = newSubscription->lifeTimeCount; |
| response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount; |
| |
| UA_LOG_INFO_SESSION(&server->config.logger, session, "Subscription %u | " |
| "Created the Subscription with a publishing interval of %.2f ms", |
| response->subscriptionId, newSubscription->publishingInterval); |
| } |
| |
| void |
| Service_ModifySubscription(UA_Server *server, UA_Session *session, |
| const UA_ModifySubscriptionRequest *request, |
| UA_ModifySubscriptionResponse *response) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing ModifySubscriptionRequest"); |
| UA_LOCK_ASSERT(server->serviceMutex, 1); |
| |
| UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId); |
| if(!sub) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
| return; |
| } |
| |
| UA_StatusCode retval = setSubscriptionSettings(server, sub, request->requestedPublishingInterval, |
| request->requestedLifetimeCount, request->requestedMaxKeepAliveCount, |
| request->maxNotificationsPerPublish, request->priority); |
| |
| if(retval != UA_STATUSCODE_GOOD) { |
| response->responseHeader.serviceResult = retval; |
| return; |
| } |
| |
| sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */ |
| response->revisedPublishingInterval = sub->publishingInterval; |
| response->revisedLifetimeCount = sub->lifeTimeCount; |
| response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount; |
| } |
| |
| static void |
| Operation_SetPublishingMode(UA_Server *Server, UA_Session *session, |
| const UA_Boolean *publishingEnabled, const UA_UInt32 *subscriptionId, |
| UA_StatusCode *result) { |
| UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId); |
| if(!sub) { |
| *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
| return; |
| } |
| |
| sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */ |
| sub->publishingEnabled = *publishingEnabled; /* Set the publishing mode */ |
| } |
| |
| void |
| Service_SetPublishingMode(UA_Server *server, UA_Session *session, |
| const UA_SetPublishingModeRequest *request, |
| UA_SetPublishingModeResponse *response) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing SetPublishingModeRequest"); |
| UA_LOCK_ASSERT(server->serviceMutex, 1); |
| |
| UA_Boolean publishingEnabled = request->publishingEnabled; /* request is const */ |
| response->responseHeader.serviceResult = |
| UA_Server_processServiceOperations(server, session, (UA_ServiceOperation)Operation_SetPublishingMode, |
| &publishingEnabled, |
| &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32], |
| &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]); |
| } |
| |
| /* TODO: Unify with senderror in ua_server_binary.c */ |
| static void |
| subscriptionSendError(UA_SecureChannel *channel, UA_UInt32 requestHandle, |
| UA_UInt32 requestId, UA_StatusCode error) { |
| UA_PublishResponse err_response; |
| UA_PublishResponse_init(&err_response); |
| err_response.responseHeader.requestHandle = requestHandle; |
| err_response.responseHeader.timestamp = UA_DateTime_now(); |
| err_response.responseHeader.serviceResult = error; |
| UA_SecureChannel_sendSymmetricMessage(channel, requestId, UA_MESSAGETYPE_MSG, |
| &err_response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); |
| } |
| |
| void |
| Service_Publish(UA_Server *server, UA_Session *session, |
| const UA_PublishRequest *request, UA_UInt32 requestId) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing PublishRequest"); |
| |
| /* Return an error if the session has no subscription */ |
| if(LIST_EMPTY(&session->serverSubscriptions)) { |
| subscriptionSendError(session->header.channel, request->requestHeader.requestHandle, |
| requestId, UA_STATUSCODE_BADNOSUBSCRIPTION); |
| return; |
| } |
| |
| /* Handle too many subscriptions to free resources before trying to allocate |
| * resources for the new publish request. If the limit has been reached the |
| * oldest publish request shall be responded */ |
| if((server->config.maxPublishReqPerSession != 0) && |
| (session->numPublishReq >= server->config.maxPublishReqPerSession)) { |
| if(!UA_Subscription_reachedPublishReqLimit(server, session)) { |
| subscriptionSendError(session->header.channel, requestId, |
| request->requestHeader.requestHandle, |
| UA_STATUSCODE_BADINTERNALERROR); |
| return; |
| } |
| } |
| |
| /* Allocate the response to store it in the retransmission queue */ |
| UA_PublishResponseEntry *entry = (UA_PublishResponseEntry *) |
| UA_malloc(sizeof(UA_PublishResponseEntry)); |
| if(!entry) { |
| subscriptionSendError(session->header.channel, requestId, |
| request->requestHeader.requestHandle, |
| UA_STATUSCODE_BADOUTOFMEMORY); |
| return; |
| } |
| |
| /* Prepare the response */ |
| entry->requestId = requestId; |
| UA_PublishResponse *response = &entry->response; |
| UA_PublishResponse_init(response); |
| response->responseHeader.requestHandle = request->requestHeader.requestHandle; |
| |
| /* Allocate the results array to acknowledge the acknowledge */ |
| if(request->subscriptionAcknowledgementsSize > 0) { |
| response->results = (UA_StatusCode *) |
| UA_Array_new(request->subscriptionAcknowledgementsSize, |
| &UA_TYPES[UA_TYPES_STATUSCODE]); |
| if(!response->results) { |
| UA_free(entry); |
| subscriptionSendError(session->header.channel, requestId, |
| request->requestHeader.requestHandle, |
| UA_STATUSCODE_BADOUTOFMEMORY); |
| return; |
| } |
| response->resultsSize = request->subscriptionAcknowledgementsSize; |
| } |
| |
| /* Delete Acknowledged Subscription Messages */ |
| for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; ++i) { |
| UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i]; |
| UA_Subscription *sub = UA_Session_getSubscriptionById(session, ack->subscriptionId); |
| if(!sub) { |
| response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Cannot process acknowledgements subscription %u", |
| ack->subscriptionId); |
| continue; |
| } |
| /* Remove the acked transmission from the retransmission queue */ |
| response->results[i] = UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber); |
| } |
| |
| /* Queue the publish response. It will be dequeued in a repeated publish |
| * callback. This can also be triggered right now for a late |
| * subscription. */ |
| UA_Session_queuePublishReq(session, entry, false); |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Queued a publication message"); |
| |
| /* If there are late subscriptions, the new publish request is used to |
| * answer them immediately. However, a single subscription that generates |
| * many notifications must not "starve" other late subscriptions. Therefore |
| * we keep track of the last subscription that got preferential treatment. |
| * We start searching for late subscriptions **after** the last one. */ |
| |
| UA_Subscription *immediate = NULL; |
| if(session->lastSeenSubscriptionId > 0) { |
| LIST_FOREACH(immediate, &session->serverSubscriptions, listEntry) { |
| if(immediate->subscriptionId == session->lastSeenSubscriptionId) { |
| immediate = LIST_NEXT(immediate, listEntry); |
| break; |
| } |
| } |
| } |
| |
| /* If no entry was found, start at the beginning and don't restart */ |
| UA_Boolean found = false; |
| if(!immediate) |
| immediate = LIST_FIRST(&session->serverSubscriptions); |
| else |
| found = true; |
| |
| repeat: |
| while(immediate) { |
| if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) { |
| session->lastSeenSubscriptionId = immediate->subscriptionId; |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Subscription %u | Response on a late subscription", |
| immediate->subscriptionId); |
| UA_Subscription_publish(server, immediate); |
| return; |
| } |
| immediate = LIST_NEXT(immediate, listEntry); |
| } |
| |
| /* Restart at the beginning of the list */ |
| if(found) { |
| immediate = LIST_FIRST(&session->serverSubscriptions); |
| found = false; |
| goto repeat; |
| } |
| |
| /* No late subscription this time */ |
| session->lastSeenSubscriptionId = 0; |
| } |
| |
| static void |
| Operation_DeleteSubscription(UA_Server *server, UA_Session *session, void *_, |
| const UA_UInt32 *subscriptionId, UA_StatusCode *result) { |
| *result = UA_Session_deleteSubscription(server, session, *subscriptionId); |
| if(*result == UA_STATUSCODE_GOOD) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Subscription %u | Subscription deleted", |
| *subscriptionId); |
| } else { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Deleting Subscription with Id %u failed with error code %s", |
| *subscriptionId, UA_StatusCode_name(*result)); |
| } |
| } |
| |
| void |
| Service_DeleteSubscriptions(UA_Server *server, UA_Session *session, |
| const UA_DeleteSubscriptionsRequest *request, |
| UA_DeleteSubscriptionsResponse *response) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Processing DeleteSubscriptionsRequest"); |
| UA_LOCK_ASSERT(server->serviceMutex, 1); |
| |
| response->responseHeader.serviceResult = |
| UA_Server_processServiceOperations(server, session, |
| (UA_ServiceOperation)Operation_DeleteSubscription, NULL, |
| &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32], |
| &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]); |
| |
| /* The session has at least one subscription */ |
| if(LIST_FIRST(&session->serverSubscriptions)) |
| return; |
| |
| /* Send remaining publish responses if the last subscription was removed */ |
| UA_Subscription_answerPublishRequestsNoSubscription(server, session); |
| } |
| |
| void |
| Service_Republish(UA_Server *server, UA_Session *session, |
| const UA_RepublishRequest *request, |
| UA_RepublishResponse *response) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Processing RepublishRequest"); |
| UA_LOCK_ASSERT(server->serviceMutex, 1); |
| |
| /* Get the subscription */ |
| UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId); |
| if(!sub) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; |
| return; |
| } |
| |
| /* Reset the subscription lifetime */ |
| sub->currentLifetimeCount = 0; |
| |
| /* Find the notification in the retransmission queue */ |
| UA_NotificationMessageEntry *entry; |
| TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) { |
| if(entry->message.sequenceNumber == request->retransmitSequenceNumber) |
| break; |
| } |
| if(!entry) { |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE; |
| return; |
| } |
| |
| response->responseHeader.serviceResult = |
| UA_NotificationMessage_copy(&entry->message, &response->notificationMessage); |
| } |
| |
| #endif /* UA_ENABLE_SUBSCRIPTIONS */ |