/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. 
 *
 *    Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
 *    Copyright 2016-2017 (c) Florian Palm
 *    Copyright 2015 (c) Chris Iatrou
 *    Copyright 2015-2016 (c) Sten Grüner
 *    Copyright 2015-2016 (c) Oleksiy Vasylyev
 *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
 *    Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
 *    Copyright 2017 (c) Mattias Bornhager
 *    Copyright 2017 (c) Henrik Norrman
 *    Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
 *    Copyright 2018 (c) Fabian Arndt, Root-Core
 */

#include "ua_server_internal.h"
#include "ua_services.h"
#include "ua_subscription.h"

#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */

static UA_StatusCode
setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
                        UA_Double requestedPublishingInterval,
                        UA_UInt32 requestedLifetimeCount,
                        UA_UInt32 requestedMaxKeepAliveCount,
                        UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
    /* deregister the callback if required */
    Subscription_unregisterPublishCallback(server, subscription);

    /* re-parameterize the subscription */
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
                               requestedPublishingInterval, subscription->publishingInterval);
    /* check for nan*/
    if(requestedPublishingInterval != requestedPublishingInterval)
        subscription->publishingInterval = server->config.publishingIntervalLimits.min;
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
                               requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
                               requestedLifetimeCount, subscription->lifeTimeCount);
    if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount)
        subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount;
    subscription->notificationsPerPublish = maxNotificationsPerPublish;
    if(maxNotificationsPerPublish == 0 ||
       maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
        subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
    subscription->priority = priority;

    UA_StatusCode retval = Subscription_registerPublishCallback(server, subscription);
    if(retval != UA_STATUSCODE_GOOD) {
        UA_LOG_DEBUG_SESSION(&server->config.logger, subscription->session,
                             "Subscription %u | Could not register publish callback with error code %s",
                             subscription->subscriptionId, UA_StatusCode_name(retval));
    }
    return retval;
}

void
Service_CreateSubscription(UA_Server *server, UA_Session *session,
                           const UA_CreateSubscriptionRequest *request,
                           UA_CreateSubscriptionResponse *response) {
    UA_LOCK_ASSERT(server->serviceMutex, 1);

    /* Check limits for the number of subscriptions */
    if(((server->config.maxSubscriptions != 0) &&
        (server->numSubscriptions >= server->config.maxSubscriptions)) ||
       ((server->config.maxSubscriptionsPerSession != 0) &&
        (session->numSubscriptions >= server->config.maxSubscriptionsPerSession))) {
        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS;
        return;
    }

    /* Create the subscription */
    UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId);
    if(!newSubscription) {
        UA_LOG_DEBUG_SESSION(&server->config.logger, session,
                             "Processing CreateSubscriptionRequest failed");
        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
        return;
    }

    UA_Session_addSubscription(server, session, newSubscription); /* Also assigns the subscription id */

    /* Set the subscription parameters */
    newSubscription->publishingEnabled = request->publishingEnabled;
    UA_StatusCode retval = setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
                                                   request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
                                                   request->maxNotificationsPerPublish, request->priority);

    if(retval != UA_STATUSCODE_GOOD) {
        response->responseHeader.serviceResult = retval;
        return;
    }

    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount; /* set settings first */

    /* Prepare the response */
    response->subscriptionId = newSubscription->subscriptionId;
    response->revisedPublishingInterval = newSubscription->publishingInterval;
    response->revisedLifetimeCount = newSubscription->lifeTimeCount;
    response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;

    UA_LOG_INFO_SESSION(&server->config.logger, session, "Subscription %u | "
                        "Created the Subscription with a publishing interval of %.2f ms",
                        response->subscriptionId, newSubscription->publishingInterval);
}

void
Service_ModifySubscription(UA_Server *server, UA_Session *session,
                           const UA_ModifySubscriptionRequest *request,
                           UA_ModifySubscriptionResponse *response) {
    UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing ModifySubscriptionRequest");
    UA_LOCK_ASSERT(server->serviceMutex, 1);

    UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId);
    if(!sub) {
        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
        return;
    }

    UA_StatusCode retval = setSubscriptionSettings(server, sub, request->requestedPublishingInterval,
                                                   request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
                                                   request->maxNotificationsPerPublish, request->priority);

    if(retval != UA_STATUSCODE_GOOD) {
        response->responseHeader.serviceResult = retval;
        return;
    }

    sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
    response->revisedPublishingInterval = sub->publishingInterval;
    response->revisedLifetimeCount = sub->lifeTimeCount;
    response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
}

static void
Operation_SetPublishingMode(UA_Server *Server, UA_Session *session,
                            const UA_Boolean *publishingEnabled, const UA_UInt32 *subscriptionId,
                            UA_StatusCode *result) {
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId);
    if(!sub) {
        *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
        return;
    }

    sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
    sub->publishingEnabled = *publishingEnabled; /* Set the publishing mode */
}

void
Service_SetPublishingMode(UA_Server *server, UA_Session *session,
                          const UA_SetPublishingModeRequest *request,
                          UA_SetPublishingModeResponse *response) {
    UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing SetPublishingModeRequest");
    UA_LOCK_ASSERT(server->serviceMutex, 1);

    UA_Boolean publishingEnabled = request->publishingEnabled; /* request is const */
    response->responseHeader.serviceResult =
        UA_Server_processServiceOperations(server, session, (UA_ServiceOperation)Operation_SetPublishingMode,
                                           &publishingEnabled,
                                           &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32],
                                           &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
}

/* TODO: Unify with senderror in ua_server_binary.c */
static void
subscriptionSendError(UA_SecureChannel *channel, UA_UInt32 requestHandle,
                      UA_UInt32 requestId, UA_StatusCode error) {
    UA_PublishResponse err_response;
    UA_PublishResponse_init(&err_response);
    err_response.responseHeader.requestHandle = requestHandle;
    err_response.responseHeader.timestamp = UA_DateTime_now();
    err_response.responseHeader.serviceResult = error;
    UA_SecureChannel_sendSymmetricMessage(channel, requestId, UA_MESSAGETYPE_MSG,
                                          &err_response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
}

void
Service_Publish(UA_Server *server, UA_Session *session,
                const UA_PublishRequest *request, UA_UInt32 requestId) {
    UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing PublishRequest");

    /* Return an error if the session has no subscription */
    if(LIST_EMPTY(&session->serverSubscriptions)) {
        subscriptionSendError(session->header.channel, request->requestHeader.requestHandle,
                              requestId, UA_STATUSCODE_BADNOSUBSCRIPTION);
        return;
    }

    /* Handle too many subscriptions to free resources before trying to allocate
     * resources for the new publish request. If the limit has been reached the
     * oldest publish request shall be responded */
    if((server->config.maxPublishReqPerSession != 0) &&
       (session->numPublishReq >= server->config.maxPublishReqPerSession)) {
        if(!UA_Subscription_reachedPublishReqLimit(server, session)) {
            subscriptionSendError(session->header.channel, requestId,
                                  request->requestHeader.requestHandle,
                                  UA_STATUSCODE_BADINTERNALERROR);
            return;
        }
    }

    /* Allocate the response to store it in the retransmission queue */
    UA_PublishResponseEntry *entry = (UA_PublishResponseEntry *)
        UA_malloc(sizeof(UA_PublishResponseEntry));
    if(!entry) {
        subscriptionSendError(session->header.channel, requestId,
                              request->requestHeader.requestHandle,
                              UA_STATUSCODE_BADOUTOFMEMORY);
        return;
    }

    /* Prepare the response */
    entry->requestId = requestId;
    UA_PublishResponse *response = &entry->response;
    UA_PublishResponse_init(response);
    response->responseHeader.requestHandle = request->requestHeader.requestHandle;

    /* Allocate the results array to acknowledge the acknowledge */
    if(request->subscriptionAcknowledgementsSize > 0) {
        response->results = (UA_StatusCode *)
            UA_Array_new(request->subscriptionAcknowledgementsSize,
                         &UA_TYPES[UA_TYPES_STATUSCODE]);
        if(!response->results) {
            UA_free(entry);
            subscriptionSendError(session->header.channel, requestId,
                                  request->requestHeader.requestHandle,
                                  UA_STATUSCODE_BADOUTOFMEMORY);
            return;
        }
        response->resultsSize = request->subscriptionAcknowledgementsSize;
    }

    /* Delete Acknowledged Subscription Messages */
    for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; ++i) {
        UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i];
        UA_Subscription *sub = UA_Session_getSubscriptionById(session, ack->subscriptionId);
        if(!sub) {
            response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
            UA_LOG_DEBUG_SESSION(&server->config.logger, session,
                                 "Cannot process acknowledgements subscription %u",
                                 ack->subscriptionId);
            continue;
        }
        /* Remove the acked transmission from the retransmission queue */
        response->results[i] = UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber);
    }

    /* Queue the publish response. It will be dequeued in a repeated publish
     * callback. This can also be triggered right now for a late
     * subscription. */
    UA_Session_queuePublishReq(session, entry, false);
    UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Queued a publication message");

    /* If there are late subscriptions, the new publish request is used to
     * answer them immediately. However, a single subscription that generates
     * many notifications must not "starve" other late subscriptions. Therefore
     * we keep track of the last subscription that got preferential treatment.
     * We start searching for late subscriptions **after** the last one. */

    UA_Subscription *immediate = NULL;
    if(session->lastSeenSubscriptionId > 0) {
        LIST_FOREACH(immediate, &session->serverSubscriptions, listEntry) {
            if(immediate->subscriptionId == session->lastSeenSubscriptionId) {
                immediate = LIST_NEXT(immediate, listEntry);
                break;
            }
        }
    }

    /* If no entry was found, start at the beginning and don't restart  */
    UA_Boolean found = false;
    if(!immediate)
        immediate = LIST_FIRST(&session->serverSubscriptions);
    else
        found = true;

 repeat:
    while(immediate) {
        if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) {
            session->lastSeenSubscriptionId = immediate->subscriptionId;
            UA_LOG_DEBUG_SESSION(&server->config.logger, session,
                                 "Subscription %u | Response on a late subscription",
                                 immediate->subscriptionId);
            UA_Subscription_publish(server, immediate);
            return;
        }
        immediate = LIST_NEXT(immediate, listEntry);
    }

    /* Restart at the beginning of the list */
    if(found) {
        immediate = LIST_FIRST(&session->serverSubscriptions);
        found = false;
        goto repeat;
    }

    /* No late subscription this time */
    session->lastSeenSubscriptionId = 0;
}

static void
Operation_DeleteSubscription(UA_Server *server, UA_Session *session, void *_,
                             const UA_UInt32 *subscriptionId, UA_StatusCode *result) {
    *result = UA_Session_deleteSubscription(server, session, *subscriptionId);
    if(*result == UA_STATUSCODE_GOOD) {
        UA_LOG_DEBUG_SESSION(&server->config.logger, session,
                             "Subscription %u | Subscription deleted",
                             *subscriptionId);
    } else {
        UA_LOG_DEBUG_SESSION(&server->config.logger, session,
                             "Deleting Subscription with Id %u failed with error code %s",
                             *subscriptionId, UA_StatusCode_name(*result));
    }
}

void
Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
                            const UA_DeleteSubscriptionsRequest *request,
                            UA_DeleteSubscriptionsResponse *response) {
    UA_LOG_DEBUG_SESSION(&server->config.logger, session,
                         "Processing DeleteSubscriptionsRequest");
    UA_LOCK_ASSERT(server->serviceMutex, 1);

    response->responseHeader.serviceResult =
        UA_Server_processServiceOperations(server, session,
                  (UA_ServiceOperation)Operation_DeleteSubscription, NULL,
                  &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32],
                  &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);

    /* The session has at least one subscription */
    if(LIST_FIRST(&session->serverSubscriptions))
        return;

    /* Send remaining publish responses if the last subscription was removed */
    UA_Subscription_answerPublishRequestsNoSubscription(server, session);
}

void
Service_Republish(UA_Server *server, UA_Session *session,
                  const UA_RepublishRequest *request,
                  UA_RepublishResponse *response) {
    UA_LOG_DEBUG_SESSION(&server->config.logger, session,
                         "Processing RepublishRequest");
    UA_LOCK_ASSERT(server->serviceMutex, 1);

    /* Get the subscription */
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId);
    if(!sub) {
        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
        return;
    }

    /* Reset the subscription lifetime */
    sub->currentLifetimeCount = 0;

    /* Find the notification in the retransmission queue  */
    UA_NotificationMessageEntry *entry;
    TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
        if(entry->message.sequenceNumber == request->retransmitSequenceNumber)
            break;
    }
    if(!entry) {
        response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE;
        return;
    }

    response->responseHeader.serviceResult =
        UA_NotificationMessage_copy(&entry->message, &response->notificationMessage);
}

#endif /* UA_ENABLE_SUBSCRIPTIONS */
