#include #include bh_task_t *bh_task_new(void (*func)(void *), void *data, int flags) { bh_task_t *result; result = malloc(sizeof(bh_task_t)); if (result) bh_task_init(result, func, data, flags); return result; } void bh_task_free(bh_task_t *task) { bh_task_destroy(task); free(task); } void bh_task_init(bh_task_t *task, void (*func)(void *), void *data, int flags) { task->func = func; task->data = data; task->flags = flags; } void bh_task_destroy(bh_task_t *task) { (void)task; } void bh_task_reuse(bh_task_t *task, void (*func)(void *), void *data) { if (task->flags & BH_THREAD_DONE) bh_task_init(task, func, data, task->flags & ~(BH_THREAD_DONE)); } int bh_task_done(bh_task_t *task) { return (task->flags & BH_THREAD_DONE) != 0; } bh_mutex_t *bh_mutex_new(void) { bh_mutex_t *result; result = malloc(sizeof(*result)); if (result && bh_mutex_init(result)) { free(result); result = NULL; } return result; } void bh_mutex_free(bh_mutex_t *mutex) { bh_mutex_destroy(mutex); free(mutex); } bh_cond_t *bh_cond_new(void) { bh_cond_t *result; result = malloc(sizeof(*result)); if (result && bh_cond_init(result)) { free(result); result = NULL; } return result; } void bh_cond_free(bh_cond_t *cond) { bh_cond_destroy(cond); free(cond); } int bh_thread_pool_add(bh_thread_pool_t *pool, bh_task_t *task) { void *iter; bh_task_t *item; /* Queue task for execution */ bh_mutex_lock(&pool->lock); if (bh_queue_insert(&pool->tasks, task)) { bh_mutex_unlock(&pool->lock); return -1; } /* Signal new job */ bh_mutex_unlock(&pool->lock); bh_cond_signal(&pool->new_task); return 0; } int bh_thread_pool_wait(bh_thread_pool_t *pool) { /* Lock and check if there is jobs in the queue */ bh_mutex_lock(&pool->lock); while (!bh_queue_empty(&pool->tasks)) { bh_task_t *task; /* Fetch task from the queue */ task = bh_queue_front(&pool->tasks); bh_queue_remove(&pool->tasks); /* Unlock and do the task */ bh_mutex_unlock(&pool->lock); task->func(task->data); task->flags |= BH_THREAD_DONE; if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); /* Lock and check for the next task */ bh_mutex_lock(&pool->lock); } /* Check until there is no active threads */ while (pool->active) bh_cond_wait(&pool->done_task, &pool->lock); /* Unlock */ bh_mutex_unlock(&pool->lock); return 0; } void bh_thread_pool_destroy(bh_thread_pool_t *pool) { size_t i; /* Broadcast shutdown */ pool->shutdown = 1; bh_cond_broadcast(&pool->new_task); /* Join and delete every thread */ for (i = 0; i < pool->size; i++) bh_thread_join(pool->threads + i); /* Destroy any tasks with cleanup flag */ while (!bh_queue_empty(&pool->tasks)) { bh_task_t *task; task = bh_queue_front(&pool->tasks); bh_queue_remove(&pool->tasks); if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); } /* Destroy thread array, queue, lock and condition variables */ free(pool->threads); bh_queue_destroy(&pool->tasks); bh_cond_destroy(&pool->new_task); bh_cond_destroy(&pool->done_task); bh_mutex_destroy(&pool->lock); } void bh_thread_pool_free(bh_thread_pool_t *pool) { bh_thread_pool_destroy(pool); free(pool); } void bh_thread_pool_worker(void *arg) { bh_thread_pool_t *pool; pool = (bh_thread_pool_t *)arg; while (1) { bh_task_t *task; bh_mutex_lock(&pool->lock); while (!pool->shutdown && bh_queue_empty(&pool->tasks)) bh_cond_wait(&pool->new_task, &pool->lock); if (pool->shutdown) { bh_mutex_unlock(&pool->lock); break; } task = bh_queue_front(&pool->tasks); bh_queue_remove(&pool->tasks); pool->active++; bh_mutex_unlock(&pool->lock); task->func(task->data); task->flags |= BH_THREAD_DONE; if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); bh_mutex_lock(&pool->lock); pool->active--; bh_mutex_unlock(&pool->lock); bh_cond_broadcast(&pool->done_task); } }