protected (mutex) read to jobCompleted, as suggested by @terrelln

This commit is contained in:
Yann Collet 2017-01-21 22:14:08 -08:00
parent 0cf74fa957
commit 9d6f7637ec

View File

@ -541,8 +541,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
/* check if there is any data available to flush */
{ unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
ZSTDMT_jobDescription job = zcs->jobs[jobID];
if (job.jobCompleted) { /* job completed : output can be flushed */
unsigned jobCompleted;
pthread_mutex_lock(&zcs->jobCompleted_mutex);
jobCompleted = zcs->jobs[jobID].jobCompleted;
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
if (jobCompleted) {
ZSTDMT_jobDescription const job = zcs->jobs[jobID];
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(1, "flush %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
@ -556,15 +560,13 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
}
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
zcs->jobs[jobID].dstFlushed += toWrite;
DEBUGLOG(1, "remaining : %u bytes ", (U32)(job.cSize - job.dstFlushed));
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */
if (zcs->jobs[jobID].dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
zcs->jobs[jobID].dstBuff = g_nullBuffer;
zcs->jobs[jobID].jobCompleted = 0;
zcs->doneJobID++;
} else {
zcs->jobs[jobID].dstFlushed = job.dstFlushed; /* save flush level into zcs for later retrieval */
} } }
/* recommended next input size : fill current input buffer */