From 6f1e260eddd08001631d6f8c5d760f9da6674e3a Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Tue, 25 Jul 2017 10:01:10 -0700 Subject: [PATCH] added mechanism for getting rid of spikes --- contrib/adaptive-compression/adapt.c | 38 +++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index f6bcddd1..32f54e67 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -26,6 +26,7 @@ #define DEFAULT_COMPRESSION_LEVEL 6 #define DEFAULT_ADAPT_PARAM 0 #define MAX_COMPRESSION_LEVEL_CHANGE 2 +#define CONVERGENCE_LOWER_BOUND 3 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; @@ -77,6 +78,7 @@ typedef struct { unsigned jobWriteID; unsigned allJobsCompleted; unsigned adaptParam; + unsigned convergenceCounter; double createWaitCompressionCompletion; double compressWaitCreateCompletion; double compressWaitWriteCompletion; @@ -213,6 +215,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs) ctx->createCompletion = 1; ctx->writeCompletion = 1; ctx->compressionCompletion = 1; + ctx->convergenceCounter = 0; ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); @@ -323,6 +326,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx) double compressWaitWriteCompletion; double writeWaitCompressionCompletion; double const threshold = 0.00001; + unsigned const prevCompressionLevel = ctx->compressionLevel; DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel); /* read and reset completion measurements */ @@ -347,7 +351,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx) compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; ctx->compressWaitCreateCompletion = 1; pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); - + DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter); /* adaptation logic */ if (1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) { /* create or write waiting on compression */ @@ -355,7 +359,15 @@ static void adaptCompressionLevel(adaptCCtx* ctx) double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion); unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const boundChange = MIN(change, ctx->compressionLevel - 1); - ctx->compressionLevel -= boundChange; + if (ctx->convergenceCounter > CONVERGENCE_LOWER_BOUND && boundChange != 0) { + /* reset convergence counter, might have been a spike */ + ctx->convergenceCounter = 0; + } + else if (boundChange != 0) { + ctx->compressionLevel -= boundChange; + ctx->convergenceCounter = 1; + } + DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange); } else if (1-compressWaitWriteCompletion > threshold) { @@ -363,7 +375,14 @@ static void adaptCompressionLevel(adaptCCtx* ctx) double const completion = compressWaitWriteCompletion; unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); - ctx->compressionLevel += boundChange; + if (ctx->convergenceCounter > CONVERGENCE_LOWER_BOUND && boundChange != 0) { + ctx->convergenceCounter = 0; + } + else if (boundChange != 0) { + ctx->compressionLevel += boundChange; + ctx->convergenceCounter = 1; + } + DEBUG(2, "compress waiting on write, tried to increase compression level by %u\n\n", boundChange); } else if (1-compressWaitCreateCompletion > threshold) { @@ -372,10 +391,21 @@ static void adaptCompressionLevel(adaptCCtx* ctx) double const completion = compressWaitCreateCompletion; unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); - ctx->compressionLevel += boundChange; + if (ctx->convergenceCounter > CONVERGENCE_LOWER_BOUND && boundChange != 0) { + ctx->convergenceCounter = 0; + } + else if (boundChange != 0) { + ctx->compressionLevel += boundChange; + ctx->convergenceCounter = 1; + } + DEBUG(2, "compression waiting on create, tried to increase compression level by %u\n\n", boundChange); } + if (ctx->compressionLevel == prevCompressionLevel) { + ctx->convergenceCounter++; + } + if (g_forceCompressionLevel) { ctx->compressionLevel = g_compressionLevel; }