#include static unsigned __stdcall bh_thread_run(void *arg) { bh_thread_data_t data; data = *(bh_thread_data_t *)arg; free(arg); data.task->func(data.task->data); data.task->flags |= BH_THREAD_DONE; if (data.task->flags & BH_THREAD_CLEANUP) bh_task_free(data.task); data.end(0); return 0; } int bh_thread_init_base(bh_thread_t *thread, bh_task_t *task, bh_thread_begin_cb_t begin, bh_thread_end_cb_t end) { bh_thread_data_t *data; data = malloc(sizeof(*data)); if (!data) return -1; data->task = task; data->end = end; thread->allocated = 0; thread->handle = (HANDLE)_beginthreadex(NULL, 0, bh_thread_run, data, 0, NULL); if (!thread->handle) { free(data); return -1; } return 0; } bh_thread_t *bh_thread_new_base(bh_task_t *task, bh_thread_begin_cb_t begin, bh_thread_end_cb_t end) { bh_thread_t *result; result = malloc(sizeof(*result)); if (result && !bh_thread_init_base(result, task, begin, end)) { free(result); result = NULL; } if (result) result->allocated = 1; return result; } int bh_thread_join(bh_thread_t *thread) { WaitForSingleObject(thread->handle, INFINITE); CloseHandle(thread->handle); if (thread->allocated) free(thread); return 0; } int bh_thread_detach(bh_thread_t *thread) { CloseHandle(thread->handle); if (thread->allocated) free(thread); return 0; } int bh_mutex_init(bh_mutex_t *mutex) { if (!InitializeCriticalSectionAndSpinCount(&mutex->handle, 0x400)) return -1; return 0; } void bh_mutex_destroy(bh_mutex_t *mutex) { DeleteCriticalSection(&mutex->handle); } int bh_mutex_lock(bh_mutex_t *mutex) { EnterCriticalSection(&mutex->handle); return 0; } int bh_mutex_try_lock(bh_mutex_t *mutex) { if (!TryEnterCriticalSection(&mutex->handle)) return -1; return 0; } int bh_mutex_unlock(bh_mutex_t *mutex) { LeaveCriticalSection(&mutex->handle); return 0; } int bh_semaphore_init(bh_semaphore_t *semaphore, int count) { semaphore->handle = CreateSemaphore(NULL, count, 0x7FFF, NULL); if (!semaphore->handle) return -1; return 0; } void bh_semaphore_destroy(bh_semaphore_t *semaphore) { CloseHandle(semaphore->handle); } int bh_semaphore_post(bh_semaphore_t *semaphore) { if (!ReleaseSemaphore(semaphore->handle, 1, NULL)) return -1; return 0; } int bh_semaphore_wait(bh_semaphore_t *semaphore) { if (WaitForSingleObject(semaphore->handle, INFINITE)) return -1; return 0; } int bh_semaphore_wait_for(bh_semaphore_t *semaphore, unsigned long timeout) { if (WaitForSingleObject(semaphore->handle, timeout)) return -1; return 0; } int bh_semaphore_try_wait(bh_semaphore_t *semaphore) { if (WaitForSingleObject(semaphore->handle, 0)) return -1; return 0; } #if WINVER >= _WIN32_WINNT_VISTA int bh_cond_init(bh_cond_t *cond) { InitializeConditionVariable(&cond->handle); return 0; } void bh_cond_destroy(bh_cond_t *cond) { (void)cond; } int bh_cond_wait(bh_cond_t *cond, bh_mutex_t *mutex) { return bh_cond_wait_for(cond, mutex, INFINITE); } int bh_cond_wait_for(bh_cond_t *cond, bh_mutex_t *mutex, unsigned long timeout) { if (!SleepConditionVariableCS(&cond->handle, &mutex->handle, timeout)) return -1; return 0; } int bh_cond_signal(bh_cond_t *cond) { WakeConditionVariable(&cond->handle); return 0; } int bh_cond_broadcast(bh_cond_t *cond) { WakeAllConditionVariable(&cond->handle); return 0; } #else /* Condition variable implementation based on BeOS article * http://www-classic.be.com/aboutbe/benewsletter/volume_III/Issue40.html */ int bh_cond_init(bh_cond_t *cond) { if (bh_mutex_init(&cond->lock)) return -1; if (bh_semaphore_init(&cond->wait, 0)) { bh_mutex_destroy(&cond->lock); return -1; } if (bh_semaphore_init(&cond->done, 0)) { bh_semaphore_destroy(&cond->wait); bh_mutex_destroy(&cond->lock); return -1; } cond->waiting = 0; cond->signals = 0; return 0; } void bh_cond_destroy(bh_cond_t *cond) { bh_semaphore_destroy(&cond->done); bh_semaphore_destroy(&cond->wait); bh_mutex_destroy(&cond->lock); } int bh_cond_wait(bh_cond_t *cond, bh_mutex_t *mutex) { int retval; bh_mutex_lock(&cond->lock); cond->waiting++; bh_mutex_unlock(&cond->lock); bh_mutex_unlock(mutex); retval = bh_semaphore_wait(&cond->wait); bh_mutex_lock(&cond->lock); if (cond->signals > 0) { if (retval) bh_semaphore_wait(&cond->wait); bh_semaphore_post(&cond->done); cond->signals--; } cond->waiting--; bh_mutex_unlock(&cond->lock); bh_mutex_lock(mutex); return retval; } int bh_cond_wait_for(bh_cond_t *cond, bh_mutex_t *mutex, unsigned long timeout) { int retval; bh_mutex_lock(&cond->lock); cond->waiting++; bh_mutex_unlock(&cond->lock); bh_mutex_unlock(mutex); retval = bh_semaphore_wait_for(&cond->wait, timeout); bh_mutex_lock(&cond->lock); if (cond->signals > 0) { if (retval) bh_semaphore_wait(&cond->wait); bh_semaphore_post(&cond->done); cond->signals--; } cond->waiting--; bh_mutex_unlock(&cond->lock); bh_mutex_lock(mutex); return retval; } int bh_cond_signal(bh_cond_t *cond) { bh_mutex_lock(&cond->lock); if (cond->waiting > cond->signals) { cond->signals++; bh_semaphore_post(&cond->wait); bh_mutex_unlock(&cond->lock); bh_semaphore_wait(&cond->done); } else bh_mutex_unlock(&cond->lock); return 0; } int bh_cond_broadcast(bh_cond_t *cond) { int i, waiting; bh_mutex_lock(&cond->lock); if (cond->waiting > cond->signals) { waiting = cond->waiting - cond->signals; cond->signals = cond->waiting; for (i = 0; i < waiting; i++) bh_semaphore_post(&cond->wait); bh_mutex_unlock(&cond->lock); for (i = 0; i < waiting; i++) bh_semaphore_wait(&cond->done); } else bh_mutex_unlock(&cond->lock); return 0; } #endif int bh_thread_pool_init_base(bh_thread_pool_t *pool, size_t size, bh_thread_begin_cb_t begin, bh_thread_end_cb_t end) { size_t i; static bh_task_t pool_task; static int pool_task_init = 0; /* Initialize static thread pool task */ if (!pool_task_init) { bh_task_init(&pool_task, bh_thread_pool_worker, pool, 0); pool_task_init = 1; } /* Zero out pool structure */ memset(pool, 0, sizeof(*pool)); /* Initialize mutex and condition variables */ if (bh_mutex_init(&pool->lock)) goto lock_fail; if (bh_cond_init(&pool->new_task)) goto task_fail; if (bh_cond_init(&pool->done_task)) goto done_fail; /* Allocate array of threads */ pool->threads = malloc(size * sizeof(bh_thread_t)); pool->size = size; if (!pool->threads) goto thread_fail; /* Initialize queue */ bh_queue_init(&pool->tasks); if (bh_queue_reserve(&pool->tasks, 64)) goto queue_fail; /* Initialize threads */ for (i = 0; i < size; i++) { if (bh_thread_init_base(&pool->threads[i], &pool_task, begin, end)) { pool->shutdown = 1; bh_cond_broadcast(&pool->new_task); for(; i; i--) bh_thread_join(&pool->threads[i - 1]); bh_queue_destroy(&pool->tasks); goto queue_fail; } } return 0; queue_fail: free(pool->threads); thread_fail: bh_cond_destroy(&pool->done_task); done_fail: bh_cond_destroy(&pool->new_task); task_fail: bh_mutex_destroy(&pool->lock); lock_fail: return -1; } bh_thread_pool_t *bh_thread_pool_new_base(size_t size, bh_thread_begin_cb_t begin, bh_thread_end_cb_t end) { bh_thread_pool_t *result; result = malloc(sizeof(*result)); if (result && bh_thread_pool_init_base(result, size, begin, end)) { free(result); result = NULL; } return result; }