From 1b5d80d633827bdb4d934b54833d5bfbebc000bf Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 18 Jan 2018 11:03:27 -0800 Subject: [PATCH] zstdmt: added ability to flush current job before it's completed however, zstdmt may still wait on next available worker, so it's not smooth yet. --- lib/compress/zstdmt_compress.c | 66 +++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 5624b5e3..812bc89b 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -315,7 +315,7 @@ typedef struct { unsigned firstChunk; unsigned lastChunk; unsigned jobCompleted; - unsigned jobScanned; + unsigned checksumWritten; ZSTD_pthread_mutex_t* jobCompleted_mutex; ZSTD_pthread_cond_t* jobCompleted_cond; ZSTD_CCtx_params params; @@ -389,7 +389,7 @@ void ZSTDMT_compressChunk(void* jobDescription) BYTE* op = ostart; BYTE* oend = op + dstBuff.size; int blockNb; - DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); + DEBUGLOG(2, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); assert(job->cSize == 0); for (blockNb = 1; blockNb < nbBlocks; blockNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); @@ -400,10 +400,13 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; + DEBUGLOG(2, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", + (U32)cSize, (U32)job->cSize); + ZSTD_pthread_cond_signal(job->jobCompleted_cond); ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); } /* last block */ - if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) { + if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) { size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; size_t const cSize = (job->lastChunk) ? @@ -428,7 +431,6 @@ _endJob: ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); job->consumed = job->srcSize; job->jobCompleted = 1; - job->jobScanned = 0; ZSTD_pthread_cond_signal(job->jobCompleted_cond); ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); } @@ -1017,6 +1019,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); zcs->jobs[jobID].lastChunk = endFrame; zcs->jobs[jobID].jobCompleted = 0; + zcs->jobs[jobID].checksumWritten = 0; zcs->jobs[jobID].dstFlushed = 0; zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; @@ -1069,43 +1072,48 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; - DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); + DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); - while (zcs->jobs[wJobID].jobCompleted==0) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); + while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) { + DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)", + zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed); + assert(zcs->jobs[wJobID].jobCompleted==0); if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */ } - ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); - /* compression job completed : output can be flushed */ + + /* some output is available to be flushed */ { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; - if (!job.jobScanned) { - if (ZSTD_isError(job.cSize)) { - DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s", - zcs->doneJobID, ZSTD_getErrorName(job.cSize)); - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return job.cSize; - } - DEBUGLOG(5, "ZSTDMT_flushNextJob: zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); - if (zcs->params.fParams.checksumFlag) { - if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ - U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); - DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum); - MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); - job.cSize += 4; - zcs->jobs[wJobID].cSize += 4; - } } - zcs->jobs[wJobID].jobScanned = 1; + ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); + if (ZSTD_isError(job.cSize)) { + DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s", + zcs->doneJobID, ZSTD_getErrorName(job.cSize)); + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return job.cSize; } + /* add frame checksum if necessary */ + if ( zcs->frameEnded + && (zcs->doneJobID+1 == zcs->nextJobID) + && (zcs->params.fParams.checksumFlag) + && (!job.checksumWritten) ) { + U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); + DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum); + MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); + job.cSize += 4; + zcs->jobs[wJobID].cSize += 4; + zcs->jobs[wJobID].checksumWritten = 1; + } + assert(job.cSize >= job.dstFlushed); { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); + DEBUGLOG(2, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; } - if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ + if ( job.jobCompleted + && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */ ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer; zcs->jobs[wJobID].jobCompleted = 0;