added extended POOL test

abrupt end + downsizing with running jobs remaining in queue.

also : POOL_resize() requires numThreads >= 1
This commit is contained in:
Yann Collet 2018-06-21 14:58:59 -07:00
parent 7d80ada5ca
commit 818e72b4d5
3 changed files with 16 additions and 9 deletions

View File

@ -73,10 +73,13 @@ static void* POOL_thread(void* opaque) {
while ( ctx->queueEmpty
|| (ctx->numThreadsBusy >= ctx->threadLimit) ) {
if (ctx->shutdown) {
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
if (ctx->shutdown) {
/* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
* a few threads will be shutdown while !queueEmpty,
* but enough threads will remain active to finish the queue */
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* Pop a job off the queue */
@ -99,7 +102,7 @@ static void* POOL_thread(void* opaque) {
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
} /* for (;;) */
assert(0); /* Unreachable */
assert(0); /* Unreachable */
}
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
@ -187,10 +190,12 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
}
/* note : only works if no job is running ! */
/* @return : a working pool on success, NULL on failure
* note : starting context is considered consumed. */
static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
{
if (numThreads <= ctx->threadCapacity) {
if (!numThreads) return NULL;
ctx->threadLimit = numThreads;
return ctx;
}

View File

@ -45,6 +45,7 @@ void POOL_free(POOL_ctx* ctx);
* note : new pool context might have same address as original one, but it's not guaranteed.
* consider starting context as consumed, only rely on returned one.
* note 2 : only numThreads can be resized, queueSize is unchanged.
* note 3 : `numThreads` must be at least 1
*/
POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads);

View File

@ -169,7 +169,7 @@ typedef struct {
void waitIncFn(void *opaque) {
abruptEndCanary_t* test = (abruptEndCanary_t*) opaque;
UTIL_sleepMilli(1);
UTIL_sleepMilli(10);
ZSTD_pthread_mutex_lock(&test->mut);
test->val = test->val + 1;
ZSTD_pthread_mutex_unlock(&test->mut);
@ -179,14 +179,15 @@ static int testAbruptEnding_internal(abruptEndCanary_t test)
{
int const nbWaits = 16;
POOL_ctx* const ctx = POOL_create(2 /*numThreads*/, nbWaits /*queueSize*/);
POOL_ctx* ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
ASSERT_TRUE(ctx);
test.val = 0;
{ int i;
for (i=0; i<nbWaits; i++)
POOL_add(ctx, &waitLongFn, &test); /* all jobs either processed on in the queue */
POOL_add(ctx, &waitIncFn, &test); /* all jobs pushed into queue */
}
ctx = POOL_resize(ctx, 1 /*numThreads*/); /* downsize numThreads, to try to break end condition */
POOL_free(ctx); /* must finish all jobs in queue before giving back control */
ASSERT_EQ(test.val, nbWaits);