diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 17721f87..ab332f49 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -355,7 +355,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx) double completion; pthread_mutex_lock(&ctx->completion_mutex.pMutex); completion = writeSlow ? ctx->writeCompletion : ctx->createCompletion; - DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion); + DEBUG(3, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); { unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1; @@ -375,7 +375,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx) unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); - DEBUG(3, "completion: %f\n", completion); + DEBUG(2, "completion: %f\n", completion); ctx->compressionLevel -= change; reset = 1; } @@ -493,6 +493,13 @@ static void* compressionThread(void* arg) dstPos += ret; dictPos += actualBlockSize; blockNum++; + + /* update completion */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + if (!ctx->compressionCompletionMeasured) { + ctx->compressionCompletion = 1 - (double)remaining/job->src.size; + } + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } job->dst.size = job->compressedSize; @@ -549,11 +556,7 @@ static void* outputThread(void* arg) pthread_mutex_unlock(&ctx->stats_mutex.pMutex); reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); - if (!ctx->compressionCompletionMeasured) { - ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx); - ctx->compressionCompletionMeasured = 1; - DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion); - } + ctx->compressionCompletionMeasured = 1; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);