diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index c542c9a3..b466dbdd 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -25,7 +25,7 @@ #define DEFAULT_DISPLAY_LEVEL 1 #define DEFAULT_COMPRESSION_LEVEL 6 #define DEFAULT_ADAPT_PARAM 0 -#define MAX_COMPRESSION_LEVEL_CHANGE 4 +#define MAX_COMPRESSION_LEVEL_CHANGE 3 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; @@ -77,9 +77,12 @@ typedef struct { unsigned jobWriteID; unsigned allJobsCompleted; unsigned adaptParam; - double compressionCompletionMeasured; - double writeCompletionMeasured; - double createCompletionMeasured; + double createWaitWriteCompletion; + double createWaitCompressionCompletion; + double compressWaitCreateCompletion; + double compressWaitWriteCompletion; + double writeWaitCreateCompletion; + double writeWaitCompressionCompletion; double compressionCompletion; double writeCompletion; double createCompletion; @@ -200,9 +203,14 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs) ctx->jobCompressedID = 0; ctx->jobWriteID = 0; ctx->lastDictSize = 0; - ctx->createCompletionMeasured = 1; - ctx->compressionCompletionMeasured = 1; - ctx->writeCompletionMeasured = 1; + + + ctx->createWaitWriteCompletion = 1; + ctx->createWaitCompressionCompletion = 1; + ctx->compressWaitCreateCompletion = 1; + ctx->compressWaitWriteCompletion = 1; + ctx->writeWaitCreateCompletion = 1; + ctx->writeWaitCompressionCompletion = 1; ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); @@ -308,45 +316,61 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) */ static void adaptCompressionLevel(adaptCCtx* ctx) { - double createCompletion, compressionCompletion, writeCompletion; + double createWaitWriteCompletion; + double createWaitCompressionCompletion; + double compressWaitCreateCompletion; + double compressWaitWriteCompletion; + double writeWaitCreateCompletion; + double writeWaitCompressionCompletion; double const threshold = 0.00001; + + + /* read and reset completion measurements */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); - createCompletion = ctx->createCompletionMeasured; - compressionCompletion = ctx->compressionCompletionMeasured; - writeCompletion = ctx->writeCompletionMeasured; + DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion); + DEBUG(2, "rw %f\n", ctx->createWaitWriteCompletion); + DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion); + DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion); + DEBUG(2, "wr %f\n", ctx->writeWaitCreateCompletion); + DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion); + + createWaitCompressionCompletion = ctx->createWaitCompressionCompletion; + createWaitWriteCompletion = ctx->createWaitWriteCompletion; + compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; + compressWaitWriteCompletion = ctx->compressWaitWriteCompletion; + writeWaitCreateCompletion = ctx->writeWaitCreateCompletion; + writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion; + + ctx->createWaitWriteCompletion = 1; + ctx->createWaitCompressionCompletion = 1; + ctx->compressWaitCreateCompletion = 1; + ctx->compressWaitWriteCompletion = 1; + ctx->writeWaitCreateCompletion = 1; + ctx->writeWaitCompressionCompletion = 1; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - DEBUG(2, "create completion: %f\n", createCompletion); - DEBUG(2, "compression completion: %f\n", compressionCompletion); - DEBUG(2, "write completion: %f\n", writeCompletion); - /* adapt compression based on bottleneck */ - if (1 - createCompletion > threshold) { - /* job creation was not finished, compression thread waited */ - unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - createCompletion * MAX_COMPRESSION_LEVEL_CHANGE; - unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); - DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change); - ctx->compressionLevel += boundChange; - } - else if (1 - writeCompletion > threshold) { - /* write thread was not finished, compression thread waited */ - unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - writeCompletion * MAX_COMPRESSION_LEVEL_CHANGE; - unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); - DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change); - ctx->compressionLevel += boundChange; - } - else if (1 - compressionCompletion > threshold) { - /* compression thread was not finished, one of the other two threads waited */ - unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - compressionCompletion * MAX_COMPRESSION_LEVEL_CHANGE; + /* adaptation logic */ + if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) { + /* both create and write threads waiting on compression */ + /* use writeWaitCompressionCompletion */ + unsigned const change = (unsigned)((1-writeWaitCompressionCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const boundChange = MIN(change, ctx->compressionLevel - 1); - DEBUG(2, "decreasing compression level %u by %u\n", ctx->compressionLevel, change); ctx->compressionLevel -= boundChange; } - /* reset */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->createCompletionMeasured = 1; - ctx->compressionCompletionMeasured = 1; - ctx->writeCompletionMeasured = 1; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + else if (1-createWaitWriteCompletion > threshold && 1-compressWaitWriteCompletion > threshold) { + /* both create and compression thread waiting on write */ + /* use createWaitWriteCompletion */ + unsigned const change = (unsigned)((1-createWaitWriteCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); + unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); + ctx->compressionLevel += boundChange; + } + else if (1-writeWaitCreateCompletion > threshold && 1-compressWaitCreateCompletion > threshold) { + /* both compression and write waiting on create */ + /* use compressWaitCreateCompletion */ + unsigned const change = (unsigned)((1-compressWaitCreateCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); + unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); + ctx->compressionLevel += boundChange; + } if (g_forceCompressionLevel) { ctx->compressionLevel = g_compressionLevel; @@ -375,9 +399,9 @@ static void* compressionThread(void* arg) while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { pthread_mutex_lock(&ctx->completion_mutex.pMutex); /* compression thread is waiting, take measurements of write completion and read completion */ - ctx->createCompletionMeasured = ctx->createCompletion; - ctx->writeCompletionMeasured = ctx->writeCompletion; - DEBUG(3, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured); + ctx->compressWaitCreateCompletion = ctx->createCompletion; + ctx->compressWaitWriteCompletion = ctx->writeCompletion; + DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion); DEBUG(3, "create completion: %f\n", ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); @@ -462,7 +486,7 @@ static void* compressionThread(void* arg) /* update completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->compressionCompletion = 1 - (double)remaining/job->src.size; - DEBUG(3, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion); + DEBUG(3, "compression completion %f\n", ctx->compressionCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } while (remaining != 0); @@ -517,8 +541,9 @@ static void* outputThread(void* arg) while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { pthread_mutex_lock(&ctx->completion_mutex.pMutex); /* write thread is waiting, take measurement of compression completion */ - ctx->compressionCompletionMeasured = ctx->compressionCompletion; - DEBUG(3, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); + ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; + ctx->writeWaitCreateCompletion = ctx->createCompletion; + DEBUG(3, "write thread waiting : writeWaitCreateCompletion %f : writeWaitCompressionCompletion %f\n", ctx->writeWaitCreateCompletion, ctx->writeWaitCompressionCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); @@ -541,7 +566,7 @@ static void* outputThread(void* arg) } { // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile); - size_t const blockSize = MAX(compressedSize >> 7, 64 << 10); + size_t const blockSize = MAX(compressedSize >> 7, 1 << 10); size_t pos = 0; for ( ; ; ) { size_t const writeSize = MIN(remaining, blockSize); @@ -553,6 +578,7 @@ static void* outputThread(void* arg) /* update completion variable for writing */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->writeCompletion = 1 - (double)remaining/compressedSize; + DEBUG(3, "write completion %f\n", ctx->writeCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); if (remaining == 0) break; @@ -599,8 +625,9 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { pthread_mutex_lock(&ctx->completion_mutex.pMutex); /* creation thread is waiting, take measurement of compression completion */ - ctx->compressionCompletionMeasured = ctx->compressionCompletion; - DEBUG(3, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured); + ctx->createWaitCompressionCompletion = ctx->compressionCompletion; + ctx->createWaitWriteCompletion = ctx->writeCompletion; + DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);