created ZSTDMT_toFlushNow()

tells in a non-blocking way if there is something ready to flush right now.
only works with multi-threading for the time being.

Useful to know if flush speed will be limited by lack of production.
This commit is contained in:
Yann Collet 2018-08-17 18:11:54 -07:00
parent 09e63c58ac
commit 105677c6db
5 changed files with 75 additions and 12 deletions

View File

@ -906,6 +906,20 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
return fp; return fp;
} } } }
/*! ZSTD_toFlushNow()
* Only useful for multithreading scenarios currently (nbWorkers >= 1).
*/
size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx)
{
#ifdef ZSTD_MULTITHREAD
if (cctx->appliedParams.nbWorkers > 0) {
return ZSTDMT_toFlushNow(cctx->mtctx);
}
#endif
return 0; /* over-simplification; could also check if context is currently running in streaming mode, and in which case, report how many bytes are left to be flushed within output buffer */
}
static U32 ZSTD_equivalentCParams(ZSTD_compressionParameters cParams1, static U32 ZSTD_equivalentCParams(ZSTD_compressionParameters cParams1,
ZSTD_compressionParameters cParams2) ZSTD_compressionParameters cParams2)

View File

@ -1096,20 +1096,45 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize; { size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
assert(flushed <= produced);
fps.ingested += jobPtr->src.size; fps.ingested += jobPtr->src.size;
fps.consumed += jobPtr->consumed; fps.consumed += jobPtr->consumed;
fps.produced += produced; fps.produced += produced;
fps.flushed += jobPtr->dstFlushed; fps.flushed += flushed;
fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size); fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size);
} }
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
} }
} }
DEBUGLOG(5, "ZSTDMT_getFrameProgression : completed");
return fps; return fps;
} }
size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
{
size_t toFlush;
unsigned const jobID = mtctx->doneJobID;
assert(jobID <= mtctx->nextJobID);
if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */
{ unsigned const wJobID = jobID & mtctx->jobIDMask;
ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
assert(flushed <= produced);
toFlush = produced - flushed;
if (toFlush==0) assert(jobPtr->consumed < jobPtr->src.size); /* if toFlush==0, doneJobID should still be active: if doneJobID is completed and fully flushed, ZSTDMT_flushProduced() should have already moved to next job */
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
}
return toFlush;
}
/* ------------------------------------------ */ /* ------------------------------------------ */
/* ===== Multi-threaded compression ===== */ /* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */ /* ------------------------------------------ */

View File

@ -119,11 +119,21 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
* === Not exposed in libzstd. Never invoke directly === * === Not exposed in libzstd. Never invoke directly ===
* ======================================================== */ * ======================================================== */
/*! ZSTDMT_toFlushNow()
* Tell how many bytes are ready to be flushed immediately.
* Probe the oldest active job (not yet entirely flushed) and check its output buffer.
* If return 0, it means there is no active job,
* or, it means oldest job is still active, but everything produced has been flushed so far,
* therefore flushing is limited by speed of oldest job. */
size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx);
/*! ZSTDMT_CCtxParam_setMTCtxParameter()
* like ZSTDMT_setMTCtxParameter(), but into a ZSTD_CCtx_Params */
size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value); size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value);
/* ZSTDMT_CCtxParam_setNbWorkers() /*! ZSTDMT_CCtxParam_setNbWorkers()
* Set nbWorkers, and clamp it. * Set nbWorkers, and clamp it.
* Also reset jobSize and overlapLog */ * Also reset jobSize and overlapLog */
size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers); size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);
/*! ZSTDMT_updateCParams_whileCompressing() : /*! ZSTDMT_updateCParams_whileCompressing() :
@ -131,9 +141,9 @@ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorker
* New parameters will be applied to next compression job. */ * New parameters will be applied to next compression job. */
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams); void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams);
/* ZSTDMT_getFrameProgression(): /*! ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame. * tells how much data has been consumed (input) and produced (output) for current frame.
* able to count progression inside worker threads. * able to count progression inside worker threads.
*/ */
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx); ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx);

View File

@ -746,7 +746,16 @@ typedef struct {
* Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed. * Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed.
* Can report progression inside worker threads (multi-threading and non-blocking mode). * Can report progression inside worker threads (multi-threading and non-blocking mode).
*/ */
ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx); ZSTDLIB_API ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);
/*! ZSTD_toFlushNow()
* Tell how many bytes are ready to be flushed immediately.
* Useful for multithreading scenarios (nbWorkers >= 1).
* Probe the oldest active job (not yet entirely flushed) and check its output buffer.
* If return 0, it means there is no active job, or
* it means oldest job is still active, but everything produced has been flushed so far,
* therefore flushing is limited by speed of oldest job. */
ZSTDLIB_API size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx);

View File

@ -747,6 +747,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
/* stats */ /* stats */
typedef enum { noChange, slower, faster } speedChange_e; typedef enum { noChange, slower, faster } speedChange_e;
speedChange_e speedChange = noChange; speedChange_e speedChange = noChange;
unsigned flushWaiting = 0;
unsigned inputPresented = 0; unsigned inputPresented = 0;
unsigned inputBlocked = 0; unsigned inputBlocked = 0;
unsigned lastJobID = 0; unsigned lastJobID = 0;
@ -777,11 +778,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
size_t const oldIPos = inBuff.pos; size_t const oldIPos = inBuff.pos;
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
/* count stats */ /* count stats */
inputPresented++; inputPresented++;
if (oldIPos == inBuff.pos) inputBlocked++; if (oldIPos == inBuff.pos) inputBlocked++;
if (!toFlushNow) flushWaiting = 1;
/* Write compressed stream */ /* Write compressed stream */
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
@ -817,11 +820,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
speedChange = slower; speedChange = slower;
} }
if ( (newlyProduced > (newlyFlushed * 9 / 8)) if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
&& (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) { && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush); ) {
DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
speedChange = slower; speedChange = slower;
} }
flushWaiting = 0;
} }
/* course correct only if there is at least one new job completed */ /* course correct only if there is at least one new job completed */