diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 5f0bf2ab..ca9bf6a2 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -273,6 +273,7 @@ struct ZSTDMT_CCtx_s { pthread_mutex_t jobCompleted_mutex; pthread_cond_t jobCompleted_cond; size_t targetSectionSize; + size_t marginSize; size_t inBuffSize; size_t dictSize; size_t targetDictSize; @@ -514,8 +515,9 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->frameContentSize = pledgedSrcSize; zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); + zcs->marginSize = zcs->targetSectionSize >> 2; zcs->targetDictSize = zcs->overlapWrLog < 10 ? (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapWrLog) : 0; - zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize; + zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize; zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation); zcs->inBuff.filled = 0; @@ -680,6 +682,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { + size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize; if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */ if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input); @@ -690,7 +693,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu zcs->inBuff.filled += toLoad; } - if ( (zcs->inBuff.filled == zcs->inBuffSize) /* filled enough : let's compress */ + if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */ CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) ); }