From e11bf55d0bf0a83327031b921480625781133033 Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Wed, 19 Jul 2017 10:10:47 -0700 Subject: [PATCH] added mechanism for measuring how much of a job has been created --- contrib/adaptive-compression/adapt.c | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index d3ffe6a8..de2e5e13 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -89,8 +89,10 @@ typedef struct { unsigned adaptParam; unsigned compressionCompletionMeasured; unsigned writeCompletionMeasured; + unsigned createCompletionMeasured; double compressionCompletion; double writeCompletion; + double createCompletion; mutex_t jobCompressed_mutex; cond_t jobCompressed_cond; mutex_t jobReady_mutex; @@ -344,7 +346,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx) unsigned const writeSlow = (compressWaiting && createWaiting); unsigned const compressSlow = (writeWaiting && createWaiting); unsigned const createSlow = (compressWaiting && writeWaiting); - DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting); + DEBUG(3, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting); if (allSlow) { reset = 1; } @@ -352,13 +354,14 @@ static void adaptCompressionLevel(adaptCCtx* ctx) DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel); double completion; pthread_mutex_lock(&ctx->completion_mutex.pMutex); - completion = ctx->writeCompletion; + completion = writeSlow ? ctx->writeCompletion : ctx->createCompletion; + DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); { unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1; unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1; - DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change); - DEBUG(2, "write completion: %f\n", completion); + DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change); + DEBUG(3, "write completion: %f\n", completion); ctx->compressionLevel += change; reset = 1; } @@ -418,6 +421,9 @@ static void* compressionThread(void* arg) ctx->stats.readyCounter++; pthread_mutex_unlock(&ctx->stats_mutex.pMutex); reduceCounters(ctx); + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->createCompletionMeasured = 1; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); adaptCompressionLevel(ctx); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); @@ -695,6 +701,12 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA } pos += ret; remaining -= ret; + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + if (!ctx->createCompletionMeasured) { + ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); + } + DEBUG(3, "create completion: %f\n", ctx->createCompletion); + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } if (remaining != 0 && !feof(srcFile)) { DISPLAY("Error: problem occurred during read from src file\n");