reduced zstdmt latency when using small custom section sizes with high compression levels

Previous version was requiring a fairly large initial amount of input data
before starting to create compression jobs.
This new version starts the process much sooner.
This commit is contained in:
Yann Collet 2017-01-27 15:55:30 -08:00
parent 5d9b894e46
commit f6d4a786fc

View File

@ -273,6 +273,7 @@ struct ZSTDMT_CCtx_s {
pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond;
size_t targetSectionSize;
size_t marginSize;
size_t inBuffSize;
size_t dictSize;
size_t targetDictSize;
@ -514,8 +515,9 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
zcs->frameContentSize = pledgedSrcSize;
zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
zcs->marginSize = zcs->targetSectionSize >> 2;
zcs->targetDictSize = zcs->overlapWrLog < 10 ? (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapWrLog) : 0;
zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize;
zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize;
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
zcs->inBuff.filled = 0;
@ -680,6 +682,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
@ -690,7 +693,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
zcs->inBuff.filled += toLoad;
}
if ( (zcs->inBuff.filled == zcs->inBuffSize) /* filled enough : let's compress */
if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) );
}