blob: f4b89dc915ee07dd9dd67a094258eacad34e3a43 [file] [log] [blame]
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import datetime
import logging
import os
import sys
import jwt
from coral.cloudiot.utils import ascii_hex_string
from coral.cloudiot.utils import split_equal_parts
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
try:
from cryptoauthlib import *
except:
pass
logger = logging.getLogger(__name__)
def _convert_ec_pub_to_pem(raw_pub_key):
"""
Convert to the key to PEM format. Expects bytes
"""
public_key_der = bytearray.fromhex(
'3059301306072A8648CE3D020106082A8648CE3D03010703420004') + raw_pub_key
public_key_b64 = base64.b64encode(public_key_der).decode('ascii')
public_key_pem = '-----BEGIN PUBLIC KEY-----\n%s\n-----END PUBLIC KEY-----' % '\n'.join(
split_equal_parts(public_key_b64, 64))
return public_key_pem
def _ecc608_check_address(bus, address):
cfg = cfg_ateccx08a_i2c_default()
# Cryptolib uses 8-bit address.
cfg.cfg.atcai2c.slave_address = address << 1
cfg.cfg.atcai2c.bus = bus # ARM I2C
cfg.devtype = 3 # ECC608
status = atcab_init(cfg)
if status == 0:
return True
return False
def ecc608_find_chip():
"""Returns the I2C address of the crypto chip or None if chip is not installed."""
# Bus and address can change for Pi/Dev Board as well as if using enviro.
for bus in (0, 1):
for addr in (0x30, 0x60, 0x62):
if _ecc608_check_address(bus, addr):
logger.info('Found crypto chip at 0x%x', addr)
return addr
logger.debug('ECC608 not detected')
return None
def ecc608_hw_sign(msg, key_id=0):
digest = bytearray(32)
status = atcab_sha(len(msg), msg, digest)
assert status == 0
signature = bytearray(64)
status = atcab_sign(key_id, digest, signature)
assert status == 0
return signature
def ecc608_man_jwt(claims):
header = '{"typ":"JWT","alg":"ES256"}'
for k, v in claims.items():
if type(v) is datetime.datetime:
claims[k] = int(v.timestamp())
payload = json.dumps(claims)
token = base64.urlsafe_b64encode(
header.encode('ascii')).replace(b'=', b'') + b'.'
token = token + \
base64.urlsafe_b64encode(payload.encode('ascii')).replace(b'=', b'')
signature = ecc608_hw_sign(token)
token = token + b'.' + \
base64.urlsafe_b64encode(signature).replace(b'=', b'')
return token
def ecc608_serial():
serial = bytearray(9)
assert atcab_read_serial_number(serial) == 0
return ascii_hex_string(serial)
def ecc608_public_key(key_id=0):
public_key = bytearray(64)
status = atcab_get_pubkey(key_id, public_key)
assert status == 0
return _convert_ec_pub_to_pem(public_key)
class HwEcAlgorithm(jwt.algorithms.Algorithm):
def __init__(self):
self.hash_alg = hashes.SHA256
def prepare_key(self, key):
return key
def sign(self, msg, key):
return ecc608_hw_sign(msg)
def verify(self, msg, key, sig):
try:
der_sig = jwt.utils.raw_to_der_signature(sig, key.curve)
except ValueError:
return False
try:
key.verify(der_sig, msg, ec.ECDSA(self.hash_alg()))
return True
except InvalidSignature:
return False
# On module import, load library.
try:
ecc608_i2c_address = None
ecc608_jwt_with_hw_alg = None
load_cryptoauthlib()
ecc608_i2c_address = ecc608_find_chip()
if ecc608_i2c_address is not None:
ecc608_jwt_with_hw_alg = jwt.PyJWT(algorithms=[])
ecc608_jwt_with_hw_alg.register_algorithm('ES256', HwEcAlgorithm())
except Exception:
logger.debug('Unable to load ECC608')