optimize ZSTDMT_compress() memory usage

does no longer allocate temporary buffers
when there is enough room in dstBuffer to decompress directly there.
(previous method would skip that for 1st chunk only).

Also : fix ZSTD_compressBound() for small srcSize
This commit is contained in:
Yann Collet 2017-03-31 18:27:03 -07:00
parent 3f75d52527
commit 30c7698970
3 changed files with 17 additions and 10 deletions

View File

@ -33,7 +33,8 @@ typedef enum { ZSTDcs_created=0, ZSTDcs_init, ZSTDcs_ongoing, ZSTDcs_ending } ZS
***************************************/
#define ZSTD_STATIC_ASSERT(c) { enum { ZSTD_static_assert = 1/(int)(!!(c)) }; }
size_t ZSTD_compressBound(size_t srcSize) {
size_t const margin = (srcSize < 512 KB) ? 16 : 0;
size_t const lowLimit = 256 KB;
size_t const margin = (srcSize < lowLimit) ? (lowLimit-srcSize) >> 12 : 0; /* from 64 to 0 */
return srcSize + (srcSize >> 8) + margin;
}

View File

@ -33,7 +33,7 @@
# include <stdio.h>
# include <unistd.h>
# include <sys/times.h>
static unsigned g_debugLevel = 2;
static unsigned g_debugLevel = 5;
# define DEBUGLOGRAW(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); }
# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __FILE__ ": "); fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
@ -253,6 +253,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)",
(unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
_endJob:
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
@ -399,7 +400,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
size_t remainingSrcSize = srcSize;
const char* const srcStart = (const char*)src;
size_t frameStartPos = 0;
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0;
DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize);
DEBUGLOG(2, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize);
@ -413,9 +415,9 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
{ unsigned u;
for (u=0; u<nbChunks; u++) {
size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
size_t const dstBufferCapacity = u ? ZSTD_compressBound(chunkSize) : dstCapacity;
buffer_t const dstAsBuffer = { dst, dstCapacity };
buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity) : dstAsBuffer;
size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize);
buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity);
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
size_t dictSize = u ? overlapSize : 0;
@ -444,6 +446,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
frameStartPos += chunkSize;
dstBufferPos += dstBufferCapacity;
remainingSrcSize -= chunkSize;
} }
/* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */
@ -467,7 +470,9 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
if (ZSTD_isError(cSize)) error = cSize;
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
if (chunkID) { /* note : chunk 0 is already written directly into dst */
if (!error) memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize);
if (!error)
memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap if chunk decompressed within dst */
if (chunkID >= compressWithinDst) /* otherwise, it decompresses within dst */
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff);
mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
}

View File

@ -856,6 +856,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
/* some issues can only happen when reusing states */
if ((FUZ_rand(&lseed) & 0xFF) == 131) {
U32 const nbThreads = (FUZ_rand(&lseed) % 6) + 1;
DISPLAYLEVEL(5, "Creating new context with %u threads \n", nbThreads);
ZSTDMT_freeCCtx(zc);
zc = ZSTDMT_createCCtx(nbThreads);
resetAllowed=0;
@ -946,7 +947,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (U32)adjustedDstSize);
{ size_t const flushError = ZSTDMT_flushStream(zc, &outBuff);
CHECK (ZSTD_isError(flushError), "flush error : %s", ZSTD_getErrorName(flushError));
CHECK (ZSTD_isError(flushError), "ZSTDMT_flushStream error : %s", ZSTD_getErrorName(flushError));
} } }
/* final frame epilogue */
@ -957,7 +958,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (U32)adjustedDstSize);
remainingToFlush = ZSTDMT_endStream(zc, &outBuff);
CHECK (ZSTD_isError(remainingToFlush), "flush error : %s", ZSTD_getErrorName(remainingToFlush));
CHECK (ZSTD_isError(remainingToFlush), "ZSTDMT_endStream error : %s", ZSTD_getErrorName(remainingToFlush));
DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (U32)remainingToFlush);
} }
DISPLAYLEVEL(5, "Frame completed \n");