Optimized ZSTDMT single-pass mode speed on large sources
by ensuring job sizes remain "not too large"
This commit is contained in:
parent
d5c046c609
commit
d8b33a598d
@ -331,14 +331,20 @@ struct ZSTDMT_CCtx_s {
|
||||
const ZSTD_CDict* cdict;
|
||||
};
|
||||
|
||||
static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
|
||||
{
|
||||
U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
|
||||
U32 const nbJobs = 1 << nbJobsLog2;
|
||||
*nbJobsPtr = nbJobs;
|
||||
return (ZSTDMT_jobDescription*) ZSTD_calloc(
|
||||
nbJobs * sizeof(ZSTDMT_jobDescription), cMem);
|
||||
}
|
||||
|
||||
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
|
||||
{
|
||||
ZSTDMT_CCtx* mtctx;
|
||||
U32 const minNbJobs = nbThreads + 2;
|
||||
U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
|
||||
U32 const nbJobs = 1 << nbJobsLog2;
|
||||
DEBUGLOG(5, "nbThreads: %u ; minNbJobs: %u ; nbJobsLog2: %u ; nbJobs: %u",
|
||||
nbThreads, minNbJobs, nbJobsLog2, nbJobs);
|
||||
U32 nbJobs = nbThreads + 2;
|
||||
DEBUGLOG(3, "ZSTDMT_createCCtx_advanced");
|
||||
|
||||
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
|
||||
if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL))
|
||||
@ -349,13 +355,12 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
|
||||
if (!mtctx) return NULL;
|
||||
mtctx->cMem = cMem;
|
||||
mtctx->nbThreads = nbThreads;
|
||||
mtctx->jobIDMask = nbJobs - 1;
|
||||
mtctx->allJobsCompleted = 1;
|
||||
mtctx->sectionSize = 0;
|
||||
mtctx->overlapRLog = 3;
|
||||
mtctx->factory = POOL_create(nbThreads, 1);
|
||||
mtctx->jobs = (ZSTDMT_jobDescription*)ZSTD_calloc(
|
||||
nbJobs * sizeof(*mtctx->jobs), cMem);
|
||||
mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
|
||||
mtctx->jobIDMask = nbJobs - 1;
|
||||
mtctx->buffPool = ZSTDMT_createBufferPool(nbThreads, cMem);
|
||||
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem);
|
||||
if (!mtctx->factory | !mtctx->jobs | !mtctx->buffPool | !mtctx->cctxPool) {
|
||||
@ -448,24 +453,39 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
|
||||
unsigned overlapRLog)
|
||||
{
|
||||
size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
|
||||
size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2);
|
||||
unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + 1;
|
||||
unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads);
|
||||
size_t const chunkSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
|
||||
size_t const chunkMaxSize = chunkSizeTarget << 2;
|
||||
size_t const passSizeMax = chunkMaxSize * mtctx->nbThreads;
|
||||
unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
|
||||
unsigned nbChunksLarge = multiplier * mtctx->nbThreads;
|
||||
unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1;
|
||||
unsigned nbChunksSmall = MIN(nbChunksMax, mtctx->nbThreads);
|
||||
unsigned nbChunks = (multiplier>1) ? nbChunksLarge : nbChunksSmall;
|
||||
size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
|
||||
size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
|
||||
size_t remainingSrcSize = srcSize;
|
||||
size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
|
||||
const char* const srcStart = (const char*)src;
|
||||
size_t remainingSrcSize = srcSize;
|
||||
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
|
||||
size_t frameStartPos = 0, dstBufferPos = 0;
|
||||
|
||||
DEBUGLOG(4, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize);
|
||||
DEBUGLOG(4, "windowLog : %2u => chunkSizeTarget : %u bytes ", params.cParams.windowLog, (U32)chunkSizeTarget);
|
||||
DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize);
|
||||
assert(avgChunkSize >= 256 KB); /* required for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B) */
|
||||
|
||||
if (nbChunks==1) { /* fallback to single-thread mode */
|
||||
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
|
||||
return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params);
|
||||
}
|
||||
|
||||
if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
|
||||
U32 nbJobs = nbChunks;
|
||||
ZSTD_free(mtctx->jobs, mtctx->cMem);
|
||||
mtctx->jobIDMask = 0;
|
||||
mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem);
|
||||
if (mtctx->jobs==NULL) return ERROR(memory_allocation);
|
||||
mtctx->jobIDMask = nbJobs - 1;
|
||||
}
|
||||
|
||||
{ unsigned u;
|
||||
for (u=0; u<nbChunks; u++) {
|
||||
size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
|
||||
@ -489,7 +509,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
|
||||
mtctx->jobs[u].fullFrameSize = srcSize;
|
||||
mtctx->jobs[u].params = params;
|
||||
/* do not calculate checksum within sections, but write it in header for first section */
|
||||
if (mtctx->nextJobID) mtctx->jobs[u].params.fParams.checksumFlag = 0;
|
||||
if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0;
|
||||
mtctx->jobs[u].dstBuff = dstBuffer;
|
||||
mtctx->jobs[u].cctx = cctx;
|
||||
mtctx->jobs[u].firstChunk = (u==0);
|
||||
@ -837,11 +857,12 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
||||
if (mtctx->nbThreads==1) {
|
||||
return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
|
||||
}
|
||||
|
||||
/* single-pass shortcut (note : this is blocking-mode) */
|
||||
if ( (mtctx->nextJobID==0) /* just started */
|
||||
&& (mtctx->inBuff.filled==0) /* nothing buffered yet */
|
||||
&& (endOp==ZSTD_e_end) /* end order, immediately at beginning */
|
||||
&& (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) /* enough room */
|
||||
&& (mtctx->cdict==NULL) ) { /* no dictionary */
|
||||
&& (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */
|
||||
size_t const cSize = ZSTDMT_compress_advanced(mtctx,
|
||||
(char*)output->dst + output->pos, output->size - output->pos,
|
||||
(const char*)input->src + input->pos, input->size - input->pos,
|
||||
|
Loading…
Reference in New Issue
Block a user