fixed zstdmt cli freeze issue with large nb of threads

fileio.c was continually pushing more content without giving a chance to flush compressed one.
It would block the job queue when input data was accumulated too fast (requiring to define many threads).
Fixed : fileio flushes whatever it can after each input attempt.
This commit is contained in:
Yann Collet 2017-01-25 12:31:07 -08:00
parent dc8dae596a
commit 943cff9c37
2 changed files with 14 additions and 11 deletions

View File

@ -233,6 +233,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
if (job->cdict) { if (job->cdict) {
size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize);
if (job->cdict) DEBUGLOG(3, "using CDict ");
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { } else {
size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize);
@ -296,6 +297,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
U32 const minNbJobs = nbThreads + 2; U32 const minNbJobs = nbThreads + 2;
U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1; U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
U32 const nbJobs = 1 << nbJobsLog2; U32 const nbJobs = 1 << nbJobsLog2;
//nbThreads = 1; /* for tests */
DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n", DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n",
nbThreads, minNbJobs, nbJobsLog2, nbJobs); nbThreads, minNbJobs, nbJobsLog2, nbJobs);
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
@ -490,6 +492,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
ZSTD_parameters params, unsigned long long pledgedSrcSize) ZSTD_parameters params, unsigned long long pledgedSrcSize)
{ {
ZSTD_customMem const cmem = { NULL, NULL, NULL }; ZSTD_customMem const cmem = { NULL, NULL, NULL };
DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog);
if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize); if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize);
if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */ if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */
ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_waitForAllJobsCompleted(zcs);
@ -596,6 +599,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
} else { } else {
zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.buffer = g_nullBuffer;
zcs->inBuff.filled = 0; zcs->inBuff.filled = 0;
zcs->dictSize = 0;
zcs->frameEnded = 1; zcs->frameEnded = 1;
if (zcs->nextJobID == 0) if (zcs->nextJobID == 0)
zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */ zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */
@ -698,12 +702,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame) static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
{ {
size_t const srcSize = zcs->inBuff.filled; size_t const srcSize = zcs->inBuff.filled - zcs->dictSize;
if (srcSize) DEBUGLOG(1, "flushing : %u bytes left to compress", (U32)srcSize); if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize - zcs->dictSize, endFrame) ); CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
} }
/* check if there is any data available to flush */ /* check if there is any data available to flush */

View File

@ -349,23 +349,22 @@ static int FIO_compressFilename_internal(cRess_t ress,
readsize += inSize; readsize += inSize;
DISPLAYUPDATE(2, "\rRead : %u MB ", (U32)(readsize>>20)); DISPLAYUPDATE(2, "\rRead : %u MB ", (U32)(readsize>>20));
/* Compress using buffered streaming */
{ ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; { ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */ while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */
ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff); size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff);
#else #else
size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff); size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
#endif #endif
if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result));
}
/* Write cBlock */ /* Write compressed stream */
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); if (outBuff.pos) {
if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); } size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
compressedfilesize += outBuff.pos; if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName);
} compressedfilesize += outBuff.pos;
} } }
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100); DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100);
} }