From 4885f591b30348cfef45fffff24a9a7dbb19ea40 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 18 Jan 2017 14:11:37 -0800 Subject: [PATCH] trap compression errors, collect back resources from workers --- lib/compress/zstdmt_compress.c | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index f880e852..3762f5a2 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -423,7 +423,6 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; if ((cctx==NULL) || (dstBuffer.start==NULL)) { - zcs->jobs[jobID].cSize = ERROR(memory_allocation); zcs->jobs[jobID].jobCompleted = 1; zcs->nextJobID++; ZSTDMT_waitForAllJobsCompleted(zcs); @@ -438,7 +437,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu zcs->jobs[jobID].params = zcs->params; zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].cctx = cctx; - zcs->jobs[jobID].firstChunk = (jobID==0); + zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); zcs->jobs[jobID].lastChunk = 0; zcs->jobs[jobID].jobCompleted = 0; zcs->jobs[jobID].dstFlushed = 0; @@ -448,7 +447,6 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu /* get a new buffer for next input - save remaining into it */ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ - zcs->jobs[jobID].cSize = ERROR(memory_allocation); zcs->jobs[jobID].jobCompleted = 1; zcs->nextJobID++; ZSTDMT_waitForAllJobsCompleted(zcs); @@ -472,6 +470,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu zcs->jobs[jobID].cctx = NULL; ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer; + if (ZSTD_isError(job.cSize)) { + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return job.cSize; + } memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; @@ -499,7 +502,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; if ((cctx==NULL) || (dstBuffer.start==NULL)) { - zcs->jobs[jobID].cSize = ERROR(memory_allocation); zcs->jobs[jobID].jobCompleted = 1; zcs->nextJobID++; ZSTDMT_waitForAllJobsCompleted(zcs); @@ -514,7 +516,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp zcs->jobs[jobID].params = zcs->params; zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].cctx = cctx; - zcs->jobs[jobID].firstChunk = (jobID==0); + zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); zcs->jobs[jobID].lastChunk = endFrame; zcs->jobs[jobID].jobCompleted = 0; zcs->jobs[jobID].dstFlushed = 0; @@ -526,7 +528,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); zcs->inBuff.filled = 0; if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ - zcs->jobs[jobID].cSize = ERROR(memory_allocation); zcs->jobs[jobID].jobCompleted = 1; zcs->nextJobID++; ZSTDMT_waitForAllJobsCompleted(zcs); @@ -543,6 +544,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp } /* check if there is any data available to flush */ + if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); while (zcs->jobs[wJobID].jobCompleted==0) { @@ -555,6 +557,11 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */ ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer; + if (ZSTD_isError(job.cSize)) { + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return job.cSize; + } memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite;