blob: 497e8027e394969f9aab15c78660972814f77ea9 [file] [log] [blame]
#
# Copyright 2018-2020 NXP
# SPDX-License-Identifier: Apache-2.0
#
#
"""License text"""
import ctypes
import os
import pickle
import logging
from . import sss_api as apis
from . import util
from . import const
from . import prepare_host_session
from .util import status_to_str
log = logging.getLogger(__name__)
class Session: # pylint: disable=too-many-instance-attributes
"""
Session open and close operation
"""
def __init__(self, from_pickle=True, host_session=False):
"""
Constructor
:param from_pickle: take session parameters from pickle file or not
:param host_session: whether session is host session or not
"""
if from_pickle:
# Check for host session pkl file, To support parallel session with se050 session
if host_session:
pkl = util.get_host_session_pkl_path()
else:
pkl = util.get_session_pkl_path()
if os.path.isfile(pkl):
pkl_session = open(pkl, 'rb')
session_params = pickle.load(pkl_session)
try:
pkl_v_major = session_params["pkl_v_major"]
except KeyError:
pkl_v_major = 0
if 1 == pkl_v_major: # pylint: disable=misplaced-comparison-constant
pass
else:
raise const.SSSUsageError("Session data is in old format. "
"Please close and open session again.")
# Public
self.subsystem = session_params['subsystem']
# Protected
self._connection_method = session_params['connection_method']
self._port_name = session_params['port_name']
self._auth_type = session_params['auth_type']
self._scpkey = session_params['scpkey']
self._tunnel = session_params['tunnel']
else:
log.error("No open session, try connecting first")
log.error("Run 'ssscli connect --help' for more information.")
raise Exception("No open session, try connecting first")
else:
self.subsystem = None
self._connection_method = None
self._port_name = None
self._auth_type = None
self._scpkey = None
self._tunnel = None
# Public
self.session_ctx = None
# Protected
self.session_policy = None
self._tunnel_session_ctx = None
self._host_session_ctx = None
self._connect_ctx = apis.SE_Connect_Ctx_t()
self.prepare_host_obj = prepare_host_session.PrepareHostSession()
def session_open(self):
"""
Session open
:return: status
"""
port_name = ctypes.create_string_buffer(1024)
port_name.value = self._port_name.encode('utf-8')
self._connect_ctx.portName = apis.String(port_name)
self._connect_ctx.connType = self._connection_method
if hasattr(self, "_scpkey"):
self.prepare_host_obj._scpkey = self._scpkey
if self._tunnel:
# open host session
status = self.prepare_host_obj.setup_counter_part_session()
if status != apis.kStatus_SSS_Success:
raise Exception("Could not open host session. Status = %s" % (status,))
# prepare host for tunnel session
self._connect_ctx.auth.authType = apis.kSE05x_AuthType_SCP03
self.prepare_host_obj.prepare_host(self._connect_ctx)
# apply session policy
if self.session_policy is not None:
self._connect_ctx.session_policy = ctypes.pointer(self.session_policy)
# open tunnel session
self._tunnel_session_ctx = self._session_open_internal(self._connect_ctx)
# prepare host for auth session
sss_connect_ctx = self._connect_ctx
sss_connect_ctx.auth.authType = self._auth_type
self.prepare_host_obj.prepare_host(sss_connect_ctx)
sss_connect_ctx.auth.ctx.scp03.pStatic_ctx = self._connect_ctx.auth.ctx.scp03.pStatic_ctx
sss_connect_ctx.auth.ctx.scp03.pDyn_ctx = self._connect_ctx.auth.ctx.scp03.pDyn_ctx
# Initialize tunnel context
tunnel_ctx = apis.sss_tunnel_t()
status = apis.sss_tunnel_context_init(ctypes.byref(tunnel_ctx),
ctypes.byref(self._tunnel_session_ctx))
if status != apis.kStatus_SSS_Success:
raise Exception("sss_tunnel_context_init failed. status: %s" % status_to_str(status))
sss_connect_ctx.connType = apis.kType_SE_Conn_Type_Channel
sss_connect_ctx.tunnelCtx = ctypes.pointer(tunnel_ctx)
# Open auth session
self.session_ctx = self._session_open_internal(sss_connect_ctx)
self.session_ctx.s_ctx.conn_ctx = self._tunnel_session_ctx.s_ctx.conn_ctx
else:
if self.subsystem == apis.kType_SSS_SE_SE05x:
self._connect_ctx.auth.authType = self._auth_type
if self._auth_type in [apis.kSE05x_AuthType_UserID,
apis.kSE05x_AuthType_AESKey,
apis.kSE05x_AuthType_ECKey,
apis.kSE05x_AuthType_SCP03]:
# open host session and prepare host
status = self.prepare_host_obj.setup_counter_part_session()
if status != apis.kStatus_SSS_Success:
raise Exception("Could not open host session. Status = %s" % (status,))
self.prepare_host_obj.prepare_host(self._connect_ctx)
# apply session policy
if self.session_policy is not None:
self._connect_ctx.session_policy = ctypes.pointer(self.session_policy)
# open session
self.session_ctx = self._session_open_internal(self._connect_ctx)
return apis.kStatus_SSS_Success
def _session_open_internal(self, connect_ctx): # pylint: disable=too-many-branches
"""
Opens session based on connect context parameter
:param connect_ctx: Connection parameters to open session
:return: session context
"""
session_ctx = apis.sss_session_t()
session_data = None
if self.subsystem in [apis.kType_SSS_SE_A71CH, apis.kType_SSS_SE_A71CL]:
session_data = ctypes.byref(connect_ctx)
elif self.subsystem == apis.kType_SSS_SE_SE05x:
session_ctx = apis.sss_se05x_session_t()
session_data = ctypes.byref(connect_ctx)
elif self.subsystem in [apis.kType_SSS_mbedTLS, apis.kType_SSS_OpenSSL]:
session_data = connect_ctx.portName
status = apis.sss_session_open(
ctypes.byref(session_ctx), self.subsystem, self.prepare_host_obj.auth_id,
self.prepare_host_obj.connection_type, session_data)
if status != apis.kStatus_SSS_Success:
subsystem_str = list(const.SUBSYSTEM_TYPE.keys())[
list(const.SUBSYSTEM_TYPE.values()).index(self.subsystem)]
connection_type_str = list(const.CONNECTION_TYPE.keys())[
list(const.CONNECTION_TYPE.values()).index(self._connection_method)]
log.warning("# Connection parameters:")
log.warning("# subsystem : %s" % subsystem_str)
log.warning("# connection_type : %s" % connection_type_str)
log.warning("# connection_data : %s" % self._port_name)
raise Exception("sss_session_open failed. status: %s" % status_to_str(status))
log.debug("sss_session_open %s", status_to_str(status))
return session_ctx
def session_close(self):
"""
Session close
:return: None
"""
if self.session_ctx:
log.debug("Closing port")
apis.sss_session_close(ctypes.byref(self.session_ctx))
self.session_ctx = None
if self.prepare_host_obj._host_keystore:
self.prepare_host_obj._host_keystore.free()
self.prepare_host_obj._host_keystore = None
if self.prepare_host_obj._host_session:
self.prepare_host_obj._host_session.session_close()
self.prepare_host_obj._host_session = None
def refresh_session(self):
"""
Refresh session
:return: Status
"""
status = apis.kStatus_SSS_Fail
if self.session_policy is not None:
status = apis.sss_se05x_refresh_session(ctypes.byref(self.session_ctx),
ctypes.byref(self.session_policy))
return status