From f91854549111eb90ba5f7f15d142b8687e4ab74f Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Mon, 10 Jul 2017 18:16:42 -0700 Subject: [PATCH] made some progress on improving compression ratio, but problems exist with speed limits, and for some reason higher compression levels are really slow --- contrib/adaptive-compression/adapt.c | 102 ++++++++++++++++++--------- 1 file changed, 69 insertions(+), 33 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 8f3acd5c..2bf8bb81 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -1,6 +1,6 @@ #define DISPLAY(...) fprintf(stderr, __VA_ARGS__) #define PRINT(...) fprintf(stdout, __VA_ARGS__) -#define DEBUGLOG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } } +#define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } } #define FILE_CHUNK_SIZE 4 << 20 #define MAX_NUM_JOBS 2; #define stdinmark "/*stdin*\\" @@ -15,7 +15,7 @@ typedef unsigned char BYTE; #include /* malloc, free */ #include /* pthread functions */ #include /* memset */ -#include "zstd.h" +#include "zstd_internal.h" #include "util.h" static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; @@ -48,6 +48,7 @@ typedef struct { typedef struct { buffer_t src; buffer_t dst; + buffer_t dict; unsigned compressionLevel; unsigned jobID; unsigned lastJob; @@ -87,6 +88,7 @@ static void freeCompressionJobs(adaptCCtx* ctx) jobDescription job = ctx->jobs[u]; free(job.dst.start); free(job.src.start); + free(job.dict.start); } } @@ -142,8 +144,9 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) jobDescription* job = &ctx->jobs[jobNum]; job->src.start = malloc(FILE_CHUNK_SIZE); job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE)); + job->dict.start = malloc(FILE_CHUNK_SIZE); job->lastJob = 0; - if (!job->src.start || !job->dst.start) { + if (!job->src.start || !job->dst.start || !job->dict.start) { DISPLAY("Could not allocate buffers for jobs\n"); freeCCtx(ctx); return NULL; @@ -207,17 +210,17 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx) unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting)) ? 1 : 0; unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting)) ? 1 : 0; unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting)) ? 1 : 0; - DEBUGLOG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter); + DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter); if (allSlow) { reset = 1; } else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) { - DEBUGLOG(2, "increasing compression level %u\n", ctx->compressionLevel); + DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel); ctx->compressionLevel++; reset = 1; } else if (compressSlow && ctx->compressionLevel > 1) { - DEBUGLOG(2, "decreasing compression level %u\n", ctx->compressionLevel); + DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel); ctx->compressionLevel--; reset = 1; } @@ -236,38 +239,63 @@ static void* compressionThread(void* arg) for ( ; ; ) { unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; - DEBUGLOG(2, "compressionThread(): waiting on job ready\n"); + DEBUG(2, "compressionThread(): waiting on job ready\n"); pthread_mutex_lock(&ctx->jobReady_mutex); while(currJob + 1 > ctx->jobReadyID) { ctx->stats.waitReady++; ctx->stats.readyCounter++; - DEBUGLOG(2, "waiting on job ready, nextJob: %u\n", currJob); + DEBUG(2, "waiting on job ready, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex); } pthread_mutex_unlock(&ctx->jobReady_mutex); - DEBUGLOG(2, "compressionThread(): continuing after job ready\n"); + DEBUG(2, "compressionThread(): continuing after job ready\n"); /* compress the data */ { unsigned const cLevel = adaptCompressionLevel(ctx); - DEBUGLOG(2, "cLevel used: %u\n", cLevel); - size_t const compressedSize = ZSTD_compressCCtx(ctx->cctx, job->dst.start, job->dst.size, job->src.start, job->src.size, cLevel); - if (ZSTD_isError(compressedSize)) { + ZSTD_parameters params = ZSTD_getParams(cLevel, job->src.size, 0); + DEBUG(2, "cLevel used: %u\n", cLevel); + + /* begin compression */ + { + size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1); + size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->dict.start, job->dict.size, params, 0); + size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1); + if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) { + DISPLAY("Error: something went wrong while starting compression\n"); + ctx->threadError = 1; + return arg; + } + } + + /* continue compression */ + if (currJob != 0) { /* not first job */ + size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.size, job->src.start, 0); + if (ZSTD_isError(hSize)) { + job->compressedSize = hSize; + ctx->threadError = 1; + return arg; + } + ZSTD_invalidateRepCodes(ctx->cctx); + } + job->compressedSize = (job->lastJob) ? + ZSTD_compressEnd (ctx->cctx, job->dst.start, job->dst.size, job->src.start, job->src.size) : + ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.size, job->src.start, job->src.size); + if (ZSTD_isError(job->compressedSize)) { + DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(job->compressedSize)); ctx->threadError = 1; - DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(compressedSize)); return arg; } - job->compressedSize = compressedSize; } pthread_mutex_lock(&ctx->jobCompressed_mutex); ctx->jobCompressedID++; - DEBUGLOG(2, "signaling for job %u\n", currJob); + DEBUG(2, "signaling for job %u\n", currJob); pthread_cond_signal(&ctx->jobCompressed_cond); pthread_mutex_unlock(&ctx->jobCompressed_mutex); - DEBUGLOG(2, "finished job compression %u\n", currJob); + DEBUG(2, "finished job compression %u\n", currJob); currJob++; if (job->lastJob || ctx->threadError) { /* finished compressing all jobs */ - DEBUGLOG(2, "all jobs finished compressing\n"); + DEBUG(2, "all jobs finished compressing\n"); break; } } @@ -299,16 +327,16 @@ static void* outputThread(void* arg) for ( ; ; ) { unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; - DEBUGLOG(2, "outputThread(): waiting on job compressed\n"); + DEBUG(2, "outputThread(): waiting on job compressed\n"); pthread_mutex_lock(&ctx->jobCompressed_mutex); while (currJob + 1 > ctx->jobCompressedID) { ctx->stats.waitCompressed++; ctx->stats.compressedCounter++; - DEBUGLOG(2, "waiting on job compressed, nextJob: %u\n", currJob); + DEBUG(2, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond, &ctx->jobCompressed_mutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex); - DEBUGLOG(2, "outputThread(): continuing after job compressed\n"); + DEBUG(2, "outputThread(): continuing after job compressed\n"); { size_t const compressedSize = job->compressedSize; if (ZSTD_isError(compressedSize)) { @@ -325,19 +353,19 @@ static void* outputThread(void* arg) } } } - DEBUGLOG(2, "finished job write %u\n", currJob); + DEBUG(2, "finished job write %u\n", currJob); currJob++; displayProgress(currJob, ctx->compressionLevel, job->lastJob); - DEBUGLOG(2, "locking job write mutex\n"); + DEBUG(2, "locking job write mutex\n"); pthread_mutex_lock(&ctx->jobWrite_mutex); ctx->jobWriteID++; pthread_cond_signal(&ctx->jobWrite_cond); pthread_mutex_unlock(&ctx->jobWrite_mutex); - DEBUGLOG(2, "unlocking job write mutex\n"); + DEBUG(2, "unlocking job write mutex\n"); if (job->lastJob || ctx->threadError) { /* finished with all jobs */ - DEBUGLOG(2, "all jobs finished writing\n"); + DEBUG(2, "all jobs finished writing\n"); pthread_mutex_lock(&ctx->allJobsCompleted_mutex); ctx->allJobsCompleted = 1; pthread_cond_signal(&ctx->allJobsCompleted_cond); @@ -353,17 +381,17 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) unsigned const nextJob = ctx->nextJobID; unsigned const nextJobIndex = nextJob % ctx->numJobs; jobDescription* job = &ctx->jobs[nextJobIndex]; - DEBUGLOG(2, "createCompressionJob(): wait for job write\n"); + DEBUG(2, "createCompressionJob(): wait for job write\n"); pthread_mutex_lock(&ctx->jobWrite_mutex); - DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs); + DEBUG(2, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs); while (nextJob - ctx->jobWriteID >= ctx->numJobs) { ctx->stats.waitWrite++; ctx->stats.writeCounter++; - DEBUGLOG(2, "waiting on job Write, nextJob: %u\n", nextJob); + DEBUG(2, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex); } pthread_mutex_unlock(&ctx->jobWrite_mutex); - DEBUGLOG(2, "createCompressionJob(): continuing after job write\n"); + DEBUG(2, "createCompressionJob(): continuing after job write\n"); job->compressionLevel = ctx->compressionLevel; @@ -371,13 +399,21 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) job->dst.size = ZSTD_compressBound(srcSize); job->jobID = nextJob; job->lastJob = last; - memcpy(job->src.start, ctx->input.buffer.start, srcSize); + memcpy(job->src.start, ctx->input.buffer.start + ctx->input.filled, srcSize); + job->dict.size = ctx->input.filled; + memcpy(job->dict.start, ctx->input.buffer.start, ctx->input.filled); pthread_mutex_lock(&ctx->jobReady_mutex); ctx->jobReadyID++; pthread_cond_signal(&ctx->jobReady_cond); pthread_mutex_unlock(&ctx->jobReady_mutex); - DEBUGLOG(2, "finished job creation %u\n", nextJob); + DEBUG(2, "finished job creation %u\n", nextJob); ctx->nextJobID++; + + /* if not on the last job, reuse data as dictionary in next job */ + if (!last) { + ctx->input.filled = srcSize; + memmove(ctx->input.buffer.start, ctx->input.buffer.start + ctx->input.filled, srcSize); + } return 0; } @@ -447,7 +483,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst /* creating jobs */ for ( ; ; ) { - size_t const readSize = fread(ctx->input.buffer.start, 1, FILE_CHUNK_SIZE, srcFile); + size_t const readSize = fread(ctx->input.buffer.start + ctx->input.filled, 1, FILE_CHUNK_SIZE, srcFile); if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) { DISPLAY("Error: problem occurred during read from src file\n"); ctx->threadError = 1; @@ -466,7 +502,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst } } if (feof(srcFile)) { - DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID); + DEBUG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID); break; } } @@ -563,7 +599,7 @@ int main(int argCount, const char* argv[]) case 'i': argument += 2; g_compressionLevel = readU32FromChar(&argument); - DEBUGLOG(2, "g_compressionLevel: %u\n", g_compressionLevel); + DEBUG(2, "g_compressionLevel: %u\n", g_compressionLevel); break; case 's': g_displayStats = 1;