diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index cc69f118..e1c73492 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -33,7 +33,8 @@ typedef enum { ZSTDcs_created=0, ZSTDcs_init, ZSTDcs_ongoing, ZSTDcs_ending } ZS ***************************************/ #define ZSTD_STATIC_ASSERT(c) { enum { ZSTD_static_assert = 1/(int)(!!(c)) }; } size_t ZSTD_compressBound(size_t srcSize) { - size_t const margin = (srcSize < 512 KB) ? 16 : 0; + size_t const lowLimit = 256 KB; + size_t const margin = (srcSize < lowLimit) ? (lowLimit-srcSize) >> 12 : 0; /* from 64 to 0 */ return srcSize + (srcSize >> 8) + margin; } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 09a96547..c9c3e853 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -33,7 +33,7 @@ # include # include # include - static unsigned g_debugLevel = 2; + static unsigned g_debugLevel = 5; # define DEBUGLOGRAW(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); } # define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __FILE__ ": "); fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); } @@ -253,6 +253,7 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); + DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); _endJob: PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); @@ -399,7 +400,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ size_t remainingSrcSize = srcSize; const char* const srcStart = (const char*)src; - size_t frameStartPos = 0; + unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ + size_t frameStartPos = 0, dstBufferPos = 0; DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); DEBUGLOG(2, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); @@ -413,9 +415,9 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, { unsigned u; for (u=0; ubuffPool, dstBufferCapacity) : dstAsBuffer; + size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize); + buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity }; + buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity); ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool); size_t dictSize = u ? overlapSize : 0; @@ -444,6 +446,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); frameStartPos += chunkSize; + dstBufferPos += dstBufferCapacity; remainingSrcSize -= chunkSize; } } /* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */ @@ -467,8 +470,10 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, if (ZSTD_isError(cSize)) error = cSize; if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); if (chunkID) { /* note : chunk 0 is already written directly into dst */ - if (!error) memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); - ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); + if (!error) + memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap if chunk decompressed within dst */ + if (chunkID >= compressWithinDst) /* otherwise, it decompresses within dst */ + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); mtctx->jobs[chunkID].dstBuff = g_nullBuffer; } dstPos += cSize ; diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 370859ca..10bcbe0c 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -856,6 +856,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp /* some issues can only happen when reusing states */ if ((FUZ_rand(&lseed) & 0xFF) == 131) { U32 const nbThreads = (FUZ_rand(&lseed) % 6) + 1; + DISPLAYLEVEL(5, "Creating new context with %u threads \n", nbThreads); ZSTDMT_freeCCtx(zc); zc = ZSTDMT_createCCtx(nbThreads); resetAllowed=0; @@ -946,7 +947,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp outBuff.size = outBuff.pos + adjustedDstSize; DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (U32)adjustedDstSize); { size_t const flushError = ZSTDMT_flushStream(zc, &outBuff); - CHECK (ZSTD_isError(flushError), "flush error : %s", ZSTD_getErrorName(flushError)); + CHECK (ZSTD_isError(flushError), "ZSTDMT_flushStream error : %s", ZSTD_getErrorName(flushError)); } } } /* final frame epilogue */ @@ -957,7 +958,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp outBuff.size = outBuff.pos + adjustedDstSize; DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (U32)adjustedDstSize); remainingToFlush = ZSTDMT_endStream(zc, &outBuff); - CHECK (ZSTD_isError(remainingToFlush), "flush error : %s", ZSTD_getErrorName(remainingToFlush)); + CHECK (ZSTD_isError(remainingToFlush), "ZSTDMT_endStream error : %s", ZSTD_getErrorName(remainingToFlush)); DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (U32)remainingToFlush); } } DISPLAYLEVEL(5, "Frame completed \n");