zstdmt: changed naming convention

to avoid confusion with blocks.

also:
- jobs are cut into chunks of 512KB now, to reduce nb of mutex calls.
- fix function declaration ZSTD_getBlockSizeMax()
- fix outdated comment
This commit is contained in:
Yann Collet 2018-01-30 14:43:36 -08:00
parent ba0cd8cf78
commit 2cb0740b6b
3 changed files with 90 additions and 90 deletions

View File

@ -1153,7 +1153,7 @@ size_t ZSTD_DCtx_refPrefix_advanced(ZSTD_DCtx* dctx, const void* prefix, size_t
Use ZSTD_insertBlock() for such a case.
</p></pre><BR>
<h3>Raw zstd block functions</h3><pre></pre><b><pre>size_t ZSTD_getBlockSize (const ZSTD_CCtx* cctx);
<h3>Raw zstd block functions</h3><pre></pre><b><pre>size_t ZSTD_getBlockSizeMax(const ZSTD_CCtx* cctx);
size_t ZSTD_compressBlock (ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
size_t ZSTD_decompressBlock(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
size_t ZSTD_insertBlock (ZSTD_DCtx* dctx, const void* blockStart, size_t blockSize); </b>/**< insert uncompressed block into `dctx` history. Useful for multi-blocks decompression. */<b>

View File

@ -319,8 +319,8 @@ typedef struct {
const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */
size_t prefixSize; /* set by mtctx, then read by worker => no barrier */
size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */
unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */
unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */
unsigned firstJob; /* set by mtctx, then read by worker => no barrier */
unsigned lastJob; /* set by mtctx, then read by worker => no barrier */
ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */
unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */
@ -328,8 +328,8 @@ typedef struct {
unsigned frameChecksumNeeded; /* used only by mtctx */
} ZSTDMT_jobDescription;
/* ZSTDMT_compressChunk() is a POOL_function type */
void ZSTDMT_compressChunk(void* jobDescription)
/* ZSTDMT_compressionJob() is a POOL_function type */
void ZSTDMT_compressionJob(void* jobDescription)
{
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
@ -353,12 +353,12 @@ void ZSTDMT_compressChunk(void* jobDescription)
/* init */
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize);
assert(job->firstChunk); /* only allowed for first job */
assert(job->firstJob); /* only allowed for first job */
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { /* srcStart points at reloaded section */
U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : job->srcSize;
U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->srcSize;
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
{ size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
{ size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob);
if (ZSTD_isError(forceWindowError)) {
job->cSize = forceWindowError;
goto _endJob;
@ -371,44 +371,44 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->cSize = initError;
goto _endJob;
} } }
if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */
if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, src, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize);
DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
ZSTD_invalidateRepCodes(cctx);
}
/* compress */
{ size_t const blockSize = ZSTD_BLOCKSIZE_MAX;
int const nbBlocks = (int)((job->srcSize + (blockSize-1)) / blockSize);
{ size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX;
int const nbChunks = (int)((job->srcSize + (chunkSize-1)) / chunkSize);
const BYTE* ip = (const BYTE*) src;
BYTE* const ostart = (BYTE*)dstBuff.start;
BYTE* op = ostart;
BYTE* oend = op + dstBuff.capacity;
int blockNb;
if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * blockSize); /* check overflow */
DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
int chunkNb;
if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * chunkSize); /* check overflow */
DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->srcSize, nbChunks);
assert(job->cSize == 0);
for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, blockSize);
for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
ip += blockSize;
ip += chunkSize;
op += cSize; assert(op < oend);
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); /* note : it's a mtctx mutex */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->cSize += cSize;
job->consumed = blockSize * blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
job->consumed = chunkSize * chunkNb;
DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
/* last block */
assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */
if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
size_t const lastBlockSize1 = job->srcSize & (blockSize-1);
size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=blockSize)) ? blockSize : lastBlockSize1;
size_t const cSize = (job->lastChunk) ?
assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {
size_t const lastBlockSize1 = job->srcSize & (chunkSize-1);
size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=chunkSize)) ? chunkSize : lastBlockSize1;
size_t const cSize = (job->lastJob) ?
ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) :
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
@ -580,7 +580,7 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
@ -709,16 +709,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
/* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */
static unsigned ZSTDMT_computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) {
static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbThreads) {
assert(nbThreads>0);
{ size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2);
size_t const chunkMaxSize = chunkSizeTarget << 2;
size_t const passSizeMax = chunkMaxSize * nbThreads;
{ size_t const jobSizeTarget = (size_t)1 << (windowLog + 2);
size_t const jobMaxSize = jobSizeTarget << 2;
size_t const passSizeMax = jobMaxSize * nbThreads;
unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
unsigned const nbChunksLarge = multiplier * nbThreads;
unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1;
unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads);
return (multiplier>1) ? nbChunksLarge : nbChunksSmall;
unsigned const nbJobsLarge = multiplier * nbThreads;
unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1;
unsigned const nbJobsSmall = MIN(nbJobsMax, nbThreads);
return (multiplier>1) ? nbJobsLarge : nbJobsSmall;
} }
/* ZSTDMT_compress_advanced_internal() :
@ -734,44 +734,44 @@ static size_t ZSTDMT_compress_advanced_internal(
ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
unsigned nbChunks = ZSTDMT_computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads);
size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
size_t const avgChunkSize = (((proposedChunkSize-1) & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbThreads);
size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */
const char* const srcStart = (const char*)src;
size_t remainingSrcSize = srcSize;
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0;
XXH64_state_t xxh64;
assert(jobParams.nbThreads == 0);
assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbChunks=%2u (rawSize=%u bytes; fixedSize=%u) ",
nbChunks, (U32)proposedChunkSize, (U32)avgChunkSize);
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
if ((nbChunks==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
if ((nbJobs==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
}
assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) );
assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
XXH64_reset(&xxh64, 0);
if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
U32 nbJobs = nbChunks;
if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */
U32 jobsTableSize = nbJobs;
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
mtctx->jobIDMask = 0;
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem);
if (mtctx->jobs==NULL) return ERROR(memory_allocation);
assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1;
assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0)); /* ensure jobsTableSize is a power of 2 */
mtctx->jobIDMask = jobsTableSize - 1;
}
{ unsigned u;
for (u=0; u<nbChunks; u++) {
size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize);
for (u=0; u<nbJobs; u++) {
size_t const jobSize = MIN(remainingSrcSize, avgJobSize);
size_t const dstBufferCapacity = ZSTD_compressBound(jobSize);
buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
size_t dictSize = u ? overlapSize : 0;
@ -779,7 +779,7 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].srcBuff = g_nullBuffer;
mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */
mtctx->jobs[u].srcSize = jobSize; assert(jobSize > 0); /* avoid job.srcSize == 0 */
mtctx->jobs[u].consumed = 0;
mtctx->jobs[u].cSize = 0;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
@ -790,51 +790,51 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].dstBuff = dstBuffer;
mtctx->jobs[u].cctxPool = mtctx->cctxPool;
mtctx->jobs[u].bufPool = mtctx->bufPool;
mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1);
mtctx->jobs[u].firstJob = (u==0);
mtctx->jobs[u].lastJob = (u==nbJobs-1);
if (params.fParams.checksumFlag) {
XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
XXH64_update(&xxh64, srcStart + frameStartPos, jobSize);
}
DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize);
DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)jobSize);
DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12);
POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]);
frameStartPos += chunkSize;
frameStartPos += jobSize;
dstBufferPos += dstBufferCapacity;
remainingSrcSize -= chunkSize;
remainingSrcSize -= jobSize;
} }
/* collect result */
{ size_t error = 0, dstPos = 0;
unsigned chunkID;
for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[chunkID].job_mutex);
while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
ZSTD_pthread_cond_wait(&mtctx->jobs[chunkID].job_cond, &mtctx->jobs[chunkID].job_mutex);
unsigned jobID;
for (jobID=0; jobID<nbJobs; jobID++) {
DEBUGLOG(5, "waiting for job %u ", jobID);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID);
ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[chunkID].job_mutex);
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
DEBUGLOG(5, "ready to write job %u ", jobID);
mtctx->jobs[chunkID].prefixStart = NULL;
{ size_t const cSize = mtctx->jobs[chunkID].cSize;
mtctx->jobs[jobID].prefixStart = NULL;
{ size_t const cSize = mtctx->jobs[jobID].cSize;
if (ZSTD_isError(cSize)) error = cSize;
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
if (chunkID) { /* note : chunk 0 is written directly at dst, which is correct position */
if (jobID) { /* note : job 0 is written directly at dst, which is correct position */
if (!error)
memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */
if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */
DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff);
memmove((char*)dst + dstPos, mtctx->jobs[jobID].dstBuff.start, cSize); /* may overlap when job compressed within dst */
if (jobID >= compressWithinDst) { /* job compressed into its own buffer, which must be released */
DEBUGLOG(5, "releasing buffer %u>=%u", jobID, compressWithinDst);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
} }
mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
mtctx->jobs[chunkID].cSize = 0;
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
mtctx->jobs[jobID].cSize = 0;
dstPos += cSize ;
}
} /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
} /* for (jobID=0; jobID<nbJobs; jobID++) */
DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag);
if (params.fParams.checksumFlag) {
@ -1016,9 +1016,9 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) {
*/
static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{
assert(job->lastChunk == 1);
assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */
assert(job->firstChunk == 0); /* cannot be first chunk, as it also needs to create frame header */
assert(job->lastJob == 1);
assert(job->srcSize == 0); /* last job is empty -> will be simplified into a last empty block */
assert(job->firstJob == 0); /* cannot be first job, as it also needs to create frame header */
/* A job created by streaming variant starts with a src buffer, but no dst buffer.
* It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx.
* When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx.
@ -1063,8 +1063,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
mtctx->jobs[jobID].bufPool = mtctx->bufPool;
mtctx->jobs[jobID].firstChunk = (mtctx->nextJobID==0);
mtctx->jobs[jobID].lastChunk = endFrame;
mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
mtctx->jobs[jobID].lastJob = endFrame;
mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
mtctx->jobs[jobID].dstFlushed = 0;
@ -1093,12 +1093,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->inBuff.prefixSize = 0;
mtctx->frameEnded = endFrame;
if (mtctx->nextJobID == 0) {
/* single chunk exception : checksum is already calculated directly within worker thread */
/* single job exception : checksum is already calculated directly within worker thread */
mtctx->params.fParams.checksumFlag = 0;
} }
if ( (srcSize == 0)
&& (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
&& (mtctx->nextJobID>0)/*single job must also write frame header*/ ) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
@ -1110,10 +1110,10 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))",
mtctx->nextJobID,
(U32)mtctx->jobs[jobID].srcSize,
mtctx->jobs[jobID].lastChunk,
mtctx->jobs[jobID].lastJob,
mtctx->nextJobID,
jobID);
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) {
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
} else {
@ -1206,7 +1206,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */
if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */
if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */
mtctx->allJobsCompleted = mtctx->frameEnded; /* all chunks are entirely flushed => if this one is last one, frame is completed */
mtctx->allJobsCompleted = mtctx->frameEnded; /* all jobs are entirely flushed => if this one is last one, frame is completed */
if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */
return 0; /* internal buffers fully flushed */
}

View File

@ -1373,7 +1373,7 @@ ZSTDLIB_API void ZSTD_DCtx_reset(ZSTD_DCtx* dctx);
#define ZSTD_BLOCKSIZELOG_MAX 17
#define ZSTD_BLOCKSIZE_MAX (1<<ZSTD_BLOCKSIZELOG_MAX) /* define, for static allocation */
/*===== Raw zstd block functions =====*/
ZSTDLIB_API size_t ZSTD_getBlockSize (const ZSTD_CCtx* cctx);
ZSTDLIB_API size_t ZSTD_getBlockSizeMax(const ZSTD_CCtx* cctx);
ZSTDLIB_API size_t ZSTD_compressBlock (ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
ZSTDLIB_API size_t ZSTD_decompressBlock(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
ZSTDLIB_API size_t ZSTD_insertBlock (ZSTD_DCtx* dctx, const void* blockStart, size_t blockSize); /**< insert uncompressed block into `dctx` history. Useful for multi-blocks decompression. */