blob: b71622dad6bb91b37d9e01b27a378e3d4e4a8722 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2009, 2020 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - MQTT 3.1.1 support
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - binary password and will payload
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/
/**
* @file
* \brief functions to deal with reading and writing of MQTT packets from and to sockets
*
* Some other related functions are in the MQTTPacket module
*/
#include "MQTTPacketOut.h"
#include "Log.h"
#include "StackTrace.h"
#include <string.h>
#include <stdlib.h>
#include "Heap.h"
/**
* Send an MQTT CONNECT packet down a socket for V5 or later
* @param client a structure from which to get all the required values
* @param MQTTVersion the MQTT version to connect with
* @param connectProperties MQTT V5 properties for the connect packet
* @param willProperties MQTT V5 properties for the will message, if any
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
char *buf, *ptr;
Connect packet;
int rc = SOCKET_ERROR, len;
FUNC_ENTRY;
packet.header.byte = 0;
packet.header.bits.type = CONNECT;
len = ((MQTTVersion == MQTTVERSION_3_1) ? 12 : 10) + (int)strlen(client->clientID)+2;
if (client->will)
len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
if (client->username)
len += (int)strlen(client->username)+2;
if (client->password)
len += client->passwordlen+2;
if (MQTTVersion >= MQTTVERSION_5)
{
len += MQTTProperties_len(connectProperties);
if (client->will)
len += MQTTProperties_len(willProperties);
}
ptr = buf = malloc(len);
if (ptr == NULL)
goto exit_nofree;
if (MQTTVersion == MQTTVERSION_3_1)
{
writeUTF(&ptr, "MQIsdp");
writeChar(&ptr, (char)MQTTVERSION_3_1);
}
else if (MQTTVersion == MQTTVERSION_3_1_1 || MQTTVersion == MQTTVERSION_5)
{
writeUTF(&ptr, "MQTT");
writeChar(&ptr, (char)MQTTVersion);
}
else
goto exit;
packet.flags.all = 0;
if (MQTTVersion >= MQTTVERSION_5)
packet.flags.bits.cleanstart = client->cleanstart;
else
packet.flags.bits.cleanstart = client->cleansession;
packet.flags.bits.will = (client->will) ? 1 : 0;
if (packet.flags.bits.will)
{
packet.flags.bits.willQoS = client->will->qos;
packet.flags.bits.willRetain = client->will->retained;
}
if (client->username)
packet.flags.bits.username = 1;
if (client->password)
packet.flags.bits.password = 1;
writeChar(&ptr, packet.flags.all);
writeInt(&ptr, client->keepAliveInterval);
if (MQTTVersion >= MQTTVERSION_5)
MQTTProperties_write(&ptr, connectProperties);
writeUTF(&ptr, client->clientID);
if (client->will)
{
if (MQTTVersion >= MQTTVERSION_5)
MQTTProperties_write(&ptr, willProperties);
writeUTF(&ptr, client->will->topic);
writeData(&ptr, client->will->payload, client->will->payloadlen);
}
if (client->username)
writeUTF(&ptr, client->username);
if (client->password)
writeData(&ptr, client->password, client->passwordlen);
rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1, MQTTVersion);
Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID,
MQTTVersion, client->cleansession, rc);
exit:
if (rc != TCPSOCKET_INTERRUPTED)
free(buf);
exit_nofree:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Function used in the new packets table to create connack packets.
* @param MQTTVersion MQTT 5 or less?
* @param aHeader the MQTT header byte
* @param data the rest of the packet
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
Connack* pack = NULL;
char* curdata = data;
char* enddata = &data[datalen];
FUNC_ENTRY;
if ((pack = malloc(sizeof(Connack))) == NULL)
goto exit;
pack->MQTTVersion = MQTTVersion;
pack->header.byte = aHeader;
pack->flags.all = readChar(&curdata); /* connect flags */
pack->rc = readChar(&curdata); /* reason code */
if (MQTTVersion < MQTTVERSION_5)
{
if (datalen != 2)
{
free(pack);
pack = NULL;
}
}
else if (datalen > 2)
{
MQTTProperties props = MQTTProperties_initializer;
pack->properties = props;
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
if (pack->properties.array)
free(pack->properties.array);
if (pack)
free(pack);
pack = NULL; /* signal protocol error */
goto exit;
}
}
exit:
FUNC_EXIT;
return pack;
}
/**
* Free allocated storage for a connack packet.
* @param pack pointer to the connack packet structure
*/
void MQTTPacket_freeConnack(Connack* pack)
{
FUNC_ENTRY;
if (pack->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&pack->properties);
free(pack);
FUNC_EXIT;
}
/**
* Send an MQTT PINGREQ packet down a socket.
* @param socket the open socket to send the data to
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
{
Header header;
int rc = 0;
FUNC_ENTRY;
header.byte = 0;
header.bits.type = PINGREQ;
rc = MQTTPacket_send(net, header, NULL, 0, 0, MQTTVERSION_3_1_1);
Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Send an MQTT subscribe packet down a socket.
* @param topics list of topics
* @param qoss list of corresponding QoSs
* @param msgid the MQTT message id to use
* @param dup boolean - whether to set the MQTT DUP flag
* @param socket the open socket to send the data to
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* opts, MQTTProperties* props,
int msgid, int dup, Clients* client)
{
Header header;
char *data, *ptr;
int rc = -1;
ListElement *elem = NULL, *qosElem = NULL;
int datalen, i = 0;
FUNC_ENTRY;
header.bits.type = SUBSCRIBE;
header.bits.dup = dup;
header.bits.qos = 1;
header.bits.retain = 0;
datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
while (ListNextElement(topics, &elem))
datalen += (int)strlen((char*)(elem->content));
if (client->MQTTVersion >= MQTTVERSION_5)
datalen += MQTTProperties_len(props);
ptr = data = malloc(datalen);
if (ptr == NULL)
goto exit;
writeInt(&ptr, msgid);
if (client->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_write(&ptr, props);
elem = NULL;
while (ListNextElement(topics, &elem))
{
char subopts = 0;
ListNextElement(qoss, &qosElem);
writeUTF(&ptr, (char*)(elem->content));
subopts = *(int*)(qosElem->content);
if (client->MQTTVersion >= MQTTVERSION_5 && opts != NULL)
{
subopts |= (opts[i].noLocal << 2); /* 1 bit */
subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */
subopts |= (opts[i].retainHandling << 4); /* 2 bits */
}
writeChar(&ptr, subopts);
++i;
}
rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
Log(LOG_PROTOCOL, 22, NULL, client->net.socket, client->clientID, msgid, rc);
if (rc != TCPSOCKET_INTERRUPTED)
free(data);
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Function used in the new packets table to create suback packets.
* @param MQTTVersion the version of MQTT
* @param aHeader the MQTT header byte
* @param data the rest of the packet
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
Suback* pack = NULL;
char* curdata = data;
char* enddata = &data[datalen];
FUNC_ENTRY;
if ((pack = malloc(sizeof(Suback))) == NULL)
goto exit;
pack->MQTTVersion = MQTTVersion;
pack->header.byte = aHeader;
pack->msgId = readInt(&curdata);
if (MQTTVersion >= MQTTVERSION_5)
{
MQTTProperties props = MQTTProperties_initializer;
pack->properties = props;
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
if (pack->properties.array)
free(pack->properties.array);
if (pack)
free(pack);
pack = NULL; /* signal protocol error */
goto exit;
}
}
pack->qoss = ListInitialize();
while ((size_t)(curdata - data) < datalen)
{
unsigned int* newint;
newint = malloc(sizeof(unsigned int));
if (newint == NULL)
{
if (pack->properties.array)
free(pack->properties.array);
if (pack)
free(pack);
pack = NULL; /* signal protocol error */
goto exit;
}
*newint = (unsigned int)readChar(&curdata);
ListAppend(pack->qoss, newint, sizeof(unsigned int));
}
if (pack->qoss->count == 0)
{
if (pack->properties.array)
free(pack->properties.array);
if (pack)
free(pack);
ListFree(pack->qoss);
pack = NULL;
}
exit:
FUNC_EXIT;
return pack;
}
/**
* Send an MQTT unsubscribe packet down a socket.
* @param topics list of topics
* @param msgid the MQTT message id to use
* @param dup boolean - whether to set the MQTT DUP flag
* @param socket the open socket to send the data to
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client)
{
Header header;
char *data, *ptr;
int rc = SOCKET_ERROR;
ListElement *elem = NULL;
int datalen;
FUNC_ENTRY;
header.bits.type = UNSUBSCRIBE;
header.bits.dup = dup;
header.bits.qos = 1;
header.bits.retain = 0;
datalen = 2 + topics->count * 2; /* utf length == 2 */
while (ListNextElement(topics, &elem))
datalen += (int)strlen((char*)(elem->content));
if (client->MQTTVersion >= MQTTVERSION_5)
datalen += MQTTProperties_len(props);
ptr = data = malloc(datalen);
if (ptr == NULL)
goto exit;
writeInt(&ptr, msgid);
if (client->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_write(&ptr, props);
elem = NULL;
while (ListNextElement(topics, &elem))
writeUTF(&ptr, (char*)(elem->content));
rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
Log(LOG_PROTOCOL, 25, NULL, client->net.socket, client->clientID, msgid, rc);
if (rc != TCPSOCKET_INTERRUPTED)
free(data);
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Function used in the new packets table to create unsuback packets.
* @param MQTTVersion the version of MQTT
* @param aHeader the MQTT header byte
* @param data the rest of the packet
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
Unsuback* pack = NULL;
char* curdata = data;
char* enddata = &data[datalen];
FUNC_ENTRY;
if ((pack = malloc(sizeof(Unsuback))) == NULL)
goto exit;
pack->MQTTVersion = MQTTVersion;
pack->header.byte = aHeader;
pack->msgId = readInt(&curdata);
pack->reasonCodes = NULL;
if (MQTTVersion >= MQTTVERSION_5)
{
MQTTProperties props = MQTTProperties_initializer;
pack->properties = props;
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
if (pack->properties.array)
free(pack->properties.array);
if (pack)
free(pack);
pack = NULL; /* signal protocol error */
goto exit;
}
pack->reasonCodes = ListInitialize();
while ((size_t)(curdata - data) < datalen)
{
enum MQTTReasonCodes* newrc;
newrc = malloc(sizeof(enum MQTTReasonCodes));
if (newrc == NULL)
{
if (pack->properties.array)
free(pack->properties.array);
if (pack)
free(pack);
pack = NULL; /* signal protocol error */
goto exit;
}
*newrc = (enum MQTTReasonCodes)readChar(&curdata);
ListAppend(pack->reasonCodes, newrc, sizeof(enum MQTTReasonCodes));
}
if (pack->reasonCodes->count == 0)
{
ListFree(pack->reasonCodes);
if (pack->properties.array)
free(pack->properties.array);
if (pack)
free(pack);
pack = NULL;
}
}
exit:
FUNC_EXIT;
return pack;
}