change parameters for compression level adapt

This commit is contained in:
Paul Cruz 2017-07-17 17:59:50 -07:00
parent b3c9e02bb6
commit 5af04c57b0

View File

@ -25,7 +25,7 @@
#define DEFAULT_DISPLAY_LEVEL 1
#define DEFAULT_COMPRESSION_LEVEL 6
#define DEFAULT_ADAPT_PARAM 1
#define MAX_COMPRESSION_LEVEL_CHANGE 10
#define MAX_COMPRESSION_LEVEL_CHANGE 3
static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@ -277,6 +277,15 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
}
/* this function normalizes counters when compression level is changing */
static void reduceCounters(adaptCCtx* ctx)
{
unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter));
ctx->stats.writeCounter -= min;
ctx->stats.compressedCounter -= min;
ctx->stats.readyCounter -= min;
}
/*
* Compression level is changed depending on which part of the compression process is lagging
* Currently, three theads exist for job creation, compression, and file writing respectively.
@ -285,10 +294,10 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
* compression thread lag => decreased compression level
* detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
*/
static unsigned adaptCompressionLevel(adaptCCtx* ctx)
static void adaptCompressionLevel(adaptCCtx* ctx)
{
if (g_forceCompressionLevel) {
return g_compressionLevel;
ctx->compressionLevel = g_compressionLevel;
}
else {
unsigned reset = 0;
@ -296,10 +305,11 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
unsigned const writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting));
unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting));
unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting));
DEBUG(3, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
unsigned const writeSlow = (compressWaiting && createWaiting);
unsigned const compressSlow = (writeWaiting && createWaiting);
unsigned const createSlow = (compressWaiting && writeWaiting);
DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
if (allSlow) {
reset = 1;
}
@ -310,10 +320,10 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
}
else if (compressSlow && ctx->compressionLevel > 1) {
double const completion = ctx->completion;
unsigned const maxChange = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
DEBUG(2, "completion: %f\n", completion);
DEBUG(3, "completion: %f\n", completion);
ctx->compressionLevel -= change;
reset = 1;
}
@ -324,7 +334,6 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
ctx->completion = 1;
ctx->completionMeasured = 0;
}
return ctx->compressionLevel;
}
}
@ -348,6 +357,8 @@ static void* compressionThread(void* arg)
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
ctx->stats.waitReady++;
ctx->stats.readyCounter++;
reduceCounters(ctx);
adaptCompressionLevel(ctx);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
}
@ -357,13 +368,13 @@ static void* compressionThread(void* arg)
DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
/* compress the data */
{
unsigned const cLevel = adaptCompressionLevel(ctx);
unsigned const cLevel = ctx->compressionLevel;
DEBUG(3, "cLevel used: %u\n", cLevel);
DEBUG(3, "compression level used: %u\n", cLevel);
/* begin compression */
{
size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
DEBUG(2, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
DEBUG(3, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, cLevel);
size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
@ -443,11 +454,13 @@ static void* outputThread(void* arg)
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
reduceCounters(ctx);
if (!ctx->completionMeasured) {
ctx->completion = ZSTD_getCompletion(ctx->cctx);
ctx->completionMeasured = 1;
}
DEBUG(2, "output detected completion: %f\n", ctx->completion);
adaptCompressionLevel(ctx);
DEBUG(3, "output detected completion: %f\n", ctx->completion);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
@ -503,11 +516,13 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
reduceCounters(ctx);
if (!ctx->completionMeasured) {
ctx->completion = ZSTD_getCompletion(ctx->cctx);
ctx->completionMeasured = 1;
}
DEBUG(2, "job creation detected completion %f\n", ctx->completion);
adaptCompressionLevel(ctx);
DEBUG(3, "job creation detected completion %f\n", ctx->completion);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
}