From d8b33a598d9b707732969bd3da02b0fea1b1985c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 30 Jun 2017 15:44:57 -0700 Subject: [PATCH] Optimized ZSTDMT single-pass mode speed on large sources by ensuring job sizes remain "not too large" --- lib/compress/zstdmt_compress.c | 55 +++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index f7ee7502..c96ef482 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -331,14 +331,20 @@ struct ZSTDMT_CCtx_s { const ZSTD_CDict* cdict; }; +static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) +{ + U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; + U32 const nbJobs = 1 << nbJobsLog2; + *nbJobsPtr = nbJobs; + return (ZSTDMT_jobDescription*) ZSTD_calloc( + nbJobs * sizeof(ZSTDMT_jobDescription), cMem); +} + ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) { ZSTDMT_CCtx* mtctx; - U32 const minNbJobs = nbThreads + 2; - U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1; - U32 const nbJobs = 1 << nbJobsLog2; - DEBUGLOG(5, "nbThreads: %u ; minNbJobs: %u ; nbJobsLog2: %u ; nbJobs: %u", - nbThreads, minNbJobs, nbJobsLog2, nbJobs); + U32 nbJobs = nbThreads + 2; + DEBUGLOG(3, "ZSTDMT_createCCtx_advanced"); if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL)) @@ -349,13 +355,12 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) if (!mtctx) return NULL; mtctx->cMem = cMem; mtctx->nbThreads = nbThreads; - mtctx->jobIDMask = nbJobs - 1; mtctx->allJobsCompleted = 1; mtctx->sectionSize = 0; mtctx->overlapRLog = 3; mtctx->factory = POOL_create(nbThreads, 1); - mtctx->jobs = (ZSTDMT_jobDescription*)ZSTD_calloc( - nbJobs * sizeof(*mtctx->jobs), cMem); + mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); + mtctx->jobIDMask = nbJobs - 1; mtctx->buffPool = ZSTDMT_createBufferPool(nbThreads, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); if (!mtctx->factory | !mtctx->jobs | !mtctx->buffPool | !mtctx->cctxPool) { @@ -448,24 +453,39 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, unsigned overlapRLog) { size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); - size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2); - unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + 1; - unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads); + size_t const chunkSizeTarget = (size_t)1 << (params.cParams.windowLog + 2); + size_t const chunkMaxSize = chunkSizeTarget << 2; + size_t const passSizeMax = chunkMaxSize * mtctx->nbThreads; + unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; + unsigned nbChunksLarge = multiplier * mtctx->nbThreads; + unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; + unsigned nbChunksSmall = MIN(nbChunksMax, mtctx->nbThreads); + unsigned nbChunks = (multiplier>1) ? nbChunksLarge : nbChunksSmall; size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; - size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ - size_t remainingSrcSize = srcSize; + size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; + size_t remainingSrcSize = srcSize; unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ size_t frameStartPos = 0, dstBufferPos = 0; - DEBUGLOG(4, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); + DEBUGLOG(4, "windowLog : %2u => chunkSizeTarget : %u bytes ", params.cParams.windowLog, (U32)chunkSizeTarget); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); + assert(avgChunkSize >= 256 KB); /* required for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B) */ if (nbChunks==1) { /* fallback to single-thread mode */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params); } + if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */ + U32 nbJobs = nbChunks; + ZSTD_free(mtctx->jobs, mtctx->cMem); + mtctx->jobIDMask = 0; + mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem); + if (mtctx->jobs==NULL) return ERROR(memory_allocation); + mtctx->jobIDMask = nbJobs - 1; + } + { unsigned u; for (u=0; ujobs[u].fullFrameSize = srcSize; mtctx->jobs[u].params = params; /* do not calculate checksum within sections, but write it in header for first section */ - if (mtctx->nextJobID) mtctx->jobs[u].params.fParams.checksumFlag = 0; + if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0; mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctx = cctx; mtctx->jobs[u].firstChunk = (u==0); @@ -837,11 +857,12 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, if (mtctx->nbThreads==1) { return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp); } + + /* single-pass shortcut (note : this is blocking-mode) */ if ( (mtctx->nextJobID==0) /* just started */ && (mtctx->inBuff.filled==0) /* nothing buffered yet */ && (endOp==ZSTD_e_end) /* end order, immediately at beginning */ - && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) /* enough room */ - && (mtctx->cdict==NULL) ) { /* no dictionary */ + && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */ size_t const cSize = ZSTDMT_compress_advanced(mtctx, (char*)output->dst + output->pos, output->size - output->pos, (const char*)input->src + input->pos, input->size - input->pos,