Simplified compressChunk job

minor refactoring : compression done in a single call on first chunk
Avoid a mutable hSize variable and eventual recombination to cSize at the end
This commit is contained in:
Yann Collet 2017-01-19 10:18:17 -08:00
parent 6073b3e6b8
commit 37226c1e9f

View File

@ -10,8 +10,16 @@
# include <stdio.h> # include <stdio.h>
# include <unistd.h> # include <unistd.h>
# include <sys/times.h> # include <sys/times.h>
static unsigned g_debugLevel = 2; static unsigned g_debugLevel = 3;
# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); } # define DEBUGLOGRAW(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); }
# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __FILE__ ": "); fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
# define DEBUG_PRINTHEX(l,p,n) { \
unsigned debug_u; \
for (debug_u=0; debug_u<(n); debug_u++) \
DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
DEBUGLOGRAW(l, " \n"); \
}
static unsigned long long GetCurrentClockTimeMicroseconds() static unsigned long long GetCurrentClockTimeMicroseconds()
{ {
@ -39,6 +47,7 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
# define DEBUGLOG(l, ...) {} /* disabled */ # define DEBUGLOG(l, ...) {} /* disabled */
# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m) # define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
# define DEBUG_PRINTHEX(l,p,n) {}
#endif #endif
@ -184,22 +193,20 @@ typedef struct {
void ZSTDMT_compressChunk(void* jobDescription) void ZSTDMT_compressChunk(void* jobDescription)
{ {
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
buffer_t dstBuff = job->dstBuff; buffer_t const dstBuff = job->dstBuff;
size_t hSize = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, job->params, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, job->params, job->fullFrameSize);
if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0); /* flush frame header */ if (!job->firstChunk) {
if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0); /* flush frame header */
if (job->firstChunk) { /* preserve frame header when it is first chunk */ if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
dstBuff.start = (char*)dstBuff.start + hSize; }
dstBuff.size -= hSize;
} else /* otherwise, overwrite */
hSize = 0;
DEBUGLOG(3, "Compressing : ");
DEBUG_PRINTHEX(3, job->srcStart, 12);
job->cSize = (job->lastChunk) ? /* last chunk signal */ job->cSize = (job->lastChunk) ? /* last chunk signal */
ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) : ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) :
ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize); ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize);
if (!ZSTD_isError(job->cSize)) job->cSize += hSize; DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(5, "chunk %u : compressed %u bytes into %u bytes ", (unsigned)job->lastChunk, (unsigned)job->srcSize, (unsigned)job->cSize);
_endJob: _endJob:
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
@ -271,8 +278,10 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx); ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx);
mtctx->jobs[jobID].cctx = NULL; mtctx->jobs[jobID].cctx = NULL;
} }
memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer);
mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.buffer = g_nullBuffer;
mtctx->allJobsCompleted = 1;
} }
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
@ -335,6 +344,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)chunkSize); DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)chunkSize);
DEBUG_PRINTHEX(3, mtctx->jobs[u].srcStart, 12);
POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
frameStartPos += chunkSize; frameStartPos += chunkSize;
@ -345,14 +355,14 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
{ unsigned chunkID; { unsigned chunkID;
size_t error = 0, dstPos = 0; size_t error = 0, dstPos = 0;
for (chunkID=0; chunkID<nbChunks; chunkID++) { for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(3, "ready to write chunk %u ", chunkID); DEBUGLOG(3, "waiting for chunk %u ", chunkID);
PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
while (mtctx->jobs[chunkID].jobCompleted==0) { while (mtctx->jobs[chunkID].jobCompleted==0) {
DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID); DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID);
pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
} }
pthread_mutex_unlock(&mtctx->jobCompleted_mutex); pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
DEBUGLOG(3, "ready to write chunk %u ", chunkID);
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx); ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx);
mtctx->jobs[chunkID].cctx = NULL; mtctx->jobs[chunkID].cctx = NULL;
@ -422,6 +432,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad); memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad);
input->pos += toLoad; input->pos += toLoad;
zcs->inBuff.filled += toLoad;
} }
if (zcs->inBuff.filled == zcs->inBuffSize) { /* filled enough : let's compress */ if (zcs->inBuff.filled == zcs->inBuffSize) { /* filled enough : let's compress */
@ -438,6 +449,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
return ERROR(memory_allocation); return ERROR(memory_allocation);
} }
DEBUGLOG(1, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = zcs->targetSectionSize; zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
@ -474,6 +486,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
ZSTDMT_jobDescription job = zcs->jobs[jobID]; ZSTDMT_jobDescription job = zcs->jobs[jobID];
if (job.jobCompleted) { /* job completed : output can be flushed */ if (job.jobCompleted) { /* job completed : output can be flushed */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(1, "trying to flush compressed data from job %u \n", (U32)zcs->doneJobID);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
zcs->jobs[jobID].cctx = NULL; zcs->jobs[jobID].cctx = NULL;
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
@ -489,6 +502,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */ if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
zcs->jobs[jobID].dstBuff = g_nullBuffer; zcs->jobs[jobID].dstBuff = g_nullBuffer;
zcs->jobs[jobID].jobCompleted = 0;
zcs->doneJobID++; zcs->doneJobID++;
} else { } else {
zcs->jobs[jobID].dstFlushed = job.dstFlushed; /* save flush level into zcs for later retrieval */ zcs->jobs[jobID].dstFlushed = job.dstFlushed; /* save flush level into zcs for later retrieval */
@ -503,6 +517,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
{ {
size_t const srcSize = zcs->inBuff.filled; size_t const srcSize = zcs->inBuff.filled;
DEBUGLOG(1, "flushing : %u bytes to compress", (U32)srcSize);
if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) { if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) {
size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
@ -548,12 +563,13 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
zcs->frameEnded = 1; zcs->frameEnded = 1;
} }
DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize); DEBUGLOG(1, "posting job %u : %u bytes (end:%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk);
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
zcs->nextJobID++; zcs->nextJobID++;
} }
/* check if there is any data available to flush */ /* check if there is any data available to flush */
DEBUGLOG(1, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
{ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
@ -565,6 +581,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
{ /* job completed : output can be flushed */ { /* job completed : output can be flushed */
ZSTDMT_jobDescription job = zcs->jobs[wJobID]; ZSTDMT_jobDescription job = zcs->jobs[wJobID];
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(1, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */ ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer; ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
if (ZSTD_isError(job.cSize)) { if (ZSTD_isError(job.cSize)) {
@ -577,6 +594,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
job.dstFlushed += toWrite; job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */ if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer; ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;
zcs->doneJobID++; zcs->doneJobID++;
} else { } else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed; zcs->jobs[wJobID].dstFlushed = job.dstFlushed;