#include #include #include static void *bh_thread_run(void *arg) { bh_tls_t *tls; bh_task_t *task; /* Fetch thread data */ task = (bh_task_t *)arg; /* Setup TLS */ tls = bh_tls_fetch(); /* Do the task, mark as done, and if required free it */ task->func(task->data); task->flags |= BH_THREAD_DONE; if (task->flags & BH_THREAD_CLEANUP) bh_task_free(task); /* Destroy TLS */ bh_tls_cleanup(); return NULL; } int bh_thread_init(bh_thread_t *thread, bh_task_t *task) { thread->allocated = 0; if (pthread_create(&thread->handle, NULL, bh_thread_run, task)) return BH_ERROR; return BH_OK; } bh_thread_t *bh_thread_new(bh_task_t *task) { bh_thread_t *result; /* Allocate thread object */ result = malloc(sizeof(*result)); if (result && bh_thread_init(result, task)) { free(result); result = NULL; } /* Mark thread as allocated for deallocation in join/detach */ if (result) result->allocated = 1; return result; } int bh_thread_join(bh_thread_t *thread) { /* Join the thread */ if (pthread_join(thread->handle, NULL)) return BH_ERROR; /* If thread is allocated, deallocate it */ if (thread->allocated) free(thread); return BH_OK; } int bh_thread_detach(bh_thread_t *thread) { /* Detach the thread */ if (pthread_detach(thread->handle)) return BH_ERROR; /* If thread is allocated, deallocate it */ if (thread->allocated) free(thread); return BH_OK; } int bh_mutex_init(bh_mutex_t *mutex) { if (pthread_mutex_init(&mutex->handle, NULL)) return BH_ERROR; return BH_OK; } void bh_mutex_destroy(bh_mutex_t *mutex) { pthread_mutex_destroy(&mutex->handle); } int bh_mutex_lock(bh_mutex_t *mutex) { if (pthread_mutex_lock(&mutex->handle)) return BH_ERROR; return BH_OK; } int bh_mutex_try_lock(bh_mutex_t *mutex) { if (pthread_mutex_trylock(&mutex->handle)) return BH_ERROR; return BH_OK; } int bh_mutex_unlock(bh_mutex_t *mutex) { if (pthread_mutex_unlock(&mutex->handle)) return BH_ERROR; return BH_OK; } int bh_semaphore_init(bh_semaphore_t *semaphore, int count) { if (sem_init(&semaphore->handle, 0, count)) return BH_ERROR; return BH_OK; } void bh_semaphore_destroy(bh_semaphore_t *semaphore) { sem_destroy(&semaphore->handle); } int bh_semaphore_post(bh_semaphore_t *semaphore) { if (sem_post(&semaphore->handle)) return BH_ERROR; return BH_OK; } int bh_semaphore_wait(bh_semaphore_t *semaphore) { if (sem_wait(&semaphore->handle)) return BH_ERROR; return BH_OK; } int bh_semaphore_wait_for(bh_semaphore_t *semaphore, unsigned long timeout) { struct timespec ts; ts.tv_sec = timeout / 1000; ts.tv_nsec = (timeout - ts.tv_sec * 1000) * 1000000; if (sem_timedwait(&semaphore->handle, &ts)) return BH_TIMEOUT; return BH_OK; } int bh_semaphore_try_wait(bh_semaphore_t *semaphore) { if (sem_trywait(&semaphore->handle)) return BH_ERROR; return BH_OK; } int bh_cond_init(bh_cond_t *cond) { if (pthread_cond_init(&cond->handle, NULL)) return BH_ERROR; return BH_OK; } void bh_cond_destroy(bh_cond_t *cond) { pthread_cond_destroy(&cond->handle); } int bh_cond_wait(bh_cond_t *cond, bh_mutex_t *mutex) { if (pthread_cond_wait(&cond->handle, &mutex->handle)) return BH_ERROR; return BH_OK; } int bh_cond_wait_for(bh_cond_t *cond, bh_mutex_t *mutex, unsigned long timeout) { struct timespec ts; ts.tv_sec = timeout / 1000; ts.tv_nsec = (timeout - ts.tv_sec * 1000) * 1000000; if (pthread_cond_timedwait(&cond->handle, &mutex->handle, &ts)) return BH_TIMEOUT; return BH_OK; } int bh_cond_signal(bh_cond_t *cond) { if (pthread_cond_signal(&cond->handle)) return BH_ERROR; return BH_OK; } int bh_cond_broadcast(bh_cond_t *cond) { if (pthread_cond_broadcast(&cond->handle)) return BH_ERROR; return BH_OK; } int bh_thread_pool_init(bh_thread_pool_t *pool, size_t size) { size_t i; static bh_task_t pool_task; static int pool_task_init = 0; /* Initialize static thread pool task */ if (!pool_task_init) { bh_task_init(&pool_task, bh_thread_pool_worker, pool, 0); pool_task_init = 1; } /* Zero out pool structure */ memset(pool, 0, sizeof(*pool)); /* Initialize mutex and condition variables */ if (bh_mutex_init(&pool->lock)) goto lock_fail; if (bh_cond_init(&pool->new_task)) goto task_fail; if (bh_cond_init(&pool->done_task)) goto done_fail; /* Allocate array of threads */ pool->threads = malloc(size * sizeof(bh_thread_t)); pool->size = size; if (!pool->threads) goto thread_fail; /* Initialize queue */ bh_queue_init(&pool->tasks); if (bh_queue_reserve(&pool->tasks, 64)) goto queue_fail; /* Initialize threads */ for (i = 0; i < size; i++) { if (bh_thread_init(&pool->threads[i], &pool_task)) { pool->shutdown = 1; bh_cond_broadcast(&pool->new_task); for(; i; i--) bh_thread_join(&pool->threads[i - 1]); bh_queue_destroy(&pool->tasks); goto queue_fail; } } return BH_OK; queue_fail: free(pool->threads); thread_fail: bh_cond_destroy(&pool->done_task); done_fail: bh_cond_destroy(&pool->new_task); task_fail: bh_mutex_destroy(&pool->lock); lock_fail: return BH_ERROR; } bh_thread_pool_t *bh_thread_pool_new(size_t size) { bh_thread_pool_t *result; result = malloc(sizeof(*result)); if (result && bh_thread_pool_init(result, size)) { free(result); result = NULL; } return result; } void bh_spinlock_init(bh_spinlock_t *lock) { lock->handle = 0; } void bh_spinlock_destroy(bh_spinlock_t *lock) { (void)lock; } int bh_spinlock_lock(bh_spinlock_t *lock) { /* TODO: Check on other OS and compilers */ #if defined(__clang__) || defined(__GNUC__) while (__sync_lock_test_and_set(&lock->handle, 1)); return BH_OK; #else #error "Spinlocks are not supported" return BH_NO_IMPL; #endif } int bh_spinlock_unlock(bh_spinlock_t *lock) { /* TODO: Check on other OS and compilers */ #if defined(__clang__) || defined(__GNUC__) if (__sync_lock_test_and_set(&lock->handle, 0)) return BH_OK; return BH_ERROR; #else #error "Spinlocks are not supported" return BH_NO_IMPL; #endif } int bh_spinlock_try_lock(bh_spinlock_t *lock) { /* TODO: Check on other OSs and compilers */ #if defined(__clang__) || defined(__GNUC__) if (__sync_lock_test_and_set(&lock->handle, 1)) return BH_ERROR; return BH_OK; #else #error "Spinlocks are not supported" return BH_NO_IMPL; #endif } bh_tls_info_t *bh_tls_info(void) { static bh_tls_info_t info = { 0, 0 }; return &info; } bh_tls_t *bh_tls_fetch(void) { static bh_spinlock_t tls_lock = { 0 }; static int tls_ready = 0; static pthread_key_t tls_key; bh_tls_t *tls; /* Protect pthread key by spinlock */ bh_spinlock_lock(&tls_lock); /* Allocate pthread key if it's not allocated */ if (tls_ready == 0) { pthread_key_create(&tls_key, NULL); tls_ready = 1; } /* Unlock spinlock */ bh_spinlock_unlock(&tls_lock); /* Get TLS pointer and setup TLS if neccesery */ tls = (bh_tls_t *)pthread_getspecific(tls_key); if (tls == NULL) { /* Allocate space for TLS */ tls = malloc(sizeof(*tls)); if (!tls) abort(); /* Zero out TLS */ memset(tls, 0, sizeof(*tls)); /* Set TLS value */ if (pthread_setspecific(tls_key, tls)) abort(); } /* Return TLS */ return tls; }