Merge pull request #995 from facebook/progressiveMT

Progressive mt
This commit is contained in:
Yann Collet 2018-01-18 17:59:49 -08:00 committed by GitHub
commit d19dc1903c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 139 additions and 23 deletions

View File

@ -792,10 +792,20 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
ZSTD_p_dictIDFlag, </b>/* When applicable, dictionary's ID is written into frame header (default:1) */<b>
</b>/* multi-threading parameters */<b>
</b>/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).<b>
* They return an error otherwise. */
ZSTD_p_nbThreads=400, </b>/* Select how many threads a compression job can spawn (default:1)<b>
* More threads improve speed, but also increase memory usage.
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
* Special: value 0 means "do not change nbThreads" */
ZSTD_p_nonBlockingMode, </b>/* Single thread mode is by default "blocking" :<b>
* it finishes its job as much as possible, and only then gives back control to caller.
* In contrast, multi-thread is by default "non-blocking" :
* it takes some input, flush some output if available, and immediately gives back control to caller.
* Compression work is performed in parallel, within worker threads.
* (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
* Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
* It allows the caller to do other tasks while the worker thread compresses in parallel. */
ZSTD_p_jobSize, </b>/* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.<b>
* Each compression job is completed in parallel, so indirectly controls the nb of active threads.
* 0 means default, which is dynamically determined based on compression parameters.

View File

@ -754,6 +754,29 @@ size_t ZSTD_estimateCStreamSize(int compressionLevel) {
return memBudget;
}
/* ZSTD_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame.
* able to count progression inside worker threads (non-blocking mode).
*/
ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
{
#ifdef ZSTD_MULTITHREAD
if ((cctx->appliedParams.nbThreads > 1) || (cctx->appliedParams.nonBlockingMode)) {
return ZSTDMT_getFrameProgression(cctx->mtctx);
}
#endif
{ ZSTD_frameProgression fp;
size_t const buffered = (cctx->inBuff == NULL) ? 0 :
cctx->inBuffPos - cctx->inToCompress;
if (buffered) assert(cctx->inBuffPos >= cctx->inToCompress);
assert(buffered <= ZSTD_BLOCKSIZE_MAX);
fp.ingested = cctx->consumedSrcSize + buffered;
fp.consumed = cctx->consumedSrcSize;
fp.produced = cctx->producedCSize;
return fp;
} }
static U32 ZSTD_equivalentCParams(ZSTD_compressionParameters cParams1,
ZSTD_compressionParameters cParams2)
{
@ -850,6 +873,7 @@ static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_CCtx_params params, U64 pl
cctx->appliedParams = params;
cctx->pledgedSrcSizePlusOne = pledgedSrcSize+1;
cctx->consumedSrcSize = 0;
cctx->producedCSize = 0;
if (pledgedSrcSize == ZSTD_CONTENTSIZE_UNKNOWN)
cctx->appliedParams.fParams.contentSizeFlag = 0;
DEBUGLOG(4, "pledged content size : %u ; flag : %u",
@ -1007,6 +1031,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
zc->appliedParams = params;
zc->pledgedSrcSizePlusOne = pledgedSrcSize+1;
zc->consumedSrcSize = 0;
zc->producedCSize = 0;
if (pledgedSrcSize == ZSTD_CONTENTSIZE_UNKNOWN)
zc->appliedParams.fParams.contentSizeFlag = 0;
DEBUGLOG(4, "pledged content size : %u ; flag : %u",
@ -2063,6 +2088,7 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx,
ZSTD_compressBlock_internal (cctx, dst, dstCapacity, src, srcSize);
if (ZSTD_isError(cSize)) return cSize;
cctx->consumedSrcSize += srcSize;
cctx->producedCSize += (cSize + fhSize);
return cSize + fhSize;
}
}

View File

@ -170,8 +170,9 @@ struct ZSTD_CCtx_s {
void* workSpace;
size_t workSpaceSize;
size_t blockSize;
U64 pledgedSrcSizePlusOne; /* this way, 0 (default) == unknown */
U64 consumedSrcSize;
unsigned long long pledgedSrcSizePlusOne; /* this way, 0 (default) == unknown */
unsigned long long consumedSrcSize;
unsigned long long producedCSize;
XXH64_state_t xxhState;
ZSTD_customMem customMem;
size_t staticSize;

View File

@ -308,7 +308,7 @@ typedef struct {
const void* srcStart;
size_t prefixSize;
size_t srcSize;
size_t readSize;
size_t consumed;
buffer_t dstBuff;
size_t cSize;
size_t dstFlushed;
@ -382,8 +382,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
#else
if (sizeof(size_t) > sizeof(int))
assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */
if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */
{ int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX);
const BYTE* ip = (const BYTE*) src;
BYTE* const ostart = (BYTE*)dstBuff.start;
@ -391,15 +390,17 @@ void ZSTDMT_compressChunk(void* jobDescription)
BYTE* oend = op + dstBuff.size;
int blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
job->cSize = 0;
assert(job->cSize == 0);
for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
ip += ZSTD_BLOCKSIZE_MAX;
op += cSize; assert(op < oend);
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->readSize = ZSTD_BLOCKSIZE_MAX * blockNb;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
}
/* last block */
if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) {
@ -410,9 +411,11 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = job->srcSize;
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
}
job->readSize = job->srcSize;
}
#endif
@ -423,6 +426,7 @@ _endJob:
job->src = g_nullBuffer; job->srcStart = NULL;
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->consumed = job->srcSize;
job->jobCompleted = 1;
job->jobScanned = 0;
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
@ -460,6 +464,8 @@ struct ZSTDMT_CCtx_s {
unsigned frameEnded;
unsigned allJobsCompleted;
unsigned long long frameContentSize;
unsigned long long consumed;
unsigned long long produced;
ZSTD_customMem cMem;
ZSTD_CDict* cdictLocal;
const ZSTD_CDict* cdict;
@ -501,15 +507,6 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
return nbThreads;
}
/* ZSTDMT_getNbThreads():
* @return nb threads currently active in mtctx.
* mtctx must be valid */
unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
{
assert(mtctx != NULL);
return mtctx->params.nbThreads;
}
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
{
ZSTDMT_CCtx* mtctx;
@ -553,6 +550,7 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads)
return ZSTDMT_createCCtx_advanced(nbThreads, ZSTD_defaultCMem);
}
/* ZSTDMT_releaseAllJobResources() :
* note : ensure all workers are killed first ! */
static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
@ -652,6 +650,43 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
}
}
/* ZSTDMT_getNbThreads():
* @return nb threads currently active in mtctx.
* mtctx must be valid */
unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
{
assert(mtctx != NULL);
return mtctx->params.nbThreads;
}
/* ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame.
* able to count progression inside worker threads.
* Note : mutex will be acquired during statistics collection. */
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{
ZSTD_frameProgression fs;
DEBUGLOG(5, "ZSTDMT_getFrameProgression");
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
fs.consumed = mtctx->consumed;
fs.produced = mtctx->produced;
assert(mtctx->inBuff.filled >= mtctx->prefixSize);
fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize);
{ unsigned jobNb;
for (jobNb = mtctx->doneJobID ; jobNb < mtctx->nextJobID ; jobNb++) {
unsigned const wJobID = jobNb & mtctx->jobIDMask;
size_t const cResult = mtctx->jobs[wJobID].cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
fs.consumed += mtctx->jobs[wJobID].consumed;
fs.ingested += mtctx->jobs[wJobID].srcSize;
fs.produced += produced;
}
}
ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
return fs;
}
/* ------------------------------------------ */
/* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */
@ -727,6 +762,8 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize;
mtctx->jobs[u].consumed = 0;
mtctx->jobs[u].cSize = 0;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
mtctx->jobs[u].fullFrameSize = srcSize;
mtctx->jobs[u].params = jobParams;
@ -900,6 +937,8 @@ size_t ZSTDMT_initCStream_internal(
zcs->nextJobID = 0;
zcs->frameEnded = 0;
zcs->allJobsCompleted = 0;
zcs->consumed = 0;
zcs->produced = 0;
if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
return 0;
}
@ -963,7 +1002,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].readSize = 0;
zcs->jobs[jobID].consumed = 0;
zcs->jobs[jobID].cSize = 0;
zcs->jobs[jobID].prefixSize = zcs->prefixSize;
assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
zcs->jobs[jobID].params = zcs->params;
@ -1070,6 +1110,8 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;
zcs->doneJobID++;
zcs->consumed += job.srcSize;
zcs->produced += job.cSize;
} else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
}

View File

@ -122,6 +122,13 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
* mtctx must be valid */
unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx);
/* ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame.
* able to count progression inside worker threads.
*/
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx);
/*! ZSTDMT_initCStream_internal() :
* Private use only. Init streaming operation.
* expects params to be valid.

View File

@ -713,11 +713,27 @@ ZSTDLIB_API size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const
* If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN.
* If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end.
* For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs,
* but it may change to mean "empty" in some future version, so prefer using macro ZSTD_CONTENTSIZE_UNKNOWN.
* but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead.
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize);
typedef struct {
unsigned long long ingested;
unsigned long long consumed;
unsigned long long produced;
} ZSTD_frameProgression;
/* ZSTD_getFrameProgression():
* tells how much data has been ingested (read from input)
* consumed (input actually compressed) and produced (output) for current frame.
* Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed.
* Can report progression inside worker threads (multi-threading and non-blocking mode).
*/
ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);
/*===== Advanced Streaming decompression functions =====*/
typedef enum { DStream_p_maxWindowSize } ZSTD_DStreamParameter_e;
ZSTDLIB_API size_t ZSTD_setDStreamParameter(ZSTD_DStream* zds, ZSTD_DStreamParameter_e paramType, unsigned paramValue); /* obsolete : this API will be removed in a future version */

View File

@ -84,10 +84,13 @@ void FIO_setNotificationLevel(unsigned level) { g_displayLevel=level; }
static const U64 g_refreshRate = SEC_TO_MICRO / 6;
static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
#define DISPLAYUPDATE(l, ...) { if (g_displayLevel>=l) { \
if ((UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) || (g_displayLevel>=4)) \
{ g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \
if (g_displayLevel>=4) fflush(stderr); } } }
#define READY_FOR_UPDATE() (UTIL_clockSpanMicro(g_displayClock) > g_refreshRate)
#define DISPLAYUPDATE(l, ...) { \
if (g_displayLevel>=l) { \
if (READY_FOR_UPDATE() || (g_displayLevel>=4)) { \
g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \
if (g_displayLevel>=4) fflush(stderr); \
} } }
#undef MIN /* in case it would be already defined */
#define MIN(a,b) ((a) < (b) ? (a) : (b))
@ -809,12 +812,23 @@ static int FIO_compressFilename_internal(cRess_t ress,
compressedfilesize += outBuff.pos;
}
}
#if 1
if (READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
(U32)(zfp.ingested >> 20),
(U32)(zfp.consumed >> 20),
(U32)(zfp.produced >> 20),
(double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 );
}
#else
if (fileSize == UTIL_FILESIZE_UNKNOWN) {
DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20));
} else {
DISPLAYUPDATE(2, "\rRead : %u / %u MB",
(U32)(readsize>>20), (U32)(fileSize>>20));
}
#endif
} while (directive != ZSTD_e_end);
finish: