zstdmt : refactor a few member names

for clarity
This commit is contained in:
Yann Collet 2018-01-26 13:00:14 -08:00
parent 79b6e28b0a
commit 0d426f6b83

View File

@ -83,7 +83,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
typedef struct buffer_s { typedef struct buffer_s {
void* start; void* start;
size_t size; size_t capacity;
} buffer_t; } buffer_t;
static const buffer_t g_nullBuffer = { NULL, 0 }; static const buffer_t g_nullBuffer = { NULL, 0 };
@ -136,7 +136,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
size_t totalBufferSize = 0; size_t totalBufferSize = 0;
ZSTD_pthread_mutex_lock(&bufPool->poolMutex); ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
for (u=0; u<bufPool->totalBuffers; u++) for (u=0; u<bufPool->totalBuffers; u++)
totalBufferSize += bufPool->bTable[u].size; totalBufferSize += bufPool->bTable[u].capacity;
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
return poolSize + totalBufferSize; return poolSize + totalBufferSize;
@ -165,12 +165,12 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
ZSTD_pthread_mutex_lock(&bufPool->poolMutex); ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
if (bufPool->nbBuffers) { /* try to use an existing buffer */ if (bufPool->nbBuffers) { /* try to use an existing buffer */
buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)]; buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];
size_t const availBufferSize = buf.size; size_t const availBufferSize = buf.capacity;
bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer; bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer;
if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) { if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) {
/* large enough, but not too much */ /* large enough, but not too much */
DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u", DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u",
bufPool->nbBuffers, (U32)buf.size); bufPool->nbBuffers, (U32)buf.capacity);
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
return buf; return buf;
} }
@ -184,7 +184,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
{ buffer_t buffer; { buffer_t buffer;
void* const start = ZSTD_malloc(bSize, bufPool->cMem); void* const start = ZSTD_malloc(bSize, bufPool->cMem);
buffer.start = start; /* note : start can be NULL if malloc fails ! */ buffer.start = start; /* note : start can be NULL if malloc fails ! */
buffer.size = (start==NULL) ? 0 : bSize; buffer.capacity = (start==NULL) ? 0 : bSize;
if (start==NULL) { if (start==NULL) {
DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!"); DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!");
} else { } else {
@ -203,7 +203,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
if (bufPool->nbBuffers < bufPool->totalBuffers) { if (bufPool->nbBuffers < bufPool->totalBuffers) {
bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */
DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
(U32)buf.size, (U32)(bufPool->nbBuffers-1)); (U32)buf.capacity, (U32)(bufPool->nbBuffers-1));
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
return; return;
} }
@ -372,7 +372,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
goto _endJob; goto _endJob;
} } } } } }
if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */ if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0); size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, src, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize); DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize);
ZSTD_invalidateRepCodes(cctx); ZSTD_invalidateRepCodes(cctx);
@ -386,7 +386,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
const BYTE* ip = (const BYTE*) src; const BYTE* ip = (const BYTE*) src;
BYTE* const ostart = (BYTE*)dstBuff.start; BYTE* const ostart = (BYTE*)dstBuff.start;
BYTE* op = ostart; BYTE* op = ostart;
BYTE* oend = op + dstBuff.size; BYTE* oend = op + dstBuff.capacity;
int blockNb; int blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
assert(job->cSize == 0); assert(job->cSize == 0);
@ -437,6 +437,8 @@ _endJob:
typedef struct { typedef struct {
buffer_t buffer; buffer_t buffer;
size_t targetCapacity; /* note : buffers provided by the pool may be larger than target capacity */
size_t prefixSize;
size_t filled; size_t filled;
} inBuff_t; } inBuff_t;
@ -449,8 +451,6 @@ struct ZSTDMT_CCtx_s {
ZSTD_pthread_cond_t mtctx_cond; ZSTD_pthread_cond_t mtctx_cond;
ZSTD_CCtx_params params; ZSTD_CCtx_params params;
size_t targetSectionSize; size_t targetSectionSize;
size_t inBuffSize;
size_t prefixSize;
size_t targetPrefixSize; size_t targetPrefixSize;
inBuff_t inBuff; inBuff_t inBuff;
int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */ int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */
@ -663,13 +663,13 @@ unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
* Note : mutex will be acquired during statistics collection. */ * Note : mutex will be acquired during statistics collection. */
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{ {
ZSTD_frameProgression fs; ZSTD_frameProgression fps;
DEBUGLOG(6, "ZSTDMT_getFrameProgression"); DEBUGLOG(6, "ZSTDMT_getFrameProgression");
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
fs.consumed = mtctx->consumed; fps.consumed = mtctx->consumed;
fs.produced = mtctx->produced; fps.produced = mtctx->produced;
assert(mtctx->inBuff.filled >= mtctx->prefixSize); assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize);
fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize); fps.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->inBuff.prefixSize);
{ unsigned jobNb; { unsigned jobNb;
unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
@ -678,13 +678,13 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
unsigned const wJobID = jobNb & mtctx->jobIDMask; unsigned const wJobID = jobNb & mtctx->jobIDMask;
size_t const cResult = mtctx->jobs[wJobID].cSize; size_t const cResult = mtctx->jobs[wJobID].cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
fs.consumed += mtctx->jobs[wJobID].consumed; fps.consumed += mtctx->jobs[wJobID].consumed;
fs.ingested += mtctx->jobs[wJobID].srcSize; fps.ingested += mtctx->jobs[wJobID].srcSize;
fs.produced += produced; fps.produced += produced;
} }
} }
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
return fs; return fps;
} }
@ -928,11 +928,11 @@ size_t ZSTDMT_initCStream_internal(
if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
mtctx->inBuffSize = mtctx->targetPrefixSize + mtctx->targetSectionSize; mtctx->inBuff.targetCapacity = mtctx->targetPrefixSize + mtctx->targetSectionSize;
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuffSize>>10)); DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuff.targetCapacity>>10));
ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuffSize, ZSTD_compressBound(mtctx->targetSectionSize)) ); ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuff.targetCapacity, ZSTD_compressBound(mtctx->targetSectionSize)) );
mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.buffer = g_nullBuffer;
mtctx->prefixSize = 0; mtctx->inBuff.prefixSize = 0;
mtctx->doneJobID = 0; mtctx->doneJobID = 0;
mtctx->nextJobID = 0; mtctx->nextJobID = 0;
mtctx->frameEnded = 0; mtctx->frameEnded = 0;
@ -1009,10 +1009,10 @@ static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
* This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */ * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */
assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */
assert(job->srcBuff.size >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */ assert(job->srcBuff.capacity >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */
job->dstBuff = job->srcBuff; job->dstBuff = job->srcBuff;
job->srcBuff = g_nullBuffer; job->srcBuff = g_nullBuffer;
job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size); job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.capacity);
assert(!ZSTD_isError(job->cSize)); assert(!ZSTD_isError(job->cSize));
assert(job->consumed == 0); assert(job->consumed == 0);
} }
@ -1030,15 +1030,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
if (!mtctx->jobReady) { if (!mtctx->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize); mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefixSize);
assert(mtctx->jobs[jobID].srcBuff.start == NULL); /* no buffer left : supposed already released */ assert(mtctx->jobs[jobID].srcBuff.start == NULL); /* no buffer left : supposed already released */
mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer; mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer;
mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start; mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start;
mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; mtctx->jobs[jobID].prefixSize = mtctx->inBuff.prefixSize;
mtctx->jobs[jobID].srcSize = srcSize; mtctx->jobs[jobID].srcSize = srcSize;
assert(mtctx->inBuff.filled >= srcSize + mtctx->inBuff.prefixSize);
mtctx->jobs[jobID].consumed = 0; mtctx->jobs[jobID].consumed = 0;
mtctx->jobs[jobID].cSize = 0; mtctx->jobs[jobID].cSize = 0;
assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize);
mtctx->jobs[jobID].params = mtctx->params; mtctx->jobs[jobID].params = mtctx->params;
/* do not calculate checksum within sections, but write it in header for first section */ /* do not calculate checksum within sections, but write it in header for first section */
if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0; if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0;
@ -1055,28 +1055,28 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond; mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond;
if (mtctx->params.fParams.checksumFlag) if (mtctx->params.fParams.checksumFlag)
XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->prefixSize, srcSize); XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize);
/* get a new buffer for next input */ /* get a new buffer for next input */
if (!endFrame) { if (!endFrame) {
size_t const newPrefixSize = MIN(srcSize + mtctx->prefixSize, mtctx->targetPrefixSize); size_t const newPrefixSize = MIN(mtctx->inBuff.filled, mtctx->targetPrefixSize);
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate a new input buffer */
mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0; mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0;
mtctx->nextJobID++; mtctx->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(mtctx); ZSTDMT_waitForAllJobsCompleted(mtctx);
ZSTDMT_releaseAllJobResources(mtctx); ZSTDMT_releaseAllJobResources(mtctx);
return ERROR(memory_allocation); return ERROR(memory_allocation);
} }
mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize; mtctx->inBuff.filled -= (mtctx->inBuff.prefixSize + srcSize) - newPrefixSize;
memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
(const char*)mtctx->jobs[jobID].prefixStart + mtctx->prefixSize + srcSize - newPrefixSize, (const char*)mtctx->jobs[jobID].prefixStart + mtctx->inBuff.prefixSize + srcSize - newPrefixSize,
mtctx->inBuff.filled); mtctx->inBuff.filled);
mtctx->prefixSize = newPrefixSize; mtctx->inBuff.prefixSize = newPrefixSize;
} else { /* endFrame==1 => no need for another input buffer */ } else { /* endFrame==1 => no need for another input buffer */
mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.buffer = g_nullBuffer;
mtctx->inBuff.filled = 0; mtctx->inBuff.filled = 0;
mtctx->prefixSize = 0; mtctx->inBuff.prefixSize = 0;
mtctx->frameEnded = endFrame; mtctx->frameEnded = endFrame;
if (mtctx->nextJobID == 0) { if (mtctx->nextJobID == 0) {
/* single chunk exception : checksum is already calculated directly within worker thread */ /* single chunk exception : checksum is already calculated directly within worker thread */
@ -1202,7 +1202,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
ZSTD_inBuffer* input, ZSTD_inBuffer* input,
ZSTD_EndDirective endOp) ZSTD_EndDirective endOp)
{ {
size_t const newJobThreshold = mtctx->prefixSize + mtctx->targetSectionSize; size_t const newJobThreshold = mtctx->inBuff.prefixSize + mtctx->targetSectionSize;
unsigned forwardInputProgress = 0; unsigned forwardInputProgress = 0;
DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u)", (U32)endOp); DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u)", (U32)endOp);
assert(output->pos <= output->size); assert(output->pos <= output->size);
@ -1240,16 +1240,18 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
if ( (!mtctx->jobReady) if ( (!mtctx->jobReady)
&& (input->size > input->pos) ) { /* support NULL input */ && (input->size > input->pos) ) { /* support NULL input */
if (mtctx->inBuff.buffer.start == NULL) { if (mtctx->inBuff.buffer.start == NULL) {
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, no forward input progress */ mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, buffer.start==NULL */
mtctx->inBuff.filled = 0; mtctx->inBuff.filled = 0;
if ( (mtctx->inBuff.buffer.start == NULL) /* allocation failure */ if ( (mtctx->inBuff.buffer.start == NULL) /* allocation failure */
&& (mtctx->doneJobID == mtctx->nextJobID) ) { /* and nothing to flush */ && (mtctx->doneJobID == mtctx->nextJobID) ) { /* and nothing to flush */
return ERROR(memory_allocation); /* no forward progress possible => output an error */ return ERROR(memory_allocation); /* no forward progress possible => output an error */
} } }
if (mtctx->inBuff.buffer.start != NULL) { assert(mtctx->inBuff.buffer.capacity >= mtctx->inBuff.targetCapacity); /* pool must provide a buffer >= targetCapacity */
size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); }
if (mtctx->inBuff.buffer.start != NULL) { /* no buffer for input, but it's possible to flush, and then reclaim the buffer */
size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuff.targetCapacity - mtctx->inBuff.filled);
DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
(U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuffSize); (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuff.targetCapacity);
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
input->pos += toLoad; input->pos += toLoad;
mtctx->inBuff.filled += toLoad; mtctx->inBuff.filled += toLoad;
@ -1263,7 +1265,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|| (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ || (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
|| ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */ || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */
|| ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */ || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */
size_t const jobSize = MIN(mtctx->inBuff.filled - mtctx->prefixSize, mtctx->targetSectionSize); size_t const jobSize = MIN(mtctx->inBuff.filled - mtctx->inBuff.prefixSize, mtctx->targetSectionSize);
CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) ); CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
} }
@ -1280,13 +1282,13 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_in
CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) ); CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
/* recommended next input size : fill current input buffer */ /* recommended next input size : fill current input buffer */
return mtctx->inBuffSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ return mtctx->inBuff.targetCapacity - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
} }
static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame) static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame)
{ {
size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize; size_t const srcSize = mtctx->inBuff.filled - mtctx->inBuff.prefixSize;
DEBUGLOG(5, "ZSTDMT_flushStream_internal"); DEBUGLOG(5, "ZSTDMT_flushStream_internal");
if ( mtctx->jobReady /* one job ready for a worker to pick up */ if ( mtctx->jobReady /* one job ready for a worker to pick up */