changed how completion is actually sampled

This commit is contained in:
Paul Cruz 2017-07-20 10:53:51 -07:00
parent dcf609f835
commit 7ab758a640

View File

@ -24,8 +24,8 @@
#define MAX_PATH 256
#define DEFAULT_DISPLAY_LEVEL 1
#define DEFAULT_COMPRESSION_LEVEL 6
#define DEFAULT_ADAPT_PARAM 1
#define MAX_COMPRESSION_LEVEL_CHANGE 3
#define DEFAULT_ADAPT_PARAM 0
#define MAX_COMPRESSION_LEVEL_CHANGE 4
static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@ -87,9 +87,9 @@ typedef struct {
unsigned jobWriteID;
unsigned allJobsCompleted;
unsigned adaptParam;
unsigned compressionCompletionMeasured;
unsigned writeCompletionMeasured;
unsigned createCompletionMeasured;
double compressionCompletionMeasured;
double writeCompletionMeasured;
double createCompletionMeasured;
double compressionCompletion;
double writeCompletion;
double createCompletion;
@ -342,6 +342,9 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
DEBUG(2, "createWaiting %u\n", createWaiting);
DEBUG(2, "compressWaiting %u\n", compressWaiting);
DEBUG(2, "writeWaiting %u\n\n", writeWaiting);
{
unsigned const writeSlow = (compressWaiting && createWaiting);
unsigned const compressSlow = (writeWaiting && createWaiting);
@ -351,14 +354,14 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
reset = 1;
}
else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel);
double completion;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
completion = writeSlow ? ctx->writeCompletion : ctx->createCompletion;
DEBUG(3, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion);
completion = writeSlow ? ctx->writeCompletionMeasured : ctx->createCompletionMeasured;
DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletionMeasured, ctx->createCompletionMeasured);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
{
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE));
unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel);
DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
DEBUG(3, "write completion: %f\n", completion);
@ -369,13 +372,13 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
else if (compressSlow && ctx->compressionLevel > 1) {
double completion;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
completion = ctx->compressionCompletion;
completion = ctx->compressionCompletionMeasured;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
{
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE));
unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
DEBUG(3, "completion: %f\n", completion);
DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel);
DEBUG(2, "completion: %f\n", completion);
ctx->compressionLevel -= change;
reset = 1;
}
@ -386,15 +389,6 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
ctx->stats.writeCounter = 0;
ctx->stats.compressedCounter = 0;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 0;
ctx->compressionCompletionMeasured = 0;
ctx->writeCompletion = 0;
ctx->writeCompletionMeasured = 0;
ctx->createCompletion = 0;
ctx->createCompletionMeasured = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
}
}
@ -424,8 +418,8 @@ static void* compressionThread(void* arg)
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletionMeasured = 1;
DEBUG(2, "create completion: %f\n", ctx->createCompletion);
ctx->createCompletionMeasured = ctx->createCompletion;
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
@ -502,9 +496,8 @@ static void* compressionThread(void* arg)
/* update completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (!ctx->compressionCompletionMeasured) {
ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
}
ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
DEBUG(2, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
} while (remaining != 0);
@ -562,8 +555,8 @@ static void* outputThread(void* arg)
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletionMeasured = 1;
DEBUG(2, "compressionCompletion %f\n", ctx->compressionCompletion);
ctx->compressionCompletionMeasured = ctx->compressionCompletion;
DEBUG(2, "waited on job %u: compressionCompletion %f\n", currJob, ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
@ -580,7 +573,7 @@ static void* outputThread(void* arg)
}
{
// size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
size_t const blockSize = 64 << 10; /* 64 KB */
size_t const blockSize = compressedSize >> 7;
size_t pos = 0;
for ( ; ; ) {
size_t const writeSize = MIN(remaining, blockSize);
@ -591,9 +584,7 @@ static void* outputThread(void* arg)
/* update completion variable for writing */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (!ctx->writeCompletionMeasured) {
ctx->writeCompletion = 1 - (double)remaining/compressedSize;
}
ctx->writeCompletion = 1 - (double)remaining/compressedSize;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
if (remaining == 0) break;
@ -643,8 +634,8 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletionMeasured = 1;
DEBUG(2, "writeCompletion: %f\n", ctx->writeCompletion);
ctx->writeCompletionMeasured = ctx->writeCompletion;
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
@ -736,9 +727,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
pos += ret;
remaining -= ret;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (!ctx->createCompletionMeasured) {
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
}
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}