diff --git a/.travis.yml b/.travis.yml index 36537cbe..6bf99f1b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ matrix: os: linux sudo: false - - env: Ubu=12.04cont Cmd="make zlibwrapper && make clean && make -C tests test-pool && make -C tests test-symbols && make clean && make -C tests test-zstd-nolegacy && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest pzstd tests check && make -C contrib/pzstd clean" + - env: Ubu=12.04cont Cmd="make zlibwrapper && make clean && make -C tests test-symbols && make clean && make -C tests test-zstd-nolegacy && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest pzstd tests check && make -C contrib/pzstd clean" os: linux sudo: false language: cpp diff --git a/lib/common/pool.c b/lib/common/pool.c index 97ca7dda..e24691f7 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -51,21 +51,21 @@ static void* POOL_thread(void* opaque) { if (!ctx) { return NULL; } for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ - if (pthread_mutex_lock(&ctx->queueMutex)) { return NULL; } + pthread_mutex_lock(&ctx->queueMutex); while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { - if (pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex)) { return NULL; } + pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* empty => shutting down: so stop */ if (ctx->queueHead == ctx->queueTail) { - if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } + pthread_mutex_unlock(&ctx->queueMutex); return opaque; } /* Pop a job off the queue */ { POOL_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; /* Unlock the mutex, signal a pusher, and run the job */ - if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } - if (pthread_cond_signal(&ctx->queuePushCond)) { return NULL; } + pthread_mutex_unlock(&ctx->queueMutex); + pthread_cond_signal(&ctx->queuePushCond); job.function(job.opaque); } } @@ -73,7 +73,6 @@ static void* POOL_thread(void* opaque) { } POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { - int err = 0; POOL_ctx *ctx; /* Check the parameters */ if (!numThreads || !queueSize) { return NULL; } @@ -88,15 +87,15 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job)); ctx->queueHead = 0; ctx->queueTail = 0; - err |= pthread_mutex_init(&ctx->queueMutex, NULL); - err |= pthread_cond_init(&ctx->queuePushCond, NULL); - err |= pthread_cond_init(&ctx->queuePopCond, NULL); + pthread_mutex_init(&ctx->queueMutex, NULL); + pthread_cond_init(&ctx->queuePushCond, NULL); + pthread_cond_init(&ctx->queuePopCond, NULL); ctx->shutdown = 0; /* Allocate space for the thread handles */ ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t)); ctx->numThreads = 0; /* Check for errors */ - if (!ctx->threads || !ctx->queue || err) { POOL_free(ctx); return NULL; } + if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } /* Initialize the threads */ { size_t i; for (i = 0; i < numThreads; ++i) { diff --git a/lib/common/pool.h b/lib/common/pool.h index f4afc1ee..c26f543f 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -39,6 +39,7 @@ typedef void (*POOL_add_function)(void *, POOL_function, void *); /*! POOL_add() : Add the job `function(opaque)` to the thread pool. Possibly blocks until there is room in the queue. + Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed. */ void POOL_add(void *ctx, POOL_function function, void *opaque); diff --git a/lib/common/threading.c b/lib/common/threading.c index 1725650c..abad2c15 100644 --- a/lib/common/threading.c +++ b/lib/common/threading.c @@ -15,7 +15,7 @@ * This file will hold wrapper for systems, which do not support Pthreads */ -#ifdef _WIN32 +#if defined(ZSTD_PTHREAD) && defined(_WIN32) /** * Windows minimalist Pthread Wrapper, based on : diff --git a/lib/common/threading.h b/lib/common/threading.h index a8126eb7..d5dc8f75 100644 --- a/lib/common/threading.h +++ b/lib/common/threading.h @@ -24,24 +24,42 @@ extern "C" { * Windows minimalist Pthread Wrapper, based on : * http://www.cse.wustl.edu/~schmidt/win32-cv-1.html */ +#ifdef WINVER +# undef WINVER +#endif +#define WINVER 0x0600 + +#ifdef _WIN32_WINNT +# undef _WIN32_WINNT +#endif +#define _WIN32_WINNT 0x0600 #ifndef WIN32_LEAN_AND_MEAN # define WIN32_LEAN_AND_MEAN #endif + #include /* mutex */ #define pthread_mutex_t CRITICAL_SECTION #define pthread_mutex_init(a,b) InitializeCriticalSection((a)) #define pthread_mutex_destroy(a) DeleteCriticalSection((a)) -#define pthread_mutex_lock EnterCriticalSection -#define pthread_mutex_unlock LeaveCriticalSection +#define pthread_mutex_lock(a) EnterCriticalSection((a)) +#define pthread_mutex_unlock(a) LeaveCriticalSection((a)) + +/* condition variable */ +#define pthread_cond_t CONDITION_VARIABLE +#define pthread_cond_init(a, b) InitializeConditionVariable((a)) +#define pthread_cond_destroy(a) /* No delete */ +#define pthread_cond_wait(a, b) SleepConditionVariableCS((a), (b), INFINITE) +#define pthread_cond_signal(a) WakeConditionVariable((a)) +#define pthread_cond_broadcast(a) WakeAllConditionVariable((a)) /* pthread_create() and pthread_join() */ typedef struct { HANDLE handle; void* (*start_routine)(void*); - void*varg; + void* arg; } pthread_t; int pthread_create(pthread_t* thread, const void* unused, @@ -68,6 +86,13 @@ typedef int pthread_mutex_t; #define pthread_mutex_lock(a) #define pthread_mutex_unlock(a) +typedef int pthread_cond_t; +#define pthread_cond_init(a,b) +#define pthread_cond_destroy(a) +#define pthread_cond_wait(a,b) +#define pthread_cond_signal(a) +#define pthread_cond_broadcast(a) + /* do not use pthread_t */ #endif /* ZSTD_PTHREAD */ diff --git a/tests/Makefile b/tests/Makefile index 739944de..6312584a 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -48,8 +48,10 @@ ZDICT_FILES := $(ZSTDDIR)/dictBuilder/*.c # Define *.exe as extension for Windows systems ifneq (,$(filter Windows%,$(OS))) EXT =.exe +PTHREAD = -DZSTD_PTHREAD else EXT = +PTHREAD = -pthread -DZSTD_PTHREAD endif VOID = /dev/null @@ -158,8 +160,8 @@ else $(CC) $(FLAGS) $^ -o $@$(EXT) -Wl,-rpath=$(ZSTDDIR) $(ZSTDDIR)/libzstd.so endif -pool : pool.c $(ZSTDDIR)/common/pool.c - $(CC) $(FLAGS) -pthread -DZSTD_PTHREAD $^ -o $@$(EXT) +pool : pool.c $(ZSTDDIR)/common/pool.c $(ZSTDDIR)/common/threading.c + $(CC) $(FLAGS) $(PTHREAD) $^ -o $@$(EXT) namespaceTest: if $(CC) namespaceTest.c ../lib/common/xxhash.c -o $@ ; then echo compilation should fail; exit 1 ; fi @@ -225,7 +227,7 @@ zstd-playTests: datagen file $(ZSTD) ZSTD="$(QEMU_SYS) $(ZSTD)" ./playTests.sh $(ZSTDRTTEST) -test: test-zstd test-fullbench test-fuzzer test-zstream test-longmatch test-invalidDictionaries +test: test-zstd test-fullbench test-fuzzer test-zstream test-longmatch test-invalidDictionaries test-pool test32: test-zstd32 test-fullbench32 test-fuzzer32 test-zstream32 diff --git a/tests/pool.c b/tests/pool.c index ce38075d..adc5947d 100644 --- a/tests/pool.c +++ b/tests/pool.c @@ -1,5 +1,5 @@ #include "pool.h" -#include +#include "threading.h" #include #include @@ -14,7 +14,7 @@ struct data { pthread_mutex_t mutex; - unsigned data[1024]; + unsigned data[16]; size_t i; }; @@ -26,45 +26,45 @@ void fn(void *opaque) { pthread_mutex_unlock(&data->mutex); } -int testOrder(size_t numThreads, size_t queueLog) { +int testOrder(size_t numThreads, size_t queueSize) { struct data data; - POOL_ctx *ctx = POOL_create(numThreads, queueLog); + POOL_ctx *ctx = POOL_create(numThreads, queueSize); ASSERT_TRUE(ctx); data.i = 0; - ASSERT_FALSE(pthread_mutex_init(&data.mutex, NULL)); + pthread_mutex_init(&data.mutex, NULL); { size_t i; - for (i = 0; i < 1024; ++i) { + for (i = 0; i < 16; ++i) { POOL_add(ctx, &fn, &data); } } POOL_free(ctx); - ASSERT_EQ(1024, data.i); + ASSERT_EQ(16, data.i); { size_t i; for (i = 0; i < data.i; ++i) { ASSERT_EQ(i, data.data[i]); } } - ASSERT_FALSE(pthread_mutex_destroy(&data.mutex)); + pthread_mutex_destroy(&data.mutex); return 0; } int main(int argc, const char **argv) { size_t numThreads; - for (numThreads = 1; numThreads <= 8; ++numThreads) { - size_t queueLog; - for (queueLog = 1; queueLog <= 8; ++queueLog) { - if (testOrder(numThreads, queueLog)) { + for (numThreads = 1; numThreads <= 4; ++numThreads) { + size_t queueSize; + for (queueSize = 1; queueSize <= 2; ++queueSize) { + if (testOrder(numThreads, queueSize)) { printf("FAIL: testOrder\n"); return 1; } } } printf("PASS: testOrder\n"); - (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n") - : printf("PASS: testInvalid\n"); (void)argc; (void)argv; + return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1 + : printf("PASS: testInvalid\n"), 0; return 0; }