change control of threadLimit
now limits maximum nb of active threads even when queueSize > 1.
This commit is contained in:
parent
62469c9f41
commit
6b48eb12c0
@ -10,9 +10,10 @@
|
|||||||
|
|
||||||
|
|
||||||
/* ====== Dependencies ======= */
|
/* ====== Dependencies ======= */
|
||||||
#include <stddef.h> /* size_t */
|
#include <stddef.h> /* size_t */
|
||||||
#include "pool.h"
|
#include "debug.h" /* assert */
|
||||||
#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
|
#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
|
||||||
|
#include "pool.h"
|
||||||
|
|
||||||
/* ====== Compiler specifics ====== */
|
/* ====== Compiler specifics ====== */
|
||||||
#if defined(_MSC_VER)
|
#if defined(_MSC_VER)
|
||||||
@ -70,14 +71,14 @@ static void* POOL_thread(void* opaque) {
|
|||||||
/* Lock the mutex and wait for a non-empty queue or until shutdown */
|
/* Lock the mutex and wait for a non-empty queue or until shutdown */
|
||||||
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
|
||||||
|
|
||||||
while (ctx->queueEmpty && !ctx->shutdown) {
|
while ( ctx->queueEmpty
|
||||||
|
|| (ctx->numThreadsBusy >= ctx->threadLimit) ) {
|
||||||
|
if (ctx->shutdown) {
|
||||||
|
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
|
||||||
|
return opaque;
|
||||||
|
}
|
||||||
ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
|
ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
|
||||||
}
|
}
|
||||||
/* empty => shutting down: so stop */
|
|
||||||
if (ctx->queueEmpty) {
|
|
||||||
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
|
|
||||||
return opaque;
|
|
||||||
}
|
|
||||||
/* Pop a job off the queue */
|
/* Pop a job off the queue */
|
||||||
{ POOL_job const job = ctx->queue[ctx->queueHead];
|
{ POOL_job const job = ctx->queue[ctx->queueHead];
|
||||||
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
|
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
|
||||||
@ -97,7 +98,7 @@ static void* POOL_thread(void* opaque) {
|
|||||||
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
|
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
|
||||||
} }
|
} }
|
||||||
} /* for (;;) */
|
} /* for (;;) */
|
||||||
/* Unreachable */
|
assert(0); /* Unreachable */
|
||||||
}
|
}
|
||||||
|
|
||||||
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
|
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
|
||||||
@ -237,7 +238,7 @@ static int isQueueFull(POOL_ctx const* ctx) {
|
|||||||
if (ctx->queueSize > 1) {
|
if (ctx->queueSize > 1) {
|
||||||
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
|
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
|
||||||
} else {
|
} else {
|
||||||
return ctx->numThreadsBusy == ctx->threadLimit ||
|
return (ctx->numThreadsBusy == ctx->threadLimit) ||
|
||||||
!ctx->queueEmpty;
|
!ctx->queueEmpty;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user