trap compression errors, collect back resources from workers

This commit is contained in:
Yann Collet 2017-01-18 14:11:37 -08:00
parent 563ef8acf4
commit 4885f591b3

View File

@ -423,7 +423,6 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
@ -438,7 +437,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx;
zcs->jobs[jobID].firstChunk = (jobID==0);
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = 0;
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].dstFlushed = 0;
@ -448,7 +447,6 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
/* get a new buffer for next input - save remaining into it */
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
@ -472,6 +470,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
zcs->jobs[jobID].cctx = NULL;
ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer;
if (ZSTD_isError(job.cSize)) {
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
return job.cSize;
}
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
@ -499,7 +502,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
@ -514,7 +516,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx;
zcs->jobs[jobID].firstChunk = (jobID==0);
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].dstFlushed = 0;
@ -526,7 +528,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
zcs->inBuff.filled = 0;
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
zcs->jobs[jobID].cSize = ERROR(memory_allocation);
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
@ -543,6 +544,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
}
/* check if there is any data available to flush */
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
{ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].jobCompleted==0) {
@ -555,6 +557,11 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
if (ZSTD_isError(job.cSize)) {
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
return job.cSize;
}
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;