diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 99b2e68f..04e0adfc 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -233,6 +233,7 @@ void ZSTDMT_compressChunk(void* jobDescription) DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); if (job->cdict) { size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize); + if (job->cdict) DEBUGLOG(3, "using CDict "); if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); @@ -296,6 +297,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) U32 const minNbJobs = nbThreads + 2; U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1; U32 const nbJobs = 1 << nbJobsLog2; + //nbThreads = 1; /* for tests */ DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n", nbThreads, minNbJobs, nbJobsLog2, nbJobs); if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; @@ -490,6 +492,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, ZSTD_parameters params, unsigned long long pledgedSrcSize) { ZSTD_customMem const cmem = { NULL, NULL, NULL }; + DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog); if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize); if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */ ZSTDMT_waitForAllJobsCompleted(zcs); @@ -596,6 +599,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi } else { zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.filled = 0; + zcs->dictSize = 0; zcs->frameEnded = 1; if (zcs->nextJobID == 0) zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */ @@ -698,12 +702,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame) { - size_t const srcSize = zcs->inBuff.filled; + size_t const srcSize = zcs->inBuff.filled - zcs->dictSize; - if (srcSize) DEBUGLOG(1, "flushing : %u bytes left to compress", (U32)srcSize); + if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize); if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { - CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize - zcs->dictSize, endFrame) ); + CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); } /* check if there is any data available to flush */ diff --git a/programs/fileio.c b/programs/fileio.c index 86db12ac..db2bb55d 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -349,23 +349,22 @@ static int FIO_compressFilename_internal(cRess_t ress, readsize += inSize; DISPLAYUPDATE(2, "\rRead : %u MB ", (U32)(readsize>>20)); - /* Compress using buffered streaming */ { ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; - ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 }; while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */ + ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 }; #ifdef ZSTD_MULTITHREAD size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff); #else size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff); #endif if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); - } - /* Write cBlock */ - { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); - if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); } - compressedfilesize += outBuff.pos; - } + /* Write compressed stream */ + if (outBuff.pos) { + size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); + if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); + compressedfilesize += outBuff.pos; + } } } DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100); }