diff --git a/doc/zstd_manual.html b/doc/zstd_manual.html index a2e0966d..473d5be8 100644 --- a/doc/zstd_manual.html +++ b/doc/zstd_manual.html @@ -792,10 +792,20 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long ZSTD_p_dictIDFlag, /* When applicable, dictionary's ID is written into frame header (default:1) */ /* multi-threading parameters */ + /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD). + * They return an error otherwise. */ ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) * More threads improve speed, but also increase memory usage. * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * Special: value 0 means "do not change nbThreads" */ + ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" : + * it finishes its job as much as possible, and only then gives back control to caller. + * In contrast, multi-thread is by default "non-blocking" : + * it takes some input, flush some output if available, and immediately gives back control to caller. + * Compression work is performed in parallel, within worker threads. + * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking) + * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected. + * It allows the caller to do other tasks while the worker thread compresses in parallel. */ ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. * Each compression job is completed in parallel, so indirectly controls the nb of active threads. * 0 means default, which is dynamically determined based on compression parameters. diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 6d477874..5211384e 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -754,6 +754,29 @@ size_t ZSTD_estimateCStreamSize(int compressionLevel) { return memBudget; } +/* ZSTD_getFrameProgression(): + * tells how much data has been consumed (input) and produced (output) for current frame. + * able to count progression inside worker threads (non-blocking mode). + */ +ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx) +{ +#ifdef ZSTD_MULTITHREAD + if ((cctx->appliedParams.nbThreads > 1) || (cctx->appliedParams.nonBlockingMode)) { + return ZSTDMT_getFrameProgression(cctx->mtctx); + } +#endif + { ZSTD_frameProgression fp; + size_t const buffered = (cctx->inBuff == NULL) ? 0 : + cctx->inBuffPos - cctx->inToCompress; + if (buffered) assert(cctx->inBuffPos >= cctx->inToCompress); + assert(buffered <= ZSTD_BLOCKSIZE_MAX); + fp.ingested = cctx->consumedSrcSize + buffered; + fp.consumed = cctx->consumedSrcSize; + fp.produced = cctx->producedCSize; + return fp; +} } + + static U32 ZSTD_equivalentCParams(ZSTD_compressionParameters cParams1, ZSTD_compressionParameters cParams2) { @@ -850,6 +873,7 @@ static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_CCtx_params params, U64 pl cctx->appliedParams = params; cctx->pledgedSrcSizePlusOne = pledgedSrcSize+1; cctx->consumedSrcSize = 0; + cctx->producedCSize = 0; if (pledgedSrcSize == ZSTD_CONTENTSIZE_UNKNOWN) cctx->appliedParams.fParams.contentSizeFlag = 0; DEBUGLOG(4, "pledged content size : %u ; flag : %u", @@ -1007,6 +1031,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, zc->appliedParams = params; zc->pledgedSrcSizePlusOne = pledgedSrcSize+1; zc->consumedSrcSize = 0; + zc->producedCSize = 0; if (pledgedSrcSize == ZSTD_CONTENTSIZE_UNKNOWN) zc->appliedParams.fParams.contentSizeFlag = 0; DEBUGLOG(4, "pledged content size : %u ; flag : %u", @@ -2063,6 +2088,7 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx, ZSTD_compressBlock_internal (cctx, dst, dstCapacity, src, srcSize); if (ZSTD_isError(cSize)) return cSize; cctx->consumedSrcSize += srcSize; + cctx->producedCSize += (cSize + fhSize); return cSize + fhSize; } } diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index eb18cbc5..64f03a0d 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -170,8 +170,9 @@ struct ZSTD_CCtx_s { void* workSpace; size_t workSpaceSize; size_t blockSize; - U64 pledgedSrcSizePlusOne; /* this way, 0 (default) == unknown */ - U64 consumedSrcSize; + unsigned long long pledgedSrcSizePlusOne; /* this way, 0 (default) == unknown */ + unsigned long long consumedSrcSize; + unsigned long long producedCSize; XXH64_state_t xxhState; ZSTD_customMem customMem; size_t staticSize; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index b3ccfe39..f9a25270 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -308,7 +308,7 @@ typedef struct { const void* srcStart; size_t prefixSize; size_t srcSize; - size_t readSize; + size_t consumed; buffer_t dstBuff; size_t cSize; size_t dstFlushed; @@ -382,8 +382,7 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); #else - if (sizeof(size_t) > sizeof(int)) - assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ + if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ { int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX); const BYTE* ip = (const BYTE*) src; BYTE* const ostart = (BYTE*)dstBuff.start; @@ -391,15 +390,17 @@ void ZSTDMT_compressChunk(void* jobDescription) BYTE* oend = op + dstBuff.size; int blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); - job->cSize = 0; + assert(job->cSize == 0); for (blockNb = 1; blockNb < nbBlocks; blockNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } ip += ZSTD_BLOCKSIZE_MAX; op += cSize; assert(op < oend); /* stats */ + ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; - job->readSize = ZSTD_BLOCKSIZE_MAX * blockNb; + job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; + ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); } /* last block */ if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) { @@ -410,9 +411,11 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ + ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; + job->consumed = job->srcSize; + ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); } - job->readSize = job->srcSize; } #endif @@ -423,6 +426,7 @@ _endJob: job->src = g_nullBuffer; job->srcStart = NULL; /* report */ ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); + job->consumed = job->srcSize; job->jobCompleted = 1; job->jobScanned = 0; ZSTD_pthread_cond_signal(job->jobCompleted_cond); @@ -460,6 +464,8 @@ struct ZSTDMT_CCtx_s { unsigned frameEnded; unsigned allJobsCompleted; unsigned long long frameContentSize; + unsigned long long consumed; + unsigned long long produced; ZSTD_customMem cMem; ZSTD_CDict* cdictLocal; const ZSTD_CDict* cdict; @@ -501,15 +507,6 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread return nbThreads; } -/* ZSTDMT_getNbThreads(): - * @return nb threads currently active in mtctx. - * mtctx must be valid */ -unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) -{ - assert(mtctx != NULL); - return mtctx->params.nbThreads; -} - ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) { ZSTDMT_CCtx* mtctx; @@ -553,6 +550,7 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads) return ZSTDMT_createCCtx_advanced(nbThreads, ZSTD_defaultCMem); } + /* ZSTDMT_releaseAllJobResources() : * note : ensure all workers are killed first ! */ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) @@ -652,6 +650,43 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, } } +/* ZSTDMT_getNbThreads(): + * @return nb threads currently active in mtctx. + * mtctx must be valid */ +unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) +{ + assert(mtctx != NULL); + return mtctx->params.nbThreads; +} + +/* ZSTDMT_getFrameProgression(): + * tells how much data has been consumed (input) and produced (output) for current frame. + * able to count progression inside worker threads. + * Note : mutex will be acquired during statistics collection. */ +ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) +{ + ZSTD_frameProgression fs; + DEBUGLOG(5, "ZSTDMT_getFrameProgression"); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); + fs.consumed = mtctx->consumed; + fs.produced = mtctx->produced; + assert(mtctx->inBuff.filled >= mtctx->prefixSize); + fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize); + { unsigned jobNb; + for (jobNb = mtctx->doneJobID ; jobNb < mtctx->nextJobID ; jobNb++) { + unsigned const wJobID = jobNb & mtctx->jobIDMask; + size_t const cResult = mtctx->jobs[wJobID].cSize; + size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; + fs.consumed += mtctx->jobs[wJobID].consumed; + fs.ingested += mtctx->jobs[wJobID].srcSize; + fs.produced += produced; + } + } + ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex); + return fs; +} + + /* ------------------------------------------ */ /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ @@ -727,6 +762,8 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].prefixSize = dictSize; mtctx->jobs[u].srcSize = chunkSize; + mtctx->jobs[u].consumed = 0; + mtctx->jobs[u].cSize = 0; mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; mtctx->jobs[u].fullFrameSize = srcSize; mtctx->jobs[u].params = jobParams; @@ -900,6 +937,8 @@ size_t ZSTDMT_initCStream_internal( zcs->nextJobID = 0; zcs->frameEnded = 0; zcs->allJobsCompleted = 0; + zcs->consumed = 0; + zcs->produced = 0; if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); return 0; } @@ -963,7 +1002,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; - zcs->jobs[jobID].readSize = 0; + zcs->jobs[jobID].consumed = 0; + zcs->jobs[jobID].cSize = 0; zcs->jobs[jobID].prefixSize = zcs->prefixSize; assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize); zcs->jobs[jobID].params = zcs->params; @@ -1070,6 +1110,8 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi zcs->jobs[wJobID].dstBuff = g_nullBuffer; zcs->jobs[wJobID].jobCompleted = 0; zcs->doneJobID++; + zcs->consumed += job.srcSize; + zcs->produced += job.cSize; } else { zcs->jobs[wJobID].dstFlushed = job.dstFlushed; } diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 7716ea68..7c1e7e27 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -122,6 +122,13 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread * mtctx must be valid */ unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); +/* ZSTDMT_getFrameProgression(): + * tells how much data has been consumed (input) and produced (output) for current frame. + * able to count progression inside worker threads. + */ +ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx); + + /*! ZSTDMT_initCStream_internal() : * Private use only. Init streaming operation. * expects params to be valid. diff --git a/lib/zstd.h b/lib/zstd.h index 11ed70e3..a1b8ea19 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -713,11 +713,27 @@ ZSTDLIB_API size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const * If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN. * If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end. * For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs, - * but it may change to mean "empty" in some future version, so prefer using macro ZSTD_CONTENTSIZE_UNKNOWN. + * but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead. * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize); +typedef struct { + unsigned long long ingested; + unsigned long long consumed; + unsigned long long produced; +} ZSTD_frameProgression; + +/* ZSTD_getFrameProgression(): + * tells how much data has been ingested (read from input) + * consumed (input actually compressed) and produced (output) for current frame. + * Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed. + * Can report progression inside worker threads (multi-threading and non-blocking mode). + */ +ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx); + + + /*===== Advanced Streaming decompression functions =====*/ typedef enum { DStream_p_maxWindowSize } ZSTD_DStreamParameter_e; ZSTDLIB_API size_t ZSTD_setDStreamParameter(ZSTD_DStream* zds, ZSTD_DStreamParameter_e paramType, unsigned paramValue); /* obsolete : this API will be removed in a future version */ diff --git a/programs/fileio.c b/programs/fileio.c index 326f0e67..7045a532 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -84,10 +84,13 @@ void FIO_setNotificationLevel(unsigned level) { g_displayLevel=level; } static const U64 g_refreshRate = SEC_TO_MICRO / 6; static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; -#define DISPLAYUPDATE(l, ...) { if (g_displayLevel>=l) { \ - if ((UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) || (g_displayLevel>=4)) \ - { g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \ - if (g_displayLevel>=4) fflush(stderr); } } } +#define READY_FOR_UPDATE() (UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) +#define DISPLAYUPDATE(l, ...) { \ + if (g_displayLevel>=l) { \ + if (READY_FOR_UPDATE() || (g_displayLevel>=4)) { \ + g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \ + if (g_displayLevel>=4) fflush(stderr); \ + } } } #undef MIN /* in case it would be already defined */ #define MIN(a,b) ((a) < (b) ? (a) : (b)) @@ -809,12 +812,23 @@ static int FIO_compressFilename_internal(cRess_t ress, compressedfilesize += outBuff.pos; } } +#if 1 + if (READY_FOR_UPDATE()) { + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); + DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", + (U32)(zfp.ingested >> 20), + (U32)(zfp.consumed >> 20), + (U32)(zfp.produced >> 20), + (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 ); + } +#else if (fileSize == UTIL_FILESIZE_UNKNOWN) { DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20)); } else { DISPLAYUPDATE(2, "\rRead : %u / %u MB", (U32)(readsize>>20), (U32)(fileSize>>20)); } +#endif } while (directive != ZSTD_e_end); finish: