diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d50f3511..808418a6 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -1468,14 +1468,18 @@ MEM_STATIC size_t ZSTD_compressSequences_internal(seqStore_t* seqStorePtr, entropy, cParams->strategy, op, dstCapacity, literals, litSize); if (ZSTD_isError(cSize)) return cSize; + assert(cSize <= dstCapacity); op += cSize; } /* Sequences Header */ - if ((oend-op) < 3 /*max nbSeq Size*/ + 1 /*seqHead */) return ERROR(dstSize_tooSmall); - if (nbSeq < 0x7F) *op++ = (BYTE)nbSeq; - else if (nbSeq < LONGNBSEQ) op[0] = (BYTE)((nbSeq>>8) + 0x80), op[1] = (BYTE)nbSeq, op+=2; - else op[0]=0xFF, MEM_writeLE16(op+1, (U16)(nbSeq - LONGNBSEQ)), op+=3; + if ((oend-op) < 3 /*max nbSeq Size*/ + 1 /*seqHead*/) return ERROR(dstSize_tooSmall); + if (nbSeq < 0x7F) + *op++ = (BYTE)nbSeq; + else if (nbSeq < LONGNBSEQ) + op[0] = (BYTE)((nbSeq>>8) + 0x80), op[1] = (BYTE)nbSeq, op+=2; + else + op[0]=0xFF, MEM_writeLE16(op+1, (U16)(nbSeq - LONGNBSEQ)), op+=3; if (nbSeq==0) return op - ostart; /* seqHead : flags for FSE encoding type */ @@ -1606,6 +1610,7 @@ static void ZSTD_storeLastLiterals(seqStore_t* seqStorePtr, static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, void* dst, size_t dstCapacity, const void* src, size_t srcSize) { + DEBUGLOG(5, "ZSTD_compressBlock_internal : dstCapacity = %u", (U32)dstCapacity); if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) return 0; /* don't even attempt compression below a certain srcSize */ ZSTD_resetSeqStore(&(zc->seqStore)); @@ -1867,7 +1872,7 @@ static size_t ZSTD_loadDictionaryContent(ZSTD_CCtx* zc, const void* src, size_t zc->lowLimit = zc->dictLimit; zc->dictLimit = (U32)(zc->nextSrc - zc->base); zc->dictBase = zc->base; - zc->base += ip - zc->nextSrc; + zc->base = ip - zc->dictLimit; zc->nextToUpdate = zc->dictLimit; zc->loadedDictEnd = zc->appliedParams.forceWindow ? 0 : (U32)(iend - zc->base); diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index af7be316..a855745b 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -142,7 +142,10 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* bufPool, size_t bSize) { + ZSTD_pthread_mutex_lock(&bufPool->poolMutex); + DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize); bufPool->bufferSize = bSize; + ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); } /** ZSTDMT_getBuffer() : @@ -150,28 +153,31 @@ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* bufPool, size_t bSize) static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) { size_t const bSize = bufPool->bufferSize; - DEBUGLOG(5, "ZSTDMT_getBuffer"); + DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize); ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers) { /* try to use an existing buffer */ buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)]; size_t const availBufferSize = buf.size; bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer; - if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) { + if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) { /* large enough, but not too much */ + DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u", + bufPool->nbBuffers, (U32)buf.size); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return buf; } /* size conditions not respected : scratch this buffer, create new one */ - DEBUGLOG(5, "existing buffer does not meet size conditions => freeing"); + DEBUGLOG(5, "ZSTDMT_getBuffer: existing buffer does not meet size conditions => freeing"); ZSTD_free(buf.start, bufPool->cMem); } ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); /* create new buffer */ - DEBUGLOG(5, "create a new buffer"); + DEBUGLOG(5, "ZSTDMT_getBuffer: create a new buffer"); { buffer_t buffer; void* const start = ZSTD_malloc(bSize, bufPool->cMem); buffer.start = start; /* note : start can be NULL if malloc fails ! */ buffer.size = (start==NULL) ? 0 : bSize; + DEBUGLOG(5, "ZSTDMT_getBuffer: created buffer of size %u", (U32)bSize); return buffer; } } @@ -184,12 +190,14 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers < bufPool->totalBuffers) { bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ + DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", + (U32)buf.size, (U32)(bufPool->nbBuffers-1)); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return; } ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); /* Reached bufferPool capacity (should not happen) */ - DEBUGLOG(5, "buffer pool capacity reached => freeing "); + DEBUGLOG(5, "ZSTDMT_releaseBuffer: pool capacity reached => freeing "); ZSTD_free(buf.start, bufPool->cMem); } @@ -324,11 +332,10 @@ typedef struct { void ZSTDMT_compressChunk(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; - ZSTD_CCtx* cctx = ZSTDMT_getCCtx(job->cctxPool); + ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); const void* const src = (const char*)job->srcStart + job->dictSize; buffer_t dstBuff = job->dstBuff; - DEBUGLOG(5, "ZSTDMT_compressChunk") - DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", + DEBUGLOG(5, "ZSTDMT_compressChunk: job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); if (cctx==NULL) { @@ -343,11 +350,12 @@ void ZSTDMT_compressChunk(void* jobDescription) goto _endJob; } job->dstBuff = dstBuff; + DEBUGLOG(5, "ZSTDMT_compressChunk: allocated dstBuff of size %u", (U32)dstBuff.size); } if (job->cdict) { size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize); - DEBUGLOG(4, "ZSTDMT_compressChunk, using CDict"); + DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict"); assert(job->firstChunk); /* should only happen for first segment */ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { /* srcStart points at reloaded section */ @@ -367,7 +375,7 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_invalidateRepCodes(cctx); } - DEBUGLOG(5, "Compressing : "); + DEBUGLOG(5, "Compressing into dstBuff of size %u", (U32)dstBuff.size); DEBUG_PRINTHEX(6, job->srcStart, 12); job->cSize = (job->lastChunk) ? ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : @@ -769,6 +777,7 @@ size_t ZSTDMT_initCStream_internal( dict, dictSize, cdict, singleThreadParams, pledgedSrcSize); } + DEBUGLOG(4, "multi - threading mode"); if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ ZSTDMT_waitForAllJobsCompleted(zcs); @@ -796,9 +805,10 @@ size_t ZSTDMT_initCStream_internal( DEBUGLOG(4, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2); zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); - zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); + zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); /* one job size must be at least overlap */ DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize; + DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10)); ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) ); zcs->inBuff.buffer = g_nullBuffer; zcs->dictSize = 0; @@ -863,7 +873,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi { unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; - DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", + DEBUGLOG(5, "ZSTDMT_createCompressionJob"); + DEBUGLOG(5, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; @@ -994,6 +1005,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, { size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize; unsigned forwardInputProgress = 0; + DEBUGLOG(5, "ZSTDMT_compressStream_generic"); assert(output->pos <= output->size); assert(input->pos <= input->size); if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) { diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 53dbaf3b..384858ed 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -951,7 +951,7 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres for (n=0, cSize=0, totalTestSize=0 ; totalTestSize < maxTestSize ; n++) { /* compress random chunks into randomly sized dst buffers */ { size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog); - size_t const srcSize = MIN (maxTestSize-totalTestSize, randomSrcSize); + size_t const srcSize = MIN(maxTestSize-totalTestSize, randomSrcSize); size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize); size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize);