#include #include #include static void *bh_thread_run(void *arg) { bh_task_t *task; task = (bh_task_t *)arg; task->func(task->data); task->flags |= BH_THREAD_DONE; if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); return NULL; } int bh_thread_init(bh_thread_t *thread, bh_task_t *task) { return pthread_create(&thread->handle, NULL, bh_thread_run, task); } int bh_thread_join(bh_thread_t *thread) { return pthread_join(thread->handle, NULL); } int bh_thread_detach(bh_thread_t *thread) { return pthread_detach(thread->handle); } int bh_mutex_init(bh_mutex_t *mutex) { return pthread_mutex_init(&mutex->handle, NULL); } void bh_mutex_destroy(bh_mutex_t *mutex) { pthread_mutex_destroy(&mutex->handle); } int bh_mutex_lock(bh_mutex_t *mutex) { return pthread_mutex_lock(&mutex->handle); } int bh_mutex_try_lock(bh_mutex_t *mutex) { return pthread_mutex_trylock(&mutex->handle); } int bh_mutex_unlock(bh_mutex_t *mutex) { return pthread_mutex_unlock(&mutex->handle); } int bh_cond_init(bh_cond_t *cond) { return pthread_cond_init(&cond->handle, NULL); } void bh_cond_destroy(bh_cond_t *cond) { pthread_cond_destroy(&cond->handle); } int bh_cond_wait(bh_cond_t *cond, bh_mutex_t *mutex) { return pthread_cond_wait(&cond->handle, &mutex->handle); } int bh_cond_wait_for(bh_cond_t *cond, bh_mutex_t *mutex, unsigned long timeout) { struct timespec ts; ts.tv_sec = timeout / 1000; ts.tv_nsec = (timeout - ts.tv_sec * 1000) * 1000000; return pthread_cond_timedwait(&cond->handle, &mutex->handle, &ts); } int bh_cond_signal(bh_cond_t *cond) { return pthread_cond_signal(&cond->handle); } int bh_cond_broadcast(bh_cond_t *cond) { return pthread_cond_broadcast(&cond->handle); } int bh_thread_pool_init(bh_thread_pool_t *pool, size_t size) { size_t i; static bh_task_t pool_task; static int pool_task_init = 0; /* Initialize static thread pool task */ if (!pool_task_init) { bh_task_init(&pool_task, bh_thread_pool_worker, pool, 0); pool_task_init = 1; } /* Zero out pool structure */ memset(pool, 0, sizeof(*pool)); /* Initialize mutex and condition variables */ if (bh_mutex_init(&pool->lock)) goto lock_fail; if (bh_cond_init(&pool->new_task)) goto task_fail; if (bh_cond_init(&pool->done_task)) goto done_fail; /* Allocate array of threads */ pool->threads = malloc(size * sizeof(bh_thread_t)); pool->size = size; if (!pool->threads) goto thread_fail; /* Initialize queue */ bh_queue_init(&pool->tasks); if (bh_queue_reserve(&pool->tasks, 64)) goto queue_fail; /* Initialize threads */ for (i = 0; i < size; i++) { if (bh_thread_init(&pool->threads[i], &pool_task)) { pool->shutdown = 1; bh_cond_broadcast(&pool->new_task); for(; i; i--) bh_thread_join(&pool->threads[i - 1]); bh_queue_destroy(&pool->tasks); goto queue_fail; } } return 0; queue_fail: free(pool->threads); thread_fail: bh_cond_destroy(&pool->done_task); done_fail: bh_cond_destroy(&pool->new_task); task_fail: bh_mutex_destroy(&pool->lock); lock_fail: return -1; } bh_thread_pool_t *bh_thread_pool_new(size_t size) { bh_thread_pool_t *result; result = malloc(sizeof(*result)); if (result && bh_thread_pool_init(result, size)) { free(result); result = NULL; } return result; }