diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index e5790807..99b2e68f 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -209,6 +209,7 @@ typedef struct { buffer_t src; const void* srcStart; size_t srcSize; + size_t dictSize; buffer_t dstBuff; size_t cSize; size_t dstFlushed; @@ -220,8 +221,6 @@ typedef struct { pthread_cond_t* jobCompleted_cond; ZSTD_parameters params; ZSTD_CDict* cdict; - const void* dict; - size_t dictSize; unsigned long long fullFrameSize; } ZSTDMT_jobDescription; @@ -229,16 +228,18 @@ typedef struct { void ZSTDMT_compressChunk(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; + const void* const src = (const char*)job->srcStart + job->dictSize; buffer_t const dstBuff = job->dstBuff; + DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); if (job->cdict) { size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize); if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { - size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->dict, job->dictSize, job->params, job->fullFrameSize); + size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } if (!job->firstChunk) { /* flush frame header */ - size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0); + size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, 0); if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } ZSTD_invalidateRepCodes(job->cctx); } @@ -246,8 +247,8 @@ void ZSTDMT_compressChunk(void* jobDescription) DEBUGLOG(4, "Compressing : "); DEBUG_PRINTHEX(4, job->srcStart, 12); job->cSize = (job->lastChunk) ? /* last chunk signal */ - ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) : - ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize); + ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : + ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); _endJob: @@ -271,6 +272,8 @@ struct ZSTDMT_CCtx_s { pthread_cond_t jobCompleted_cond; size_t targetSectionSize; size_t inBuffSize; + size_t dictSize; + size_t targetDictSize; inBuff_t inBuff; ZSTD_parameters params; XXH64_state_t xxhState; @@ -354,7 +357,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) return 0; } -unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) +size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) { switch(parameter) { @@ -503,10 +506,14 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->frameContentSize = pledgedSrcSize; zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); - zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog); + //zcs->targetDictSize = ((size_t)1 << zcs->params.cParams.windowLog); /* full window size, for test */ + zcs->targetDictSize = ((size_t)1 << zcs->params.cParams.windowLog) >> 3; /* fixed currently */ + //zcs->targetDictSize = 0; + zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize; zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation); zcs->inBuff.filled = 0; + zcs->dictSize = 0; zcs->doneJobID = 0; zcs->nextJobID = 0; zcs->frameEnded = 0; @@ -551,15 +558,14 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi return ERROR(memory_allocation); } - DEBUGLOG(4, "preparing job %u to compress %u bytes \n", zcs->nextJobID, (U32)srcSize); + DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; + zcs->jobs[jobID].dictSize = zcs->dictSize; /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */ zcs->jobs[jobID].params = zcs->params; if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; - zcs->jobs[jobID].dict = NULL; - zcs->jobs[jobID].dictSize = 0; zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].cctx = cctx; @@ -572,6 +578,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi /* get a new buffer for next input */ if (!endFrame) { + size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ zcs->jobs[jobID].jobCompleted = 1; @@ -580,8 +587,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi ZSTDMT_releaseAllJobResources(zcs); return ERROR(memory_allocation); } - zcs->inBuff.filled -= srcSize; - memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + srcSize, zcs->inBuff.filled); + DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled); + zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; + DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize)); + memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled); + DEBUGLOG(5, "new inBuff pre-filled"); + zcs->dictSize = newDictSize; } else { zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.filled = 0; @@ -625,7 +636,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi zcs->jobs[wJobID].cctx = NULL; DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); if (zcs->params.fParams.checksumFlag) { - XXH64_update(&zcs->xxhState, job.srcStart, job.srcSize); + XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize); if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); DEBUGLOG(4, "writing checksum : %08X \n", checksum); @@ -689,10 +700,10 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp { size_t const srcSize = zcs->inBuff.filled; - DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize); + if (srcSize) DEBUGLOG(1, "flushing : %u bytes left to compress", (U32)srcSize); if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { - CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); + CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize - zcs->dictSize, endFrame) ); } /* check if there is any data available to flush */ diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index c00782e9..1288c1ed 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -59,4 +59,4 @@ typedef enum { ZSTDMT_p_sectionSize /* size of input "section". Each section * The function must be called typically after ZSTD_createCCtx(). * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions. * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ -ZSTDLIB_API unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value); +ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value);