blob: 689742764b53dd7743783010b5afb1d208ff071d [file] [log] [blame]
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Python Library for connecting to Google Cloud IoT Core via MQTT, using JWT.
This library connects to Google Cloud IoT Core via MQTT, using a JWT for device
authentication. After connection, publish_message can be used to provide an
arbitrary message to a cloud project. Configuration must be done using a
configuration file.
"""
import argparse
import configparser
import datetime
import json
import jwt
import logging
import os
import paho.mqtt.client as mqtt
import threading
import time
from coral.cloudiot.ecc608 import ecc608_jwt_with_hw_alg
logger = logging.getLogger(__name__)
class CloudIot:
def __init__(self, config_file, config_section='DEFAULT'):
self._config = configparser.ConfigParser()
self._config.read(config_file)
if not self._config.getboolean(config_section, 'Enabled'):
logger.warn('Cloud IoT is disabled per configuration.')
self._enabled = False
return
config = self._config[config_section]
self._project_id = config['ProjectID']
self._cloud_region = config['CloudRegion']
self._registry_id = config['RegistryID']
self._device_id = config['DeviceID']
self._ca_certs = config['CACerts']
self._message_type = config['MessageType']
self._mqtt_bridge_hostname = config['MQTTBridgeHostName']
self._mqtt_bridge_port = config.getint('MQTTBridgePort')
self._mutex = threading.Lock()
if ecc608_jwt_with_hw_alg:
# For the HW Crypto chip, use ES256. No key is needed.
self._algorithm = 'ES256'
self._private_key = None
self._jwt_inst = ecc608_jwt_with_hw_alg
else:
# For SW, use RS256 on a key file provided in the configuration.
self._algorithm = 'RS256'
rsa_cert = config['RSACertFile']
with open(rsa_cert, 'r') as f:
self._private_key = f.read()
self._jwt_inst = jwt.PyJWT()
# Create our MQTT client. The client_id is a unique string that identifies
# this device. For Google Cloud IoT Core, it must be in the format below.
self._client = mqtt.Client(
client_id='projects/%s/locations/%s/registries/%s/devices/%s' %
(self._project_id,
self._cloud_region,
self._registry_id,
self._device_id))
# With Google Cloud IoT Core, the username field is ignored, and the
# password field is used to transmit a JWT to authorize the device.
self._client.username_pw_set(
username='unused', password=self._create_jwt())
# Start thread to create new token before timeout.
self._term_event = threading.Event()
self._token_thread = threading.Thread(
target=self._token_update_loop, args=(self._term_event,))
self._token_thread.start()
# Enable SSL/TLS support.
self._client.tls_set(ca_certs=self._ca_certs)
# Connect to the Google MQTT bridge.
self._client.connect(self._mqtt_bridge_hostname,
self._mqtt_bridge_port)
logger.info('Successfully connected to Cloud IoT')
self._enabled = True
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
if self._enabled:
# Terminate token thread.
self._term_event.set()
self._token_thread.join()
def enabled(self):
return self._enabled
def publish_message(self, message):
if not self._enabled:
return
with self._mutex:
# Start the network loop.
self._client.loop_start()
# Publish to the events or state topic based on the flag.
sub_topic = 'events' if self._message_type == 'event' else 'state'
mqtt_topic = '/devices/%s/%s' % (self._device_id, sub_topic)
# Publish payload using JSON dumps to create bytes representation.
payload = json.dumps(message)
# Publish payload to the MQTT topic. qos=1 means at least once
# delivery. Cloud IoT Core also supports qos=0 for at most once
# delivery.
self._client.publish(mqtt_topic, payload, qos=1)
# End the network loop and finish.
self._client.loop_stop()
def register_message_callbacks(self, callbacks):
if 'on_connect' in callbacks:
self._client.on_connect = callbacks['on_connect']
if 'on_disconnect' in callbacks:
self._client.on_disconnect = callbacks['on_disconnect']
if 'on_publish' in callbacks:
self._client.on_publish = callbacks['on_publish']
if 'on_message' in callbacks:
self._client.on_message = callbacks['on_message']
if 'on_unsubscribe' in callbacks:
self._client.on_unsubscribe = callbacks['on_unsubscribe']
if 'on_log' in callbacks:
self._client.on_log = callbacks['on_log']
def _token_update_loop(self, term_event):
# Update token every 50 minutes (of allowed 60).
while not term_event.wait(50 * 60):
with self._mutex:
self._client.disconnect()
# Set new token.
self._client.username_pw_set(
username='unused', password=self._create_jwt())
# Connect to the Google MQTT bridge.
self._client.connect(
self._mqtt_bridge_hostname, self._mqtt_bridge_port)
logger.info(
'Successfully re-established connection with new token')
def _create_jwt(self):
"""Creates a JWT (https://jwt.io) to establish an MQTT connection.
Args:
Project_id: The cloud project ID this device belongs to
algorithm: The encryption algorithm to use. Either 'RS256' or 'ES256'
Returns:
An MQTT generated from the given project_id and private key, which
expires in 20 minutes. After 20 minutes, your client will be
disconnected, and a new JWT will have to be generated.
"""
token = {
# The time that the token was issued at
'iat': datetime.datetime.utcnow(),
# The time the token expires.
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=60),
# The audience field should always be set to the GCP project id.
'aud': self._project_id
}
return self._jwt_inst.encode(token, self._private_key, algorithm=self._algorithm)