diff options
| -rw-r--r-- | CMakeLists.txt | 59 | ||||
| -rw-r--r-- | include/bh/bh.h | 1 | ||||
| -rw-r--r-- | include/bh/config.in | 7 | ||||
| -rw-r--r-- | include/bh/internal/thread.h | 45 | ||||
| -rw-r--r-- | include/bh/internal/thread_posix.h | 30 | ||||
| -rw-r--r-- | include/bh/thread.h | 251 | ||||
| -rw-r--r-- | main.c | 45 | ||||
| -rw-r--r-- | src/thread.c | 219 | ||||
| -rw-r--r-- | src/thread_posix.c | 182 |
9 files changed, 837 insertions, 2 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f3c061d..6b2ceff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,8 +5,12 @@ project(bhlib LANGUAGES C) set(CMAKE_C_STANDARD 90) set(CMAKE_C_STANDARD_REQUIRED ON) -# Check for IPO/LTO +# Project includes include(CheckIPOSupported) +include(CheckIncludeFile) +include(CheckSymbolExists) + +# Check for IPO/LTO check_ipo_supported(RESULT supported) if(supported) @@ -23,6 +27,7 @@ set(BHLIB_SOURCE src/algo.c src/hashmap.c src/queue.c + src/thread.c ) set(BHLIB_HEADER @@ -31,9 +36,59 @@ set(BHLIB_HEADER include/bh/queue.h ) +set(BHLIB_INCLUDE_DIRS + include + ${PROJECT_BINARY_DIR}/include +) + + +# Determine platform +if(WIN32) + message(STATUS "Platform - Win32") + + # Check multithreading support + check_symbol_exists(_beginthread process.h BHLIB_USE_WINTHREAD) + if(BHLIB_USE_WINTHREAD) + message(STATUS "Multithreading enabled") + list(APPEND BHLIB_SOURCE + src/thread_win.c + ) + else() + message(STATUS "Multithreading disabled") + list(APPEND BHLIB_SOURCE + src/thread_null.c + ) + endif() +elseif(UNIX) + message(STATUS "Platform: Unix (Linux/BSD/MacOS X)") + + # Check multithreading support + check_include_file(pthread.h BHLIB_USE_PTHREAD) + if(BHLIB_USE_PTHREAD) + message(STATUS "Multithreading enabled") + list(APPEND BHLIB_SOURCE + src/thread_posix.c + ) + else() + message(STATUS "Multithreading disabled") + list(APPEND BHLIB_SOURCE + src/thread_null.c + ) + endif() +else() + message(STATUS "Platform: Unknown") + message(STATUS "Multithreading disabled") + list(APPEND BHLIB_SOURCE + src/thread_null.c + ) +endif() + +# Configure library +configure_file(include/bh/config.in include/bh/config.h) + # Library add_library(bhlib STATIC ${BHLIB_SOURCE} ${BHLIB_HEADER}) -target_include_directories(bhlib PUBLIC include) +target_include_directories(bhlib PUBLIC ${BHLIB_INCLUDE_DIRS}) # Runtime definition add_executable(main diff --git a/include/bh/bh.h b/include/bh/bh.h index f9df864..d00dc5c 100644 --- a/include/bh/bh.h +++ b/include/bh/bh.h @@ -1,6 +1,7 @@ #ifndef BHLIB_H #define BHLIB_H +#include <bh/config.h> #include <stddef.h> #define BH_INT_TO_PTR(x) \ diff --git a/include/bh/config.in b/include/bh/config.in new file mode 100644 index 0000000..5868721 --- /dev/null +++ b/include/bh/config.in @@ -0,0 +1,7 @@ +#ifndef BHLIB_CONFIG_H +#define BHLIB_CONFIG_H + +#cmakedefine BHLIB_USE_WINTHREAD +#cmakedefine BHLIB_USE_PTHREAD + +#endif /* BHLIB_CONFIG_H */ diff --git a/include/bh/internal/thread.h b/include/bh/internal/thread.h new file mode 100644 index 0000000..fe2a9c8 --- /dev/null +++ b/include/bh/internal/thread.h @@ -0,0 +1,45 @@ +#ifndef BHLIB_INTERNAL_THREAD_H +#define BHLIB_INTERNAL_THREAD_H + +#include <bh/thread.h> +#include "queue.h" + +#define BH_THREAD_DONE (1 << 8) + +#if defined(BHLIB_USE_PTHREAD) +#include "thread_posix.h" +#elif defined(BHLIB_USE_WINTHREAD) +#include "thread_win.h" +#else +#include "thread_null.h" +#endif + +struct bh_task_s +{ + void (*func)(void *); + void *data; + int flags; +}; + +struct bh_thread_pool_s +{ + bh_thread_t *threads; + bh_queue_t tasks; + bh_mutex_t lock; + bh_cond_t new_task; + bh_cond_t done_task; + size_t size; + size_t active; + int shutdown; +}; + +void bh_task_init(bh_task_t *task, + void (*func)(void *), + void *data, + int flags); + +void bh_task_destroy(bh_task_t *task); + +void bh_thread_pool_worker(void *arg); + +#endif /* BHLIB_INTERNAL_THREAD_H */ diff --git a/include/bh/internal/thread_posix.h b/include/bh/internal/thread_posix.h new file mode 100644 index 0000000..311d0ae --- /dev/null +++ b/include/bh/internal/thread_posix.h @@ -0,0 +1,30 @@ +#include <pthread.h> + +struct bh_thread_s +{ + pthread_t handle; +}; + +struct bh_mutex_s +{ + pthread_mutex_t handle; +}; + +struct bh_cond_s +{ + pthread_cond_t handle; +}; + +int bh_thread_init(bh_thread_t *thread, + bh_task_t *task); + +int bh_mutex_init(bh_mutex_t *mutex); +void bh_mutex_destroy(bh_mutex_t *mutex); + +int bh_cond_init(bh_cond_t *cond); +void bh_cond_destroy(bh_cond_t *cond); + +int bh_thread_pool_init(bh_thread_pool_t *pool, + size_t size); + +void bh_thread_pool_destroy(bh_thread_pool_t *pool);
\ No newline at end of file diff --git a/include/bh/thread.h b/include/bh/thread.h new file mode 100644 index 0000000..c8547ce --- /dev/null +++ b/include/bh/thread.h @@ -0,0 +1,251 @@ +#ifndef BHLIB_THREAD_H +#define BHLIB_THREAD_H + +#include "bh.h" + +#define BH_THREAD_CLEANUP (1 << 0) + +typedef void (*bh_thread_cb_t)(void *); +typedef struct bh_thread_s bh_thread_t; +typedef struct bh_mutex_s bh_mutex_t; +typedef struct bh_cond_s bh_cond_t; +typedef struct bh_task_s bh_task_t; +typedef struct bh_thread_pool_s bh_thread_pool_t; + +#if defined(BHLIB_USE_PTHREAD) +bh_thread_t *bh_thread_new(bh_task_t *task); + +bh_thread_pool_t *bh_thread_pool_new(size_t size); +#elif defined(BHLIB_USE_WINTHREAD) +typedef uintptr_t (__cdecl *bh_thread_begin_cb_t)(void *, + unsigned, + unsigned (__stdcall *)(void *), + void *, + unsigned, + unsigned *); + +typedef uintptr_t (__cdecl *bh_thread_end_cb_t)(unsigned); + +bh_thread_t *bh_thread_new_base(bh_task_t *task, + bh_thread_begin_cb_t begin, + bh_thread_end_cb_t end); + +bh_thread_pool_t *bh_thread_pool_new_base(size_t size, + bh_thread_begin_cb_t begin, + bh_thread_end_cb_t end); + +#define bh_thread_new(task) \ + bh_thread_new_base((task), _beginthreadex, _endthreadex) + +#define bh_thread_pool_new(size) \ + bh_thread_pool_new_base((size), _beginthreadex, _endthreadex); + +#endif + +/** + * @function bh_thread_new + * + * @brief Create new thread. + * + * @param task Thread task + * + * @return Pointer to thread + * + * @sa bh_thread_join, bh_thread_detach + */ + +/** + * @function bh_thread_pool_new + * @brief Create new thread pool. + * + * @param pool Pointer to the thread pool + * @param size Amount of threads + * + * @return Pointer to thread pool + * + * @sa bh_thread_pool_add, bh_thread_pool_join, bh_thread_pool_free + */ + +/** + * @brief Join thread. + * + * @param thread Pointer to the thread + * + * @return 0 on success, non-zero otherwise + * + * @sa bh_thread_detach + */ +int bh_thread_join(bh_thread_t *thread); + +/** + * @brief Detach thread. + * + * @param thread Pointer to the thread + * + * @return 0 on success, non-zero otherwise + * + * @sa bh_thread_join + */ +int bh_thread_detach(bh_thread_t *thread); + +/** + * @brief Create mutex. + * + * @return Pointer to the mutex + * + * @sa bh_mutex_lock, bh_mutex_try_lock, bh_mutex_destroy + */ +bh_mutex_t *bh_mutex_new(void); + +/** + * @brief Free mutex. + * + * @param mutex Pointer ot the mutex + */ +void bh_mutex_free(bh_mutex_t *mutex); + +/** + * @brief Lock mutex. + * + * @param mutex Pointer to the mutex + * + * @return 0 on success, non-zero otherwise + * + * @note Locking already locked mutex will block thread until mutex is + * released + * + * @sa bh_mutex_try_lock, bh_mutex_unlock + */ +int bh_mutex_lock(bh_mutex_t *mutex); + +/** + * @brief Try to lock mutex. + * + * @param mutex Pointer to the mutex + * + * @return 0 on success, positive value if mutex is locked, negative value on + * error + * + * @sa bh_mutex_lock, bh_mutex_unlock + */ +int bh_mutex_try_lock(bh_mutex_t *mutex); + +/** + * @brief Unlock mutex. + * + * @param mutex Pointer to the mutex + * + * @return 0 on success, non-zero otherwise + * + * @sa bh_mutex_lock, bh_mutex_try_lock + */ +int bh_mutex_unlock(bh_mutex_t *mutex); + +/** + * @brief Create condition variable. + * + * @return Pointer to the condition variable + */ +bh_cond_t *bh_cond_new(void); + +/** + * @brief Destroy condition variable. + * + * @param cond Pointer to the conditional variable + */ +void bh_cond_free(bh_cond_t *cond); + +/** + * @brief Block on conditional variable. + * + * @param cond Pointer to the condition variable + * @param mutex Pointer to the mutex + * + * @return 0 on success, non-zero otherwise + * + * @sa bh_cond_wait_for, bh_cond_signal, bh_cond_broadcast + */ +int bh_cond_wait(bh_cond_t *cond, + bh_mutex_t *mutex); + +/** + * @brief Block on conditional variable for a period of the time. + * + * @param cond Pointer to the conditional variable + * @param mutex Pointer to the mutex + * @param timeout Timeout in miliseconds + * + * @return 0 on success, positive value on timeout, negative on error + * + * @sa bh_cond_wait, bh_cond_signal, bh_cond_broadcast + */ +int bh_cond_wait_for(bh_cond_t *cond, + bh_mutex_t *mutex, + unsigned long timeout); + +/** + * @brief Unblock (notify) thread. + * + * @param cond Pointer to the condition variable + * + * @return 0 on success, non-zero otherwise + * + * @sa bh_cond_broadcast, bh_cond_wait, bh_cond_wait_for + */ +int bh_cond_signal(bh_cond_t *cond); + +/** + * @brief Unblock all threads. + * + * @param cond Pointer to the conditional variable + * + * @return 0 on success, non-zero otherwise + * + * @sa bh_cond_signal, bh_cond_wait, bh_cond_wait_for + */ +int bh_cond_broadcast(bh_cond_t *cond); + +bh_task_t *bh_task_new(void (*func)(void *), + void *data, + int flags); + +void bh_task_free(bh_task_t *task); + +void bh_task_reuse(bh_task_t *task, + void (*func)(void *), + void *data); + +int bh_task_done(bh_task_t *task); + +/** + * @brief Submit task to the thread pool. + * + * @param pool Pointer to the thread pool + * @param func Task function + * @param data Task data + * + * @return 0 on success, non-zero otherwise + * + * @sa bh_thread_pool_join + */ +int bh_thread_pool_add(bh_thread_pool_t *pool, + bh_task_t *task); + +/** + * @brief Wait until all tasks are finished. + * + * @param pool Pointer to the thread pool + * @return 0 on success, non-zero otherwise + * + * @sa bh_thread_pool_add + */ +int bh_thread_pool_wait(bh_thread_pool_t *pool); + +/** + * @brief Destroy thread pool. + * + * @param pool Pointer to the thread pool + */ +void bh_thread_pool_free(bh_thread_pool_t *pool); + +#endif /* BHLIB_THREAD_H */ @@ -1,5 +1,6 @@ #include <bh/queue.h> #include <bh/hashmap.h> +#include <bh/thread.h> #include <stdio.h> #include <stdint.h> @@ -46,6 +47,46 @@ void foo() bh_hashmap_free(hashmap); } +int factor(int x) +{ + if (x < 2) + return 1; + + return factor(x - 1) + factor(x - 2); +} + +void factor_task(void *arg) +{ + printf("Task start\n"); + fflush(stdout); + printf("Factor: %d\n", factor(48)); + fflush(stdout); +} + +void bar() +{ + bh_thread_pool_t *pool; + bh_task_t *task; + size_t i; + + printf("Pool create\n"); + fflush(stdout); + pool = bh_thread_pool_new(16); + + printf("Prepare\n"); + fflush(stdout); + for (i = 0; i < 32; i++) + { + printf("Task create\n"); + fflush(stdout); + task = bh_task_new(factor_task, NULL, BH_THREAD_CLEANUP); + bh_thread_pool_add(pool, task); + } + + bh_thread_pool_wait(pool); + bh_thread_pool_free(pool); +} + int main() { bh_queue_t *queue; @@ -54,6 +95,10 @@ int main() foo(); + printf("Thread?\n"); + fflush(stdout); + bar(); + queue = bh_queue_new(); for (j = 0; j < 32; j++) diff --git a/src/thread.c b/src/thread.c new file mode 100644 index 0000000..9620085 --- /dev/null +++ b/src/thread.c @@ -0,0 +1,219 @@ +#include <bh/internal/thread.h> +#include <stdlib.h> + +bh_task_t *bh_task_new(void (*func)(void *), + void *data, + int flags) +{ + bh_task_t *result; + + result = malloc(sizeof(bh_task_t)); + if (result) + bh_task_init(result, func, data, flags); + + return result; +} + +void bh_task_free(bh_task_t *task) +{ + bh_task_destroy(task); + free(task); +} + +void bh_task_init(bh_task_t *task, + void (*func)(void *), + void *data, + int flags) +{ + task->func = func; + task->data = data; + task->flags = flags; +} + +void bh_task_destroy(bh_task_t *task) +{ + (void)task; +} + +void bh_task_reuse(bh_task_t *task, + void (*func)(void *), + void *data) +{ + if (task->flags & BH_THREAD_DONE) + bh_task_init(task, func, data, task->flags & ~(BH_THREAD_DONE)); +} + +int bh_task_done(bh_task_t *task) +{ + return (task->flags & BH_THREAD_DONE) != 0; +} + +bh_mutex_t *bh_mutex_new(void) +{ + bh_mutex_t *result; + + result = malloc(sizeof(*result)); + if (result && bh_mutex_init(result)) + { + free(result); + result = NULL; + } + + return result; +} + +void bh_mutex_free(bh_mutex_t *mutex) +{ + bh_mutex_destroy(mutex); + free(mutex); +} + +bh_cond_t *bh_cond_new(void) +{ + bh_cond_t *result; + + result = malloc(sizeof(*result)); + if (result && bh_cond_init(result)) + { + free(result); + result = NULL; + } + + return result; +} + +void bh_cond_free(bh_cond_t *cond) +{ + bh_cond_destroy(cond); + free(cond); +} + +int bh_thread_pool_add(bh_thread_pool_t *pool, + bh_task_t *task) +{ + void *iter; + bh_task_t *item; + + /* Queue task for execution */ + bh_mutex_lock(&pool->lock); + if (bh_queue_insert(&pool->tasks, task)) + { + bh_mutex_unlock(&pool->lock); + return -1; + } + + /* Signal new job */ + bh_mutex_unlock(&pool->lock); + bh_cond_signal(&pool->new_task); + + return 0; +} + +int bh_thread_pool_wait(bh_thread_pool_t *pool) +{ + /* Lock and check if there is jobs in the queue */ + bh_mutex_lock(&pool->lock); + while (!bh_queue_empty(&pool->tasks)) + { + bh_task_t *task; + + /* Fetch task from the queue */ + task = bh_queue_front(&pool->tasks); + bh_queue_remove(&pool->tasks); + + /* Unlock and do the task */ + bh_mutex_unlock(&pool->lock); + + task->func(task->data); + task->flags |= BH_THREAD_DONE; + if (task->flags & BH_THREAD_CLEANUP) + bh_task_free(task); + + /* Lock and check for the next task */ + bh_mutex_lock(&pool->lock); + } + + /* Check until there is no active threads */ + while (pool->active) + bh_cond_wait(&pool->done_task, &pool->lock); + + /* Unlock */ + bh_mutex_unlock(&pool->lock); + + return 0; +} + +void bh_thread_pool_destroy(bh_thread_pool_t *pool) +{ + size_t i; + + /* Broadcast shutdown */ + pool->shutdown = 1; + bh_cond_broadcast(&pool->new_task); + + /* Join and delete every thread */ + for (i = 0; i < pool->size; i++) + bh_thread_join(pool->threads + i); + + /* Destroy any tasks with cleanup flag */ + while (!bh_queue_empty(&pool->tasks)) + { + bh_task_t *task; + + task = bh_queue_front(&pool->tasks); + bh_queue_remove(&pool->tasks); + + if (task->flags & BH_THREAD_CLEANUP) + bh_task_free(task); + } + + /* Destroy thread array, queue, lock and condition variables */ + free(pool->threads); + bh_queue_destroy(&pool->tasks); + bh_cond_destroy(&pool->new_task); + bh_cond_destroy(&pool->done_task); + bh_mutex_destroy(&pool->lock); +} + +void bh_thread_pool_free(bh_thread_pool_t *pool) +{ + bh_thread_pool_destroy(pool); + free(pool); +} + +void bh_thread_pool_worker(void *arg) +{ + bh_thread_pool_t *pool; + + pool = (bh_thread_pool_t *)arg; + while (1) + { + bh_task_t *task; + + bh_mutex_lock(&pool->lock); + while (!pool->shutdown && bh_queue_empty(&pool->tasks)) + bh_cond_wait(&pool->new_task, &pool->lock); + + if (pool->shutdown) + { + bh_mutex_unlock(&pool->lock); + break; + } + + task = bh_queue_front(&pool->tasks); + bh_queue_remove(&pool->tasks); + pool->active++; + bh_mutex_unlock(&pool->lock); + + task->func(task->data); + task->flags |= BH_THREAD_DONE; + if (task->flags & BH_THREAD_CLEANUP) + bh_task_free(task); + + bh_mutex_lock(&pool->lock); + pool->active--; + + bh_mutex_unlock(&pool->lock); + bh_cond_broadcast(&pool->done_task); + } +}
\ No newline at end of file diff --git a/src/thread_posix.c b/src/thread_posix.c new file mode 100644 index 0000000..a20bd57 --- /dev/null +++ b/src/thread_posix.c @@ -0,0 +1,182 @@ +#include <bh/internal/thread.h> +#include <string.h> +#include <stdlib.h> + +static void *bh_thread_run(void *arg) +{ + bh_task_t *task; + + task = (bh_task_t *)arg; + + task->func(task->data); + task->flags |= BH_THREAD_DONE; + + if (task->flags & BH_THREAD_CLEANUP) + bh_task_free(task); + + return NULL; +} + +int bh_thread_init(bh_thread_t *thread, + bh_task_t *task) +{ + return pthread_create(&thread->handle, NULL, bh_thread_run, task); +} + +int bh_thread_join(bh_thread_t *thread) +{ + return pthread_join(thread->handle, NULL); +} + +int bh_thread_detach(bh_thread_t *thread) +{ + return pthread_detach(thread->handle); +} + +int bh_mutex_init(bh_mutex_t *mutex) +{ + return pthread_mutex_init(&mutex->handle, NULL); +} + +void bh_mutex_destroy(bh_mutex_t *mutex) +{ + pthread_mutex_destroy(&mutex->handle); +} + +int bh_mutex_lock(bh_mutex_t *mutex) +{ + return pthread_mutex_lock(&mutex->handle); +} + +int bh_mutex_try_lock(bh_mutex_t *mutex) +{ + return pthread_mutex_trylock(&mutex->handle); +} + +int bh_mutex_unlock(bh_mutex_t *mutex) +{ + return pthread_mutex_unlock(&mutex->handle); +} + +int bh_cond_init(bh_cond_t *cond) +{ + return pthread_cond_init(&cond->handle, NULL); +} + +void bh_cond_destroy(bh_cond_t *cond) +{ + pthread_cond_destroy(&cond->handle); +} + +int bh_cond_wait(bh_cond_t *cond, + bh_mutex_t *mutex) +{ + return pthread_cond_wait(&cond->handle, &mutex->handle); +} + +int bh_cond_wait_for(bh_cond_t *cond, + bh_mutex_t *mutex, + unsigned long timeout) +{ + struct timespec ts; + + ts.tv_sec = timeout / 1000; + ts.tv_nsec = (timeout - ts.tv_sec * 1000) * 1000000; + + return pthread_cond_timedwait(&cond->handle, &mutex->handle, &ts); +} + +int bh_cond_signal(bh_cond_t *cond) +{ + return pthread_cond_signal(&cond->handle); +} + +int bh_cond_broadcast(bh_cond_t *cond) +{ + return pthread_cond_broadcast(&cond->handle); +} + +int bh_thread_pool_init(bh_thread_pool_t *pool, + size_t size) +{ + size_t i; + static bh_task_t pool_task; + static int pool_task_init = 0; + + /* Initialize static thread pool task */ + if (!pool_task_init) + { + bh_task_init(&pool_task, bh_thread_pool_worker, pool, 0); + pool_task_init = 1; + } + + /* Zero out pool structure */ + memset(pool, 0, sizeof(*pool)); + + /* Initialize mutex and condition variables */ + if (bh_mutex_init(&pool->lock)) + goto lock_fail; + + if (bh_cond_init(&pool->new_task)) + goto task_fail; + + if (bh_cond_init(&pool->done_task)) + goto done_fail; + + /* Allocate array of threads */ + pool->threads = malloc(size * sizeof(bh_thread_t)); + pool->size = size; + + if (!pool->threads) + goto thread_fail; + + /* Initialize queue */ + bh_queue_init(&pool->tasks); + if (bh_queue_reserve(&pool->tasks, 64)) + goto queue_fail; + + /* Initialize threads */ + for (i = 0; i < size; i++) + { + if (bh_thread_init(&pool->threads[i], &pool_task)) + { + pool->shutdown = 1; + bh_cond_broadcast(&pool->new_task); + for(; i; i--) + bh_thread_join(&pool->threads[i - 1]); + + bh_queue_destroy(&pool->tasks); + goto queue_fail; + } + } + + return 0; + +queue_fail: + free(pool->threads); + +thread_fail: + bh_cond_destroy(&pool->done_task); + +done_fail: + bh_cond_destroy(&pool->new_task); + +task_fail: + bh_mutex_destroy(&pool->lock); + +lock_fail: + return -1; +} + +bh_thread_pool_t *bh_thread_pool_new(size_t size) +{ + bh_thread_pool_t *result; + result = malloc(sizeof(*result)); + if (result && bh_thread_pool_init(result, size)) + { + free(result); + result = NULL; + } + + return result; +} |
