#include #include #include static void *bh_thread_run(void *arg) { bh_task_t *task; task = (bh_task_t *)arg; /* Do the task, mark as done, and if required free it */ task->func(task->data); task->flags |= BH_THREAD_DONE; if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); return NULL; } int bh_thread_init(bh_thread_t *thread, bh_task_t *task) { thread->allocated = 0; if (pthread_create(&thread->handle, NULL, bh_thread_run, task)) return BH_ERROR; return BH_OK; } bh_thread_t *bh_thread_new(bh_task_t *task) { bh_thread_t *result; /* Allocate thread object */ result = malloc(sizeof(*result)); if (result && bh_thread_init(result, task)) { free(result); result = NULL; } /* Mark thread as allocated for deallocation in join/detach */ if (result) result->allocated = 1; return result; } int bh_thread_join(bh_thread_t *thread) { /* Join the thread */ if (pthread_join(thread->handle, NULL)) return BH_ERROR; /* If thread is allocated, deallocate it */ if (thread->allocated) free(thread); return BH_OK; } int bh_thread_detach(bh_thread_t *thread) { /* Detach the thread */ if (pthread_detach(thread->handle)) return BH_ERROR; /* If thread is allocated, deallocate it */ if (thread->allocated) free(thread); return BH_OK; } int bh_mutex_init(bh_mutex_t *mutex) { if (pthread_mutex_init(&mutex->handle, NULL)) return BH_ERROR; return BH_OK; } void bh_mutex_destroy(bh_mutex_t *mutex) { pthread_mutex_destroy(&mutex->handle); } int bh_mutex_lock(bh_mutex_t *mutex) { if (pthread_mutex_lock(&mutex->handle)) return BH_ERROR; return BH_OK; } int bh_mutex_try_lock(bh_mutex_t *mutex) { if (pthread_mutex_trylock(&mutex->handle)) return BH_ERROR; return BH_OK; } int bh_mutex_unlock(bh_mutex_t *mutex) { if (pthread_mutex_unlock(&mutex->handle)) return BH_ERROR; return BH_OK; } int bh_semaphore_init(bh_semaphore_t *semaphore, int count) { if (sem_init(&semaphore->handle, 0, count)) return BH_ERROR; return BH_OK; } void bh_semaphore_destroy(bh_semaphore_t *semaphore) { sem_destroy(&semaphore->handle); } int bh_semaphore_post(bh_semaphore_t *semaphore) { if (sem_post(&semaphore->handle)) return BH_ERROR; return BH_OK; } int bh_semaphore_wait(bh_semaphore_t *semaphore) { if (sem_wait(&semaphore->handle)) return BH_ERROR; return BH_OK; } int bh_semaphore_wait_for(bh_semaphore_t *semaphore, unsigned long timeout) { struct timespec ts; ts.tv_sec = timeout / 1000; ts.tv_nsec = (timeout - ts.tv_sec * 1000) * 1000000; if (sem_timedwait(&semaphore->handle, &ts)) return BH_TIMEOUT; return BH_OK; } int bh_semaphore_try_wait(bh_semaphore_t *semaphore) { if (sem_trywait(&semaphore->handle)) return BH_ERROR; return BH_OK; } int bh_cond_init(bh_cond_t *cond) { if (pthread_cond_init(&cond->handle, NULL)) return BH_ERROR; return BH_OK; } void bh_cond_destroy(bh_cond_t *cond) { pthread_cond_destroy(&cond->handle); } int bh_cond_wait(bh_cond_t *cond, bh_mutex_t *mutex) { if (pthread_cond_wait(&cond->handle, &mutex->handle)) return BH_ERROR; return BH_OK; } int bh_cond_wait_for(bh_cond_t *cond, bh_mutex_t *mutex, unsigned long timeout) { struct timespec ts; ts.tv_sec = timeout / 1000; ts.tv_nsec = (timeout - ts.tv_sec * 1000) * 1000000; if (pthread_cond_timedwait(&cond->handle, &mutex->handle, &ts)) return BH_TIMEOUT; return BH_OK; } int bh_cond_signal(bh_cond_t *cond) { if (pthread_cond_signal(&cond->handle)) return BH_ERROR; return BH_OK; } int bh_cond_broadcast(bh_cond_t *cond) { if (pthread_cond_broadcast(&cond->handle)) return BH_ERROR; return BH_OK; } int bh_thread_pool_init(bh_thread_pool_t *pool, size_t size) { size_t i; static bh_task_t pool_task; static int pool_task_init = 0; /* Initialize static thread pool task */ if (!pool_task_init) { bh_task_init(&pool_task, bh_thread_pool_worker, pool, 0); pool_task_init = 1; } /* Zero out pool structure */ memset(pool, 0, sizeof(*pool)); /* Initialize mutex and condition variables */ if (bh_mutex_init(&pool->lock)) goto lock_fail; if (bh_cond_init(&pool->new_task)) goto task_fail; if (bh_cond_init(&pool->done_task)) goto done_fail; /* Allocate array of threads */ pool->threads = malloc(size * sizeof(bh_thread_t)); pool->size = size; if (!pool->threads) goto thread_fail; /* Initialize queue */ bh_queue_init(&pool->tasks); if (bh_queue_reserve(&pool->tasks, 64)) goto queue_fail; /* Initialize threads */ for (i = 0; i < size; i++) { if (bh_thread_init(&pool->threads[i], &pool_task)) { pool->shutdown = 1; bh_cond_broadcast(&pool->new_task); for(; i; i--) bh_thread_join(&pool->threads[i - 1]); bh_queue_destroy(&pool->tasks); goto queue_fail; } } return BH_OK; queue_fail: free(pool->threads); thread_fail: bh_cond_destroy(&pool->done_task); done_fail: bh_cond_destroy(&pool->new_task); task_fail: bh_mutex_destroy(&pool->lock); lock_fail: return BH_ERROR; } bh_thread_pool_t *bh_thread_pool_new(size_t size) { bh_thread_pool_t *result; result = malloc(sizeof(*result)); if (result && bh_thread_pool_init(result, size)) { free(result); result = NULL; } return result; }