blob: 5e325f270d5d9ddfe2a6d7814c2df8b2350d78af [file] [log] [blame]
/*
* FreeRTOS Common V1.1.1
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://aws.amazon.com/freertos
* http://www.FreeRTOS.org
*/
/**
* @file iot_taskpool.c
* @brief Implements the task pool functions in iot_taskpool.h
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
/* Platform layer includes. */
#include "platform/iot_threads.h"
#include "platform/iot_clock.h"
/* Task pool internal include. */
#include "private/iot_taskpool_internal.h"
/**
* @brief Enter a critical section by locking a mutex.
*
*/
#define TASKPOOL_ENTER_CRITICAL() IotMutex_Lock( &( pTaskPool->lock ) )
/**
* @brief Exit a critical section by unlocking a mutex.
*
*/
#define TASKPOOL_EXIT_CRITICAL() IotMutex_Unlock( &( pTaskPool->lock ) )
/**
* @brief Maximum semaphore value for wait operations.
*/
#define TASKPOOL_MAX_SEM_VALUE 0xFFFF
/**
* @brief Reschedule delay in milliseconds for deferred jobs.
*/
#define TASKPOOL_JOB_RESCHEDULE_DELAY_MS ( 10ULL )
/* ---------------------------------------------------------------------------------- */
/**
* Doxygen should ignore this section.
*
* @brief The system task pool handle for all libraries to use.
* User application can use the system task pool as well knowing that the usage will be shared with
* the system libraries as well. The system task pool needs to be initialized before any library is used or
* before any code that posts jobs to the task pool runs.
*/
_taskPool_t _IotSystemTaskPool = { .dispatchQueue = IOT_DEQUEUE_INITIALIZER };
/* -------------- Convenience functions to create/recycle/destroy jobs -------------- */
/**
* @brief Initializes one instance of a Task pool cache.
*
* @param[in] pCache The pre-allocated instance of the cache to initialize.
*/
static void _initJobsCache( _taskPoolCache_t * const pCache );
/**
* @brief Initialize a job.
*
* @param[in] pJob The job to initialize.
* @param[in] userCallback The user callback for the job.
* @param[in] pUserContext The context tp be passed to the callback.
* @param[in] isStatic A flag to indicate whether the job is statically or dynamically allocated.
*/
static void _initializeJob( _taskPoolJob_t * const pJob,
IotTaskPoolRoutine_t userCallback,
void * pUserContext,
bool isStatic );
/**
* @brief Extracts and initializes one instance of a job from the cache or, if there is none available, it allocates and initializes a new one.
*
* @param[in] pCache The instance of the cache to extract the job from.
*/
static _taskPoolJob_t * _fetchOrAllocateJob( _taskPoolCache_t * const pCache );
/**
* Recycles one instance of a job into the cache or, if the cache is full, it destroys it.
*
* @param[in] pCache The instance of the cache to recycle the job into.
* @param[in] pJob The job to recycle.
*
*/
static void _recycleJob( _taskPoolCache_t * const pCache,
_taskPoolJob_t * const pJob );
/**
* Destroys one instance of a job.
*
* @param[in] pJob The job to destroy.
*
*/
static void _destroyJob( _taskPoolJob_t * const pJob );
/* -------------- The worker thread procedure for a task pool thread -------------- */
/**
* The procedure for a task pool worker thread.
*
* @param[in] pUserContext The user context.
*
*/
static void _taskPoolWorker( void * pUserContext );
/* -------------- Convenience functions to handle timer events -------------- */
/**
* Comparer for the time list.
*
* param[in] pTimerEventLink1 The link to the first timer event.
* param[in] pTimerEventLink1 The link to the first timer event.
*/
static int32_t _timerEventCompare( const IotLink_t * const pTimerEventLink1,
const IotLink_t * const pTimerEventLink2 );
/**
* Reschedules the timer for handling deferred jobs to the next timeout.
*
* param[in] pTimer The timer to reschedule.
* param[in] pFirstTimerEvent The timer event that carries the timeout and job information.
*/
static void _rescheduleDeferredJobsTimer( IotTimer_t * const pTimer,
_taskPoolTimerEvent_t * const pFirstTimerEvent );
/**
* The task pool timer procedure for scheduling deferred jobs.
*
* param[in] pArgument An opaque pointer for timer procedure context.
*/
static void _timerThread( void * pArgument );
/* -------------- Convenience functions to create/initialize/destroy the task pool -------------- */
/**
* Parameter validation for a task pool initialization.
*
* @param[in] pInfo The initialization information for the task pool.
*
*/
IotTaskPoolError_t _performTaskPoolParameterValidation( const IotTaskPoolInfo_t * const pInfo );
/**
* Initializes a pre-allocated instance of a task pool.
*
* @param[in] pInfo The initialization information for the task pool.
* @param[in] pTaskPool The pre-allocated instance of the task pool to initialize.
*
*/
static IotTaskPoolError_t _initTaskPoolControlStructures( const IotTaskPoolInfo_t * const pInfo,
_taskPool_t * const pTaskPool );
/**
* Initializes a pre-allocated instance of a task pool.
*
* @param[in] pInfo The initialization information for the task pool.
* @param[out] pTaskPool A pointer to the task pool data structure to initialize.
*
*/
static IotTaskPoolError_t _createTaskPool( const IotTaskPoolInfo_t * const pInfo,
_taskPool_t * const pTaskPool );
/**
* Destroys one instance of a task pool.
*
* @param[in] pTaskPool The task pool to destroy.
*
*/
static void _destroyTaskPool( _taskPool_t * const pTaskPool );
/**
* Check for the exit condition.
*
* @param[in] pTaskPool The task pool to destroy.
*
*/
static bool _IsShutdownStarted( const _taskPool_t * const pTaskPool );
/**
* Set the exit condition.
*
* @param[in] pTaskPool The task pool to destroy.
* @param[in] threads The number of threads active in the task pool at shutdown time.
*
*/
static void _signalShutdown( _taskPool_t * const pTaskPool,
uint32_t threads );
/**
* Places a job in the dispatch queue.
*
* @param[in] pTaskPool The task pool to schedule the job with.
* @param[in] pJob The job to schedule.
* @param[in] flags The job flags.
*
*/
static IotTaskPoolError_t _scheduleInternal( _taskPool_t * const pTaskPool,
_taskPoolJob_t * const pJob,
uint32_t flags );
/**
* Matches a deferred job in the timer queue with its timer event wrapper.
*
* @param[in] pLink A pointer to the timer event link in the timer queue.
* @param[in] pMatch A pointer to the job to match.
*
*/
static bool _matchJobByPointer( const IotLink_t * const pLink,
void * pMatch );
/**
* Tries to cancel a job.
*
* @param[in] pTaskPool The task pool to cancel an operation against.
* @param[in] pJob The job to cancel.
* @param[out] pStatus The status of the job at the time of cancellation.
*
*/
static IotTaskPoolError_t _tryCancelInternal( _taskPool_t * const pTaskPool,
_taskPoolJob_t * const pJob,
IotTaskPoolJobStatus_t * const pStatus );
/**
* Try to safely cancel and/or remove a job from the cache when the user calls API out of order.
*
* @param[in] pTaskPool The task pool to safely extract a job from.
* @param[in] pJob The job to extract.
* @param[in] atCompletion A flag to indicate whether the job is being scheduled or
* was completed already.
*
*/
static IotTaskPoolError_t _trySafeExtraction( _taskPool_t * const pTaskPool,
_taskPoolJob_t * const pJob,
bool atCompletion );
/* ---------------------------------------------------------------------------------------------- */
IotTaskPool_t IotTaskPool_GetSystemTaskPool( void )
{
return &_IotSystemTaskPool;
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_CreateSystemTaskPool( const IotTaskPoolInfo_t * const pInfo )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
/* Parameter checking. */
TASKPOOL_ON_ERROR_GOTO_CLEANUP( _performTaskPoolParameterValidation( pInfo ) );
/* Create the system task pool pool. */
TASKPOOL_SET_AND_GOTO_CLEANUP( _createTaskPool( pInfo, &_IotSystemTaskPool ) );
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_Create( const IotTaskPoolInfo_t * const pInfo,
IotTaskPool_t * const pTaskPool )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
_taskPool_t * pTempTaskPool = NULL;
/* Verify that the task pool storage is valid. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );
/* Parameter checking. */
TASKPOOL_ON_ERROR_GOTO_CLEANUP( _performTaskPoolParameterValidation( pInfo ) );
/* Allocate the memory for the task pool */
pTempTaskPool = ( _taskPool_t * ) IotTaskPool_MallocTaskPool( sizeof( _taskPool_t ) );
if( pTempTaskPool == NULL )
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
memset( pTempTaskPool, 0x00, sizeof( _taskPool_t ) );
TASKPOOL_SET_AND_GOTO_CLEANUP( _createTaskPool( pInfo, pTempTaskPool ) );
TASKPOOL_FUNCTION_CLEANUP();
if( TASKPOOL_FAILED( status ) )
{
if( pTempTaskPool != NULL )
{
IotTaskPool_FreeTaskPool( pTempTaskPool );
}
}
else
{
*pTaskPool = pTempTaskPool;
}
TASKPOOL_FUNCTION_CLEANUP_END();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_Destroy( IotTaskPool_t taskPoolHandle )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
uint32_t count;
bool completeShutdown = true;
_taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
/* Track how many threads the task pool owns. */
uint32_t activeThreads;
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );
/* Destroying the task pool should be safe, and therefore we will grab the task pool lock.
* No worker thread or application thread should access any data structure
* in the task pool while the task pool is being destroyed. */
TASKPOOL_ENTER_CRITICAL();
{
IotLink_t * pItemLink;
/* Record how many active threads in the task pool. */
activeThreads = pTaskPool->activeThreads;
/* Destroying a Task pool happens in six (6) stages: First, (1) we clear the job queue and (2) the timer queue.
* Then (3) we clear the jobs cache. We will then (4) wait for all worker threads to signal exit,
* before (5) setting the exit condition and wake up all active worker threads. Finally (6) destroying
* all task pool data structures and release the associated memory.
*/
/* (1) Clear the job queue. */
do
{
pItemLink = NULL;
pItemLink = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );
if( pItemLink != NULL )
{
_taskPoolJob_t * pJob = IotLink_Container( _taskPoolJob_t, pItemLink, link );
_destroyJob( pJob );
}
} while( pItemLink );
/* (2) Clear the timer queue. */
{
_taskPoolTimerEvent_t * pTimerEvent;
/* A deferred job may have fired already. Since deferred jobs will go through the same mutex
* the shutdown sequence is holding at this stage, there is no risk for race conditions. Yet, we
* need to let the deferred job to destroy the task pool. */
pItemLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
if( pItemLink != NULL )
{
uint64_t now = IotClock_GetTimeMs();
pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pItemLink, link );
if( pTimerEvent->expirationTime <= now )
{
IotLogDebug( "Shutdown will be deferred to the timer thread" );
/* Timer may have fired already! Let the timer thread destroy
* complete the taskpool destruction sequence. */
completeShutdown = false;
}
/* Remove all timers from the timeout list. */
for( ; ; )
{
pItemLink = IotListDouble_RemoveHead( &pTaskPool->timerEventsList );
if( pItemLink == NULL )
{
break;
}
pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pItemLink, link );
_destroyJob( pTimerEvent->pJob );
IotTaskPool_FreeTimerEvent( pTimerEvent );
}
}
}
/* (3) Clear the job cache. */
do
{
pItemLink = NULL;
pItemLink = IotListDouble_RemoveHead( &pTaskPool->jobsCache.freeList );
if( pItemLink != NULL )
{
_taskPoolJob_t * pJob = IotLink_Container( _taskPoolJob_t, pItemLink, link );
_destroyJob( pJob );
}
} while( pItemLink );
/* (4) Set the exit condition. */
_signalShutdown( pTaskPool, activeThreads );
}
TASKPOOL_EXIT_CRITICAL();
/* (5) Wait for all active threads to reach the end of their life-span. */
for( count = 0; count < activeThreads; ++count )
{
IotSemaphore_Wait( &pTaskPool->startStopSignal );
}
IotTaskPool_Assert( IotSemaphore_GetCount( &pTaskPool->startStopSignal ) == 0 );
IotTaskPool_Assert( pTaskPool->activeThreads == 0 );
/* (6) Destroy all signaling objects. */
if( completeShutdown == true )
{
_destroyTaskPool( pTaskPool );
/* Do not free the system task pool which is statically allocated. */
if( pTaskPool != &_IotSystemTaskPool )
{
IotTaskPool_FreeTaskPool( pTaskPool );
}
}
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_SetMaxThreads( IotTaskPool_t taskPoolHandle,
uint32_t maxThreads )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
uint32_t count, i;
_taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );
TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pTaskPool->minThreads > maxThreads );
TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( maxThreads < 1UL );
TASKPOOL_ENTER_CRITICAL();
{
/* Bail out early if this task pool is shutting down. */
if( _IsShutdownStarted( pTaskPool ) )
{
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS );
}
uint32_t previousMaxThreads = pTaskPool->maxThreads;
/* Reset the max threads counter. */
pTaskPool->maxThreads = maxThreads;
count = previousMaxThreads - maxThreads;
/* If the number of maximum threads in the pool is set to be smaller than the current value,
* then we need to signal all redundant threads to exit.
*/
if( maxThreads < previousMaxThreads )
{
IotLogDebug( "Setting max threads caused %d thread to exit.", count );
i = count;
while( i > 0UL )
{
IotSemaphore_Post( &pTaskPool->dispatchSignal );
--i;
}
}
}
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_CreateJob( IotTaskPoolRoutine_t userCallback,
void * pUserContext,
IotTaskPoolJobStorage_t * const pJobStorage,
IotTaskPoolJob_t * const ppJob )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( userCallback );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJobStorage );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( ppJob );
/* Build a job around the user-provided storage. */
_initializeJob( ( _taskPoolJob_t * ) pJobStorage, userCallback, pUserContext, true );
*ppJob = ( IotTaskPoolJob_t ) pJobStorage;
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_CreateRecyclableJob( IotTaskPool_t taskPoolHandle,
IotTaskPoolRoutine_t userCallback,
void * pUserContext,
IotTaskPoolJob_t * const ppJob )
{
_taskPool_t * pTaskPool = NULL;
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( userCallback );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( ppJob );
pTaskPool = ( _taskPool_t * ) taskPoolHandle;
{
_taskPoolJob_t * pTempJob = NULL;
TASKPOOL_ENTER_CRITICAL();
{
/* Bail out early if this task pool is shutting down. */
if( _IsShutdownStarted( pTaskPool ) )
{
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS );
}
pTempJob = _fetchOrAllocateJob( &pTaskPool->jobsCache );
}
TASKPOOL_EXIT_CRITICAL();
if( pTempJob == NULL )
{
IotLogInfo( "Failed to allocate a job." );
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
_initializeJob( pTempJob, userCallback, pUserContext, false );
*ppJob = pTempJob;
}
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_DestroyRecyclableJob( IotTaskPool_t taskPoolHandle,
IotTaskPoolJob_t pJobHandle )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
_taskPool_t * pTaskPool = NULL;
_taskPoolJob_t * pJob1 = NULL;
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJobHandle );
pTaskPool = ( _taskPool_t * ) taskPoolHandle;
pJob1 = ( _taskPoolJob_t * ) pJobHandle;
TASKPOOL_ENTER_CRITICAL();
{
/* Bail out early if this task pool is shutting down. */
if( _IsShutdownStarted( pTaskPool ) )
{
status = IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS;
}
/* Do not destroy statically allocated jobs. */
else if( ( pJob1->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == IOT_TASK_POOL_INTERNAL_STATIC )
{
IotLogWarn( "Attempt to destroy a statically allocated job." );
status = IOT_TASKPOOL_ILLEGAL_OPERATION;
}
else
{
status = _trySafeExtraction( pTaskPool, pJob1, true );
}
}
TASKPOOL_EXIT_CRITICAL();
if( TASKPOOL_SUCCEEDED( status ) )
{
/* At this point, the job must not be in any queue or list. */
IotTaskPool_Assert( IotLink_IsLinked( &pJob1->link ) == false );
_destroyJob( pJob1 );
}
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_RecycleJob( IotTaskPool_t taskPoolHandle,
IotTaskPoolJob_t pJob )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
_taskPool_t * pTaskPool = NULL;
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );
pTaskPool = ( _taskPool_t * ) taskPoolHandle;
TASKPOOL_ENTER_CRITICAL();
{
/* Bail out early if this task pool is shutting down. */
if( _IsShutdownStarted( pTaskPool ) )
{
status = IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS;
}
/* Do not recycle statically allocated jobs. */
else if( ( pJob->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == 0UL )
{
status = _trySafeExtraction( pTaskPool, pJob, true );
}
else
{
IotLogWarn( "Attempt to recycle a statically allocated job." );
status = IOT_TASKPOOL_ILLEGAL_OPERATION;
}
/* If all safety checks completed, proceed. */
if( TASKPOOL_SUCCEEDED( status ) )
{
/* At this point, the job must not be in any queue or list. */
IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );
_recycleJob( &pTaskPool->jobsCache, pJob );
}
}
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_Schedule( IotTaskPool_t taskPoolHandle,
IotTaskPoolJob_t pJob,
uint32_t flags )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
_taskPool_t * pTaskPool = NULL;
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );
TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( ( flags != 0UL ) && ( flags != IOT_TASKPOOL_JOB_HIGH_PRIORITY ) );
pTaskPool = ( _taskPool_t * ) taskPoolHandle;
TASKPOOL_ENTER_CRITICAL();
{
/* Bail out early if this task pool is shutting down. */
if( _IsShutdownStarted( pTaskPool ) )
{
status = IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS;
}
else
{
status = _trySafeExtraction( pTaskPool, pJob, false );
}
/* If all safety checks completed, proceed. */
if( TASKPOOL_SUCCEEDED( status ) )
{
status = _scheduleInternal( pTaskPool, pJob, flags );
}
}
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_ScheduleDeferred( IotTaskPool_t taskPoolHandle,
IotTaskPoolJob_t pJob,
uint32_t timeMs )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
_taskPool_t * pTaskPool = NULL;
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );
pTaskPool = ( _taskPool_t * ) taskPoolHandle;
if( timeMs == 0UL )
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IotTaskPool_Schedule( pTaskPool, pJob, 0 ) );
}
TASKPOOL_ENTER_CRITICAL();
{
/* Bail out early if this task pool is shutting down. */
if( _IsShutdownStarted( pTaskPool ) )
{
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS );
}
/* If all safety checks completed, proceed. */
if( TASKPOOL_SUCCEEDED( _trySafeExtraction( pTaskPool, pJob, false ) ) )
{
IotLink_t * pTimerEventLink;
uint64_t now;
_taskPoolTimerEvent_t * pTimerEvent = ( _taskPoolTimerEvent_t * ) IotTaskPool_MallocTimerEvent( sizeof( _taskPoolTimerEvent_t ) );
if( pTimerEvent == NULL )
{
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
memset( pTimerEvent, 0x00, sizeof( _taskPoolTimerEvent_t ) );
now = IotClock_GetTimeMs();
pTimerEvent->link.pNext = NULL;
pTimerEvent->link.pPrevious = NULL;
pTimerEvent->expirationTime = now + timeMs;
pTimerEvent->pJob = ( _taskPoolJob_t * ) pJob;
/* Append the timer event to the timer list. */
IotListDouble_InsertSorted( &pTaskPool->timerEventsList, &pTimerEvent->link, _timerEventCompare );
/* Update the job status to 'scheduled'. */
pJob->status = IOT_TASKPOOL_STATUS_DEFERRED;
/* Peek the first event in the timer event list. There must be at least one,
* since we just inserted it. */
pTimerEventLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
IotTaskPool_Assert( pTimerEventLink != NULL );
/* If the event we inserted is at the front of the queue, then
* we need to reschedule the underlying timer. */
if( pTimerEventLink == &pTimerEvent->link )
{
pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink, link );
_rescheduleDeferredJobsTimer( &pTaskPool->timer, pTimerEvent );
}
}
else
{
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_ILLEGAL_OPERATION );
}
}
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_GetStatus( IotTaskPool_t taskPoolHandle,
IotTaskPoolJob_t pJob,
IotTaskPoolJobStatus_t * const pStatus )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
_taskPool_t * pTaskPool = NULL;
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pStatus );
pTaskPool = ( _taskPool_t * ) taskPoolHandle;
*pStatus = IOT_TASKPOOL_STATUS_UNDEFINED;
TASKPOOL_ENTER_CRITICAL();
{
/* Bail out early if this task pool is shutting down. */
if( _IsShutdownStarted( pTaskPool ) )
{
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS );
}
*pStatus = pJob->status;
}
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
IotTaskPoolError_t IotTaskPool_TryCancel( IotTaskPool_t taskPoolHandle,
IotTaskPoolJob_t pJob,
IotTaskPoolJobStatus_t * const pStatus )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
_taskPool_t * pTaskPool = NULL;
/* Parameter checking. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );
pTaskPool = ( _taskPool_t * ) taskPoolHandle;
if( pStatus != NULL )
{
*pStatus = IOT_TASKPOOL_STATUS_UNDEFINED;
}
TASKPOOL_ENTER_CRITICAL();
{
/* Check again if this task pool is shutting down. */
if( _IsShutdownStarted( pTaskPool ) )
{
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS );
}
status = _tryCancelInternal( pTaskPool, pJob, pStatus );
}
TASKPOOL_EXIT_CRITICAL();
TASKPOOL_NO_FUNCTION_CLEANUP();
}
IotTaskPoolJobStorage_t * IotTaskPool_GetJobStorageFromHandle( IotTaskPoolJob_t pJob )
{
return ( IotTaskPoolJobStorage_t * ) pJob;
}
const char * IotTaskPool_strerror( IotTaskPoolError_t status )
{
const char * pMessage = NULL;
switch( status )
{
case IOT_TASKPOOL_SUCCESS:
pMessage = "SUCCESS";
break;
case IOT_TASKPOOL_BAD_PARAMETER:
pMessage = "BAD PARAMETER";
break;
case IOT_TASKPOOL_ILLEGAL_OPERATION:
pMessage = "ILLEGAL OPERATION";
break;
case IOT_TASKPOOL_NO_MEMORY:
pMessage = "NO MEMORY";
break;
case IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS:
pMessage = "SHUTDOWN IN PROGRESS";
break;
case IOT_TASKPOOL_CANCEL_FAILED:
pMessage = "CANCEL FAILED";
break;
default:
pMessage = "INVALID STATUS";
break;
}
return pMessage;
}
/* ---------------------------------------------------------------------------------------------- */
/* ---------------------------------------------------------------------------------------------- */
/* ---------------------------------------------------------------------------------------------- */
IotTaskPoolError_t _performTaskPoolParameterValidation( const IotTaskPoolInfo_t * const pInfo )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
/* Check input values for consistency. */
TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pInfo );
TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->minThreads > pInfo->maxThreads );
TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->minThreads < 1UL );
TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->maxThreads < 1UL );
TASKPOOL_NO_FUNCTION_CLEANUP();
}
static IotTaskPoolError_t _initTaskPoolControlStructures( const IotTaskPoolInfo_t * const pInfo,
_taskPool_t * const pTaskPool )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
bool semStartStopInit = false;
bool lockInit = false;
bool semDispatchInit = false;
bool timerInit = false;
/* Zero out all data structures. */
memset( ( void * ) pTaskPool, 0x00, sizeof( _taskPool_t ) );
/* Initialize a job data structures that require no de-initialization.
* All other data structures carry a value of 'NULL' before initialization.
*/
IotDeQueue_Create( &pTaskPool->dispatchQueue );
IotListDouble_Create( &pTaskPool->timerEventsList );
pTaskPool->minThreads = pInfo->minThreads;
pTaskPool->maxThreads = pInfo->maxThreads;
pTaskPool->stackSize = pInfo->stackSize;
pTaskPool->priority = pInfo->priority;
_initJobsCache( &pTaskPool->jobsCache );
/* Initialize the semaphore to ensure all threads have started. */
if( IotSemaphore_Create( &pTaskPool->startStopSignal, 0, TASKPOOL_MAX_SEM_VALUE ) == true )
{
semStartStopInit = true;
if( IotMutex_Create( &pTaskPool->lock, true ) == true )
{
lockInit = true;
/* Initialize the semaphore for waiting for incoming work. */
if( IotSemaphore_Create( &pTaskPool->dispatchSignal, 0, TASKPOOL_MAX_SEM_VALUE ) == true )
{
semDispatchInit = true;
/* Create the timer mutex for a new connection. */
if( IotClock_TimerCreate( &( pTaskPool->timer ), _timerThread, pTaskPool ) == true )
{
timerInit = true;
}
else
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
}
else
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
}
else
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
}
else
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
TASKPOOL_FUNCTION_CLEANUP();
if( TASKPOOL_FAILED( status ) )
{
if( semStartStopInit == true )
{
IotSemaphore_Destroy( &pTaskPool->startStopSignal );
}
if( lockInit == true )
{
IotMutex_Destroy( &pTaskPool->lock );
}
if( semDispatchInit == true )
{
IotSemaphore_Destroy( &pTaskPool->dispatchSignal );
}
if( timerInit == true )
{
IotClock_TimerDestroy( &pTaskPool->timer );
}
}
TASKPOOL_FUNCTION_CLEANUP_END();
}
static IotTaskPoolError_t _createTaskPool( const IotTaskPoolInfo_t * const pInfo,
_taskPool_t * const pTaskPool )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
uint32_t count;
uint32_t threadsCreated = 0;
bool controlInit = false;
/* Initialize all internal data structure prior to creating all threads. */
TASKPOOL_ON_ERROR_GOTO_CLEANUP( _initTaskPoolControlStructures( pInfo, pTaskPool ) );
controlInit = true;
IotTaskPool_Assert( pInfo->minThreads == pTaskPool->minThreads );
IotTaskPool_Assert( pInfo->maxThreads == pTaskPool->maxThreads );
/* The task pool will initialize the minimum number of threads requested by the user upon start. */
/* When a thread is created, it will signal a semaphore to signify that it is about to wait on incoming */
/* jobs. A thread can be woken up for exit or for new jobs only at that point in time. */
/* The exit condition is setting the maximum number of threads to 0. */
/* Create the minimum number of threads specified by the user, and if one fails shutdown and return error. */
for( ; threadsCreated < pTaskPool->minThreads; )
{
/* Create one thread. */
if( Iot_CreateDetachedThread( _taskPoolWorker,
pTaskPool,
pTaskPool->priority,
pTaskPool->stackSize ) == false )
{
IotLogError( "Could not create worker thread! Exiting..." );
/* If creating one thread fails, set error condition and exit the loop. */
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
/* Upon successful thread creation, increase the number of active threads. */
pTaskPool->activeThreads++;
++threadsCreated;
}
TASKPOOL_FUNCTION_CLEANUP();
/* Wait for threads to be ready to wait on the condition, so that threads are actually able to receive messages. */
for( count = 0; count < threadsCreated; ++count )
{
IotSemaphore_Wait( &pTaskPool->startStopSignal );
}
/* In case of failure, wait on the created threads to exit. */
if( TASKPOOL_FAILED( status ) )
{
/* Set the exit condition for the newly created threads. */
_signalShutdown( pTaskPool, threadsCreated );
/* Signal all threads to exit. */
for( count = 0; count < threadsCreated; ++count )
{
IotSemaphore_Wait( &pTaskPool->startStopSignal );
}
if( controlInit == true )
{
_destroyTaskPool( pTaskPool );
}
}
TASKPOOL_FUNCTION_CLEANUP_END();
}
/*-----------------------------------------------------------*/
static void _destroyTaskPool( _taskPool_t * const pTaskPool )
{
IotClock_TimerDestroy( &pTaskPool->timer );
IotSemaphore_Destroy( &pTaskPool->dispatchSignal );
IotSemaphore_Destroy( &pTaskPool->startStopSignal );
IotMutex_Destroy( &pTaskPool->lock );
}
/* ---------------------------------------------------------------------------------------------- */
static void _taskPoolWorker( void * pUserContext )
{
IotTaskPool_Assert( pUserContext != NULL );
IotTaskPoolRoutine_t userCallback = NULL;
bool running = true;
/* Extract pTaskPool pointer from context. */
_taskPool_t * pTaskPool = ( _taskPool_t * ) pUserContext;
/* Signal that this worker completed initialization and it is ready to receive notifications. */
IotSemaphore_Post( &pTaskPool->startStopSignal );
/* OUTER LOOP: it controls the lifetime of the worker thread: exit condition for a worker thread
* is setting maxThreads to zero. A worker thread is running until the maximum number of allowed
* threads is not zero and the active threads are less than the maximum number of allowed threads.
*/
do
{
bool jobAvailable;
IotLink_t * pFirst;
_taskPoolJob_t * pJob = NULL;
/* Wait on incoming notifications. If waiting on the semaphore return with timeout, then
* it means that this thread should consider shutting down for the task pool to fold back
* to its minimum number of threads. */
jobAvailable = IotSemaphore_TimedWait( &pTaskPool->dispatchSignal, IOT_TASKPOOL_JOB_WAIT_TIMEOUT_MS );
/* Acquire the lock to check the exit condition, and release the lock if the exit condition is verified,
* or before waiting for incoming notifications.
*/
TASKPOOL_ENTER_CRITICAL();
{
/* If the exit condition is verified, update the number of active threads and exit the loop. */
if( _IsShutdownStarted( pTaskPool ) )
{
IotLogDebug( "Worker thread exiting because shutdown condition was set." );
/* Decrease the number of active threads. */
pTaskPool->activeThreads--;
TASKPOOL_EXIT_CRITICAL();
/* Signal that this worker is exiting. */
IotSemaphore_Post( &pTaskPool->startStopSignal );
/* On shutdown, abandon the OUTER LOOP immediately. */
break;
}
/* Check if this thread needs to exit because 'max threads' quota was exceeded.
* In that case, let it run once, so we can support the case for scheduling 'high priority'
* jobs that causes exceeding the max threads quota for the purpose of executing
* the high-priority task. */
if( pTaskPool->activeThreads > pTaskPool->maxThreads )
{
IotLogDebug( "Worker thread will exit because maximum quota was exceeded." );
/* Decrease the number of active threads pro-actively. */
pTaskPool->activeThreads--;
/* Mark this thread as dead. */
running = false;
}
/* Check if this thread needs to exit because the worker woke up after a timeout. */
else if( jobAvailable == false )
{
/* If there was a timeout, shrink back the task pool to the minimum number of threads. */
if( pTaskPool->activeThreads > pTaskPool->minThreads )
{
/* After waking up from a timeout, the thread will try and pick up a new job.
* But if there is no job available, the thread will exit to ensure that
* the taskpool does not have more than minimum number of active threads. */
IotLogDebug( "Worker will exit because task pool is shrinking." );
/* Decrease the number of active threads pro-actively. */
pTaskPool->activeThreads--;
/* Mark this thread as dead. */
running = false;
}
}
/* Only look for a job if waiting did not timed out. */
if( jobAvailable == true )
{
/* Dequeue the first job in FIFO order. */
pFirst = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );
/* If there is indeed a job, then update status under lock, and release the lock before processing the job. */
if( pFirst != NULL )
{
/* Extract the job from its link. */
pJob = IotLink_Container( _taskPoolJob_t, pFirst, link );
/* Update status to 'executing'. */
pJob->status = IOT_TASKPOOL_STATUS_COMPLETED;
userCallback = pJob->userCallback;
}
}
}
TASKPOOL_EXIT_CRITICAL();
/* INNER LOOP: it controls the execution of jobs: the exit condition is the lack of a job to execute. */
while( pJob != NULL )
{
/* Process the job by invoking the associated callback with the user context.
* This task pool thread will not be available until the user callback returns.
*/
{
IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );
IotTaskPool_Assert( userCallback != NULL );
userCallback( pTaskPool, pJob, pJob->pUserContext );
/* This job is finished, clear its pointer. */
pJob = NULL;
userCallback = NULL;
/* If this thread exceeded the quota, then let it terminate. */
if( running == false )
{
/* Abandon the INNER LOOP. Execution will tranfer back to the OUTER LOOP condition. */
break;
}
}
/* Acquire the lock before updating the job status. */
TASKPOOL_ENTER_CRITICAL();
{
/* Update the number of busy threads, so new requests can be served by creating new threads, up to maxThreads. */
pTaskPool->activeJobs--;
/* Try and dequeue the next job in the dispatch queue. */
IotLink_t * pItem = NULL;
/* Dequeue the next job from the dispatch queue. */
pItem = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );
/* If there is no job left in the dispatch queue, update the worker status and leave. */
if( pItem == NULL )
{
TASKPOOL_EXIT_CRITICAL();
/* Abandon the INNER LOOP. Execution will tranfer back to the OUTER LOOP condition. */
break;
}
else
{
pJob = IotLink_Container( _taskPoolJob_t, pItem, link );
userCallback = pJob->userCallback;
}
pJob->status = IOT_TASKPOOL_STATUS_COMPLETED;
}
TASKPOOL_EXIT_CRITICAL();
}
} while( running == true );
}
/* ---------------------------------------------------------------------------------------------- */
static void _initJobsCache( _taskPoolCache_t * const pCache )
{
IotDeQueue_Create( &pCache->freeList );
pCache->freeCount = 0;
}
/*-----------------------------------------------------------*/
static void _initializeJob( _taskPoolJob_t * const pJob,
IotTaskPoolRoutine_t userCallback,
void * pUserContext,
bool isStatic )
{
pJob->link.pNext = NULL;
pJob->link.pPrevious = NULL;
pJob->userCallback = userCallback;
pJob->pUserContext = pUserContext;
if( isStatic )
{
pJob->flags = IOT_TASK_POOL_INTERNAL_STATIC;
pJob->status = IOT_TASKPOOL_STATUS_READY;
}
else
{
pJob->status = IOT_TASKPOOL_STATUS_READY;
}
}
static _taskPoolJob_t * _fetchOrAllocateJob( _taskPoolCache_t * const pCache )
{
_taskPoolJob_t * pJob = NULL;
IotLink_t * pLink = IotListDouble_RemoveHead( &( pCache->freeList ) );
if( pLink != NULL )
{
pJob = IotLink_Container( _taskPoolJob_t, pLink, link );
}
/* If there is no available job in the cache, then allocate one. */
if( pJob == NULL )
{
pJob = ( _taskPoolJob_t * ) IotTaskPool_MallocJob( sizeof( _taskPoolJob_t ) );
if( pJob != NULL )
{
memset( pJob, 0x00, sizeof( _taskPoolJob_t ) );
}
else
{
/* Log allocation failure for troubleshooting purposes. */
IotLogInfo( "Failed to allocate job." );
}
}
/* If there was a job in the cache, then make sure we keep the counters up-to-date. */
else
{
IotTaskPool_Assert( pCache->freeCount > 0 );
pCache->freeCount--;
}
return pJob;
}
/*-----------------------------------------------------------*/
static void _recycleJob( _taskPoolCache_t * const pCache,
_taskPoolJob_t * const pJob )
{
/* We should never try and recycling a job that is linked into some queue. */
IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );
/* We will recycle the job if there is space in the cache. */
if( pCache->freeCount < IOT_TASKPOOL_JOBS_RECYCLE_LIMIT )
{
/* Destroy user data, for added safety & security. */
pJob->userCallback = NULL;
pJob->pUserContext = NULL;
/* Reset the status for added debugability. */
pJob->status = IOT_TASKPOOL_STATUS_UNDEFINED;
IotListDouble_InsertTail( &pCache->freeList, &pJob->link );
pCache->freeCount++;
IotTaskPool_Assert( pCache->freeCount >= 1 );
}
else
{
_destroyJob( pJob );
}
}
/*-----------------------------------------------------------*/
static void _destroyJob( _taskPoolJob_t * const pJob )
{
/* Destroy user data, for added safety & security. */
pJob->userCallback = NULL;
pJob->pUserContext = NULL;
/* Reset the status for added debugability. */
pJob->status = IOT_TASKPOOL_STATUS_UNDEFINED;
/* Only dispose of dynamically allocated jobs. */
if( ( pJob->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == 0UL )
{
IotTaskPool_FreeJob( pJob );
}
}
/* ---------------------------------------------------------------------------------------------- */
static bool _IsShutdownStarted( const _taskPool_t * const pTaskPool )
{
return( pTaskPool->maxThreads == 0UL );
}
/*-----------------------------------------------------------*/
static void _signalShutdown( _taskPool_t * const pTaskPool,
uint32_t threads )
{
uint32_t count;
/* Set the exit condition. */
pTaskPool->maxThreads = 0;
/* Broadcast to all active threads to wake-up. Active threads do check the exit condition right after waking up. */
for( count = 0; count < threads; ++count )
{
IotSemaphore_Post( &pTaskPool->dispatchSignal );
}
}
/* ---------------------------------------------------------------------------------------------- */
static IotTaskPoolError_t _scheduleInternal( _taskPool_t * const pTaskPool,
_taskPoolJob_t * const pJob,
uint32_t flags )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
bool mustGrow = false;
bool shouldGrow = false;
/* Update the job status to 'scheduled'. */
pJob->status = IOT_TASKPOOL_STATUS_SCHEDULED;
/* Update the number of active jobs optimistically, so new requests can be served by creating new threads. */
pTaskPool->activeJobs++;
/* If all threads are busy, try and create a new one. Failing to create a new thread
* only has performance implications on correctly executing the scheduled job.
*/
uint32_t activeThreads = pTaskPool->activeThreads;
if( activeThreads <= pTaskPool->activeJobs )
{
/* If the job scheduling is tagged as high priority, then we must grow the task pool,
* no matter how many threads are active already. */
if( ( flags & IOT_TASKPOOL_JOB_HIGH_PRIORITY ) == IOT_TASKPOOL_JOB_HIGH_PRIORITY )
{
mustGrow = true;
}
/* Grow the task pool up to the maximum number of threads indicated by the user.
* Growing the taskpool can safely fail, the existing threads will eventually pick up
* the job sometimes later. */
else if( activeThreads < pTaskPool->maxThreads )
{
shouldGrow = true;
}
else
{
/* Nothing to do. */
}
if( ( mustGrow == true ) || ( shouldGrow == true ) )
{
IotLogInfo( "Growing a Task pool with a new worker thread..." );
if( Iot_CreateDetachedThread( _taskPoolWorker,
pTaskPool,
pTaskPool->priority,
pTaskPool->stackSize ) )
{
IotSemaphore_Wait( &pTaskPool->startStopSignal );
pTaskPool->activeThreads++;
}
else
{
/* Failure to create a worker thread may not hinder functional correctness, but rather just responsiveness. */
IotLogWarn( "Task pool failed to create a worker thread." );
/* Failure to create a worker thread for a high priority job is considered a failure. */
if( mustGrow )
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
}
}
}
}
TASKPOOL_FUNCTION_CLEANUP();
if( TASKPOOL_SUCCEEDED( status ) )
{
/* Append the job to the dispatch queue.
* Put the job at the front, if it is a high priority job. */
if( mustGrow == true )
{
IotLogDebug( "High priority job: placing job at the head of the queue." );
IotDeQueue_EnqueueHead( &pTaskPool->dispatchQueue, &pJob->link );
}
else
{
IotDeQueue_EnqueueTail( &pTaskPool->dispatchQueue, &pJob->link );
}
/* Signal a worker to pick up the job. */
IotSemaphore_Post( &pTaskPool->dispatchSignal );
}
else
{
/* Scheduling can only fail to allocate a new worker, which is an error
* only for high priority tasks. */
IotTaskPool_Assert( mustGrow == true );
/* Revert updating the number of active jobs. */
pTaskPool->activeJobs--;
}
TASKPOOL_FUNCTION_CLEANUP_END();
}
/*-----------------------------------------------------------*/
static bool _matchJobByPointer( const IotLink_t * const pLink,
void * pMatch )
{
const _taskPoolJob_t * const pJob = ( _taskPoolJob_t * ) pMatch;
const _taskPoolTimerEvent_t * const pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pLink, link );
if( pJob == pTimerEvent->pJob )
{
return true;
}
return false;
}
/*-----------------------------------------------------------*/
static IotTaskPoolError_t _tryCancelInternal( _taskPool_t * const pTaskPool,
_taskPoolJob_t * const pJob,
IotTaskPoolJobStatus_t * const pStatus )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
bool cancelable = false;
/* We can only cancel jobs that are either 'ready' (waiting to be scheduled). 'deferred', or 'scheduled'. */
IotTaskPoolJobStatus_t currentStatus = pJob->status;
switch( currentStatus )
{
case IOT_TASKPOOL_STATUS_READY:
case IOT_TASKPOOL_STATUS_DEFERRED:
case IOT_TASKPOOL_STATUS_SCHEDULED:
case IOT_TASKPOOL_STATUS_CANCELED:
cancelable = true;
break;
case IOT_TASKPOOL_STATUS_COMPLETED:
/* Log message for debugging purposes. */
IotLogWarn( "Attempt to cancel a job that is already executing, or canceled." );
break;
default:
/* Log message for debugging purposes. */
IotLogError( "Attempt to cancel a job with an undefined state." );
break;
}
/* Update the returned status to the current status of the job. */
if( pStatus != NULL )
{
*pStatus = currentStatus;
}
if( cancelable == false )
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_CANCEL_FAILED );
}
else
{
/* Update the status of the job. */
pJob->status = IOT_TASKPOOL_STATUS_CANCELED;
/* If the job is cancelable and its current status is 'scheduled' then unlink it from the dispatch
* queue and signal any waiting threads. */
if( currentStatus == IOT_TASKPOOL_STATUS_SCHEDULED )
{
/* A scheduled work items must be in the dispatch queue. */
IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) );
IotDeQueue_Remove( &pJob->link );
}
/* If the job current status is 'deferred' then the job has to be pending
* in the timeouts queue. */
else if( currentStatus == IOT_TASKPOOL_STATUS_DEFERRED )
{
/* Find the timer event associated with the current job. There MUST be one, hence assert if not. */
IotLink_t * pTimerEventLink = IotListDouble_FindFirstMatch( &pTaskPool->timerEventsList, NULL, _matchJobByPointer, pJob );
IotTaskPool_Assert( pTimerEventLink != NULL );
if( pTimerEventLink != NULL )
{
bool shouldReschedule = false;
/* If the job being cancelled was at the head of the timeouts queue, then we need to reschedule the timer
* with the next job timeout */
IotLink_t * pHeadLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
if( pHeadLink == pTimerEventLink )
{
shouldReschedule = true;
}
/* Remove the timer event associated with the canceled job and free the associated memory. */
IotListDouble_Remove( pTimerEventLink );
IotTaskPool_FreeTimerEvent( IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink, link ) );
if( shouldReschedule )
{
IotLink_t * pNextTimerEventLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
if( pNextTimerEventLink != NULL )
{
_rescheduleDeferredJobsTimer( &pTaskPool->timer, IotLink_Container( _taskPoolTimerEvent_t, pNextTimerEventLink, link ) );
}
}
}
}
else
{
/* A cancelable job status should be either 'scheduled' or 'deferrred'. */
IotTaskPool_Assert( ( currentStatus == IOT_TASKPOOL_STATUS_READY ) || ( currentStatus == IOT_TASKPOOL_STATUS_CANCELED ) );
}
}
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
static IotTaskPoolError_t _trySafeExtraction( _taskPool_t * const pTaskPool,
_taskPoolJob_t * const pJob,
bool atCompletion )
{
TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
IotTaskPoolJobStatus_t currentStatus = pJob->status;
/* if the job is executing, we cannot touch it. */
if( ( atCompletion == false ) && ( currentStatus == IOT_TASKPOOL_STATUS_COMPLETED ) )
{
TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_ILLEGAL_OPERATION );
}
/* Do not destroy a job in the dispatch queue or the timer queue without cancelling first. */
else if( ( currentStatus == IOT_TASKPOOL_STATUS_SCHEDULED ) || ( currentStatus == IOT_TASKPOOL_STATUS_DEFERRED ) )
{
IotTaskPoolJobStatus_t statusAtCancellation;
/* Cancellation can fail, e.g. if a job is being executed when we are trying to cancel it. */
status = _tryCancelInternal( pTaskPool, pJob, &statusAtCancellation );
switch( status )
{
case IOT_TASKPOOL_SUCCESS:
/* Nothing to do. */
break;
case IOT_TASKPOOL_CANCEL_FAILED:
IotLogWarn( "Removing a scheduled job failed because the job could not be canceled, error %s.",
IotTaskPool_strerror( status ) );
status = IOT_TASKPOOL_ILLEGAL_OPERATION;
break;
default:
/* Nothing to do. */
break;
}
}
else if( IotLink_IsLinked( &pJob->link ) )
{
/* If the job is not in the dispatch or timer queue, it must be in the cache. */
IotTaskPool_Assert( ( pJob->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == 0 );
IotListDouble_Remove( &pJob->link );
}
else
{
/* Nothing to do */
}
TASKPOOL_NO_FUNCTION_CLEANUP();
}
/*-----------------------------------------------------------*/
static int32_t _timerEventCompare( const IotLink_t * const pTimerEventLink1,
const IotLink_t * const pTimerEventLink2 )
{
const _taskPoolTimerEvent_t * const pTimerEvent1 = IotLink_Container( _taskPoolTimerEvent_t,
pTimerEventLink1,
link );
const _taskPoolTimerEvent_t * const pTimerEvent2 = IotLink_Container( _taskPoolTimerEvent_t,
pTimerEventLink2,
link );
if( pTimerEvent1->expirationTime < pTimerEvent2->expirationTime )
{
return -1;
}
if( pTimerEvent1->expirationTime > pTimerEvent2->expirationTime )
{
return 1;
}
return 0;
}
/*-----------------------------------------------------------*/
static void _rescheduleDeferredJobsTimer( IotTimer_t * const pTimer,
_taskPoolTimerEvent_t * const pFirstTimerEvent )
{
uint64_t delta = 0;
uint64_t now = IotClock_GetTimeMs();
if( pFirstTimerEvent->expirationTime > now )
{
delta = pFirstTimerEvent->expirationTime - now;
}
if( delta < TASKPOOL_JOB_RESCHEDULE_DELAY_MS )
{
delta = TASKPOOL_JOB_RESCHEDULE_DELAY_MS; /* The job will be late... */
}
IotTaskPool_Assert( delta > 0 );
if( IotClock_TimerArm( pTimer, ( uint32_t ) delta, 0 ) == false )
{
IotLogWarn( "Failed to re-arm timer for task pool" );
}
}
/*-----------------------------------------------------------*/
static void _timerThread( void * pArgument )
{
_taskPool_t * pTaskPool = ( _taskPool_t * ) pArgument;
_taskPoolTimerEvent_t * pTimerEvent = NULL;
IotLogDebug( "Timer thread started for task pool %p.", pTaskPool );
/* Attempt to lock the timer mutex. Return immediately if the mutex cannot be locked.
* If this mutex cannot be locked it means that another thread is manipulating the
* timeouts list, and will reset the timer to fire again, although it will be late.
*/
TASKPOOL_ENTER_CRITICAL();
{
/* Check again for shutdown and bail out early in case. */
if( _IsShutdownStarted( pTaskPool ) )
{
TASKPOOL_EXIT_CRITICAL();
/* Complete the shutdown sequence. */
_destroyTaskPool( pTaskPool );
IotTaskPool_FreeTaskPool( pTaskPool );
return;
}
/* Dispatch all deferred job whose timer expired, then reset the timer for the next
* job down the line. */
for( ; ; )
{
/* Peek the first event in the timer event list. */
IotLink_t * pLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
/* Check if the timer misfired for any reason. */
if( pLink != NULL )
{
/* Record the current time. */
uint64_t now = IotClock_GetTimeMs();
/* Extract the job from its envelope. */
pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pLink, link );
/* Check if the first event should be processed now. */
if( pTimerEvent->expirationTime <= now )
{
/* Remove the timer event for immediate processing. */
IotListDouble_Remove( &( pTimerEvent->link ) );
}
else
{
/* The first element in the timer queue shouldn't be processed yet.
* Arm the timer for when it should be processed and leave altogether. */
_rescheduleDeferredJobsTimer( &pTaskPool->timer, pTimerEvent );
break;
}
}
/* If there are no timer events to process, terminate this thread. */
else
{
IotLogDebug( "No further timer events to process. Exiting timer thread." );
break;
}
IotLogDebug( "Scheduling job from timer event." );
/* Queue the job associated with the received timer event. */
( void ) _scheduleInternal( pTaskPool, pTimerEvent->pJob, 0 );
/* Free the timer event. */
IotTaskPool_FreeTimerEvent( pTimerEvent );
}
}
TASKPOOL_EXIT_CRITICAL();
}