| /* This Source Code Form is subject to the terms of the Mozilla Public |
| * License, v. 2.0. If a copy of the MPL was not distributed with this |
| * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| * |
| * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
| * Copyright 2015 (c) Chris Iatrou |
| * Copyright 2015-2016 (c) Sten Grüner |
| * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA |
| * Copyright 2015 (c) Joakim L. Gilje |
| * Copyright 2016-2017 (c) Florian Palm |
| * Copyright 2015-2016 (c) Oleksiy Vasylyev |
| * Copyright 2017 (c) frax2222 |
| * Copyright 2017 (c) Stefan Profanter, fortiss GmbH |
| * Copyright 2017 (c) Ari Breitkreuz, fortiss GmbH |
| * Copyright 2017 (c) Mattias Bornhager |
| * Copyright 2018 (c) Hilscher Gesellschaft für Systemautomation mbH (Author: Martin Lang) |
| */ |
| |
| #include "ua_server_internal.h" |
| #include "ua_subscription.h" |
| |
| #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ |
| |
| UA_Subscription * |
| UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId) { |
| /* Allocate the memory */ |
| UA_Subscription *newSub = |
| (UA_Subscription*)UA_calloc(1, sizeof(UA_Subscription)); |
| if(!newSub) |
| return NULL; |
| |
| /* Remaining members are covered by calloc zeroing out the memory */ |
| newSub->session = session; |
| newSub->subscriptionId = subscriptionId; |
| newSub->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */ |
| /* Even if the first publish response is a keepalive the sequence number is 1. |
| * This can happen by a subscription without a monitored item (see CTT test scripts). */ |
| newSub->nextSequenceNumber = 1; |
| TAILQ_INIT(&newSub->retransmissionQueue); |
| TAILQ_INIT(&newSub->notificationQueue); |
| return newSub; |
| } |
| |
| void |
| UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) { |
| Subscription_unregisterPublishCallback(server, sub); |
| |
| /* Delete monitored Items */ |
| UA_MonitoredItem *mon, *tmp_mon; |
| LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmp_mon) { |
| LIST_REMOVE(mon, listEntry); |
| UA_LOG_INFO_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | MonitoredItem %i | " |
| "Deleted the MonitoredItem", sub->subscriptionId, |
| mon->monitoredItemId); |
| UA_MonitoredItem_delete(server, mon); |
| } |
| UA_assert(server->numMonitoredItems >= sub->monitoredItemsSize); |
| server->numMonitoredItems -= sub->monitoredItemsSize; |
| sub->monitoredItemsSize = 0; |
| |
| /* Delete Retransmission Queue */ |
| UA_NotificationMessageEntry *nme, *nme_tmp; |
| TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) { |
| TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry); |
| UA_NotificationMessage_deleteMembers(&nme->message); |
| UA_free(nme); |
| --sub->session->totalRetransmissionQueueSize; |
| --sub->retransmissionQueueSize; |
| } |
| UA_assert(sub->retransmissionQueueSize == 0); |
| |
| UA_LOG_INFO_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | Deleted the Subscription", |
| sub->subscriptionId); |
| } |
| |
| UA_MonitoredItem * |
| UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId) { |
| UA_MonitoredItem *mon; |
| LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
| if(mon->monitoredItemId == monitoredItemId) |
| break; |
| } |
| return mon; |
| } |
| |
| UA_StatusCode |
| UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub, |
| UA_UInt32 monitoredItemId) { |
| /* Find the MonitoredItem */ |
| UA_MonitoredItem *mon; |
| LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { |
| if(mon->monitoredItemId == monitoredItemId) |
| break; |
| } |
| if(!mon) |
| return UA_STATUSCODE_BADMONITOREDITEMIDINVALID; |
| |
| UA_LOG_INFO_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | MonitoredItem %i | " |
| "Delete the MonitoredItem", sub->subscriptionId, |
| mon->monitoredItemId); |
| |
| /* Remove the MonitoredItem */ |
| LIST_REMOVE(mon, listEntry); |
| UA_assert(sub->monitoredItemsSize > 0); |
| UA_assert(server->numMonitoredItems > 0); |
| sub->monitoredItemsSize--; |
| server->numMonitoredItems--; |
| |
| /* Remove content and delayed free */ |
| UA_MonitoredItem_delete(server, mon); |
| |
| return UA_STATUSCODE_GOOD; |
| } |
| |
| void |
| UA_Subscription_addMonitoredItem(UA_Server *server, UA_Subscription *sub, UA_MonitoredItem *newMon) { |
| sub->monitoredItemsSize++; |
| server->numMonitoredItems++; |
| LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry); |
| } |
| |
| static void |
| removeOldestRetransmissionMessage(UA_Session *session) { |
| UA_NotificationMessageEntry *oldestEntry = NULL; |
| UA_Subscription *oldestSub = NULL; |
| |
| UA_Subscription *sub; |
| LIST_FOREACH(sub, &session->serverSubscriptions, listEntry) { |
| UA_NotificationMessageEntry *first = |
| TAILQ_LAST(&sub->retransmissionQueue, ListOfNotificationMessages); |
| if(!first) |
| continue; |
| if(!oldestEntry || oldestEntry->message.publishTime > first->message.publishTime) { |
| oldestEntry = first; |
| oldestSub = sub; |
| } |
| } |
| UA_assert(oldestEntry); |
| UA_assert(oldestSub); |
| |
| TAILQ_REMOVE(&oldestSub->retransmissionQueue, oldestEntry, listEntry); |
| UA_NotificationMessage_deleteMembers(&oldestEntry->message); |
| UA_free(oldestEntry); |
| --session->totalRetransmissionQueueSize; |
| --oldestSub->retransmissionQueueSize; |
| } |
| |
| static void |
| UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub, |
| UA_NotificationMessageEntry *entry) { |
| /* Release the oldest entry if there is not enough space */ |
| if(server->config.maxRetransmissionQueueSize > 0 && |
| sub->session->totalRetransmissionQueueSize >= server->config.maxRetransmissionQueueSize) { |
| UA_LOG_WARNING_SESSION(&server->config.logger, sub->session, "Subscription %u | " |
| "Retransmission queue overflow", sub->subscriptionId); |
| removeOldestRetransmissionMessage(sub->session); |
| } |
| |
| /* Add entry */ |
| TAILQ_INSERT_TAIL(&sub->retransmissionQueue, entry, listEntry); |
| ++sub->session->totalRetransmissionQueueSize; |
| ++sub->retransmissionQueueSize; |
| } |
| |
| UA_StatusCode |
| UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) { |
| /* Find the retransmission message */ |
| UA_NotificationMessageEntry *entry; |
| TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) { |
| if(entry->message.sequenceNumber == sequenceNumber) |
| break; |
| } |
| if(!entry) |
| return UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN; |
| |
| /* Remove the retransmission message */ |
| TAILQ_REMOVE(&sub->retransmissionQueue, entry, listEntry); |
| --sub->session->totalRetransmissionQueueSize; |
| --sub->retransmissionQueueSize; |
| UA_NotificationMessage_deleteMembers(&entry->message); |
| UA_free(entry); |
| return UA_STATUSCODE_GOOD; |
| } |
| |
| static UA_StatusCode |
| prepareNotificationMessage(UA_Server *server, UA_Subscription *sub, |
| UA_NotificationMessage *message, size_t notifications) { |
| UA_assert(notifications > 0); |
| |
| /* Allocate an ExtensionObject for events and data */ |
| message->notificationData = (UA_ExtensionObject*) |
| UA_Array_new(2, &UA_TYPES[UA_TYPES_EXTENSIONOBJECT]); |
| if(!message->notificationData) |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| message->notificationDataSize = 2; |
| |
| /* Pre-allocate DataChangeNotifications */ |
| size_t notificationDataIdx = 0; |
| UA_DataChangeNotification *dcn = NULL; |
| if(sub->dataChangeNotifications > 0) { |
| dcn = UA_DataChangeNotification_new(); |
| if(!dcn) { |
| UA_NotificationMessage_deleteMembers(message); |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| } |
| message->notificationData->encoding = UA_EXTENSIONOBJECT_DECODED; |
| message->notificationData->content.decoded.data = dcn; |
| message->notificationData->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]; |
| |
| size_t dcnSize = sub->dataChangeNotifications; |
| if(dcnSize > notifications) |
| dcnSize = notifications; |
| dcn->monitoredItems = (UA_MonitoredItemNotification*) |
| UA_Array_new(dcnSize, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]); |
| if(!dcn->monitoredItems) { |
| UA_NotificationMessage_deleteMembers(message); |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| } |
| dcn->monitoredItemsSize = dcnSize; |
| notificationDataIdx++; |
| } |
| |
| #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
| UA_EventNotificationList *enl = NULL; |
| UA_StatusChangeNotification *scn = NULL; |
| /* Pre-allocate either StatusChange or EventNotifications. Sending a |
| * (single) StatusChangeNotification has priority. */ |
| if(sub->statusChangeNotifications > 0) { |
| scn = UA_StatusChangeNotification_new(); |
| if(!scn) { |
| UA_NotificationMessage_deleteMembers(message); |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| } |
| message->notificationData[notificationDataIdx].encoding = UA_EXTENSIONOBJECT_DECODED; |
| message->notificationData[notificationDataIdx].content.decoded.data = scn; |
| message->notificationData[notificationDataIdx].content.decoded.type = &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]; |
| notificationDataIdx++; |
| } else if(sub->eventNotifications > 0) { |
| enl = UA_EventNotificationList_new(); |
| if(!enl) { |
| UA_NotificationMessage_deleteMembers(message); |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| } |
| message->notificationData[notificationDataIdx].encoding = UA_EXTENSIONOBJECT_DECODED; |
| message->notificationData[notificationDataIdx].content.decoded.data = enl; |
| message->notificationData[notificationDataIdx].content.decoded.type = &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]; |
| |
| size_t enlSize = sub->eventNotifications; |
| if(enlSize > notifications) |
| enlSize = notifications; |
| enl->events = (UA_EventFieldList*) UA_Array_new(enlSize, &UA_TYPES[UA_TYPES_EVENTFIELDLIST]); |
| if(!enl->events) { |
| UA_NotificationMessage_deleteMembers(message); |
| return UA_STATUSCODE_BADOUTOFMEMORY; |
| } |
| enl->eventsSize = enlSize; |
| notificationDataIdx++; |
| } |
| #endif |
| |
| UA_assert(notificationDataIdx > 0); |
| message->notificationDataSize = notificationDataIdx; |
| |
| /* <-- The point of no return --> */ |
| |
| size_t totalNotifications = 0; /* How many notifications were moved to the response overall? */ |
| size_t dcnPos = 0; /* How many DataChangeNotifications were put into the list? */ |
| #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
| size_t enlPos = 0; /* How many EventNotifications were moved into the list */ |
| #endif |
| UA_Notification *notification, *notification_tmp; |
| TAILQ_FOREACH_SAFE(notification, &sub->notificationQueue, globalEntry, notification_tmp) { |
| if(totalNotifications >= notifications) |
| break; |
| |
| UA_MonitoredItem *mon = notification->mon; |
| |
| /* Remove from the queues and decrease the counters */ |
| UA_Notification_dequeue(server, notification); |
| |
| /* Move the content to the response */ |
| #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
| if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) { |
| |
| UA_assert(enl != NULL); /* Have at least one event notification */ |
| |
| /* Move the content to the response */ |
| UA_EventFieldList *efl = &enl->events[enlPos]; |
| *efl = notification->data.event.fields; |
| UA_EventFieldList_init(¬ification->data.event.fields); |
| efl->clientHandle = mon->clientHandle; |
| |
| enlPos++; |
| } else |
| #endif |
| { |
| UA_assert(dcn != NULL); /* Have at least one change notification */ |
| /* Move the content to the response */ |
| UA_MonitoredItemNotification *min = &dcn->monitoredItems[dcnPos]; |
| min->clientHandle = mon->clientHandle; |
| min->value = notification->data.value; |
| UA_DataValue_init(¬ification->data.value); /* Reset after the value has been moved */ |
| dcnPos++; |
| } |
| |
| UA_Notification_delete(notification); |
| totalNotifications++; |
| } |
| |
| /* Set sizes */ |
| if(dcn) { |
| dcn->monitoredItemsSize = dcnPos; |
| if(dcnPos == 0) { |
| UA_free(dcn->monitoredItems); |
| dcn->monitoredItems = NULL; |
| } |
| } |
| |
| #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS |
| if(enl) { |
| enl->eventsSize = enlPos; |
| if(enlPos == 0) { |
| UA_free(enl->events); |
| enl->events = NULL; |
| } |
| } |
| #endif |
| |
| return UA_STATUSCODE_GOOD; |
| } |
| |
| /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) The value 0 is |
| * never used for the sequence number */ |
| static UA_UInt32 |
| UA_Subscription_nextSequenceNumber(UA_UInt32 sequenceNumber) { |
| UA_UInt32 nextSequenceNumber = sequenceNumber + 1; |
| if(nextSequenceNumber == 0) |
| nextSequenceNumber = 1; |
| return nextSequenceNumber; |
| } |
| |
| static void |
| publishCallback(UA_Server *server, UA_Subscription *sub) { |
| sub->readyNotifications = sub->notificationQueueSize; |
| UA_Subscription_publish(server, sub); |
| } |
| |
| void |
| UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, "Subscription %u | " |
| "Publish Callback", sub->subscriptionId); |
| /* Dequeue a response */ |
| UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(sub->session); |
| if(pre) { |
| sub->currentLifetimeCount = 0; /* Reset the LifetimeCounter */ |
| } else { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | The publish queue is empty", |
| sub->subscriptionId); |
| ++sub->currentLifetimeCount; |
| |
| if(sub->currentLifetimeCount > sub->lifeTimeCount) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | End of lifetime " |
| "for subscription", sub->subscriptionId); |
| UA_Session_deleteSubscription(server, sub->session, sub->subscriptionId); |
| /* TODO: send a StatusChangeNotification with Bad_Timeout */ |
| return; |
| } |
| } |
| |
| /* If there are several late publish responses... */ |
| if(sub->readyNotifications > sub->notificationQueueSize) |
| sub->readyNotifications = sub->notificationQueueSize; |
| |
| /* Count the available notifications */ |
| UA_UInt32 notifications = sub->readyNotifications; |
| if(!sub->publishingEnabled) |
| notifications = 0; |
| |
| UA_Boolean moreNotifications = false; |
| if(notifications > sub->notificationsPerPublish) { |
| notifications = sub->notificationsPerPublish; |
| moreNotifications = true; |
| } |
| |
| /* Return if no notifications and no keepalive */ |
| if(notifications == 0) { |
| ++sub->currentKeepAliveCount; |
| if(sub->currentKeepAliveCount < sub->maxKeepAliveCount) { |
| if(pre) |
| UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */ |
| return; |
| } |
| UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | Sending a KeepAlive", |
| sub->subscriptionId); |
| } |
| |
| /* We want to send a response. Is the channel open? */ |
| UA_SecureChannel *channel = sub->session->header.channel; |
| if(!channel || !pre) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | Want to send a publish response but can't. " |
| "The subscription is late.", sub->subscriptionId); |
| sub->state = UA_SUBSCRIPTIONSTATE_LATE; |
| if(pre) |
| UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */ |
| return; |
| } |
| |
| /* Prepare the response */ |
| UA_PublishResponse *response = &pre->response; |
| UA_NotificationMessage *message = &response->notificationMessage; |
| UA_NotificationMessageEntry *retransmission = NULL; |
| if(notifications > 0) { |
| if(server->config.enableRetransmissionQueue) { |
| /* Allocate the retransmission entry */ |
| retransmission = (UA_NotificationMessageEntry*)UA_malloc(sizeof(UA_NotificationMessageEntry)); |
| if(!retransmission) { |
| UA_LOG_WARNING_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | Could not allocate memory for retransmission. " |
| "The subscription is late.", sub->subscriptionId); |
| sub->state = UA_SUBSCRIPTIONSTATE_LATE; |
| UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */ |
| return; |
| } |
| } |
| |
| /* Prepare the response */ |
| UA_StatusCode retval = prepareNotificationMessage(server, sub, message, notifications); |
| if(retval != UA_STATUSCODE_GOOD) { |
| UA_LOG_WARNING_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | Could not prepare the notification message. " |
| "The subscription is late.", sub->subscriptionId); |
| /* If the retransmission queue is enabled a retransmission message is allocated */ |
| if(retransmission) |
| UA_free(retransmission); |
| sub->state = UA_SUBSCRIPTIONSTATE_LATE; |
| UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */ |
| return; |
| } |
| } |
| |
| /* <-- The point of no return --> */ |
| |
| /* Adjust the number of ready notifications */ |
| UA_assert(sub->readyNotifications >= notifications); |
| sub->readyNotifications -= notifications; |
| |
| /* Set up the response */ |
| response->responseHeader.timestamp = UA_DateTime_now(); |
| response->subscriptionId = sub->subscriptionId; |
| response->moreNotifications = moreNotifications; |
| message->publishTime = response->responseHeader.timestamp; |
| |
| /* Set sequence number to message. Started at 1 which is given |
| * during creating a new subscription. The 1 is required for |
| * initial publish response with or without an monitored item. */ |
| message->sequenceNumber = sub->nextSequenceNumber; |
| |
| if(notifications > 0) { |
| /* If the retransmission queue is enabled a retransmission message is allocated */ |
| if(retransmission) { |
| /* Put the notification message into the retransmission queue. This |
| * needs to be done here, so that the message itself is included in the |
| * available sequence numbers for acknowledgement. */ |
| retransmission->message = response->notificationMessage; |
| UA_Subscription_addRetransmissionMessage(server, sub, retransmission); |
| } |
| /* Only if a notification was created, the sequence number must be increased. |
| * For a keepalive the sequence number can be reused. */ |
| sub->nextSequenceNumber = UA_Subscription_nextSequenceNumber(sub->nextSequenceNumber); |
| } |
| |
| /* Get the available sequence numbers from the retransmission queue */ |
| size_t available = sub->retransmissionQueueSize; |
| UA_STACKARRAY(UA_UInt32, seqNumbers, available); |
| if(available > 0) { |
| response->availableSequenceNumbers = seqNumbers; |
| response->availableSequenceNumbersSize = available; |
| size_t i = 0; |
| UA_NotificationMessageEntry *nme; |
| TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry) { |
| response->availableSequenceNumbers[i] = nme->message.sequenceNumber; |
| ++i; |
| } |
| } |
| |
| /* Send the response */ |
| UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | Sending out a publish response " |
| "with %u notifications", sub->subscriptionId, |
| (UA_UInt32)notifications); |
| UA_SecureChannel_sendSymmetricMessage(sub->session->header.channel, pre->requestId, |
| UA_MESSAGETYPE_MSG, response, |
| &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); |
| |
| /* Reset subscription state to normal */ |
| sub->state = UA_SUBSCRIPTIONSTATE_NORMAL; |
| sub->currentKeepAliveCount = 0; |
| |
| /* Free the response */ |
| UA_Array_delete(response->results, response->resultsSize, &UA_TYPES[UA_TYPES_UINT32]); |
| UA_free(pre); /* No need for UA_PublishResponse_deleteMembers */ |
| |
| /* Repeat sending responses if there are more notifications to send */ |
| if(moreNotifications) |
| UA_Subscription_publish(server, sub); |
| } |
| |
| UA_Boolean |
| UA_Subscription_reachedPublishReqLimit(UA_Server *server, UA_Session *session) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Reached number of publish request limit"); |
| |
| /* Dequeue a response */ |
| UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(session); |
| |
| /* Cannot publish without a response */ |
| if(!pre) { |
| UA_LOG_FATAL_SESSION(&server->config.logger, session, "No publish requests available"); |
| return false; |
| } |
| |
| /* <-- The point of no return --> */ |
| |
| UA_PublishResponse *response = &pre->response; |
| UA_NotificationMessage *message = &response->notificationMessage; |
| |
| /* Set up the response. Note that this response has no related subscription id */ |
| response->responseHeader.timestamp = UA_DateTime_now(); |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS; |
| response->subscriptionId = 0; |
| response->moreNotifications = false; |
| message->publishTime = response->responseHeader.timestamp; |
| message->sequenceNumber = 0; |
| response->availableSequenceNumbersSize = 0; |
| |
| /* Send the response */ |
| UA_LOG_DEBUG_SESSION(&server->config.logger, session, |
| "Sending out a publish response triggered by too many publish requests"); |
| UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId, |
| UA_MESSAGETYPE_MSG, response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); |
| |
| /* Free the response */ |
| UA_Array_delete(response->results, response->resultsSize, &UA_TYPES[UA_TYPES_UINT32]); |
| UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */ |
| |
| return true; |
| } |
| |
| UA_StatusCode |
| Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, |
| "Subscription %u | Register subscription " |
| "publishing callback", sub->subscriptionId); |
| |
| if(sub->publishCallbackIsRegistered) |
| return UA_STATUSCODE_GOOD; |
| |
| UA_StatusCode retval = |
| UA_Server_addRepeatedCallback(server, (UA_ServerCallback)publishCallback, |
| sub, (UA_UInt32)sub->publishingInterval, &sub->publishCallbackId); |
| if(retval != UA_STATUSCODE_GOOD) |
| return retval; |
| |
| sub->publishCallbackIsRegistered = true; |
| return UA_STATUSCODE_GOOD; |
| } |
| |
| void |
| Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub) { |
| UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, "Subscription %u | " |
| "Unregister subscription publishing callback", sub->subscriptionId); |
| |
| if(!sub->publishCallbackIsRegistered) |
| return; |
| |
| UA_Server_removeRepeatedCallback(server, sub->publishCallbackId); |
| sub->publishCallbackIsRegistered = false; |
| } |
| |
| /* When the session has publish requests stored but the last subscription is |
| * deleted... Send out empty responses */ |
| void |
| UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_Session *session) { |
| /* No session or there are remaining subscriptions */ |
| if(!session || LIST_FIRST(&session->serverSubscriptions)) |
| return; |
| |
| /* Send a response for every queued request */ |
| UA_PublishResponseEntry *pre; |
| while((pre = UA_Session_dequeuePublishReq(session))) { |
| UA_PublishResponse *response = &pre->response; |
| response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION; |
| response->responseHeader.timestamp = UA_DateTime_now(); |
| UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId, UA_MESSAGETYPE_MSG, |
| response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); |
| UA_PublishResponse_deleteMembers(response); |
| UA_free(pre); |
| } |
| } |
| |
| #endif /* UA_ENABLE_SUBSCRIPTIONS */ |