#include static unsigned __stdcall bh_thread_run(void *arg) { bh_tls_t *tls; bh_thread_data_t data; /* Fetch thread data, store it on stack and free from heap */ data = *(bh_thread_data_t *)arg; free(arg); /* Setup TLS */ tls = bh_tls_fetch(); /* Do the task, mark as done, and if required free it */ data.task->func(data.task->data); data.task->flags |= BH_THREAD_DONE; if (data.task->flags & BH_THREAD_CLEANUP) bh_task_free(data.task); /* Destroy our TLS */ bh_tls_cleanup(); /* Call thread specific end function (deallocate libc TLS) */ data.end(0); return 0; } int bh_thread_init_base(bh_thread_t *thread, bh_task_t *task, bh_thread_begin_cb_t begin, bh_thread_end_cb_t end) { bh_thread_data_t *data; /* Allocate thread specific data */ data = malloc(sizeof(*data)); if (!data) return BH_OOM; /* Setup thread specific data */ data->task = task; data->end = end; /* Create and setup thread (relative to the callers libc) */ thread->allocated = 0; thread->handle = (HANDLE)_beginthreadex(NULL, 0, bh_thread_run, data, 0, NULL); /* Check for errors */ if (!thread->handle) { free(data); return BH_ERROR; } return BH_OK; } bh_thread_t *bh_thread_new_base(bh_task_t *task, bh_thread_begin_cb_t begin, bh_thread_end_cb_t end) { bh_thread_t *result; /* Allocate thread object */ result = malloc(sizeof(*result)); if (result && bh_thread_init_base(result, task, begin, end)) { free(result); result = NULL; } /* Mark thread as allocated for deallocation in join/detach */ if (result) result->allocated = 1; return result; } int bh_thread_sleep(unsigned long timeout) { Sleep(timeout); return BH_OK; } int bh_thread_join(bh_thread_t *thread) { /* Join the thread */ WaitForSingleObject(thread->handle, INFINITE); CloseHandle(thread->handle); /* If thread is allocated, deallocate it */ if (thread->allocated) free(thread); return BH_OK; } int bh_thread_detach(bh_thread_t *thread) { /* Detach from thread */ CloseHandle(thread->handle); /* If thread is allocated, deallocate it */ if (thread->allocated) free(thread); return BH_OK; } int bh_mutex_init(bh_mutex_t *mutex) { /* TODO: Is this spincount needed or sane? */ if (!InitializeCriticalSectionAndSpinCount(&mutex->handle, 0x400)) return BH_ERROR; return BH_OK; } void bh_mutex_destroy(bh_mutex_t *mutex) { DeleteCriticalSection(&mutex->handle); } int bh_mutex_lock(bh_mutex_t *mutex) { EnterCriticalSection(&mutex->handle); return BH_OK; } int bh_mutex_try_lock(bh_mutex_t *mutex) { if (!TryEnterCriticalSection(&mutex->handle)) return BH_ERROR; return BH_OK; } int bh_mutex_unlock(bh_mutex_t *mutex) { LeaveCriticalSection(&mutex->handle); return BH_OK; } int bh_semaphore_init(bh_semaphore_t *semaphore, int count) { /* Create semaphore with max value of 32767 (to match POSIX) */ semaphore->handle = CreateSemaphore(NULL, count, 0x7FFF, NULL); if (!semaphore->handle) return BH_ERROR; return BH_OK; } void bh_semaphore_destroy(bh_semaphore_t *semaphore) { CloseHandle(semaphore->handle); } int bh_semaphore_post(bh_semaphore_t *semaphore) { if (!ReleaseSemaphore(semaphore->handle, 1, NULL)) return BH_ERROR; return BH_OK; } int bh_semaphore_wait(bh_semaphore_t *semaphore) { if (WaitForSingleObject(semaphore->handle, INFINITE)) return BH_ERROR; return BH_OK; } int bh_semaphore_wait_for(bh_semaphore_t *semaphore, unsigned long timeout) { /* FIXME: Check if we timed out or errored out */ if (WaitForSingleObject(semaphore->handle, timeout)) return BH_TIMEOUT; return BH_ERROR; } int bh_semaphore_try_wait(bh_semaphore_t *semaphore) { if (WaitForSingleObject(semaphore->handle, 0)) return BH_ERROR; return BH_OK; } #if WINVER >= _WIN32_WINNT_VISTA int bh_cond_init(bh_cond_t *cond) { InitializeConditionVariable(&cond->handle); return BH_OK; } void bh_cond_destroy(bh_cond_t *cond) { (void)cond; } int bh_cond_wait(bh_cond_t *cond, bh_mutex_t *mutex) { return bh_cond_wait_for(cond, mutex, INFINITE); } int bh_cond_wait_for(bh_cond_t *cond, bh_mutex_t *mutex, unsigned long timeout) { /* FIXME: Check if we timed out or errored out */ if (!SleepConditionVariableCS(&cond->handle, &mutex->handle, timeout)) return BH_TIMEOUT; return BH_OK; } int bh_cond_signal(bh_cond_t *cond) { WakeConditionVariable(&cond->handle); return BH_OK; } int bh_cond_broadcast(bh_cond_t *cond) { WakeAllConditionVariable(&cond->handle); return BH_OK; } #else /* Condition variable implementation based on BeOS article * http://www-classic.be.com/aboutbe/benewsletter/volume_III/Issue40.html * * Slow, but correct implementation of CVs. */ int bh_cond_init(bh_cond_t *cond) { if (bh_mutex_init(&cond->lock)) return BH_ERROR; if (bh_semaphore_init(&cond->wait, 0)) { bh_mutex_destroy(&cond->lock); return BH_ERROR; } if (bh_semaphore_init(&cond->done, 0)) { bh_semaphore_destroy(&cond->wait); bh_mutex_destroy(&cond->lock); return BH_ERROR; } cond->waiting = 0; cond->signals = 0; return 0; } void bh_cond_destroy(bh_cond_t *cond) { bh_semaphore_destroy(&cond->done); bh_semaphore_destroy(&cond->wait); bh_mutex_destroy(&cond->lock); } int bh_cond_wait(bh_cond_t *cond, bh_mutex_t *mutex) { int retval; bh_mutex_lock(&cond->lock); cond->waiting++; bh_mutex_unlock(&cond->lock); bh_mutex_unlock(mutex); retval = bh_semaphore_wait(&cond->wait); bh_mutex_lock(&cond->lock); if (cond->signals > 0) { if (retval) bh_semaphore_wait(&cond->wait); bh_semaphore_post(&cond->done); cond->signals--; } cond->waiting--; bh_mutex_unlock(&cond->lock); bh_mutex_lock(mutex); return retval; } int bh_cond_wait_for(bh_cond_t *cond, bh_mutex_t *mutex, unsigned long timeout) { int retval; bh_mutex_lock(&cond->lock); cond->waiting++; bh_mutex_unlock(&cond->lock); bh_mutex_unlock(mutex); retval = bh_semaphore_wait_for(&cond->wait, timeout); bh_mutex_lock(&cond->lock); if (cond->signals > 0) { if (retval) bh_semaphore_wait(&cond->wait); bh_semaphore_post(&cond->done); cond->signals--; } cond->waiting--; bh_mutex_unlock(&cond->lock); bh_mutex_lock(mutex); return retval; } int bh_cond_signal(bh_cond_t *cond) { bh_mutex_lock(&cond->lock); if (cond->waiting > cond->signals) { cond->signals++; bh_semaphore_post(&cond->wait); bh_mutex_unlock(&cond->lock); bh_semaphore_wait(&cond->done); } else bh_mutex_unlock(&cond->lock); return BH_OK; } int bh_cond_broadcast(bh_cond_t *cond) { int i, waiting; bh_mutex_lock(&cond->lock); if (cond->waiting > cond->signals) { waiting = cond->waiting - cond->signals; cond->signals = cond->waiting; for (i = 0; i < waiting; i++) bh_semaphore_post(&cond->wait); bh_mutex_unlock(&cond->lock); for (i = 0; i < waiting; i++) bh_semaphore_wait(&cond->done); } else bh_mutex_unlock(&cond->lock); return BH_OK; } #endif int bh_thread_pool_init_base(bh_thread_pool_t *pool, size_t size, bh_thread_begin_cb_t begin, bh_thread_end_cb_t end) { size_t i; static bh_task_t pool_task; static int pool_task_init = 0; /* Initialize static thread pool task */ if (!pool_task_init) { bh_task_init(&pool_task, bh_thread_pool_worker, pool, 0); pool_task_init = 1; } /* Zero out pool structure */ memset(pool, 0, sizeof(*pool)); /* Initialize mutex and condition variables */ if (bh_mutex_init(&pool->lock)) goto lock_fail; if (bh_cond_init(&pool->new_task)) goto task_fail; if (bh_cond_init(&pool->done_task)) goto done_fail; /* Allocate array of threads */ pool->threads = malloc(size * sizeof(bh_thread_t)); pool->size = size; if (!pool->threads) goto thread_fail; /* Initialize queue */ bh_queue_init(&pool->tasks); if (bh_queue_reserve(&pool->tasks, 64)) goto queue_fail; /* Initialize threads */ for (i = 0; i < size; i++) { if (bh_thread_init_base(&pool->threads[i], &pool_task, begin, end)) { pool->shutdown = 1; bh_cond_broadcast(&pool->new_task); for(; i; i--) bh_thread_join(&pool->threads[i - 1]); bh_queue_destroy(&pool->tasks); goto queue_fail; } } return BH_OK; queue_fail: free(pool->threads); thread_fail: bh_cond_destroy(&pool->done_task); done_fail: bh_cond_destroy(&pool->new_task); task_fail: bh_mutex_destroy(&pool->lock); lock_fail: return BH_ERROR; } bh_thread_pool_t *bh_thread_pool_new_base(size_t size, bh_thread_begin_cb_t begin, bh_thread_end_cb_t end) { bh_thread_pool_t *result; result = malloc(sizeof(*result)); if (result && bh_thread_pool_init_base(result, size, begin, end)) { free(result); result = NULL; } return result; } void bh_spinlock_init(bh_spinlock_t *lock) { lock->handle = 0; } void bh_spinlock_destroy(bh_spinlock_t *lock) { (void)lock; } int bh_spinlock_lock(bh_spinlock_t *lock) { while (InterlockedExchange(&lock->handle, 1)); return BH_OK; } int bh_spinlock_unlock(bh_spinlock_t *lock) { if (InterlockedExchange(&lock->handle, 0)) return BH_OK; return BH_ERROR; } int bh_spinlock_try_lock(bh_spinlock_t *lock) { if (InterlockedExchange(&lock->handle, 1)) return BH_ERROR; return BH_OK; } bh_tls_info_t *bh_tls_info(void) { static bh_tls_info_t info = {0, 0}; return &info; } bh_tls_t *bh_tls_fetch(void) { static bh_spinlock_t tls_lock = {0}; static DWORD tls_index = TLS_OUT_OF_INDEXES; bh_tls_t *tls; /* Protect TLS index by spinlock */ bh_spinlock_lock(&tls_lock); /* Allocate TLS index if it's not allocated */ if (tls_index == TLS_OUT_OF_INDEXES) tls_index = TlsAlloc(); /* Unlock spinlock */ bh_spinlock_unlock(&tls_lock); /* Check that TLS index is valid */ if (tls_index == TLS_OUT_OF_INDEXES) abort(); /* Get TLS pointer and setup TLS if neccesery */ tls = (bh_tls_t *)TlsGetValue(tls_index); if (tls == NULL) { /* Allocate space for TLS */ tls = malloc(sizeof(*tls)); if (!tls) abort(); /* Zero out TLS */ memset(tls, 0, sizeof(*tls)); /* Set TLS value */ if (!TlsSetValue(tls_index, tls)) abort(); } /* Return TLS */ return tls; }