zstdmt: added ability to flush current job before it's completed

however, zstdmt may still wait on next available worker,
so it's not smooth yet.
This commit is contained in:
Yann Collet 2018-01-18 11:03:27 -08:00
parent aa79c18e3f
commit 1b5d80d633

View File

@ -315,7 +315,7 @@ typedef struct {
unsigned firstChunk; unsigned firstChunk;
unsigned lastChunk; unsigned lastChunk;
unsigned jobCompleted; unsigned jobCompleted;
unsigned jobScanned; unsigned checksumWritten;
ZSTD_pthread_mutex_t* jobCompleted_mutex; ZSTD_pthread_mutex_t* jobCompleted_mutex;
ZSTD_pthread_cond_t* jobCompleted_cond; ZSTD_pthread_cond_t* jobCompleted_cond;
ZSTD_CCtx_params params; ZSTD_CCtx_params params;
@ -389,7 +389,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
BYTE* op = ostart; BYTE* op = ostart;
BYTE* oend = op + dstBuff.size; BYTE* oend = op + dstBuff.size;
int blockNb; int blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); DEBUGLOG(2, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
assert(job->cSize == 0); assert(job->cSize == 0);
for (blockNb = 1; blockNb < nbBlocks; blockNb++) { for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
@ -400,10 +400,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize; job->cSize += cSize;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
DEBUGLOG(2, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
} }
/* last block */ /* last block */
if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) { if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1);
size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1;
size_t const cSize = (job->lastChunk) ? size_t const cSize = (job->lastChunk) ?
@ -428,7 +431,6 @@ _endJob:
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->consumed = job->srcSize; job->consumed = job->srcSize;
job->jobCompleted = 1; job->jobCompleted = 1;
job->jobScanned = 0;
ZSTD_pthread_cond_signal(job->jobCompleted_cond); ZSTD_pthread_cond_signal(job->jobCompleted_cond);
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
} }
@ -1017,6 +1019,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = endFrame; zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0; zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].checksumWritten = 0;
zcs->jobs[jobID].dstFlushed = 0; zcs->jobs[jobID].dstFlushed = 0;
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
@ -1069,18 +1072,20 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{ {
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].jobCompleted==0) { while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)",
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
assert(zcs->jobs[wJobID].jobCompleted==0);
if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */ ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
} }
ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
/* compression job completed : output can be flushed */ /* some output is available to be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID]; { ZSTDMT_jobDescription job = zcs->jobs[wJobID];
if (!job.jobScanned) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
if (ZSTD_isError(job.cSize)) { if (ZSTD_isError(job.cSize)) {
DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s", DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s",
zcs->doneJobID, ZSTD_getErrorName(job.cSize)); zcs->doneJobID, ZSTD_getErrorName(job.cSize));
@ -1088,24 +1093,27 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
ZSTDMT_releaseAllJobResources(zcs); ZSTDMT_releaseAllJobResources(zcs);
return job.cSize; return job.cSize;
} }
DEBUGLOG(5, "ZSTDMT_flushNextJob: zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); /* add frame checksum if necessary */
if (zcs->params.fParams.checksumFlag) { if ( zcs->frameEnded
if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ && (zcs->doneJobID+1 == zcs->nextJobID)
&& (zcs->params.fParams.checksumFlag)
&& (!job.checksumWritten) ) {
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum); DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4; job.cSize += 4;
zcs->jobs[wJobID].cSize += 4; zcs->jobs[wJobID].cSize += 4;
} } zcs->jobs[wJobID].checksumWritten = 1;
zcs->jobs[wJobID].jobScanned = 1;
} }
assert(job.cSize >= job.dstFlushed);
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); DEBUGLOG(2, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite; output->pos += toWrite;
job.dstFlushed += toWrite; job.dstFlushed += toWrite;
} }
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ if ( job.jobCompleted
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer; zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0; zcs->jobs[wJobID].jobCompleted = 0;