blob: 2e21862586d4cce9284ea7df47077e0321b28154 [file] [log] [blame]
#
# Copyright 2019,2020 NXP
# SPDX-License-Identifier: Apache-2.0
#
#
"""License text"""
import os
import sys
import traceback
import click
import func_timeout
import logging
if '-v' in sys.argv or '--verbose' in sys.argv:
logging.basicConfig(level=logging.DEBUG)
try:
from sss.erasekey import Erase
from sss.connect import do_open_session, do_close_session
from sss.sign import Sign
from sss.verify import Verify
from sss.crypt import Crypt
from sss import const
import sss.session as session
import sss.sss_api as apis
from sss import plugandtrust_ver
from sss.const import AUTH_TYPE_MAP, CRYPT_ALGO
except ImportError:
print("Error: Library sssapisw not found !! Build sssapisw first !!")
sys.exit(1)
TIME_OUT = 60 # Time out in seconds
TIME_OUT_RSA = 180 # Longer timeout for RSA key
VERSION = plugandtrust_ver.PLUGANDTRUST_VER_STR
class Context:
"""
Logs a message to stdout
"""
def __init__(self):
self.verbose = False
@classmethod
def log(cls, msg, *args):
"""Logs a message to stdout."""
if args:
msg %= args
print(msg) # pylint: disable=superfluous-parens
def vlog(self, msg, *args):
"""Logs a message to stdout only if verbose is enabled."""
if self.verbose:
self.log(msg, *args)
pass_context = click.make_pass_decorator(Context, ensure=True) # pylint: disable=invalid-name
def log_traceback(cli_ctx, ex):
""" Logs the call hierarchy. Called in case of error
"""
cli_ctx.log(ex)
output_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "..", "..", "output")
if not os.path.exists(output_dir):
os.mkdir(output_dir)
error_file = output_dir + os.sep + "error_log.txt"
if not os.path.isfile(error_file):
err_write = open(error_file, 'w+')
else:
err_write = open(error_file, 'a+')
err_write.write("\n\n")
traceback.print_exc(None, err_write)
err_write.close()
def session_open(cli_ctx):
""" Session open
"""
cli_ctx.session = session.Session()
func_timeout.func_timeout(TIME_OUT, cli_ctx.session.session_open, None)
def session_close(cli_ctx):
"""
Session close
:param cli_ctx: Holds cli parameters
:return: status
"""
try:
func_timeout.func_timeout(TIME_OUT, cli_ctx.session.session_close, None)
status = apis.kStatus_SSS_Success
except func_timeout.FunctionTimedOut as timeout_exc:
log_traceback(cli_ctx, timeout_exc.getMsg())
status = apis.kStatus_SSS_Fail
except Exception as exc: # pylint: disable=broad-except
log_traceback(cli_ctx, exc)
status = apis.kStatus_SSS_Fail
return status
@click.group()
@click.option('--verbose', '-v', is_flag=True,
help='Enables verbose mode.')
@click.version_option(version=VERSION)
@click.pass_context
def cli(cli_ctx, verbose):
"""Command line interface for SE050"""
# Top level Group
cli_ctx.verbose = verbose
@cli.group()
@pass_context
def generate(cli_ctx):
""" Generate ECC/RSA Key pair """
cli_ctx.vlog("Generate ECC/RSA Key pair")
@cli.group()
@pass_context
def set(cli_ctx): # pylint: disable=redefined-builtin
""" Set ECC/RSA/AES Keys or certificates """
cli_ctx.vlog("Set ECC/RSA/AES Keys or certificates")
@cli.command('erase', short_help='Erase ECC/RSA/AES Keys or Certificate (contents)')
@click.argument('keyid', type=str, metavar='keyid')
@pass_context
def erase(cli_ctx, keyid):
""" Erase ECC/RSA/AES Keys or Certificate (contents) \n
keyid = 32bit Key ID. Should be in hex format. Example: 20E8A001 \n
"""
try:
keyid = int(keyid, 16)
cli_ctx.log("Erasing Key entry from KeyID = 0x%08x" % keyid)
session_open(cli_ctx)
erase_obj = Erase(cli_ctx.session)
status = func_timeout.func_timeout(TIME_OUT, erase_obj.erase_key, (keyid, ))
except func_timeout.FunctionTimedOut as timeout_exc:
log_traceback(cli_ctx, timeout_exc.getMsg())
status = apis.kStatus_SSS_Fail
except Exception as exc: # pylint: disable=broad-except
log_traceback(cli_ctx, exc)
status = apis.kStatus_SSS_Fail
session_status = session_close(cli_ctx)
if status == apis.kStatus_SSS_Success and session_status == apis.kStatus_SSS_Success:
cli_ctx.log("Erased Key entry from KeyID = 0x%08X" % keyid)
ret_value = 0
else:
cli_ctx.log("ERROR! Could not Erase Key Entry from KeyID 0x%08X " % keyid)
ret_value = 1
sys.exit(ret_value)
@cli.group()
@pass_context
def get(cli_ctx):
""" Get ECC/RSA/AES Keys or certificates """
cli_ctx.vlog("Get ECC/RSA/AES Keys or certificates")
@cli.group()
@pass_context
def cloud(cli_ctx):
""" (Not Implemented) Cloud Specific utilities.
This helps to handle GCP/AWS/Watson specific settings."""
cli_ctx.vlog("Cloud Specific utilities")
@cli.command()
@click.argument('subsystem', type=click.Choice(list(const.SUBSYSTEM_TYPE.keys())),
metavar='subsystem')
@click.argument('method', type=click.Choice(list(const.CONNECTION_TYPE.keys())), metavar='method')
@click.argument('port_name', type=str, metavar='port_name')
@click.option('--auth_type', default='None', type=click.Choice(list(AUTH_TYPE_MAP.keys())),
help="Authentication type. Default is \"None\". Can be one of \"None, UserID, "
"ECKey, AESKey, PlatformSCP, UserID_PlatformSCP, ECKey_PlatformSCP, AESKey_PlatformSCP\"")
@click.option('--scpkey', default='', type=str,
help="File path of the platformscp keys for platformscp session")
@pass_context
def connect(cli_ctx, subsystem, method, port_name, auth_type, scpkey):
""" Open Session.\n
subsystem = Security subsystem is selected to be used. Can be one of "se05x, a71ch, mbedtls,
openssl"\n
method = Connection method to the system. Can be one of "none, sci2c, vcom, t1oi2c, jrcpv1,
jrcpv2, pcsc"\n
port_name = Subsystem specific connection parameters. Example: COM6, 127.0.0.1:8050.
Use "None" where not applicable. e.g. SCI2C/T1oI2C. Default i2c port (i2c-1) will be used for port name = "None".
"""
if subsystem == "se050":
cli_ctx.log("Warning: subsystem 'se050' has been depreciated. Use 'se05x' instead")
if auth_type in ["PlatformSCP", "UserID_PlatformSCP", "ECKey_PlatformSCP", "AESKey_PlatformSCP"]:
if scpkey == '':
cli_ctx.log("scpkey file is required for PlatformSCP session. "
"Use option '--scpkey' to enter scp key file path.")
return
if not os.path.isfile(scpkey):
cli_ctx.log("Invalid scpkey file ! "
"Use option '--scpkey' to enter scp key file path.")
return
cli_ctx.vlog("Open Session")
do_open_session(const.SUBSYSTEM_TYPE[subsystem],
const.CONNECTION_TYPE[method],
port_name, False, AUTH_TYPE_MAP[auth_type][0], scpkey, AUTH_TYPE_MAP[auth_type][2])
@cli.command()
@pass_context
def disconnect(cli_ctx):
""" Close session. """
cli_ctx.vlog("Close session")
do_close_session()
@cli.group()
@pass_context
def se05x(cli_ctx):
""" SE05X specific commands """
cli_ctx.vlog("SE05X specific commands")
@cli.group()
@pass_context
def a71ch(cli_ctx):
""" A71CH specific commands """
cli_ctx.vlog("A71CH specific commands")
@cli.command('sign', short_help='Sign Operation')
@click.argument('keyid', type=str, metavar='keyid')
@click.argument('input_file', type=str, metavar='input_file')
@click.argument('signature_file', type=str, metavar='signature_file')
@click.option('--informat', default='',
help="Input format. TEXT can be \"DER\" or \"PEM\".")
@click.option('--outformat', default='',
help="Output file format. TEXT can be \"DER\" or \"PEM\"")
# pylint: disable=too-many-arguments
@click.option('--hashalgo', default='',
help='Hash algorithm. TEXT can be one of \"SHA1, SHA224, SHA256, SHA384, '
'SHA512, \nRSASSA_PKCS1_V1_5_SHA1, RSASSA_PKCS1_V1_5_SHA224, \n'
'RSASSA_PKCS1_V1_5_SHA256, \nRSASSA_PKCS1_V1_5_SHA384, \n'
'RSASSA_PKCS1_V1_5_SHA512, \nRSASSA_PKCS1_PSS_MGF1_SHA1, \n'
'RSASSA_PKCS1_PSS_MGF1_SHA224, RSASSA_PKCS1_PSS_MGF1_SHA256, '
'RSASSA_PKCS1_PSS_MGF1_SHA384, RSASSA_PKCS1_PSS_MGF1_SHA512\"')
# pylint: enable=too-many-arguments
@pass_context
def sign(cli_ctx, keyid, input_file, signature_file, informat, outformat, hashalgo):
""" Sign Operation \n
keyid = 32bit Key ID. Should be in hex format. Example: 20E8A001 \n
input_file = Input file to sign.
By default filename with extension .pem and .cer considered as PEM format,
others as DER/BINARY format.\n
signature_file = File name to store signature data.
By default filename with extension .pem in PEM format and others in DER format.
"""
try:
keyid = int(keyid, 16)
session_open(cli_ctx)
sign_obj = Sign(cli_ctx.session)
status = func_timeout.func_timeout(TIME_OUT, sign_obj.do_signature,
(keyid, input_file, signature_file,
informat, outformat, hashalgo))
except func_timeout.FunctionTimedOut as timeout_exc:
log_traceback(cli_ctx, timeout_exc.getMsg())
status = apis.kStatus_SSS_Fail
except Exception as exc: # pylint: disable=broad-except
log_traceback(cli_ctx, exc)
status = apis.kStatus_SSS_Fail
session_status = session_close(cli_ctx)
if status == apis.kStatus_SSS_Success and session_status == apis.kStatus_SSS_Success:
cli_ctx.log("Signed from KeyID = 0x%08X" % keyid)
ret_value = 0
else:
cli_ctx.log("ERROR! Could not Sign from KeyID 0x%08X " % keyid)
ret_value = 1
sys.exit(ret_value)
@cli.command('verify', short_help='verify Operation')
@click.argument('keyid', type=str, metavar='keyid')
@click.argument('input_file', type=str, metavar='input_file')
@click.argument('signature_file', type=str, metavar='signature_file')
@click.option('--format', default='',
help="input_file and signature file format. TEXT can be \"DER\" or \"PEM\"")
# pylint: disable=too-many-arguments
@click.option('--hashalgo', default='',
help='Hash algorithm. TEXT can be one of \"SHA1, SHA224, SHA256, SHA384, '
'SHA512, \nRSASSA_PKCS1_V1_5_SHA1, RSASSA_PKCS1_V1_5_SHA224, \n'
'RSASSA_PKCS1_V1_5_SHA256, \nRSASSA_PKCS1_V1_5_SHA384, \n'
'RSASSA_PKCS1_V1_5_SHA512, \nRSASSA_PKCS1_PSS_MGF1_SHA1, \n'
'RSASSA_PKCS1_PSS_MGF1_SHA224, RSASSA_PKCS1_PSS_MGF1_SHA256, '
'RSASSA_PKCS1_PSS_MGF1_SHA384, RSASSA_PKCS1_PSS_MGF1_SHA512\"')
# pylint: enable=too-many-arguments
@pass_context
def verify(cli_ctx, keyid, input_file, signature_file, format, hashalgo): # pylint: disable=redefined-builtin
""" verify operation \n
keyid = 32bit Key ID. Should be in hex format. Example: 20E8A001 \n
input_file = Input file to verify.
By default filename with extension .pem and .cer considered as PEM format,
others as DER/BINARY format.\n
filename = signature_file data file for verification.
By default filename with extension .pem in PEM format and others in DER format.
"""
try:
keyid = int(keyid, 16)
session_open(cli_ctx)
verify_obj = Verify(cli_ctx.session)
status = func_timeout.func_timeout(TIME_OUT, verify_obj.do_verification,
(keyid, input_file, signature_file, format, hashalgo))
except func_timeout.FunctionTimedOut as timeout_exc:
log_traceback(cli_ctx, timeout_exc.getMsg())
status = apis.kStatus_SSS_Fail
except Exception as exc: # pylint: disable=broad-except
log_traceback(cli_ctx, exc)
status = apis.kStatus_SSS_Fail
session_status = session_close(cli_ctx)
if status == apis.kStatus_SSS_Success and session_status == apis.kStatus_SSS_Success:
cli_ctx.log("Verified from KeyID = 0x%08X" % keyid)
ret_value = 0
else:
cli_ctx.log("ERROR! Could not Verify from KeyID 0x%08X " % keyid)
ret_value = 1
sys.exit(ret_value)
@cli.command('encrypt', short_help='Encrypt Operation')
@click.argument('keyid', type=str, metavar='keyid')
@click.argument('input_data', type=str, metavar='input_data')
@click.argument('filename', type=str, metavar='filename')
@click.option('--algo', default='',
help='Algorithm. TEXT can be one of \"oaep\", \"rsaes\"')
@pass_context
def encrypt(cli_ctx, keyid, input_data, filename, algo):
""" Sign Operation \n
keyid = 32bit Key ID. Should be in hex format. Example: 20E8A001 \n
input_data = Input data to Encrypt. can be raw string or in file.\n
filename = Output file name to store encrypted data.
Encrypted data will be stored in DER format.\n
"""
try:
keyid = int(keyid, 16)
session_open(cli_ctx)
crypt_obj = Crypt(cli_ctx.session)
if algo != '':
crypt_obj.algorithm = CRYPT_ALGO[algo]
status = func_timeout.func_timeout(TIME_OUT, crypt_obj.do_encryption,
(keyid, input_data, filename))
except func_timeout.FunctionTimedOut as timeout_exc:
log_traceback(cli_ctx, timeout_exc.getMsg())
status = apis.kStatus_SSS_Fail
except Exception as exc: # pylint: disable=broad-except
log_traceback(cli_ctx, exc)
status = apis.kStatus_SSS_Fail
session_status = session_close(cli_ctx)
if status == apis.kStatus_SSS_Success and session_status == apis.kStatus_SSS_Success:
cli_ctx.log("Encrypted from KeyID = 0x%08X" % keyid)
ret_value = 0
else:
cli_ctx.log("ERROR! Could not Encrypt from KeyID = 0x%08X " % keyid)
ret_value = 1
sys.exit(ret_value)
@cli.command('decrypt', short_help='Decrypt Operation')
@click.argument('keyid', type=str, metavar='keyid')
@click.argument('encrypted_data', type=str, metavar='encrypted_data')
@click.argument('filename', type=str, metavar='filename')
@click.option('--algo', default='',
help='Algorithm. TEXT can be one of \"oaep\", \"rsaes\"')
@pass_context
def decrypt(cli_ctx, keyid, encrypted_data, filename, algo):
""" Sign Operation \n
keyid = 32bit Key ID. Should be in hex format. Example: 20E8A001 \n
encrypted_data = Encrypted data to Decrypt. can be raw data or in file.
Input data should be in DER format.\n
filename = Output file name to store Decrypted data.
"""
try:
keyid = int(keyid, 16)
session_open(cli_ctx)
crypt_obj = Crypt(cli_ctx.session)
if algo != '':
crypt_obj.algorithm = CRYPT_ALGO[algo]
status = func_timeout.func_timeout(TIME_OUT, crypt_obj.do_decryption,
(keyid, encrypted_data, filename))
except func_timeout.FunctionTimedOut as timeout_exc:
log_traceback(cli_ctx, timeout_exc.getMsg())
status = apis.kStatus_SSS_Fail
except Exception as exc: # pylint: disable=broad-except
log_traceback(cli_ctx, exc)
status = apis.kStatus_SSS_Fail
session_status = session_close(cli_ctx)
if status == apis.kStatus_SSS_Success and session_status == apis.kStatus_SSS_Success:
cli_ctx.log("Decrypted from KeyID = 0x%08X" % keyid)
ret_value = 0
else:
cli_ctx.log("ERROR! Could not Decrypt from KeyID = 0x%08X " % keyid)
ret_value = 1
sys.exit(ret_value)
@cli.group()
@pass_context
def refpem(cli_ctx):
""" Create Reference PEM/DER files (For OpenSSL Engine)."""
cli_ctx.vlog("Create Reference PEM/DER files (For OpenSSL Engine)")
@cli.group()
@pass_context
def policy(cli_ctx):
""" Create/Dump Object Policy"""
pass