blob: 144ff1ef82ae874916c7b59b5f2b76d5767878ba [file] [log] [blame]
/******************************************************************************
* Copyright 2017 Google
* Copyright 2019,2020 NXP
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#define _XOPEN_SOURCE 500
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "MQTTClient.h"
#include "jwt.h"
#include "jsmn.h"
#include "openssl/conf.h"
#include "openssl/ec.h"
#include "openssl/evp.h"
#include "mqtt_azure.h"
static const int kQos = 1;
static const unsigned long kTimeout = 10000L;
static const unsigned long kInitialConnectIntervalMillis = 500L;
static const unsigned long kMaxConnectIntervalMillis = 6000L;
static const unsigned long kMaxConnectRetryTimeElapsedMillis = 900000L;
static const float kIntervalMultiplier = 1.5f;
volatile MQTTClient_deliveryToken deliveredtoken;
register_state state = ASSIGNING;
volatile uint16_t msgnum = 0;
/**
* 1) Register Azure device:
* Required parameters (either in JSON file or via command line):
* -Registration ID of the device
* -Id_Scope the Azuer IoTHub DPS
* -Path to keyref file
* -Path to device certificate file
* -Path to azure root certificate file
* 2) Publish a given message, passed in as payload, to Cloud IoT Core using the
* values passed to the sample, stored in the global opts structure. Returns
* the result code from the MQTT client
*/
// [START iot_mqtt_register]
bool get_json_tagValue(char *js, const char * key, char * value)
{
jsmn_parser p;
jsmntok_t tokens[50]; /* We expect no more than 50 JSON tokens */
jsmn_init(&p);
int count = jsmn_parse(&p, js, strlen(js), tokens, 50);
for (int i = 1; i < count; i += 2)
{
jsmntok_t *t = &tokens[i];
char *tag = js + t->start;
if (!memcmp(tag, key, t->end - t->start))
{
t = &tokens[i + 1];
memcpy(value, js + t->start, t->end - t->start);
value[t->end - t->start] = '\0';
return true;
}
}
printf("%s not found\n", key);
return false;
}
bool write_registration_configuration(char* payload)
{
char assignedhub[256];
char deviceId[256];
char registrationId[256];
char status[256];
if (!get_json_tagValue(payload, "registrationId", registrationId) ||
!get_json_tagValue(payload, "status", status) ||
!get_json_tagValue(payload, "assignedHub", assignedhub) ||
!get_json_tagValue(payload, "deviceId", deviceId))
{
return false;
}
char * filename = malloc(strlen(deviceId) + 4);
strcpy(filename, deviceId);
filename = strcat(filename, ".txt");
FILE* fp = fopen(filename, "wb");
fprintf(fp, "{\n");
fprintf(fp, " \"assignedHub\": \"%s\",\n", assignedhub);
fprintf(fp, " \"deviceId\": \"%s\",\n", deviceId);
fprintf(fp, " \"registration_id\": \"%s\",\n", registrationId);
fprintf(fp, " \"status\": \"%s\",\n", status);
fprintf(fp, " \"keypath\": \"%s\",\n", opts.keypath);
fprintf(fp, " \"devcert\": \"%s\",\n", opts.devcert);
fprintf(fp, " \"rootpath\": \"%s\",\n", opts.rootpath);
fprintf(fp, " \"payload\": \"hello message from device %s\"\n", deviceId);
fprintf(fp, "}");
fclose(fp);
return true;
}
void delivered(void *context, MQTTClient_deliveryToken dt)
{
(void) (context);
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(
void *client, char *topicName, int topicLen, MQTTClient_message *message)
{
(void) (topicLen);
int i;
char *payloadptr;
MQTTClient_deliveryToken token = { 0 };
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = "";
pubmsg.payloadlen = 0;
pubmsg.qos = 1;
pubmsg.retained = 0;
printf("Message #%d arrived \n", msgnum++);
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for (i = 0; i < message->payloadlen; i++) {
putchar(*payloadptr++);
}
putchar('\n');
switch (state)
{
case ASSIGNING:
pubmsg.qos = 1;
char *message_ptr = malloc(message->payloadlen + 1);
memcpy(message_ptr, message->payload, message->payloadlen);
message_ptr[message->payloadlen] = '\0';
char opid[256];
char status[64];
if (!(get_json_tagValue(message_ptr, "status", status) &&
get_json_tagValue(message_ptr, "operationId", opid)))
{
printf("Registartion failed with message: %s\n", message_ptr);
exit(EXIT_FAILURE);
}
if (!strcmp(status, "assigning"))
{
char *topicOperationId = malloc(strlen(MQTT_PUBLISH_MSG_OPID_AZURE) + strlen(opid) + 1); // +1 for the null-terminator
if (topicOperationId == NULL)
{
printf("malloc error\n");
exit(EXIT_FAILURE);
}
strcpy(topicOperationId, MQTT_PUBLISH_MSG_OPID_AZURE);
strcat(topicOperationId, opid);
// Prevent Throttling by backend: As per this link, maximum 5 requests per 10 seconds allowed:
// https://github.com/MicrosoftDocs/azure-docs/blob/master/includes/iot-dps-limits.md
// A better approach would be to react appropriately in case throttling occurs
{
struct timespec req;
const struct timespec *pReq;
req.tv_sec = 1;
req.tv_nsec = 0;
pReq = &req;
for (unsigned int i = 0; i < (msgnum % 5); i++)
{
nanosleep(pReq, NULL);
}
}
int rc = MQTTClient_publishMessage((MQTTClient *)client, topicOperationId, &pubmsg, &token);
if (rc != MQTTCLIENT_SUCCESS) {
free(topicOperationId);
exit(EXIT_FAILURE);
}
printf("Device State is assigning\n");
state = ASSIGNING;
free(topicOperationId);
}
else if (!strcmp(status, "assigned"))
{
printf("Device State is now ASSIGNED\n");
if (!write_registration_configuration(message_ptr))
{
printf("Error while writing file\n");
free(message_ptr);
exit(EXIT_FAILURE);
}
state = STOP;
}
else
{
free(message_ptr);
printf("Error while assigning\n");
exit(EXIT_FAILURE);
}
free(message_ptr);
break;
case STOP:
break;
default:
printf("Error state\n");
exit(EXIT_FAILURE);
break;
}
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
(void) (context);
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
int register_device(bool registerDevice)
{
int rc = -1;
MQTTClient client = {0};
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token = {0};
#ifdef DEBUG
printf("MQTTClient_create opts.address = %s \n", opts.address);
printf("MQTTClient_create opts.username = %s \n", opts.username);
printf("MQTTClient_create opts.clientid = %s \n", opts.clientid);
printf("MQTTClient_create opts.keypath = %s \n", opts.keypath);
printf("MQTTClient_create opts.devcert = %s \n", opts.devcert);
printf("MQTTClient_create opts.rootpath = %s \n", opts.rootpath);
#endif
if ((MQTTClient_create(&client,
opts.address,
opts.clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL)) != MQTTCLIENT_SUCCESS) {
printf("MQTTClient_create failed \n");
exit(EXIT_FAILURE);
}
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 1;
conn_opts.reliable = 0;
conn_opts.username = opts.username;
conn_opts.password = NULL;
MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;
sslopts.enableServerCertAuth = 0;
sslopts.trustStore = opts.rootpath;
sslopts.privateKey = opts.keypath;
sslopts.keyStore = opts.devcert;
conn_opts.ssl = &sslopts;
unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
unsigned long total_retry_time_ms = 0;
MQTTClient_setCallbacks(client, client, connlost, msgarrvd, delivered);
while (
(rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
if (rc == 3) { // connection refused: server unavailable
sleep(retry_interval_ms / 1000);
total_retry_time_ms += retry_interval_ms;
if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
printf("Failed to connect, maximum retry time exceeded.");
exit(EXIT_FAILURE);
}
retry_interval_ms *= kIntervalMultiplier;
if (retry_interval_ms > kMaxConnectIntervalMillis) {
retry_interval_ms = kMaxConnectIntervalMillis;
}
}
else {
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
pubmsg.qos = kQos;
pubmsg.retained = 0;
if (registerDevice)
{
pubmsg.payload = "";
pubmsg.payloadlen = 0;
rc = MQTTClient_publishMessage(client, MQTT_REGISTRATION_MSG_AZURE, &pubmsg, &token);
if (rc != MQTTCLIENT_SUCCESS) {
exit(EXIT_FAILURE);
}
MQTTClient_subscribe(client, MQTT_SUBSCRIBE_MSG_AZURE, 1);
struct timespec req;
const struct timespec *pReq;
req.tv_sec = 1;
req.tv_nsec = 0;
pReq = &req;
unsigned int registration_time_sec = 0;
while (state != STOP) {
nanosleep(pReq, NULL);
registration_time_sec++;
if (registration_time_sec > REGISTRATION_TIMEOUT)
{
printf("Failed to complete registration, maximum wait time exceeded.\n");
exit(EXIT_FAILURE);
}
}
MQTTClient_unsubscribe(client, MQTT_SUBSCRIBE_MSG_AZURE);
}
else
{
pubmsg.payload = opts.payload;
pubmsg.payloadlen = strlen(opts.payload);
state = STOP;
rc = MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
printf("MQTTClient_publishMessage done with error %d\n", rc);
if (rc != MQTTCLIENT_SUCCESS) {
exit(EXIT_FAILURE);
}
printf(
"Waiting for up to %lu seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(kTimeout / 1000),
opts.payload,
opts.topic,
opts.clientid);
}
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
// [END iot_mqtt_register]