blob: b8a04263947caff9bfa5d5f342b25151c48335d6 [file] [log] [blame]
/*
* FreeRTOS MQTT V2.1.1
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://aws.amazon.com/freertos
* http://www.FreeRTOS.org
*/
/**
* @file iot_mqtt_agent.c
* @brief MQTT Agent implementation. Provides backwards compatibility between
* MQTT v2 and MQTT v1.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <string.h>
/* FreeRTOS includes. */
#include "FreeRTOS.h"
#include "semphr.h"
/* MQTT v1 includes. */
#include "iot_mqtt_agent.h"
#include "iot_mqtt_agent_config.h"
#include "iot_mqtt_agent_config_defaults.h"
/* MQTT v2 include. */
#include "iot_mqtt.h"
/* Platform network include. */
#include "platform/iot_network_freertos.h"
/*-----------------------------------------------------------*/
/**
* @brief Converts FreeRTOS ticks to milliseconds.
*/
#define mqttTICKS_TO_MS( xTicks ) ( xTicks * 1000 / configTICK_RATE_HZ )
/*-----------------------------------------------------------*/
/**
* @brief Stores data to convert between the MQTT v1 subscription callback
* and the MQTT v2 subscription callback.
*/
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
typedef struct MQTTCallback
{
BaseType_t xInUse; /**< Whether this instance is in-use. */
MQTTPublishCallback_t xFunction; /**< MQTT v1 callback function. */
void * pvParameter; /**< Parameter to xFunction. */
uint16_t usTopicFilterLength; /**< Length of pcTopicFilter. */
char pcTopicFilter[ mqttconfigSUBSCRIPTION_MANAGER_MAX_TOPIC_LENGTH ]; /**< Topic filter. */
} MQTTCallback_t;
#endif
/**
* @brief Stores data on an active MQTT connection.
*/
typedef struct MQTTConnection
{
IotMqttConnection_t xMQTTConnection; /**< MQTT v2 connection handle. */
MQTTAgentCallback_t pxCallback; /**< MQTT v1 global callback. */
void * pvUserData; /**< Parameter to pxCallback. */
StaticSemaphore_t xConnectionMutex; /**< Protects from concurrent accesses. */
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
MQTTCallback_t xCallbacks /**< Conversion table of MQTT v1 to MQTT v2 subscription callbacks. */
[ mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS ];
#endif
} MQTTConnection_t;
/*-----------------------------------------------------------*/
/**
* @brief Convert an MQTT v2 return code to an MQTT v1 return code.
*
* @param[in] xMqttStatus The MQTT v2 return code.
*
* @return An equivalent MQTT v1 return code.
*/
static inline MQTTAgentReturnCode_t prvConvertReturnCode( IotMqttError_t xMqttStatus );
/**
* @brief Wraps an MQTT v1 publish callback.
*
* @param[in] pvParameter The MQTT connection.
* @param[in] pxPublish Information about the incoming publish.
*/
static void prvPublishCallbackWrapper( void * pvParameter,
IotMqttCallbackParam_t * const pxPublish );
/**
* @brief Wraps an MQTT v1 disconnect callback.
*
* @param[in] pvCallbackContext The MQTT connection.
* @param[in] pxDisconnect Information about the disconnect.
*/
static void prvDisconnectCallbackWrapper( void * pvParameter,
IotMqttCallbackParam_t * pxDisconnect );
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
/**
* @brief Store an MQTT v1 callback in the conversion table.
*
* @param[in] pxConnection Where to store the callback.
* @param[in] pcTopicFilter Topic filter to store.
* @param[in] usTopicFilterLength Length of pcTopicFilter.
* @param[in] xCallback MQTT v1 callback to store.
* @param[in] pvParameter Parameter to xCallback.
*
* @return pdPASS if the callback was successfully stored; pdFAIL otherwise.
*/
static BaseType_t prvStoreCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength,
MQTTPublishCallback_t xCallback,
void * pvParameter );
/**
* @brief Search the callback conversion table for the given topic filter.
*
* @param[in] pxConnection The connection containing the conversion table.
* @param[in] pcTopicFilter The topic filter to search for.
* @param[in] usTopicFilterLength The length of pcTopicFilter.
*
* @return A pointer to the callback entry if found; NULL otherwise.
* @note This function should be called with pxConnection->xConnectionMutex
* locked.
*/
static MQTTCallback_t * prvFindCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength );
/**
* @brief Remove a topic filter from the callback conversion table.
*
* @param[in] pxConnection The connection containing the conversion table.
* @param[in] pcTopicFilter The topic filter to remove.
* @param[in] usTopicFilterLength The length of pcTopic.
*/
static void prvRemoveCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength );
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
/*-----------------------------------------------------------*/
/**
* @brief The number of available MQTT brokers, controlled by the constant
* mqttconfigMAX_BROKERS;
*/
static UBaseType_t uxAvailableBrokers = mqttconfigMAX_BROKERS;
/*-----------------------------------------------------------*/
static inline MQTTAgentReturnCode_t prvConvertReturnCode( IotMqttError_t xMqttStatus )
{
MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
switch( xMqttStatus )
{
case IOT_MQTT_SUCCESS:
case IOT_MQTT_STATUS_PENDING:
xStatus = eMQTTAgentSuccess;
break;
case IOT_MQTT_TIMEOUT:
xStatus = eMQTTAgentTimeout;
break;
default:
xStatus = eMQTTAgentFailure;
break;
}
return xStatus;
}
/*-----------------------------------------------------------*/
static void prvPublishCallbackWrapper( void * pvParameter,
IotMqttCallbackParam_t * const pxPublish )
{
BaseType_t xStatus = pdPASS;
size_t xBufferSize = 0;
uint8_t * pucMqttBuffer = NULL;
MQTTBool_t xCallbackReturn = eMQTTFalse;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) pvParameter;
MQTTAgentCallbackParams_t xPublishData = { .xMQTTEvent = eMQTTAgentPublish };
/* Calculate the size of the MQTT buffer that must be allocated. */
if( xStatus == pdPASS )
{
xBufferSize = pxPublish->u.message.info.topicNameLength +
pxPublish->u.message.info.payloadLength;
/* Check for overflow. */
if( ( xBufferSize < pxPublish->u.message.info.topicNameLength ) ||
( xBufferSize < pxPublish->u.message.info.payloadLength ) )
{
mqttconfigDEBUG_LOG( ( "Incoming PUBLISH message and topic name length too large.\r\n" ) );
xStatus = pdFAIL;
}
}
/* Allocate an MQTT buffer for the callback. */
if( xStatus == pdPASS )
{
pucMqttBuffer = pvPortMalloc( xBufferSize );
if( pucMqttBuffer == NULL )
{
mqttconfigDEBUG_LOG( ( "Failed to allocate memory for MQTT buffer.\r\n" ) );
xStatus = pdFAIL;
}
else
{
/* Copy the topic name and payload. The topic name and payload must be
* copied in case the user decides to take ownership of the MQTT buffer.
* The original buffer containing the MQTT topic name and payload may
* contain further unprocessed packets and must remain property of the
* MQTT library. Therefore, the topic name and payload are copied into
* another buffer for the user. */
( void ) memcpy( pucMqttBuffer,
pxPublish->u.message.info.pTopicName,
pxPublish->u.message.info.topicNameLength );
( void ) memcpy( pucMqttBuffer + pxPublish->u.message.info.topicNameLength,
pxPublish->u.message.info.pPayload,
pxPublish->u.message.info.payloadLength );
/* Set the members of the callback parameter. */
xPublishData.xMQTTEvent = eMQTTAgentPublish;
xPublishData.u.xPublishData.pucTopic = pucMqttBuffer;
xPublishData.u.xPublishData.usTopicLength = pxPublish->u.message.info.topicNameLength;
xPublishData.u.xPublishData.pvData = pucMqttBuffer + pxPublish->u.message.info.topicNameLength;
xPublishData.u.xPublishData.ulDataLength = ( uint32_t ) pxPublish->u.message.info.payloadLength;
xPublishData.u.xPublishData.xQos = ( MQTTQoS_t ) pxPublish->u.message.info.qos;
xPublishData.u.xPublishData.xBuffer = pucMqttBuffer;
}
}
if( xStatus == pdPASS )
{
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
/* When subscription management is enabled, search for a matching subscription. */
MQTTCallback_t * pxCallbackEntry = prvFindCallback( pxConnection,
pxPublish->u.message.pTopicFilter,
pxPublish->u.message.topicFilterLength );
/* Check if a matching MQTT v1 subscription was found. */
if( pxCallbackEntry != NULL )
{
/* Invoke the topic-specific callback if it exists. */
if( pxCallbackEntry->xFunction != NULL )
{
xCallbackReturn = pxCallbackEntry->xFunction( pxCallbackEntry->pvParameter,
&( xPublishData.u.xPublishData ) );
}
else
{
/* Otherwise, invoke the global callback. */
if( pxConnection->pxCallback != NULL )
{
xCallbackReturn = ( MQTTBool_t ) pxConnection->pxCallback( pxConnection->pvUserData,
&xPublishData );
}
}
}
#else /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
/* When subscription management is disabled, invoke the global callback
* if one exists. */
/* When subscription management is disabled, the topic filter must be "#". */
mqttconfigASSERT( *( xPublish.message.pTopicFilter ) == '#' );
mqttconfigASSERT( xPublish.message.topicFilterLength == 1 );
if( pxConnection->pxCallback != NULL )
{
xCallbackReturn = ( MQTTBool_t ) pxConnection->pxCallback( pxConnection->pvUserData,
&xPublishData );
}
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
}
/* Free the MQTT buffer if the user did not take ownership of it. */
if( ( xCallbackReturn == eMQTTFalse ) && ( pucMqttBuffer != NULL ) )
{
vPortFree( pucMqttBuffer );
}
}
/*-----------------------------------------------------------*/
static void prvDisconnectCallbackWrapper( void * pvParameter,
IotMqttCallbackParam_t * pxDisconnect )
{
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) pvParameter;
MQTTAgentCallbackParams_t xCallbackParams = { .xMQTTEvent = eMQTTAgentDisconnect };
( void ) pxDisconnect;
/* This function should only be called if a callback was set. */
mqttconfigASSERT( pxConnection->pxCallback != NULL );
/* Invoke the MQTT v1 callback. Ignore the return value. */
pxConnection->pxCallback( pxConnection->pvUserData,
&xCallbackParams );
}
/*-----------------------------------------------------------*/
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
static BaseType_t prvStoreCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength,
MQTTPublishCallback_t xCallback,
void * pvParameter )
{
MQTTCallback_t * pxCallback = NULL;
BaseType_t xStatus = pdFAIL, i = 0;
/* Prevent other tasks from modifying stored callbacks while this function
* runs. */
if( xSemaphoreTake( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ),
portMAX_DELAY ) == pdTRUE )
{
/* Check if the topic filter already has an entry. */
pxCallback = prvFindCallback( pxConnection, pcTopicFilter, usTopicFilterLength );
if( pxCallback == NULL )
{
/* If no entry was found, find a free entry. */
for( i = 0; i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; i++ )
{
if( pxConnection->xCallbacks[ i ].xInUse == pdFALSE )
{
pxConnection->xCallbacks[ i ].xInUse = pdTRUE;
pxCallback = &( pxConnection->xCallbacks[ i ] );
break;
}
}
}
/* Set the members of the callback entry. */
if( i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS )
{
pxCallback->pvParameter = pvParameter;
pxCallback->usTopicFilterLength = usTopicFilterLength;
pxCallback->xFunction = xCallback;
( void ) strncpy( pxCallback->pcTopicFilter, pcTopicFilter, usTopicFilterLength );
xStatus = pdPASS;
}
( void ) xSemaphoreGive( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ) );
}
return xStatus;
}
/*-----------------------------------------------------------*/
static MQTTCallback_t * prvFindCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength )
{
BaseType_t i = 0;
MQTTCallback_t * pxResult = NULL;
/* Search the callback conversion table for the topic filter. */
for( i = 0; i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; i++ )
{
if( ( pxConnection->xCallbacks[ i ].usTopicFilterLength == usTopicFilterLength ) &&
( strncmp( pxConnection->xCallbacks[ i ].pcTopicFilter,
pcTopicFilter,
usTopicFilterLength ) == 0 ) )
{
pxResult = &( pxConnection->xCallbacks[ i ] );
break;
}
}
return pxResult;
}
/*-----------------------------------------------------------*/
static void prvRemoveCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength )
{
MQTTCallback_t * pxCallback = NULL;
/* Prevent other tasks from modifying stored callbacks while this function
* runs. */
if( xSemaphoreTake( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ),
portMAX_DELAY ) == pdTRUE )
{
/* Find the given topic filter. */
pxCallback = prvFindCallback( pxConnection, pcTopicFilter, usTopicFilterLength );
if( pxCallback != NULL )
{
/* Clear the callback entry. */
mqttconfigASSERT( pxCallback->xInUse == pdTRUE );
( void ) memset( pxCallback, 0x00, sizeof( MQTTCallback_t ) );
}
( void ) xSemaphoreGive( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ) );
}
}
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
/*-----------------------------------------------------------*/
IotMqttConnection_t MQTT_AGENT_Getv2Connection( MQTTAgentHandle_t xMQTTHandle )
{
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
return pxConnection->xMQTTConnection;
}
/*-----------------------------------------------------------*/
BaseType_t MQTT_AGENT_Init( void )
{
BaseType_t xStatus = pdFALSE;
/* Call the initialization function of MQTT v2. */
if( IotMqtt_Init() == IOT_MQTT_SUCCESS )
{
xStatus = pdTRUE;
}
return xStatus;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Create( MQTTAgentHandle_t * const pxMQTTHandle )
{
MQTTConnection_t * pxNewConnection = NULL;
MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
/* Check how many brokers are available; fail if all brokers are in use. */
taskENTER_CRITICAL();
{
if( uxAvailableBrokers == 0 )
{
xStatus = eMQTTAgentFailure;
}
else
{
uxAvailableBrokers--;
mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
}
}
taskEXIT_CRITICAL();
/* Allocate memory for an MQTT connection. */
if( xStatus == eMQTTAgentSuccess )
{
pxNewConnection = pvPortMalloc( sizeof( MQTTConnection_t ) );
if( pxNewConnection == NULL )
{
xStatus = eMQTTAgentFailure;
taskENTER_CRITICAL();
{
uxAvailableBrokers++;
mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
}
taskEXIT_CRITICAL();
}
else
{
( void ) memset( pxNewConnection, 0x00, sizeof( MQTTConnection_t ) );
pxNewConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
}
}
/* Create the connection mutex and set the output parameter. */
if( xStatus == eMQTTAgentSuccess )
{
( void ) xSemaphoreCreateMutexStatic( &( pxNewConnection->xConnectionMutex ) );
*pxMQTTHandle = ( MQTTAgentHandle_t ) pxNewConnection;
}
return xStatus;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Delete( MQTTAgentHandle_t xMQTTHandle )
{
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
/* Clean up any allocated MQTT or network resources. */
if( pxConnection->xMQTTConnection != IOT_MQTT_CONNECTION_INITIALIZER )
{
IotMqtt_Disconnect( pxConnection->xMQTTConnection, IOT_MQTT_FLAG_CLEANUP_ONLY );
pxConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
}
/* Free memory used by the MQTT connection. */
vPortFree( pxConnection );
/* Increment the number of available brokers. */
taskENTER_CRITICAL();
{
uxAvailableBrokers++;
mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
}
taskEXIT_CRITICAL();
return eMQTTAgentSuccess;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Connect( MQTTAgentHandle_t xMQTTHandle,
const MQTTAgentConnectParams_t * const pxConnectParams,
TickType_t xTimeoutTicks )
{
MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
IotNetworkServerInfo_t xServerInfo = { 0 };
IotNetworkCredentials_t xCredentials = AWS_IOT_NETWORK_CREDENTIALS_AFR_INITIALIZER, * pxCredentials = NULL;
IotMqttNetworkInfo_t xNetworkInfo = IOT_MQTT_NETWORK_INFO_INITIALIZER;
IotMqttConnectInfo_t xMqttConnectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;
/* Copy the global callback and parameter. */
pxConnection->pxCallback = pxConnectParams->pxCallback;
pxConnection->pvUserData = pxConnectParams->pvUserData;
/* Set the TLS info for a secured connection. */
if( ( pxConnectParams->xSecuredConnection == pdTRUE ) ||
( ( pxConnectParams->xFlags & mqttagentREQUIRE_TLS ) == mqttagentREQUIRE_TLS ) )
{
pxCredentials = &xCredentials;
/* Set the server certificate. Other credentials are set by the initializer. */
xCredentials.pRootCa = pxConnectParams->pcCertificate;
xCredentials.rootCaSize = ( size_t ) pxConnectParams->ulCertificateSize;
/* Disable ALPN if requested. */
if( ( pxConnectParams->xFlags & mqttagentUSE_AWS_IOT_ALPN_443 ) == 0 )
{
xCredentials.pAlpnProtos = NULL;
}
/* Disable SNI if requested. */
if( ( pxConnectParams->xURLIsIPAddress == pdTRUE ) ||
( ( pxConnectParams->xFlags & mqttagentURL_IS_IP_ADDRESS ) == mqttagentURL_IS_IP_ADDRESS ) )
{
xCredentials.disableSni = true;
}
}
/* Set the server info. */
xServerInfo.pHostName = pxConnectParams->pcURL;
xServerInfo.port = pxConnectParams->usPort;
/* Set the members of the network info. */
xNetworkInfo.createNetworkConnection = true;
xNetworkInfo.u.setup.pNetworkServerInfo = &xServerInfo;
xNetworkInfo.u.setup.pNetworkCredentialInfo = pxCredentials;
xNetworkInfo.pNetworkInterface = IOT_NETWORK_INTERFACE_AFR;
if( pxConnectParams->pxCallback != NULL )
{
xNetworkInfo.disconnectCallback.function = prvDisconnectCallbackWrapper;
xNetworkInfo.disconnectCallback.pCallbackContext = pxConnection;
}
/* Set the members of the MQTT connect info. */
xMqttConnectInfo.awsIotMqttMode = true;
xMqttConnectInfo.cleanSession = true;
xMqttConnectInfo.pClientIdentifier = ( const char * ) ( pxConnectParams->pucClientId );
xMqttConnectInfo.clientIdentifierLength = pxConnectParams->usClientIdLength;
xMqttConnectInfo.keepAliveSeconds = mqttconfigKEEP_ALIVE_INTERVAL_SECONDS;
#if SSS_USE_FTR_FILE
xMqttConnectInfo.awsIotMqttMode = false;
xMqttConnectInfo.pUserName = (const char*)( pxConnectParams->cUserName );
xMqttConnectInfo.userNameLength = pxConnectParams->uUsernamelength;
xMqttConnectInfo.pPassword = (const char*)( pxConnectParams->p_password );
xMqttConnectInfo.passwordLength = pxConnectParams->passwordlength;
#endif
/* Call MQTT v2's CONNECT function. */
xMqttStatus = IotMqtt_Connect( &xNetworkInfo,
&xMqttConnectInfo,
mqttTICKS_TO_MS( xTimeoutTicks ),
&( pxConnection->xMQTTConnection ) );
xStatus = prvConvertReturnCode( xMqttStatus );
/* Add a subscription to "#" to support the global callback when subscription
* manager is disabled. */
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 0 )
IotMqttSubscription_t xGlobalSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
IotMqttReference_t xGlobalSubscriptionRef = IOT_MQTT_REFERENCE_INITIALIZER;
if( xStatus == eMQTTAgentSuccess )
{
xGlobalSubscription.pTopicFilter = "#";
xGlobalSubscription.topicFilterLength = 1;
xGlobalSubscription.qos = 0;
xGlobalSubscription.callback.param1 = pxConnection;
xGlobalSubscription.callback.function = prvPublishCallbackWrapper;
xMqttStatus = IotMqtt_Subscribe( pxConnection->xMQTTConnection,
&xGlobalSubscription,
1,
IOT_MQTT_FLAG_WAITABLE,
NULL,
&xGlobalSubscriptionRef );
xStatus = prvConvertReturnCode( xMqttStatus );
}
/* Wait for the subscription to "#" to complete. */
if( xStatus == eMQTTAgentSuccess )
{
xMqttStatus = IotMqtt_Wait( xGlobalSubscriptionRef,
mqttTICKS_TO_MS( xTimeoutTicks ) );
xStatus = prvConvertReturnCode( xMqttStatus );
}
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
return xStatus;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Disconnect( MQTTAgentHandle_t xMQTTHandle,
TickType_t xTimeoutTicks )
{
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
/* MQTT v2's DISCONNECT function does not have a timeout argument. */
( void ) xTimeoutTicks;
/* Check that the connection is established. */
if( pxConnection->xMQTTConnection != IOT_MQTT_CONNECTION_INITIALIZER )
{
/* Call MQTT v2's DISCONNECT function. */
IotMqtt_Disconnect( pxConnection->xMQTTConnection,
0 );
pxConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
}
return eMQTTAgentSuccess;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Subscribe( MQTTAgentHandle_t xMQTTHandle,
const MQTTAgentSubscribeParams_t * const pxSubscribeParams,
TickType_t xTimeoutTicks )
{
MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
IotMqttSubscription_t xSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
/* Store the topic filter if subscription management is enabled. */
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
/* Check topic filter length. */
if( pxSubscribeParams->usTopicLength > mqttconfigSUBSCRIPTION_MANAGER_MAX_TOPIC_LENGTH )
{
xStatus = eMQTTAgentFailure;
}
/* Store the subscription. */
if( prvStoreCallback( pxConnection,
( const char * ) pxSubscribeParams->pucTopic,
pxSubscribeParams->usTopicLength,
pxSubscribeParams->pxPublishCallback,
pxSubscribeParams->pvPublishCallbackContext ) == pdFAIL )
{
xStatus = eMQTTAgentFailure;
}
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
/* Call MQTT v2 blocking SUBSCRIBE function. */
if( xStatus == eMQTTAgentSuccess )
{
/* Set the members of the MQTT subscription. */
xSubscription.pTopicFilter = ( const char * ) ( pxSubscribeParams->pucTopic );
xSubscription.topicFilterLength = pxSubscribeParams->usTopicLength;
xSubscription.qos = ( IotMqttQos_t ) pxSubscribeParams->xQoS;
xSubscription.callback.pCallbackContext = pxConnection;
xSubscription.callback.function = prvPublishCallbackWrapper;
xMqttStatus = IotMqtt_TimedSubscribe( pxConnection->xMQTTConnection,
&xSubscription,
1,
0,
mqttTICKS_TO_MS( xTimeoutTicks ) );
xStatus = prvConvertReturnCode( xMqttStatus );
}
return xStatus;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Unsubscribe( MQTTAgentHandle_t xMQTTHandle,
const MQTTAgentUnsubscribeParams_t * const pxUnsubscribeParams,
TickType_t xTimeoutTicks )
{
IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
IotMqttSubscription_t xSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
/* Remove any subscription callback that may be registered. */
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
prvRemoveCallback( pxConnection,
( const char * ) ( pxUnsubscribeParams->pucTopic ),
pxUnsubscribeParams->usTopicLength );
#endif
/* Set the members of the subscription to remove. */
xSubscription.pTopicFilter = ( const char * ) ( pxUnsubscribeParams->pucTopic );
xSubscription.topicFilterLength = pxUnsubscribeParams->usTopicLength;
xSubscription.callback.pCallbackContext = pxConnection;
xSubscription.callback.function = prvPublishCallbackWrapper;
/* Call MQTT v2 blocking UNSUBSCRIBE function. */
xMqttStatus = IotMqtt_TimedUnsubscribe( pxConnection->xMQTTConnection,
&xSubscription,
1,
0,
mqttTICKS_TO_MS( xTimeoutTicks ) );
return prvConvertReturnCode( xMqttStatus );
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Publish( MQTTAgentHandle_t xMQTTHandle,
const MQTTAgentPublishParams_t * const pxPublishParams,
TickType_t xTimeoutTicks )
{
IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
IotMqttPublishInfo_t xPublishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;
/* Set the members of the publish info. */
xPublishInfo.pTopicName = ( const char * ) pxPublishParams->pucTopic;
xPublishInfo.topicNameLength = pxPublishParams->usTopicLength;
xPublishInfo.qos = ( IotMqttQos_t ) pxPublishParams->xQoS;
xPublishInfo.pPayload = ( const void * ) pxPublishParams->pvData;
xPublishInfo.payloadLength = pxPublishParams->ulDataLength;
/* Call the MQTT v2 blocking PUBLISH function. */
xMqttStatus = IotMqtt_TimedPublish( pxConnection->xMQTTConnection,
&xPublishInfo,
0,
mqttTICKS_TO_MS( xTimeoutTicks ) );
return prvConvertReturnCode( xMqttStatus );
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_ReturnBuffer( MQTTAgentHandle_t xMQTTHandle,
MQTTBufferHandle_t xBufferHandle )
{
( void ) xMQTTHandle;
/* Free the MQTT buffer. */
vPortFree( xBufferHandle );
return eMQTTAgentSuccess;
}
/*-----------------------------------------------------------*/