From 2bfc79ab8d480e7556ee75135407cc8aa0557b60 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 1 Feb 2018 16:13:04 -0800 Subject: [PATCH 01/10] removed bitstream.h dependency --- programs/fileio.c | 48 +++++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/programs/fileio.c b/programs/fileio.c index 6039a75a..4e5d7c41 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -36,18 +36,21 @@ # include #endif -#include "bitstream.h" #include "mem.h" #include "fileio.h" #include "util.h" + #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */ #include "zstd.h" +#include "zstd_errors.h" /* ZSTD_error_frameParameter_windowTooLarge */ + #if defined(ZSTD_GZCOMPRESS) || defined(ZSTD_GZDECOMPRESS) # include # if !defined(z_const) # define z_const # endif #endif + #if defined(ZSTD_LZMACOMPRESS) || defined(ZSTD_LZMADECOMPRESS) # include #endif @@ -1142,33 +1145,46 @@ static unsigned FIO_passThrough(FILE* foutput, FILE* finput, void* buffer, size_ return 0; } -static void FIO_zstdErrorHelp(dRess_t* ress, size_t ret, char const* srcFileName) +/* FIO_highbit64() : + * gives position of highest bit. + * note : only works for v > 0 ! + */ +static unsigned FIO_highbit64(unsigned long long v) +{ + unsigned count = 0; + assert(v != 0); + v >>= 1; + while (v) { v >>= 1; count++; } + return count; +} + +/* FIO_zstdErrorHelp() : + * detailed error message when requested window size is too large */ +static void FIO_zstdErrorHelp(dRess_t* ress, size_t err, char const* srcFileName) { ZSTD_frameHeader header; - /* No special help for these errors */ - if (ZSTD_getErrorCode(ret) != ZSTD_error_frameParameter_windowTooLarge) + + /* Help message only for one specific error */ + if (ZSTD_getErrorCode(err) != ZSTD_error_frameParameter_windowTooLarge) return; + /* Try to decode the frame header */ - ret = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded); - if (ret == 0) { - U32 const windowSize = (U32)header.windowSize; - U32 const windowLog = BIT_highbit32(windowSize) + ((windowSize & (windowSize - 1)) != 0); - U32 const windowMB = (windowSize >> 20) + ((windowSize & ((1 MB) - 1)) != 0); - assert(header.windowSize <= (U64)((U32)-1)); + err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded); + if (err == 0) { + U64 const windowSize = header.windowSize; + U32 const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0); + U32 const windowMB = (U32)((windowSize >> 20) + ((windowSize & ((1 MB) - 1)) != 0)); + assert(windowSize < (U64)(1ULL << 52)); assert(g_memLimit > 0); DISPLAYLEVEL(1, "%s : Window size larger than maximum : %llu > %u\n", - srcFileName, header.windowSize, g_memLimit); + srcFileName, windowSize, g_memLimit); if (windowLog <= ZSTD_WINDOWLOG_MAX) { DISPLAYLEVEL(1, "%s : Use --long=%u or --memory=%uMB\n", srcFileName, windowLog, windowMB); return; } - } else if (ZSTD_getErrorCode(ret) != ZSTD_error_frameParameter_windowTooLarge) { - DISPLAYLEVEL(1, "%s : Error decoding frame header to read window size : %s\n", - srcFileName, ZSTD_getErrorName(ret)); - return; } - DISPLAYLEVEL(1, "%s : Window log larger than ZSTD_WINDOWLOG_MAX=%u not supported\n", + DISPLAYLEVEL(1, "%s : Window log larger than ZSTD_WINDOWLOG_MAX=%u; not supported\n", srcFileName, ZSTD_WINDOWLOG_MAX); } From 60fa90b6c04c69cbd9312155f70dad46f2ce92b5 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 1 Feb 2018 16:13:31 -0800 Subject: [PATCH 02/10] zstdmt: added ability to change compression parameters during compression --- lib/compress/zstd_compress.c | 7 +++++++ lib/compress/zstdmt_compress.c | 21 ++++++++++++++++++++- lib/compress/zstdmt_compress.h | 27 +++++++++++++++++++++------ lib/zstd.h | 2 +- 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 4a825f65..18da77f8 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -476,6 +476,9 @@ size_t ZSTD_CCtxParam_setParameter( /** ZSTD_CCtx_setParametersUsingCCtxParams() : * just applies `params` into `cctx` * no action is performed, parameters are merely stored. + * If ZSTDMT is enabled, parameters are pushed to cctx->mtctx. + * This is possible even if a compression is ongoing. + * In which case, new parameters will be applied on the fly, starting with next compression job. */ size_t ZSTD_CCtx_setParametersUsingCCtxParams( ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params) @@ -484,6 +487,10 @@ size_t ZSTD_CCtx_setParametersUsingCCtxParams( if (cctx->cdict) return ERROR(stage_wrong); cctx->requestedParams = *params; +#ifdef ZSTD_MULTITHREAD + if (cctx->mtctx) + ZSTDMT_MTCtx_setParametersUsingCCtxParams(cctx->mtctx, params); +#endif return 0; } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index ec5f0bbb..32390a61 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -664,6 +664,25 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) return jobParams; } +/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams() : + * Apply a ZSTD_CCtx_params to the compression context. + * This entry point is accessed while compression is ongoing, + * new parameters will be applied to next compression job. + * However, following parameters are NOT updated : + * - window size + * - pledgedSrcSize + * - nb threads + * - job size + * - overlap size + */ +void ZSTDMT_MTCtx_setParametersUsingCCtxParams(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params) +{ + U32 const wlog = mtctx->params.cParams.windowLog; + mtctx->params = *params; + mtctx->params.cParams.windowLog = wlog; /* Do not modify windowLog ! */ + /* note : other parameters not updated are simply not used beyond initialization */ +} + /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ @@ -856,7 +875,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, const ZSTD_CDict* cdict, - ZSTD_parameters const params, + ZSTD_parameters params, unsigned overlapLog) { ZSTD_CCtx_params cctxParams = mtctx->params; diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 34dfc488..dcff1138 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -38,7 +38,7 @@ ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx); ZSTDLIB_API size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx); -/* === Simple buffer-to-butter one-pass function === */ +/* === Simple one-pass compression function === */ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, @@ -50,7 +50,7 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, /* === Streaming functions === */ ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); -ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it may change in the future, to mean "empty" */ +ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it will change in the future to mean "empty" */ ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input); @@ -68,7 +68,7 @@ ZSTDLIB_API size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, const ZSTD_CDict* cdict, - ZSTD_parameters const params, + ZSTD_parameters params, unsigned overlapLog); ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, @@ -109,15 +109,30 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTD_EndDirective endOp); -/* === Private definitions; never ever use directly === */ +/* ======================================================== + * === Private interface, for use by ZSTD_compress.c === + * === Not exposed in libzstd. Never invoke directly === + * ======================================================== */ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value); /* ZSTDMT_CCtxParam_setNbThreads() - * Set nbThreads, and clamp it correctly, - * also reset jobSize and overlapLog */ + * Set nbThreads, and clamp it. + * Also reset jobSize and overlapLog */ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads); +/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams() : + * Apply a ZSTD_CCtx_params to the compression context. + * This works even during compression, and will be applied to next compression job. + * However, the following parameters will NOT be updated after compression has been started : + * - window size + * - pledgedSrcSize + * - nb threads + * - job size + * - overlap size + */ +void ZSTDMT_MTCtx_setParametersUsingCCtxParams(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params); + /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ diff --git a/lib/zstd.h b/lib/zstd.h index 29c3ced1..be73d25b 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1198,7 +1198,7 @@ ZSTDLIB_API size_t ZSTD_compress_generic_simpleArgs ( ZSTDLIB_API ZSTD_CCtx_params* ZSTD_createCCtxParams(void); /*! ZSTD_resetCCtxParams() : - * Reset params to default, with the default compression level. + * Reset params to default values. */ ZSTDLIB_API size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params); From 4b6a94f0ccb3de6ba0ff3b840febecefb6285d81 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 1 Feb 2018 17:07:27 -0800 Subject: [PATCH 03/10] clarified comments on LDM parameters --- lib/zstd.h | 34 +++++++++++++++++++--------------- programs/fileio.c | 9 ++++----- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/lib/zstd.h b/lib/zstd.h index be73d25b..c3630eb0 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1015,7 +1015,7 @@ typedef enum { /* advanced parameters - may not remain available after API update */ ZSTD_p_forceMaxWindow=1100, /* Force back-reference distances to remain < windowSize, * even when referencing into Dictionary content (default:0) */ - ZSTD_p_enableLongDistanceMatching=1200, /* Enable long distance matching. + ZSTD_p_enableLongDistanceMatching=1200, /* Enable long distance matching. * This parameter is designed to improve the compression * ratio for large inputs with long distance matches. * This increases the memory usage as well as window size. @@ -1025,25 +1025,29 @@ typedef enum { * other LDM parameters. Setting the compression level * after this parameter overrides the window log, though LDM * will remain enabled until explicitly disabled. */ - ZSTD_p_ldmHashLog, /* Size of the table for long distance matching, as a power of 2. - * Larger values increase memory usage and compression ratio, but decrease - * compression speed. - * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX - * (default: windowlog - 7). */ - ZSTD_p_ldmMinMatch, /* Minimum size of searched matches for long distance matcher. - * Larger/too small values usually decrease compression ratio. - * Must be clamped between ZSTD_LDM_MINMATCH_MIN - * and ZSTD_LDM_MINMATCH_MAX (default: 64). */ - ZSTD_p_ldmBucketSizeLog, /* Log size of each bucket in the LDM hash table for collision resolution. - * Larger values usually improve collision resolution but may decrease - * compression speed. - * The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3). */ + ZSTD_p_ldmHashLog, /* Size of the table for long distance matching, as a power of 2. + * Larger values increase memory usage and compression ratio, but decrease + * compression speed. + * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX + * (default: windowlog - 7). + * Special: value 0 means "do not change ldmHashLog". */ + ZSTD_p_ldmMinMatch, /* Minimum size of searched matches for long distance matcher. + * Larger/too small values usually decrease compression ratio. + * Must be clamped between ZSTD_LDM_MINMATCH_MIN + * and ZSTD_LDM_MINMATCH_MAX (default: 64). + * Special: value 0 means "do not change ldmMinMatch". */ + ZSTD_p_ldmBucketSizeLog, /* Log size of each bucket in the LDM hash table for collision resolution. + * Larger values usually improve collision resolution but may decrease + * compression speed. + * The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3). + * note : 0 is a valid value */ ZSTD_p_ldmHashEveryLog, /* Frequency of inserting/looking up entries in the LDM hash table. * The default is MAX(0, (windowLog - ldmHashLog)) to * optimize hash table usage. * Larger values improve compression speed. Deviating far from the * default value will likely result in a decrease in compression ratio. - * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN. */ + * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN. + * note : 0 is a valid value */ } ZSTD_cParameter; diff --git a/programs/fileio.c b/programs/fileio.c index 4e5d7c41..8bfdc20e 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -430,7 +430,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, if (!ress.srcBuffer || !ress.dstBuffer) EXM_THROW(31, "allocation error : not enough memory"); - /* Advances parameters, including dictionary */ + /* Advanced parameters, including dictionary */ { void* dictBuffer; size_t const dictBuffSize = FIO_createDictBuffer(&dictBuffer, dictFileName); /* works with dictFileName==NULL */ if (dictFileName && (dictBuffer==NULL)) @@ -442,8 +442,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, /* compression level */ CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, cLevel) ); /* long distance matching */ - CHECK( ZSTD_CCtx_setParameter( - ress.cctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag) ); + CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_ldmHashLog, g_ldmHashLog) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch) ); if (g_ldmBucketSizeLog != FIO_LDM_PARAM_NOTSET) { @@ -461,13 +460,13 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_targetLength, comprParams->targetLength) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionStrategy, (U32)comprParams->strategy) ); /* multi-threading */ +#ifdef ZSTD_MULTITHREAD DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) ); -#ifdef ZSTD_MULTITHREAD CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) ); #endif /* dictionary */ - CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* just for dictionary loading, for compression parameters adaptation */ + CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* set the value temporarily for dictionary loading, to adapt compression parameters */ CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) ); CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, ZSTD_CONTENTSIZE_UNKNOWN) ); /* reset */ From 209df52ba29a0f1eacd9899dda5fcc0c0a1c313f Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 1 Feb 2018 19:29:30 -0800 Subject: [PATCH 04/10] Changed nbThreads for nbWorkers This makes it easier to explain that nbWorkers=0 --> single-threaded mode, while nbWorkers=1 --> asynchronous mode (one mode thread on top of the "main" caller thread). No need for an additional asynchronous mode flag. nbWorkers>=2 works the same as nbThreads>=2 previously. --- doc/zstd_manual.html | 57 +++++++------ lib/compress/zstd_compress.c | 56 +++++-------- lib/compress/zstd_compress_internal.h | 5 +- lib/compress/zstdmt_compress.c | 111 +++++++++++++------------- lib/compress/zstdmt_compress.h | 18 ++--- lib/zstd.h | 30 +++---- programs/bench.c | 10 +-- programs/bench.h | 2 +- programs/fileio.c | 17 ++-- programs/fileio.h | 2 +- programs/zstdcli.c | 22 ++--- tests/fullbench.c | 4 +- tests/fuzzer.c | 4 +- tests/roundTripCrash.c | 2 +- tests/zstreamtest.c | 8 +- 15 files changed, 165 insertions(+), 183 deletions(-) diff --git a/doc/zstd_manual.html b/doc/zstd_manual.html index 1dea249d..74856823 100644 --- a/doc/zstd_manual.html +++ b/doc/zstd_manual.html @@ -416,7 +416,7 @@ size_t ZSTD_estimateDCtxSize(void); It will also consider src size to be arbitrarily "large", which is worst case. If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation. ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. - ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is > 1. + ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1. Note : CCtx size estimation is only correct for single-threaded compression.


@@ -429,7 +429,7 @@ size_t ZSTD_estimateDStreamSize_fromFrame(const void* src, size_t srcSize); It will also consider src size to be arbitrarily "large", which is worst case. If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation. ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. - ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is set to a value > 1. + ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1. Note : CStream size estimation is only correct for single-threaded compression. ZSTD_DStream memory budget depends on window Size. This information can be passed manually, using ZSTD_estimateDStreamSize, @@ -800,18 +800,11 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long /* multi-threading parameters */ /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD). * They return an error otherwise. */ - ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) + ZSTD_p_nbWorkers=400, /* Select how many threads will be spawned to compress in parallel. + * Triggers asynchronous mode, even with nbWorkers = 1. + * Can only be set to a value >= 1 if ZSTD_MULTITHREAD is enabled. * More threads improve speed, but also increase memory usage. - * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. - * Special: value 0 means "do not change nbThreads" */ - ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" : - * it finishes its job as much as possible, and only then gives back control to caller. - * In contrast, multi-thread is by default "non-blocking" : - * it takes some input, flush some output if available, and immediately gives back control to caller. - * Compression work is performed in parallel, within worker threads. - * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking) - * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected. - * It allows the caller to do other tasks while the worker thread compresses in parallel. */ + * Default value is `0`, aka "blocking mode" : no worker is spawned, compression is performed inside Caller's thread */ ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. * Each compression job is completed in parallel, so indirectly controls the nb of active threads. * 0 means default, which is dynamically determined based on compression parameters. @@ -823,7 +816,7 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long /* advanced parameters - may not remain available after API update */ ZSTD_p_forceMaxWindow=1100, /* Force back-reference distances to remain < windowSize, * even when referencing into Dictionary content (default:0) */ - ZSTD_p_enableLongDistanceMatching=1200, /* Enable long distance matching. + ZSTD_p_enableLongDistanceMatching=1200, /* Enable long distance matching. * This parameter is designed to improve the compression * ratio for large inputs with long distance matches. * This increases the memory usage as well as window size. @@ -833,25 +826,29 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long * other LDM parameters. Setting the compression level * after this parameter overrides the window log, though LDM * will remain enabled until explicitly disabled. */ - ZSTD_p_ldmHashLog, /* Size of the table for long distance matching, as a power of 2. - * Larger values increase memory usage and compression ratio, but decrease - * compression speed. - * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX - * (default: windowlog - 7). */ - ZSTD_p_ldmMinMatch, /* Minimum size of searched matches for long distance matcher. - * Larger/too small values usually decrease compression ratio. - * Must be clamped between ZSTD_LDM_MINMATCH_MIN - * and ZSTD_LDM_MINMATCH_MAX (default: 64). */ - ZSTD_p_ldmBucketSizeLog, /* Log size of each bucket in the LDM hash table for collision resolution. - * Larger values usually improve collision resolution but may decrease - * compression speed. - * The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3). */ + ZSTD_p_ldmHashLog, /* Size of the table for long distance matching, as a power of 2. + * Larger values increase memory usage and compression ratio, but decrease + * compression speed. + * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX + * (default: windowlog - 7). + * Special: value 0 means "do not change ldmHashLog". */ + ZSTD_p_ldmMinMatch, /* Minimum size of searched matches for long distance matcher. + * Larger/too small values usually decrease compression ratio. + * Must be clamped between ZSTD_LDM_MINMATCH_MIN + * and ZSTD_LDM_MINMATCH_MAX (default: 64). + * Special: value 0 means "do not change ldmMinMatch". */ + ZSTD_p_ldmBucketSizeLog, /* Log size of each bucket in the LDM hash table for collision resolution. + * Larger values usually improve collision resolution but may decrease + * compression speed. + * The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3). + * note : 0 is a valid value */ ZSTD_p_ldmHashEveryLog, /* Frequency of inserting/looking up entries in the LDM hash table. * The default is MAX(0, (windowLog - ldmHashLog)) to * optimize hash table usage. * Larger values improve compression speed. Deviating far from the * default value will likely result in a decrease in compression ratio. - * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN. */ + * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN. + * note : 0 is a valid value */ } ZSTD_cParameter;
@@ -1000,7 +997,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t


size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params);
-

Reset params to default, with the default compression level. +

Reset params to default values.


@@ -1030,7 +1027,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t

Apply a set of ZSTD_CCtx_params to the compression context. This must be done before the dictionary is loaded. The pledgedSrcSize is treated as unknown. - Multithreading parameters are applied only if nbThreads > 1. + Multithreading parameters are applied only if nbWorkers >= 1.


diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 18da77f8..e66b4df8 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -281,13 +281,12 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v * default : 0 when using a CDict, 1 when using a Prefix */ return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); - case ZSTD_p_nbThreads: - if ((value > 1) && cctx->staticSize) { + case ZSTD_p_nbWorkers: + if ((value>0) && cctx->staticSize) { return ERROR(parameter_unsupported); /* MT not compatible with static alloc */ } return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); - case ZSTD_p_nonBlockingMode: case ZSTD_p_jobSize: case ZSTD_p_overlapSizeLog: return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); @@ -403,21 +402,12 @@ size_t ZSTD_CCtxParam_setParameter( CCtxParams->forceWindow = (value > 0); return CCtxParams->forceWindow; - case ZSTD_p_nbThreads : - if (value == 0) return CCtxParams->nbThreads; + case ZSTD_p_nbWorkers : #ifndef ZSTD_MULTITHREAD - if (value > 1) return ERROR(parameter_unsupported); - return 1; + if (value > 0) return ERROR(parameter_unsupported); + return 0; #else - return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value); -#endif - - case ZSTD_p_nonBlockingMode : -#ifndef ZSTD_MULTITHREAD - return ERROR(parameter_unsupported); -#else - CCtxParams->nonBlockingMode = (value>0); - return CCtxParams->nonBlockingMode; + return ZSTDMT_CCtxParam_setNbWorkers(CCtxParams, value); #endif case ZSTD_p_jobSize : @@ -489,7 +479,7 @@ size_t ZSTD_CCtx_setParametersUsingCCtxParams( cctx->requestedParams = *params; #ifdef ZSTD_MULTITHREAD if (cctx->mtctx) - ZSTDMT_MTCtx_setParametersUsingCCtxParams(cctx->mtctx, params); + ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(cctx->mtctx, params); #endif return 0; @@ -687,7 +677,7 @@ static size_t ZSTD_sizeof_matchState(ZSTD_compressionParameters const* cParams, size_t ZSTD_estimateCCtxSize_usingCCtxParams(const ZSTD_CCtx_params* params) { /* Estimate CCtx size is supported for single-threaded compression only. */ - if (params->nbThreads > 1) { return ERROR(GENERIC); } + if (params->nbWorkers > 0) { return ERROR(GENERIC); } { ZSTD_compressionParameters const cParams = ZSTD_getCParamsFromCCtxParams(*params, 0, 0); size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << cParams.windowLog); @@ -736,7 +726,7 @@ size_t ZSTD_estimateCCtxSize(int compressionLevel) size_t ZSTD_estimateCStreamSize_usingCCtxParams(const ZSTD_CCtx_params* params) { - if (params->nbThreads > 1) { return ERROR(GENERIC); } + if (params->nbWorkers > 0) { return ERROR(GENERIC); } { size_t const CCtxSize = ZSTD_estimateCCtxSize_usingCCtxParams(params); size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params->cParams.windowLog); size_t const inBuffSize = ((size_t)1 << params->cParams.windowLog) + blockSize; @@ -775,7 +765,7 @@ size_t ZSTD_estimateCStreamSize(int compressionLevel) { ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx) { #ifdef ZSTD_MULTITHREAD - if ((cctx->appliedParams.nbThreads > 1) || (cctx->appliedParams.nonBlockingMode)) { + if (cctx->appliedParams.nbWorkers > 0) { return ZSTDMT_getFrameProgression(cctx->mtctx); } #endif @@ -3166,28 +3156,26 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, #ifdef ZSTD_MULTITHREAD if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) { - params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ - params.nonBlockingMode = 0; + params.nbWorkers = 0; /* do not invoke multi-threading when src size is too small */ } - if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) { - if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) { - DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u", - params.nbThreads); + if (params.nbWorkers > 0) { + if (cctx->mtctx == NULL || (params.nbWorkers != ZSTDMT_getNbWorkers(cctx->mtctx))) { + DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbWorkers=%u", + params.nbWorkers); if (cctx->mtctx != NULL) - DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u", - ZSTDMT_getNbThreads(cctx->mtctx)); + DEBUGLOG(4, "ZSTD_compress_generic: previous nbWorkers was %u", + ZSTDMT_getNbWorkers(cctx->mtctx)); ZSTDMT_freeCCtx(cctx->mtctx); - cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem); + cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbWorkers, cctx->customMem); if (cctx->mtctx == NULL) return ERROR(memory_allocation); } - DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbThreads=%u", params.nbThreads); + DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbWorkers=%u", params.nbWorkers); CHECK_F( ZSTDMT_initCStream_internal( cctx->mtctx, prefixDict.dict, prefixDict.dictSize, ZSTD_dm_rawContent, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); cctx->streamStage = zcss_load; - cctx->appliedParams.nbThreads = params.nbThreads; - cctx->appliedParams.nonBlockingMode = params.nonBlockingMode; + cctx->appliedParams.nbWorkers = params.nbWorkers; } else #endif { CHECK_F( ZSTD_resetCStream_internal( @@ -3195,12 +3183,12 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, prefixDict.dictMode, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); assert(cctx->streamStage == zcss_load); - assert(cctx->appliedParams.nbThreads <= 1); + assert(cctx->appliedParams.nbWorkers == 0); } } /* compression stage */ #ifdef ZSTD_MULTITHREAD - if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) { + if (cctx->appliedParams.nbWorkers > 0) { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); if ( ZSTD_isError(flushMin) || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index cf593658..eb651fb5 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -145,12 +145,11 @@ struct ZSTD_CCtx_params_s { ZSTD_frameParameters fParams; int compressionLevel; - U32 forceWindow; /* force back-references to respect limit of + int forceWindow; /* force back-references to respect limit of * 1<= 1 , checked by caller ZSTDMT_createCCtx() */ -static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads, + * implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */ +static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbWorkers, ZSTD_customMem cMem) { ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc( - sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem); + sizeof(ZSTDMT_CCtxPool) + (nbWorkers-1)*sizeof(ZSTD_CCtx*), cMem); + assert(nbWorkers > 0); if (!cctxPool) return NULL; if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) { ZSTD_free(cctxPool, cMem); return NULL; } cctxPool->cMem = cMem; - cctxPool->totalCCtx = nbThreads; + cctxPool->totalCCtx = nbWorkers; cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } - DEBUGLOG(3, "cctxPool created, with %u threads", nbThreads); + DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers); return cctxPool; } @@ -260,15 +261,16 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads, static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) { ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); - { unsigned const nbThreads = cctxPool->totalCCtx; + { unsigned const nbWorkers = cctxPool->totalCCtx; size_t const poolSize = sizeof(*cctxPool) - + (nbThreads-1)*sizeof(ZSTD_CCtx*); + + (nbWorkers-1) * sizeof(ZSTD_CCtx*); unsigned u; size_t totalCCtxSize = 0; - for (u=0; ucctx[u]); } ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex); + assert(nbWorkers > 0); return poolSize + totalCCtxSize; } } @@ -295,8 +297,8 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) if (pool->availCCtx < pool->totalCCtx) pool->cctx[pool->availCCtx++] = cctx; else { - /* pool overflow : should not happen, since totalCCtx==nbThreads */ - DEBUGLOG(5, "CCtx pool overflow : free cctx"); + /* pool overflow : should not happen, since totalCCtx==nbWorkers */ + DEBUGLOG(4, "CCtx pool overflow : free cctx"); ZSTD_freeCCtx(cctx); } ZSTD_pthread_mutex_unlock(&pool->poolMutex); @@ -502,52 +504,52 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom return jobTable; } -/* ZSTDMT_CCtxParam_setNbThreads(): +/* ZSTDMT_CCtxParam_setNbWorkers(): * Internal use only */ -size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads) +size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) { - if (nbThreads > ZSTDMT_NBTHREADS_MAX) nbThreads = ZSTDMT_NBTHREADS_MAX; - if (nbThreads < 1) nbThreads = 1; - params->nbThreads = nbThreads; + if (nbWorkers > ZSTDMT_NBWORKERS_MAX) nbWorkers = ZSTDMT_NBWORKERS_MAX; + if (nbWorkers < 1) nbWorkers = 1; + params->nbWorkers = nbWorkers; params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT; params->jobSize = 0; - return nbThreads; + return nbWorkers; } -ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) +ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem) { ZSTDMT_CCtx* mtctx; - U32 nbJobs = nbThreads + 2; - DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbThreads = %u)", nbThreads); + U32 nbJobs = nbWorkers + 2; + DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbWorkers = %u)", nbWorkers); - if (nbThreads < 1) return NULL; - nbThreads = MIN(nbThreads , ZSTDMT_NBTHREADS_MAX); + if (nbWorkers < 1) return NULL; + nbWorkers = MIN(nbWorkers , ZSTDMT_NBWORKERS_MAX); if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL)) /* invalid custom allocator */ return NULL; mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem); if (!mtctx) return NULL; - ZSTDMT_CCtxParam_setNbThreads(&mtctx->params, nbThreads); + ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers); mtctx->cMem = cMem; mtctx->allJobsCompleted = 1; - mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); + mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem); mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem); assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; - mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); - mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); + mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); + mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem); if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) { ZSTDMT_freeCCtx(mtctx); return NULL; } - DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads); + DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers); return mtctx; } -ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads) +ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers) { - return ZSTDMT_createCCtx_advanced(nbThreads, ZSTD_defaultCMem); + return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem); } @@ -649,8 +651,8 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, } } -/* Sets parameters relevant to the compression job, initializing others to - * default values. Notably, nbThreads should probably be zero. */ +/* Sets parameters relevant to the compression job, + * initializing others to default values. */ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) { ZSTD_CCtx_params jobParams; @@ -664,7 +666,7 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) return jobParams; } -/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams() : +/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing() : * Apply a ZSTD_CCtx_params to the compression context. * This entry point is accessed while compression is ongoing, * new parameters will be applied to next compression job. @@ -675,21 +677,23 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) * - job size * - overlap size */ -void ZSTDMT_MTCtx_setParametersUsingCCtxParams(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params) +void ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params) { U32 const wlog = mtctx->params.cParams.windowLog; + U32 const nbWorkers = mtctx->params.nbWorkers; mtctx->params = *params; - mtctx->params.cParams.windowLog = wlog; /* Do not modify windowLog ! */ + mtctx->params.cParams.windowLog = wlog; /* Do not modify windowLog ! Frame must keep same wlog during the whole process ! */ + mtctx->params.nbWorkers = nbWorkers; /* Do not modify nbWorkers, it must remain synchronized with CCtx Pool ! */ /* note : other parameters not updated are simply not used beyond initialization */ } -/* ZSTDMT_getNbThreads(): +/* ZSTDMT_getNbWorkers(): * @return nb threads currently active in mtctx. * mtctx must be valid */ -unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) +unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx) { assert(mtctx != NULL); - return mtctx->params.nbThreads; + return mtctx->params.nbWorkers; } /* ZSTDMT_getFrameProgression(): @@ -728,15 +732,15 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ -static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbThreads) { - assert(nbThreads>0); +static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbWorkers) { + assert(nbWorkers>0); { size_t const jobSizeTarget = (size_t)1 << (windowLog + 2); size_t const jobMaxSize = jobSizeTarget << 2; - size_t const passSizeMax = jobMaxSize * nbThreads; + size_t const passSizeMax = jobMaxSize * nbWorkers; unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; - unsigned const nbJobsLarge = multiplier * nbThreads; + unsigned const nbJobsLarge = multiplier * nbWorkers; unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1; - unsigned const nbJobsSmall = MIN(nbJobsMax, nbThreads); + unsigned const nbJobsSmall = MIN(nbJobsMax, nbWorkers); return (multiplier>1) ? nbJobsLarge : nbJobsSmall; } } @@ -753,7 +757,7 @@ static size_t ZSTDMT_compress_advanced_internal( ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); - unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbThreads); + unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbWorkers); size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; @@ -761,13 +765,13 @@ static size_t ZSTDMT_compress_advanced_internal( unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */ size_t frameStartPos = 0, dstBufferPos = 0; XXH64_state_t xxh64; - assert(jobParams.nbThreads == 0); - assert(mtctx->cctxPool->totalCCtx == params.nbThreads); + assert(jobParams.nbWorkers == 0); + assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", nbJobs, (U32)proposedJobSize, (U32)avgJobSize); - if ((nbJobs==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ + if ((nbJobs==1) | (params.nbWorkers<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); @@ -911,11 +915,12 @@ size_t ZSTDMT_initCStream_internal( const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) { - DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u)", (U32)pledgedSrcSize); + DEBUGLOG(2, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)", + (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ - assert(mtctx->cctxPool->totalCCtx == params.nbThreads); + assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ if (params.jobSize == 0) { if (params.cParams.windowLog >= 29) @@ -928,12 +933,12 @@ size_t ZSTDMT_initCStream_internal( if (mtctx->singleBlockingThread) { ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); - assert(singleThreadParams.nbThreads == 0); + assert(singleThreadParams.nbWorkers == 0); return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], dict, dictSize, cdict, singleThreadParams, pledgedSrcSize); } - DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads); + DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers); if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ ZSTDMT_waitForAllJobsCompleted(mtctx); @@ -1012,8 +1017,6 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) { if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; - if (mtctx->params.nbThreads==1) - return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize); return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params, pledgedSrcSize); } diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index dcff1138..3fef2ed6 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -30,8 +30,8 @@ /* === Memory management === */ typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx; -ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads); -ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, +ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers); +ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem); ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx); @@ -116,12 +116,12 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value); -/* ZSTDMT_CCtxParam_setNbThreads() - * Set nbThreads, and clamp it. +/* ZSTDMT_CCtxParam_setNbWorkers() + * Set nbWorkers, and clamp it. * Also reset jobSize and overlapLog */ -size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads); +size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers); -/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams() : +/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing() : * Apply a ZSTD_CCtx_params to the compression context. * This works even during compression, and will be applied to next compression job. * However, the following parameters will NOT be updated after compression has been started : @@ -131,12 +131,12 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread * - job size * - overlap size */ -void ZSTDMT_MTCtx_setParametersUsingCCtxParams(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params); +void ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params); -/* ZSTDMT_getNbThreads(): +/* ZSTDMT_getNbWorkers(): * @return nb threads currently active in mtctx. * mtctx must be valid */ -unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); +unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx); /* ZSTDMT_getFrameProgression(): * tells how much data has been consumed (input) and produced (output) for current frame. diff --git a/lib/zstd.h b/lib/zstd.h index c3630eb0..be93a7ee 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -506,7 +506,7 @@ ZSTDLIB_API size_t ZSTD_sizeof_DDict(const ZSTD_DDict* ddict); * It will also consider src size to be arbitrarily "large", which is worst case. * If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation. * ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. - * ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is > 1. + * ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1. * Note : CCtx size estimation is only correct for single-threaded compression. */ ZSTDLIB_API size_t ZSTD_estimateCCtxSize(int compressionLevel); ZSTDLIB_API size_t ZSTD_estimateCCtxSize_usingCParams(ZSTD_compressionParameters cParams); @@ -518,7 +518,7 @@ ZSTDLIB_API size_t ZSTD_estimateDCtxSize(void); * It will also consider src size to be arbitrarily "large", which is worst case. * If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation. * ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. - * ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is set to a value > 1. + * ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1. * Note : CStream size estimation is only correct for single-threaded compression. * ZSTD_DStream memory budget depends on window Size. * This information can be passed manually, using ZSTD_estimateDStreamSize, @@ -992,18 +992,13 @@ typedef enum { /* multi-threading parameters */ /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD). * They return an error otherwise. */ - ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) - * More threads improve speed, but also increase memory usage. - * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. - * Special: value 0 means "do not change nbThreads" */ - ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" : - * it finishes its job as much as possible, and only then gives back control to caller. - * In contrast, multi-thread is by default "non-blocking" : - * it takes some input, flush some output if available, and immediately gives back control to caller. - * Compression work is performed in parallel, within worker threads. - * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking) - * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected. - * It allows the caller to do other tasks while the worker thread compresses in parallel. */ + ZSTD_p_nbWorkers=400, /* Select how many threads will be spawned to compress in parallel. + * When nbWorkers >= 1, triggers asynchronous mode : + * ZSTD_compress_generic() consumes some input, flush some output if possible, and immediately gives back control to caller, + * while compression work is performed in parallel, within worker threads. + * (note : a strong exception to this rule is when first invocation sets ZSTD_e_end : it becomes a blocking call). + * More workers improve speed, but also increase memory usage. + * Default value is `0`, aka "single-threaded mode" : no worker is spawned, compression is performed inside Caller's thread, all invocations are blocking */ ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. * Each compression job is completed in parallel, so indirectly controls the nb of active threads. * 0 means default, which is dynamically determined based on compression parameters. @@ -1231,9 +1226,10 @@ ZSTDLIB_API size_t ZSTD_CCtxParam_setParameter(ZSTD_CCtx_params* params, ZSTD_cP /*! ZSTD_CCtx_setParametersUsingCCtxParams() : * Apply a set of ZSTD_CCtx_params to the compression context. - * This must be done before the dictionary is loaded. - * The pledgedSrcSize is treated as unknown. - * Multithreading parameters are applied only if nbThreads > 1. + * This can be done even after compression is started, + * if nbWorkers==0, this will have no impact until a new compression is started. + * if nbWorkers>=1, new parameters will be picked up at next job, + * with a few restrictions (windowLog, pledgedSrcSize, nbWorkers, jobSize, and overlapLog are not updated). */ ZSTDLIB_API size_t ZSTD_CCtx_setParametersUsingCCtxParams( ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params); diff --git a/programs/bench.c b/programs/bench.c index 843920c8..bf3dcb47 100644 --- a/programs/bench.c +++ b/programs/bench.c @@ -122,12 +122,12 @@ void BMK_setBlockSize(size_t blockSize) void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); } -static U32 g_nbThreads = 1; -void BMK_setNbThreads(unsigned nbThreads) { +static U32 g_nbWorkers = 0; +void BMK_setNbWorkers(unsigned nbWorkers) { #ifndef ZSTD_MULTITHREAD - if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); + if (nbWorkers > 0) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); #endif - g_nbThreads = nbThreads; + g_nbWorkers = nbWorkers; } static U32 g_realTime = 0; @@ -295,7 +295,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, if (!cCompleted) { /* still some time to do compression tests */ U64 const clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1; U32 nbLoops = 0; - ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbThreads, g_nbThreads); + ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbWorkers, g_nbWorkers); ZSTD_CCtx_setParameter(ctx, ZSTD_p_compressionLevel, cLevel); ZSTD_CCtx_setParameter(ctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag); ZSTD_CCtx_setParameter(ctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch); diff --git a/programs/bench.h b/programs/bench.h index d2ec94ee..bf108701 100644 --- a/programs/bench.h +++ b/programs/bench.h @@ -22,7 +22,7 @@ int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles, const char* di /* Set Parameters */ void BMK_setNbSeconds(unsigned nbLoops); void BMK_setBlockSize(size_t blockSize); -void BMK_setNbThreads(unsigned nbThreads); +void BMK_setNbWorkers(unsigned nbWorkers); void BMK_setRealTime(unsigned priority); void BMK_setNotificationLevel(unsigned level); void BMK_setSeparateFiles(unsigned separate); diff --git a/programs/fileio.c b/programs/fileio.c index 8bfdc20e..bd880f30 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -218,23 +218,23 @@ static U32 g_removeSrcFile = 0; void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); } static U32 g_memLimit = 0; void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; } -static U32 g_nbThreads = 1; -void FIO_setNbThreads(unsigned nbThreads) { +static U32 g_nbWorkers = 1; +void FIO_setNbWorkers(unsigned nbWorkers) { #ifndef ZSTD_MULTITHREAD - if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); + if (nbWorkers > 0) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); #endif - g_nbThreads = nbThreads; + g_nbWorkers = nbWorkers; } static U32 g_blockSize = 0; void FIO_setBlockSize(unsigned blockSize) { - if (blockSize && g_nbThreads==1) + if (blockSize && g_nbWorkers==0) DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n"); g_blockSize = blockSize; } #define FIO_OVERLAP_LOG_NOTSET 9999 static U32 g_overlapLog = FIO_OVERLAP_LOG_NOTSET; void FIO_setOverlapLog(unsigned overlapLog){ - if (overlapLog && g_nbThreads==1) + if (overlapLog && g_nbWorkers==0) DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n"); g_overlapLog = overlapLog; } @@ -461,9 +461,8 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionStrategy, (U32)comprParams->strategy) ); /* multi-threading */ #ifdef ZSTD_MULTITHREAD - DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads); - CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) ); - CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) ); + DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbWorkers); + CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbWorkers, g_nbWorkers) ); #endif /* dictionary */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* set the value temporarily for dictionary loading, to adapt compression parameters */ diff --git a/programs/fileio.h b/programs/fileio.h index 9b9c7ea2..69c83f71 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -54,7 +54,7 @@ void FIO_setDictIDFlag(unsigned dictIDFlag); void FIO_setChecksumFlag(unsigned checksumFlag); void FIO_setRemoveSrcFile(unsigned flag); void FIO_setMemLimit(unsigned memLimit); -void FIO_setNbThreads(unsigned nbThreads); +void FIO_setNbWorkers(unsigned nbWorkers); void FIO_setBlockSize(unsigned blockSize); void FIO_setOverlapLog(unsigned overlapLog); void FIO_setLdmFlag(unsigned ldmFlag); diff --git a/programs/zstdcli.c b/programs/zstdcli.c index c80c36ac..015dc5e4 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -377,7 +377,7 @@ int main(int argCount, const char* argv[]) nextArgumentsAreFiles=0, ultra=0, lastCommand = 0, - nbThreads = 1, + nbWorkers = 1, setRealTimePrio = 0, separateFiles = 0, ldmFlag = 0; @@ -422,7 +422,7 @@ int main(int argCount, const char* argv[]) programName = lastNameFromPath(programName); /* preset behaviors */ - if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbThreads=0; + if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbWorkers=0; if (exeNameMatch(programName, ZSTD_UNZSTD)) operation=zom_decompress; if (exeNameMatch(programName, ZSTD_CAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* supports multiple formats */ if (exeNameMatch(programName, ZSTD_ZCAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* behave like zcat, also supports multiple formats */ @@ -515,7 +515,7 @@ int main(int argCount, const char* argv[]) continue; } #endif - if (longCommandWArg(&argument, "--threads=")) { nbThreads = readU32FromChar(&argument); continue; } + if (longCommandWArg(&argument, "--threads=")) { nbWorkers = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memlimit=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memory=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memlimit-decompress=")) { memLimit = readU32FromChar(&argument); continue; } @@ -648,7 +648,7 @@ int main(int argCount, const char* argv[]) /* nb of threads (hidden option) */ case 'T': argument++; - nbThreads = readU32FromChar(&argument); + nbWorkers = readU32FromChar(&argument); break; /* Dictionary Selection level */ @@ -716,10 +716,10 @@ int main(int argCount, const char* argv[]) /* Welcome message (if verbose) */ DISPLAYLEVEL(3, WELCOME_MESSAGE); - if (nbThreads == 0) { - /* try to guess */ - nbThreads = UTIL_countPhysicalCores(); - DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbThreads); + if (nbWorkers == 0) { + /* automatically set # workers based on # of reported cpus */ + nbWorkers = UTIL_countPhysicalCores(); + DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbWorkers); } g_utilDisplayLevel = g_displayLevel; @@ -763,7 +763,7 @@ int main(int argCount, const char* argv[]) BMK_setNotificationLevel(g_displayLevel); BMK_setSeparateFiles(separateFiles); BMK_setBlockSize(blockSize); - BMK_setNbThreads(nbThreads); + BMK_setNbWorkers(nbWorkers); BMK_setRealTime(setRealTimePrio); BMK_setNbSeconds(bench_nbSeconds); BMK_setLdmFlag(ldmFlag); @@ -791,7 +791,7 @@ int main(int argCount, const char* argv[]) zParams.dictID = dictID; if (cover) { int const optimize = !coverParams.k || !coverParams.d; - coverParams.nbThreads = nbThreads; + coverParams.nbThreads = nbWorkers; coverParams.zParams = zParams; operationResult = DiB_trainFromFiles(outFileName, maxDictSize, filenameTable, filenameIdx, blockSize, NULL, &coverParams, optimize); } else { @@ -835,7 +835,7 @@ int main(int argCount, const char* argv[]) FIO_setNotificationLevel(g_displayLevel); if (operation==zom_compress) { #ifndef ZSTD_NOCOMPRESS - FIO_setNbThreads(nbThreads); + FIO_setNbWorkers(nbWorkers); FIO_setBlockSize((U32)blockSize); FIO_setLdmFlag(ldmFlag); FIO_setLdmHashLog(g_ldmHashLog); diff --git a/tests/fullbench.c b/tests/fullbench.c index 2ec3ce55..199f4966 100644 --- a/tests/fullbench.c +++ b/tests/fullbench.c @@ -181,7 +181,7 @@ static size_t local_ZSTD_compress_generic_T2_end(void* dst, size_t dstCapacity, ZSTD_inBuffer buffIn; (void)buff2; ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); - ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2); + ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbWorkers, 2); buffOut.dst = dst; buffOut.size = dstCapacity; buffOut.pos = 0; @@ -198,7 +198,7 @@ static size_t local_ZSTD_compress_generic_T2_continue(void* dst, size_t dstCapac ZSTD_inBuffer buffIn; (void)buff2; ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); - ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2); + ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbWorkers, 2); buffOut.dst = dst; buffOut.size = dstCapacity; buffOut.pos = 0; diff --git a/tests/fuzzer.c b/tests/fuzzer.c index 3982f8bb..f42b8b7a 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -226,7 +226,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility, unsigned part) ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 }; CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) ); - CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) ); + CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) ); while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {} ZSTD_freeCCtx(cctx); DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ", @@ -246,7 +246,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility, unsigned part) ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 }; CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) ); - CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) ); + CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) ); CHECK_Z( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue) ); while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {} ZSTD_freeCCtx(cctx); diff --git a/tests/roundTripCrash.c b/tests/roundTripCrash.c index 180fa9b6..7d937fce 100644 --- a/tests/roundTripCrash.c +++ b/tests/roundTripCrash.c @@ -94,7 +94,7 @@ static size_t cctxParamRoundTripTest(void* resultBuff, size_t resultBuffCapacity /* Set parameters */ CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_compressionLevel, cLevel) ); - CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbThreads, 2) ); + CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbWorkers, 2) ); CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_overlapSizeLog, 5) ); diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index a4fb5589..db87e2c5 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -753,9 +753,9 @@ static int basicUnitTests(U32 seed, double compressibility) DISPLAYLEVEL(3, "OK \n"); /* Complex multithreading + dictionary test */ - { U32 const nbThreads = 2; + { U32 const nbWorkers = 2; size_t const jobSize = 4 * 1 MB; - size_t const srcSize = jobSize * nbThreads; /* we want each job to have predictable size */ + size_t const srcSize = jobSize * nbWorkers; /* we want each job to have predictable size */ size_t const segLength = 2 KB; size_t const offset = 600 KB; /* must be larger than window defined in cdict */ size_t const start = jobSize + (offset-1); @@ -763,7 +763,7 @@ static int basicUnitTests(U32 seed, double compressibility) BYTE* const dst = (BYTE*)CNBuffer + start - offset; DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads + dictionary : ", testNb++, (U32)srcSize); CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_compressionLevel, 3) ); - CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbThreads, 2) ); + CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbWorkers, nbWorkers) ); CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_jobSize, jobSize) ); assert(start > offset); assert(start + segLength < COMPRESSIBLE_NOISE_LENGTH); @@ -1672,7 +1672,7 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double U32 const nbThreadsAdjusted = (windowLogMalus < nbThreadsCandidate) ? nbThreadsCandidate - windowLogMalus : 1; U32 const nbThreads = MIN(nbThreadsAdjusted, nbThreadsMax); DISPLAYLEVEL(5, "t%u: nbThreads : %u \n", testNb, nbThreads); - CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_nbThreads, nbThreads, useOpaqueAPI) ); + CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_nbWorkers, nbThreads, useOpaqueAPI) ); if (nbThreads > 1) { U32 const jobLog = FUZ_rand(&lseed) % (testLog+1); CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_overlapSizeLog, FUZ_rand(&lseed) % 10, useOpaqueAPI) ); From 6c492af284230dfc1356bd686c84c017c9695cb1 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 1 Feb 2018 20:16:00 -0800 Subject: [PATCH 05/10] fixed minor conversion warning --- programs/fileio.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/programs/fileio.c b/programs/fileio.c index bd880f30..9fc0fc99 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -1169,7 +1169,7 @@ static void FIO_zstdErrorHelp(dRess_t* ress, size_t err, char const* srcFileName /* Try to decode the frame header */ err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded); if (err == 0) { - U64 const windowSize = header.windowSize; + unsigned long long const windowSize = header.windowSize; U32 const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0); U32 const windowMB = (U32)((windowSize >> 20) + ((windowSize & ((1 MB) - 1)) != 0)); assert(windowSize < (U64)(1ULL << 52)); From 90eca318a776a1e365955e8a2c4f3245e5840bd7 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 2 Feb 2018 14:24:56 -0800 Subject: [PATCH 06/10] fileio: create dedicated function to generate zstd frames like other formats --- lib/compress/zstd_compress.c | 65 +++++++++++---- lib/compress/zstd_compress_internal.h | 4 +- lib/compress/zstdmt_compress.c | 27 ++---- lib/compress/zstdmt_compress.h | 16 ++-- lib/zstd.h | 4 +- programs/fileio.c | 115 +++++++++++++++----------- 6 files changed, 137 insertions(+), 94 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index e66b4df8..67cc49b7 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -160,13 +160,6 @@ static void ZSTD_cLevelToCCtxParams_srcSize(ZSTD_CCtx_params* CCtxParams, U64 sr CCtxParams->compressionLevel = ZSTD_CLEVEL_CUSTOM; } -static void ZSTD_cLevelToCParams(ZSTD_CCtx* cctx) -{ - DEBUGLOG(4, "ZSTD_cLevelToCParams: level=%i", cctx->requestedParams.compressionLevel); - ZSTD_cLevelToCCtxParams_srcSize( - &cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1); -} - static void ZSTD_cLevelToCCtxParams(ZSTD_CCtx_params* CCtxParams) { DEBUGLOG(4, "ZSTD_cLevelToCCtxParams"); @@ -246,10 +239,51 @@ static ZSTD_CCtx_params ZSTD_assignParamsToCCtxParams( return ERROR(parameter_outOfBound); \ } } + +static int ZSTD_isUpdateAuthorized(ZSTD_cParameter param) +{ + switch(param) + { + case ZSTD_p_compressionLevel: + case ZSTD_p_hashLog: + case ZSTD_p_chainLog: + case ZSTD_p_searchLog: + case ZSTD_p_minMatch: + case ZSTD_p_targetLength: + case ZSTD_p_compressionStrategy: + return 1; + + case ZSTD_p_format : + case ZSTD_p_windowLog: + case ZSTD_p_contentSizeFlag: + case ZSTD_p_checksumFlag: + case ZSTD_p_dictIDFlag: + case ZSTD_p_forceMaxWindow : + case ZSTD_p_nbWorkers: + case ZSTD_p_jobSize: + case ZSTD_p_overlapSizeLog: + case ZSTD_p_enableLongDistanceMatching: + case ZSTD_p_ldmHashLog: + case ZSTD_p_ldmMinMatch: + case ZSTD_p_ldmBucketSizeLog: + case ZSTD_p_ldmHashEveryLog: + return 0; + + default: + assert(0); + return 0; + } +} + size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value) { DEBUGLOG(4, "ZSTD_CCtx_setParameter (%u, %u)", (U32)param, value); - if (cctx->streamStage != zcss_init) return ERROR(stage_wrong); + if (cctx->streamStage != zcss_init) { + if (ZSTD_isUpdateAuthorized(param)) { + cctx->cParamsChanged = 1; + } else { + return ERROR(stage_wrong); + } } switch(param) { @@ -268,7 +302,9 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v case ZSTD_p_targetLength: case ZSTD_p_compressionStrategy: if (cctx->cdict) return ERROR(stage_wrong); - if (value>0) ZSTD_cLevelToCParams(cctx); /* Can optimize if srcSize is known */ + if (value>0) { + ZSTD_cLevelToCCtxParams_srcSize(&cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1); /* Optimize cParams when srcSize is known */ + } return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_contentSizeFlag: @@ -293,7 +329,8 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v case ZSTD_p_enableLongDistanceMatching: if (cctx->cdict) return ERROR(stage_wrong); - if (value>0) ZSTD_cLevelToCParams(cctx); + if (value>0) + ZSTD_cLevelToCCtxParams_srcSize(&cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1); /* Optimize cParams when srcSize is known */ return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_ldmHashLog: @@ -477,11 +514,6 @@ size_t ZSTD_CCtx_setParametersUsingCCtxParams( if (cctx->cdict) return ERROR(stage_wrong); cctx->requestedParams = *params; -#ifdef ZSTD_MULTITHREAD - if (cctx->mtctx) - ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(cctx->mtctx, params); -#endif - return 0; } @@ -2497,7 +2529,8 @@ size_t ZSTD_compress_advanced_internal( const void* dict,size_t dictSize, ZSTD_CCtx_params params) { - DEBUGLOG(4, "ZSTD_compress_advanced_internal"); + DEBUGLOG(4, "ZSTD_compress_advanced_internal (srcSize:%u)", + (U32)srcSize); CHECK_F( ZSTD_compressBegin_internal(cctx, dict, dictSize, ZSTD_dm_auto, NULL, params, srcSize, ZSTDb_not_buffered) ); return ZSTD_compressEnd(cctx, dst, dstCapacity, src, srcSize); diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index eb651fb5..ad2a4be0 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -158,14 +158,14 @@ struct ZSTD_CCtx_params_s { /* For use with createCCtxParams() and freeCCtxParams() only */ ZSTD_customMem customMem; - }; /* typedef'd to ZSTD_CCtx_params within "zstd.h" */ struct ZSTD_CCtx_s { ZSTD_compressionStage_e stage; - U32 dictID; + int cParamsChanged; /* == 1 if cParams(except wlog) or compression level are changed in requestedParams. Triggers transmission of new params to ZSTDMT (if available) then reset to 0. */ ZSTD_CCtx_params requestedParams; ZSTD_CCtx_params appliedParams; + U32 dictID; void* workSpace; size_t workSpaceSize; size_t blockSize; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 90084b9b..3542623e 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -666,25 +666,16 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) return jobParams; } -/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing() : - * Apply a ZSTD_CCtx_params to the compression context. - * This entry point is accessed while compression is ongoing, - * new parameters will be applied to next compression job. - * However, following parameters are NOT updated : - * - window size - * - pledgedSrcSize - * - nb threads - * - job size - * - overlap size - */ -void ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params) +/*! ZSTDMT_MTCtx_updateParametersWhileCompressing() : + * Update compression level and parameters (except wlog) + * while compression is ongoing. + * New parameters will be applied to next compression job. */ +void ZSTDMT_MTCtx_updateParametersWhileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams) { - U32 const wlog = mtctx->params.cParams.windowLog; - U32 const nbWorkers = mtctx->params.nbWorkers; - mtctx->params = *params; + U32 const wlog = cParams.windowLog; + mtctx->params.cParams = cParams; mtctx->params.cParams.windowLog = wlog; /* Do not modify windowLog ! Frame must keep same wlog during the whole process ! */ - mtctx->params.nbWorkers = nbWorkers; /* Do not modify nbWorkers, it must remain synchronized with CCtx Pool ! */ - /* note : other parameters not updated are simply not used beyond initialization */ + mtctx->params.compressionLevel = compressionLevel; } /* ZSTDMT_getNbWorkers(): @@ -915,7 +906,7 @@ size_t ZSTDMT_initCStream_internal( const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) { - DEBUGLOG(2, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)", + DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)", (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 3fef2ed6..c25521d3 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -121,17 +121,11 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_param * Also reset jobSize and overlapLog */ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers); -/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing() : - * Apply a ZSTD_CCtx_params to the compression context. - * This works even during compression, and will be applied to next compression job. - * However, the following parameters will NOT be updated after compression has been started : - * - window size - * - pledgedSrcSize - * - nb threads - * - job size - * - overlap size - */ -void ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params); +/*! ZSTDMT_MTCtx_updateParametersWhileCompressing() : + * Update compression level and parameters (except wlog) + * while compression is ongoing. + * New parameters will be applied to next compression job. */ +void ZSTDMT_MTCtx_updateParametersWhileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams); /* ZSTDMT_getNbWorkers(): * @return nb threads currently active in mtctx. diff --git a/lib/zstd.h b/lib/zstd.h index be93a7ee..986b8198 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1049,8 +1049,10 @@ typedef enum { /*! ZSTD_CCtx_setParameter() : * Set one compression parameter, selected by enum ZSTD_cParameter. + * Setting a parameter is generally only possible during frame initialization (before starting compression), + * except for a few exceptions which can be updated during compression: compressionLevel, hashLog, chainLog, searchLog, minMatch, targetLength and strategy. * Note : when `value` is an enum, cast it to unsigned for proper type checking. - * @result : informational value (typically, the one being set, possibly corrected), + * @result : informational value (typically, value being set clamped correctly), * or an error code (which can be tested with ZSTD_isError()). */ ZSTDLIB_API size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value); diff --git a/programs/fileio.c b/programs/fileio.c index 9fc0fc99..f1c9b922 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -736,56 +736,22 @@ static unsigned long long FIO_compressLz4Frame(cRess_t* ress, * @return : 0 : compression completed correctly, * 1 : missing or pb opening srcFileName */ -static int FIO_compressFilename_internal(cRess_t ress, - const char* dstFileName, const char* srcFileName, int compressionLevel) +static unsigned long long +FIO_compressZstdFrame(const cRess_t* ressPtr, + const char* srcFileName, U64 fileSize, + int compressionLevel, U64* readsize) { + cRess_t const ress = *ressPtr; FILE* const srcFile = ress.srcFile; FILE* const dstFile = ress.dstFile; - U64 readsize = 0; U64 compressedfilesize = 0; - U64 const fileSize = UTIL_getFileSize(srcFileName); ZSTD_EndDirective directive = ZSTD_e_continue; - DISPLAYLEVEL(5, "%s: %u bytes \n", srcFileName, (U32)fileSize); - - switch (g_compressionType) { - case FIO_zstdCompression: - break; - - case FIO_gzipCompression: -#ifdef ZSTD_GZCOMPRESS - compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize); -#else - (void)compressionLevel; - EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n", - srcFileName); -#endif - goto finish; - - case FIO_xzCompression: - case FIO_lzmaCompression: -#ifdef ZSTD_LZMACOMPRESS - compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, g_compressionType==FIO_lzmaCompression); -#else - (void)compressionLevel; - EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n", - srcFileName); -#endif - goto finish; - - case FIO_lz4Compression: -#ifdef ZSTD_LZ4COMPRESS - compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, &readsize); -#else - (void)compressionLevel; - EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n", - srcFileName); -#endif - goto finish; - } + DISPLAYLEVEL(6, "compression using zstd format \n"); /* init */ if (fileSize != UTIL_FILESIZE_UNKNOWN) ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize); + (void)compressionLevel; (void)srcFileName; /* Main compression loop */ do { @@ -794,9 +760,9 @@ static int FIO_compressFilename_internal(cRess_t ress, size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; DISPLAYLEVEL(6, "fread %u bytes from source \n", (U32)inSize); - readsize += inSize; + *readsize += inSize; - if (inSize == 0 || (fileSize != UTIL_FILESIZE_UNKNOWN && readsize == fileSize)) + if ((inSize == 0) || (*readsize == fileSize)) directive = ZSTD_e_end; result = 1; @@ -810,7 +776,7 @@ static int FIO_compressFilename_internal(cRess_t ress, if (outBuff.pos) { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); if (sizeCheck!=outBuff.pos) - EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); + EXM_THROW(25, "Write error : cannot write compressed block"); compressedfilesize += outBuff.pos; } if (READY_FOR_UPDATE()) { @@ -824,10 +790,67 @@ static int FIO_compressFilename_internal(cRess_t ress, } } while (directive != ZSTD_e_end); -finish: + return compressedfilesize; +} + +/*! FIO_compressFilename_internal() : + * same as FIO_compressFilename_extRess(), with `ress.desFile` already opened. + * @return : 0 : compression completed correctly, + * 1 : missing or pb opening srcFileName + */ +static int +FIO_compressFilename_internal(cRess_t ress, + const char* dstFileName, const char* srcFileName, + int compressionLevel) +{ + U64 readsize = 0; + U64 compressedfilesize = 0; + U64 const fileSize = UTIL_getFileSize(srcFileName); + DISPLAYLEVEL(5, "%s: %u bytes \n", srcFileName, (U32)fileSize); + + /* compression format selection */ + switch (g_compressionType) { + default: + case FIO_zstdCompression: + compressedfilesize = FIO_compressZstdFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize); + break; + + case FIO_gzipCompression: +#ifdef ZSTD_GZCOMPRESS + compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize); +#else + (void)compressionLevel; + EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n", + srcFileName); +#endif + break; + + case FIO_xzCompression: + case FIO_lzmaCompression: +#ifdef ZSTD_LZMACOMPRESS + compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, g_compressionType==FIO_lzmaCompression); +#else + (void)compressionLevel; + EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n", + srcFileName); +#endif + break; + + case FIO_lz4Compression: +#ifdef ZSTD_LZ4COMPRESS + compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, &readsize); +#else + (void)compressionLevel; + EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n", + srcFileName); +#endif + break; + } + /* Status */ DISPLAYLEVEL(2, "\r%79s\r", ""); - DISPLAYLEVEL(2,"%-20s :%6.2f%% (%6llu => %6llu bytes, %s) \n", srcFileName, + DISPLAYLEVEL(2,"%-20s :%6.2f%% (%6llu => %6llu bytes, %s) \n", + srcFileName, (double)compressedfilesize / (readsize+(!readsize)/*avoid div by zero*/) * 100, (unsigned long long)readsize, (unsigned long long) compressedfilesize, dstFileName); From 4b525af53afbc6fd5f13b021b8f707ebe2ac4514 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 2 Feb 2018 15:58:13 -0800 Subject: [PATCH 07/10] zstdmt: applies new parameters on the fly when invoked from ZSTD_compress_generic() --- lib/compress/zstd_compress.c | 16 ++++++++++------ lib/compress/zstdmt_compress.c | 4 ++-- lib/compress/zstdmt_compress.h | 4 ++-- programs/fileio.c | 3 ++- tests/fuzzer.c | 4 ++-- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 67cc49b7..fe916b50 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3222,13 +3222,17 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, /* compression stage */ #ifdef ZSTD_MULTITHREAD if (cctx->appliedParams.nbWorkers > 0) { - size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); - if ( ZSTD_isError(flushMin) - || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ - ZSTD_startNewCompression(cctx); + if (cctx->cParamsChanged) { + ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, cctx->requestedParams.compressionLevel, cctx->requestedParams.cParams); + cctx->cParamsChanged = 0; } - return flushMin; - } + { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); + if ( ZSTD_isError(flushMin) + || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ + ZSTD_startNewCompression(cctx); + } + return flushMin; + } } #endif CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) ); DEBUGLOG(5, "completed ZSTD_compress_generic"); diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 3542623e..a2deac14 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -666,11 +666,11 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) return jobParams; } -/*! ZSTDMT_MTCtx_updateParametersWhileCompressing() : +/*! ZSTDMT_updateCParams_whileCompressing() : * Update compression level and parameters (except wlog) * while compression is ongoing. * New parameters will be applied to next compression job. */ -void ZSTDMT_MTCtx_updateParametersWhileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams) +void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams) { U32 const wlog = cParams.windowLog; mtctx->params.cParams = cParams; diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index c25521d3..4364f100 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -121,11 +121,11 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_param * Also reset jobSize and overlapLog */ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers); -/*! ZSTDMT_MTCtx_updateParametersWhileCompressing() : +/*! ZSTDMT_updateCParams_whileCompressing() : * Update compression level and parameters (except wlog) * while compression is ongoing. * New parameters will be applied to next compression job. */ -void ZSTDMT_MTCtx_updateParametersWhileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams); +void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams); /* ZSTDMT_getNbWorkers(): * @return nb threads currently active in mtctx. diff --git a/programs/fileio.c b/programs/fileio.c index f1c9b922..0cc807a1 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -781,7 +781,8 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, } if (READY_FOR_UPDATE()) { ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); - DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", + DISPLAYUPDATE(2, "\r(%i) Read :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", + compressionLevel, (U32)(zfp.ingested >> 20), (U32)(zfp.consumed >> 20), (U32)(zfp.produced >> 20), diff --git a/tests/fuzzer.c b/tests/fuzzer.c index f42b8b7a..e7c92edc 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -53,7 +53,7 @@ static const U32 nbTestsDefault = 30000; /*-************************************ * Display Macros **************************************/ -#define DISPLAY(...) fprintf(stdout, __VA_ARGS__) +#define DISPLAY(...) fprintf(stderr, __VA_ARGS__) #define DISPLAYLEVEL(l, ...) if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } static U32 g_displayLevel = 2; @@ -63,7 +63,7 @@ static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; #define DISPLAYUPDATE(l, ...) if (g_displayLevel>=l) { \ if ((UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) || (g_displayLevel>=4)) \ { g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \ - if (g_displayLevel>=4) fflush(stdout); } } + if (g_displayLevel>=4) fflush(stderr); } } #undef MIN From 5188749e1cf38c7fc1faddc5415940c0bf74c449 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 2 Feb 2018 16:31:20 -0800 Subject: [PATCH 08/10] ensure compression parameters are updated when only compression level is changed --- lib/compress/zstd_compress.c | 8 ++++---- lib/compress/zstd_compress_internal.h | 5 +++-- lib/compress/zstd_double_fast.c | 4 ++-- lib/compress/zstd_fast.c | 4 ++-- lib/compress/zstd_lazy.c | 4 ++-- lib/compress/zstdmt_compress.c | 15 +++++++++++---- 6 files changed, 24 insertions(+), 16 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index fe916b50..9e768edd 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -140,8 +140,6 @@ size_t ZSTD_sizeof_CStream(const ZSTD_CStream* zcs) /* private API call, for dictBuilder only */ const seqStore_t* ZSTD_getSeqStore(const ZSTD_CCtx* ctx) { return &(ctx->seqStore); } -#define ZSTD_CLEVEL_CUSTOM 999 - static ZSTD_compressionParameters ZSTD_getCParamsFromCCtxParams( ZSTD_CCtx_params CCtxParams, U64 srcSizeHint, size_t dictSize) { @@ -3010,7 +3008,7 @@ MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, /** ZSTD_compressStream_generic(): * internal function for all *compressStream*() variants and *compress_generic() - * non-static, because can be called from zstdmt.c + * non-static, because can be called from zstdmt_compress.c * @return : hint size for next input */ size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, ZSTD_outBuffer* output, @@ -3192,6 +3190,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, params.nbWorkers = 0; /* do not invoke multi-threading when src size is too small */ } if (params.nbWorkers > 0) { + /* mt context creation */ if (cctx->mtctx == NULL || (params.nbWorkers != ZSTDMT_getNbWorkers(cctx->mtctx))) { DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbWorkers=%u", params.nbWorkers); @@ -3202,6 +3201,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbWorkers, cctx->customMem); if (cctx->mtctx == NULL) return ERROR(memory_allocation); } + /* mt compression */ DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbWorkers=%u", params.nbWorkers); CHECK_F( ZSTDMT_initCStream_internal( cctx->mtctx, @@ -3258,7 +3258,7 @@ size_t ZSTD_compress_generic_simpleArgs ( /*====== Finalize ======*/ /*! ZSTD_flushStream() : -* @return : amount of data remaining to flush */ + * @return : amount of data remaining to flush */ size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { ZSTD_inBuffer input = { NULL, 0, 0 }; diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index ad2a4be0..22213c22 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -30,8 +30,9 @@ extern "C" { /*-************************************* * Constants ***************************************/ -static const U32 g_searchStrength = 8; -#define HASH_READ_SIZE 8 +#define kSearchStrength 8 +#define HASH_READ_SIZE 8 +#define ZSTD_CLEVEL_CUSTOM 999 /*-************************************* diff --git a/lib/compress/zstd_double_fast.c b/lib/compress/zstd_double_fast.c index 834a64b0..6415480e 100644 --- a/lib/compress/zstd_double_fast.c +++ b/lib/compress/zstd_double_fast.c @@ -113,7 +113,7 @@ size_t ZSTD_compressBlock_doubleFast_generic( while (((ip>anchor) & (match>lowest)) && (ip[-1] == match[-1])) { ip--; match--; mLength++; } /* catch up */ } } else { - ip += ((ip-anchor) >> g_searchStrength) + 1; + ip += ((ip-anchor) >> kSearchStrength) + 1; continue; } @@ -264,7 +264,7 @@ static size_t ZSTD_compressBlock_doubleFast_extDict_generic( ZSTD_storeSeq(seqStore, ip-anchor, anchor, offset + ZSTD_REP_MOVE, mLength-MINMATCH); } else { - ip += ((ip-anchor) >> g_searchStrength) + 1; + ip += ((ip-anchor) >> kSearchStrength) + 1; continue; } } diff --git a/lib/compress/zstd_fast.c b/lib/compress/zstd_fast.c index 8941326e..b60f2158 100644 --- a/lib/compress/zstd_fast.c +++ b/lib/compress/zstd_fast.c @@ -79,7 +79,7 @@ size_t ZSTD_compressBlock_fast_generic( } else { U32 offset; if ( (matchIndex <= lowestIndex) || (MEM_read32(match) != MEM_read32(ip)) ) { - ip += ((ip-anchor) >> g_searchStrength) + 1; + ip += ((ip-anchor) >> kSearchStrength) + 1; continue; } mLength = ZSTD_count(ip+4, match+4, iend) + 4; @@ -185,7 +185,7 @@ static size_t ZSTD_compressBlock_fast_extDict_generic( } else { if ( (matchIndex < lowestIndex) || (MEM_read32(match) != MEM_read32(ip)) ) { - ip += ((ip-anchor) >> g_searchStrength) + 1; + ip += ((ip-anchor) >> kSearchStrength) + 1; continue; } { const BYTE* matchEnd = matchIndex < dictLimit ? dictEnd : iend; diff --git a/lib/compress/zstd_lazy.c b/lib/compress/zstd_lazy.c index 7f59eb34..7cb53b2e 100644 --- a/lib/compress/zstd_lazy.c +++ b/lib/compress/zstd_lazy.c @@ -537,7 +537,7 @@ size_t ZSTD_compressBlock_lazy_generic( } if (matchLength < 4) { - ip += ((ip-anchor) >> g_searchStrength) + 1; /* jump faster over incompressible sections */ + ip += ((ip-anchor) >> kSearchStrength) + 1; /* jump faster over incompressible sections */ continue; } @@ -710,7 +710,7 @@ size_t ZSTD_compressBlock_lazy_extDict_generic( } if (matchLength < 4) { - ip += ((ip-anchor) >> g_searchStrength) + 1; /* jump faster over incompressible sections */ + ip += ((ip-anchor) >> kSearchStrength) + 1; /* jump faster over incompressible sections */ continue; } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index a2deac14..4f327c07 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -672,10 +672,14 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) * New parameters will be applied to next compression job. */ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams) { - U32 const wlog = cParams.windowLog; - mtctx->params.cParams = cParams; - mtctx->params.cParams.windowLog = wlog; /* Do not modify windowLog ! Frame must keep same wlog during the whole process ! */ + U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */ + DEBUGLOG(5, "ZSTDMT_updateCParams_whileCompressing (level:%i)", + compressionLevel); mtctx->params.compressionLevel = compressionLevel; + if (compressionLevel != ZSTD_CLEVEL_CUSTOM) + cParams = ZSTD_getCParams(compressionLevel, mtctx->frameContentSize, 0 /* should be dictSize */ ); + cParams. windowLog = saved_wlog; + mtctx->params.cParams = cParams; } /* ZSTDMT_getNbWorkers(): @@ -912,7 +916,8 @@ size_t ZSTDMT_initCStream_internal( assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); - mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ + + /* init */ if (params.jobSize == 0) { if (params.cParams.windowLog >= 29) params.jobSize = ZSTDMT_JOBSIZE_MAX; @@ -921,6 +926,7 @@ size_t ZSTDMT_initCStream_internal( } if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; + mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ if (mtctx->singleBlockingThread) { ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); @@ -929,6 +935,7 @@ size_t ZSTDMT_initCStream_internal( dict, dictSize, cdict, singleThreadParams, pledgedSrcSize); } + DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers); if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ From c72091556bb5735ef51b9cfd5a79a54f69a875bd Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 9 Feb 2018 09:46:08 -0800 Subject: [PATCH 09/10] fixed minor nit as per @terrelln's comments --- lib/compress/zstd_compress.c | 5 +---- lib/compress/zstdmt_compress.c | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 9e768edd..0994d78f 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -251,7 +251,7 @@ static int ZSTD_isUpdateAuthorized(ZSTD_cParameter param) case ZSTD_p_compressionStrategy: return 1; - case ZSTD_p_format : + case ZSTD_p_format: case ZSTD_p_windowLog: case ZSTD_p_contentSizeFlag: case ZSTD_p_checksumFlag: @@ -265,10 +265,7 @@ static int ZSTD_isUpdateAuthorized(ZSTD_cParameter param) case ZSTD_p_ldmMinMatch: case ZSTD_p_ldmBucketSizeLog: case ZSTD_p_ldmHashEveryLog: - return 0; - default: - assert(0); return 0; } } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 4f327c07..7ae27888 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -677,8 +677,8 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLe compressionLevel); mtctx->params.compressionLevel = compressionLevel; if (compressionLevel != ZSTD_CLEVEL_CUSTOM) - cParams = ZSTD_getCParams(compressionLevel, mtctx->frameContentSize, 0 /* should be dictSize */ ); - cParams. windowLog = saved_wlog; + cParams = ZSTD_getCParams(compressionLevel, mtctx->frameContentSize, 0 /* dictSize */ ); + cParams.windowLog = saved_wlog; mtctx->params.cParams = cParams; } From 75689838e4e3fd1657315cc1d0afc383bb8e0375 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 9 Feb 2018 15:53:27 -0800 Subject: [PATCH 10/10] specify new command --single-thread --- programs/fileio.c | 2 +- programs/zstd.1.md | 10 ++++++++-- programs/zstdcli.c | 19 ++++++++++--------- tests/playTests.sh | 1 + 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/programs/fileio.c b/programs/fileio.c index 0cc807a1..d344b8f6 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -461,7 +461,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionStrategy, (U32)comprParams->strategy) ); /* multi-threading */ #ifdef ZSTD_MULTITHREAD - DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbWorkers); + DISPLAYLEVEL(5,"set nb workers = %u \n", g_nbWorkers); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbWorkers, g_nbWorkers) ); #endif /* dictionary */ diff --git a/programs/zstd.1.md b/programs/zstd.1.md index 4d9ec2fc..c970c5cb 100644 --- a/programs/zstd.1.md +++ b/programs/zstd.1.md @@ -116,10 +116,16 @@ the last one takes effect. Note: If `windowLog` is set to larger than 27, `--long=windowLog` or `--memory=windowSize` needs to be passed to the decompressor. * `-T#`, `--threads=#`: - Compress using `#` threads (default: 1). + Compress using `#` working threads (default: 1). If `#` is 0, attempt to detect and use the number of physical CPU cores. - In all cases, the nb of threads is capped to ZSTDMT_NBTHREADS_MAX==256. + In all cases, the nb of threads is capped to ZSTDMT_NBTHREADS_MAX==200. This modifier does nothing if `zstd` is compiled without multithread support. +* `--single-thread`: + Does not spawn a thread for compression, use caller thread instead. + This is the only available mode when multithread support is disabled. + In this mode, compression is serialized with I/O. + (This is different from `-T1`, which spawns 1 compression thread in parallel of I/O). + Single-thread mode also features lower memory usage. * `-D file`: use `file` as Dictionary to compress or decompress FILE(s) * `--nodictID`: diff --git a/programs/zstdcli.c b/programs/zstdcli.c index 015dc5e4..6d7957c8 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -135,7 +135,7 @@ static int usage_advanced(const char* programName) DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel()); DISPLAY( "--long[=#] : enable long distance matching with given window log (default: %u)\n", g_defaultMaxWindowLog); #ifdef ZSTD_MULTITHREAD - DISPLAY( " -T# : use # threads for compression (default: 1) \n"); + DISPLAY( " -T# : spawns # compression threads (default: 1) \n"); DISPLAY( " -B# : select size of each job (default: 0==automatic) \n"); #endif DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n"); @@ -366,21 +366,21 @@ typedef enum { zom_compress, zom_decompress, zom_test, zom_bench, zom_train, zom int main(int argCount, const char* argv[]) { int argNb, - forceStdout=0, followLinks=0, + forceStdout=0, + lastCommand = 0, + ldmFlag = 0, main_pause=0, - nextEntryIsDictionary=0, - operationResult=0, + nbWorkers = 1, nextArgumentIsOutFileName=0, nextArgumentIsMaxDict=0, nextArgumentIsDictID=0, nextArgumentsAreFiles=0, - ultra=0, - lastCommand = 0, - nbWorkers = 1, - setRealTimePrio = 0, + nextEntryIsDictionary=0, + operationResult=0, separateFiles = 0, - ldmFlag = 0; + setRealTimePrio = 0, + ultra=0; unsigned bench_nbSeconds = 3; /* would be better if this value was synchronized from bench */ size_t blockSize = 0; zstd_operation_mode operation = zom_compress; @@ -481,6 +481,7 @@ int main(int argCount, const char* argv[]) if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(0); continue; } if (!strcmp(argument, "--rm")) { FIO_setRemoveSrcFile(1); continue; } if (!strcmp(argument, "--priority=rt")) { setRealTimePrio = 1; continue; } + if (!strcmp(argument, "--single-thread")) { nbWorkers = 0; continue; } #ifdef ZSTD_GZCOMPRESS if (!strcmp(argument, "--format=gzip")) { suffix = GZ_EXTENSION; FIO_setCompressionType(FIO_gzipCompression); continue; } #endif diff --git a/tests/playTests.sh b/tests/playTests.sh index 0e12ec8e..258c9c75 100755 --- a/tests/playTests.sh +++ b/tests/playTests.sh @@ -634,6 +634,7 @@ roundTripTest -g518K "19 --long" fileRoundTripTest -g5M "3 --long" +roundTripTest -g96K "5 --single-thread" if [ -n "$hasMT" ] then $ECHO "\n===> zstdmt round-trip tests "