#include #include /** * \defgroup thread Multithreading * * Multithreading API * \{ */ /** * Creates new task object with specified function \a func, \a data and \a * flags. * * If the \a flags contain BH_THREAD_CLEANUP, upon successful completion task * will be automaticly destroyed. * * \param func Task's function * \param data Task's data * \param flags Task's flags * * \return On success, returns new task object. * \return On failure, returns null pointer. */ bh_task_t *bh_task_new(void (*func)(void *), void *data, int flags) { bh_task_t *result; result = malloc(sizeof(bh_task_t)); if (result) bh_task_init(result, func, data, flags); return result; } /** * Frees the \a task object. * * \param task Pointer to the task object. */ void bh_task_free(bh_task_t *task) { bh_task_destroy(task); free(task); } /** * Initializes the \a task object with specified function \a func, \a data and * \a flags. * * If the \a flags contain BH_THREAD_CLEANUP, upon successful completion task * will be automaticly destroyed. * * \warning This is an internal function. * * \param task Pointer to the task object * \param func Task's function * \param data Task's data * \param flags Task's flags */ void bh_task_init(bh_task_t *task, void (*func)(void *), void *data, int flags) { task->func = func; task->data = data; task->flags = flags; } /** * Destroyes the \a task object. * * \warning This is an internal function. * * \param task Pointer to the task object */ void bh_task_destroy(bh_task_t *task) { (void)task; } /** * Reuses the \a task object for different purpose. * * \param task Pointer to the task object * \param func New task's function * \param data New task's data */ void bh_task_reuse(bh_task_t *task, void (*func)(void *), void *data) { /* If the task is done - reinitilize it with new data */ if (task->flags & BH_THREAD_DONE) bh_task_init(task, func, data, task->flags & ~(BH_THREAD_DONE)); } /** * Checks if the \a task is done. * * \param task Pointer to the task object * * \return If the task is done, returns non-zero. * \return If the task is not done, returns zero. */ int bh_task_done(bh_task_t *task) { return (task->flags & BH_THREAD_DONE) != 0; } /** * Creates the mutex object. * * \return On success, returns new mutex object. * \return On failure, returns null pointer. */ bh_mutex_t *bh_mutex_new(void) { bh_mutex_t *result; result = malloc(sizeof(*result)); if (result && bh_mutex_init(result)) { free(result); result = NULL; } return result; } /** * Frees the \a mutex object. * * \param mutex Pointer to the mutex object */ void bh_mutex_free(bh_mutex_t *mutex) { bh_mutex_destroy(mutex); free(mutex); } /** * Creates the semaphore object with initial \a count value. * * \param count Initial semaphore value * * \return On success, returns new semaphore object. * \return On failure, returns null pointer. * * \note Guranteed maximum \a count value is 32767. */ bh_semaphore_t *bh_semaphore_new(int count) { bh_semaphore_t *result; result = malloc(sizeof(*result)); if (result && bh_semaphore_init(result, count)) { free(result); result = NULL; } return result; } /** * Frees the \a semaphore object. * * \param semaphore Pointer to the semaphore object */ void bh_semaphore_free(bh_semaphore_t *semaphore) { bh_semaphore_destroy(semaphore); free(semaphore); } /** * Creates the condition variable object. * * \return On success, returns new condition variable object. * \return On failure, returns null pointer. */ bh_cond_t *bh_cond_new(void) { bh_cond_t *result; result = malloc(sizeof(*result)); if (result && bh_cond_init(result)) { free(result); result = NULL; } return result; } /** * Frees the condition variable object \a cond. * * \param cond Pointer to the condition variable object */ void bh_cond_free(bh_cond_t *cond) { bh_cond_destroy(cond); free(cond); } /** * Adds \a task to the thread \a pool. * * \param pool Pointer to the thread pool object * \param task Pointer to the task object * * \return On success, returns zero. * \return On failure, returns error code. */ int bh_thread_pool_add(bh_thread_pool_t *pool, bh_task_t *task) { /* Queue task for execution */ bh_mutex_lock(&pool->lock); if (bh_queue_insert(&pool->tasks, task)) { bh_mutex_unlock(&pool->lock); return BH_ERROR; } /* Signal new job */ bh_mutex_unlock(&pool->lock); bh_cond_signal(&pool->new_task); return BH_OK; } /** * Waits unitl all task in thread \a pool are complete. * * \param pool Pointer to the thread pool. * * \return On success, returns zero. * \return On failure, returns error code. */ int bh_thread_pool_wait(bh_thread_pool_t *pool) { /* Lock and check if there is jobs in the queue */ bh_mutex_lock(&pool->lock); while (!bh_queue_empty(&pool->tasks)) { bh_task_t *task; /* Fetch task from the queue */ task = bh_queue_front(&pool->tasks); bh_queue_remove(&pool->tasks); /* Unlock and do the task */ bh_mutex_unlock(&pool->lock); task->func(task->data); task->flags |= BH_THREAD_DONE; if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); /* Lock and check for the next task */ bh_mutex_lock(&pool->lock); } /* Check until there is no active threads */ while (pool->active) bh_cond_wait(&pool->done_task, &pool->lock); /* Unlock */ bh_mutex_unlock(&pool->lock); return BH_OK; } /** * Destroyes the thread \a pool object. * * \warning This is an internal function. * * \param pool Pointer to the thread pool object */ void bh_thread_pool_destroy(bh_thread_pool_t *pool) { size_t i; /* Broadcast shutdown */ pool->shutdown = 1; bh_cond_broadcast(&pool->new_task); /* Join and delete every thread */ for (i = 0; i < pool->size; i++) bh_thread_join(pool->threads + i); /* Destroy any tasks with cleanup flag */ while (!bh_queue_empty(&pool->tasks)) { bh_task_t *task; task = bh_queue_front(&pool->tasks); bh_queue_remove(&pool->tasks); if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); } /* Destroy thread array, queue, lock and condition variables */ free(pool->threads); bh_queue_destroy(&pool->tasks); bh_cond_destroy(&pool->new_task); bh_cond_destroy(&pool->done_task); bh_mutex_destroy(&pool->lock); } /** * Frees the thread \a pool object. * * \param pool Pointer to the thread pool object */ void bh_thread_pool_free(bh_thread_pool_t *pool) { bh_thread_pool_destroy(pool); free(pool); } /** * \warning This is an internal function. * \warning Do not use this function directly! */ void bh_thread_pool_worker(void *arg) { bh_thread_pool_t *pool; pool = (bh_thread_pool_t *)arg; while (1) { bh_task_t *task; /* Wait unitl there is a task or shutdown signal */ bh_mutex_lock(&pool->lock); while (!pool->shutdown && bh_queue_empty(&pool->tasks)) bh_cond_wait(&pool->new_task, &pool->lock); /* If its shutdown signal - exit the loop */ if (pool->shutdown) { bh_mutex_unlock(&pool->lock); break; } /* Fetch task from the front of the queue, increase active counter */ task = bh_queue_front(&pool->tasks); bh_queue_remove(&pool->tasks); pool->active++; bh_mutex_unlock(&pool->lock); /* Do the task, mark as done, and if required - free it */ task->func(task->data); task->flags |= BH_THREAD_DONE; if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); /* Decrease active counter and broadcast that we are done */ bh_mutex_lock(&pool->lock); pool->active--; bh_mutex_unlock(&pool->lock); bh_cond_broadcast(&pool->done_task); } } bh_spinlock_t *bh_spinlock_new(void) { bh_spinlock_t *result; result = malloc(sizeof(*result)); if (result) bh_spinlock_init(result); return result; } void bh_spinlock_free(bh_spinlock_t *lock) { bh_spinlock_destroy(lock); free(lock); } int bh_tls_alloc(bh_generic_cb_t cleanup) { bh_tls_info_t *info; int result; /* Fetch TLS managment info */ info = bh_tls_info(); result = BH_ERROR; /* Get new counter value and setup cleanup function */ bh_spinlock_lock(&info->lock); if (info->counter < BH_MAX_TLS) { result = info->counter++; info->cleanup[result] = cleanup; } bh_spinlock_unlock(&info->lock); /* Return slot id */ return result; } void *bh_tls_get(int slot) { return bh_tls_fetch()->data[slot]; } void bh_tls_set(int slot, void *data) { bh_tls_fetch()->data[slot] = data; } void bh_tls_cleanup(void) { bh_tls_info_t *info; bh_tls_t *tls; size_t i; /* Fetch TLS managment info and TLS itself */ info = bh_tls_info(); tls = bh_tls_fetch(); /* Call cleanup for every slot */ for (i = 0; i < info->counter; i++) if (info->cleanup[i]) info->cleanup[i](tls->data[i]); /* Free tls */ free(tls); } /** * \} */