From 512cbe8c10b59b957ecb107b119af95720b6d470 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 24 Jan 2017 17:02:26 -0800 Subject: [PATCH] zstdmt cli and API allow selection of section sizes By default, section sizes are 4x window size. This new setting allow manual selection of section sizes. The larger they are, the (slightly) better the compression ratio, but also the higher the memory allocation cost, and eventually the lesser the nb of possible threads, since each section is compressed by a single thread. It also introduces a prototype to set generic parameters, ZSTDMT_setMTCtxParameter() The idea is that it's possible to add enums to extend the list of parameters that can be set this way. This is more long-term oriented than a fixed-size struct. Consider it as a test. --- lib/compress/zstdmt_compress.c | 25 +++++++++++++++++----- lib/compress/zstdmt_compress.h | 38 +++++++++++++++++++++++++++------- programs/fileio.c | 14 ++++++++++++- programs/fileio.h | 1 + programs/zstdcli.c | 2 ++ 5 files changed, 67 insertions(+), 13 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 0b91ad4e..1baccf0f 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -9,10 +9,6 @@ /* ====== Tuning parameters ====== */ -#ifndef ZSTDMT_SECTION_LOGSIZE_MIN -# define ZSTDMT_SECTION_LOGSIZE_MIN 20 /* minimum size for a full compression job (20==2^20==1 MB) */ -#endif - #define ZSTDMT_NBTHREADS_MAX 128 @@ -285,6 +281,7 @@ struct ZSTDMT_CCtx_s { unsigned frameEnded; unsigned allJobsCompleted; unsigned long long frameContentSize; + size_t sectionSize; ZSTD_CDict* cdict; ZSTD_CStream* cstream; ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */ @@ -304,6 +301,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) cctx->nbThreads = nbThreads; cctx->jobIDMask = nbJobs - 1; cctx->allJobsCompleted = 1; + cctx->sectionSize = 0; cctx->factory = POOL_create(nbThreads, 1); cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); @@ -356,6 +354,22 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) return 0; } +unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) +{ + switch(parameter) + { + case ZSTDMT_p_sectionSize : + mtctx->sectionSize = value; + return 0; + default : + return ERROR(compressionParameter_unsupported); + } +} + + +/* ------------------------------------------ */ +/* ===== Multi-threaded compression ===== */ +/* ------------------------------------------ */ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, @@ -487,7 +501,8 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, if (zcs->cdict == NULL) return ERROR(memory_allocation); } } zcs->frameContentSize = pledgedSrcSize; - zcs->targetSectionSize = (size_t)1 << MAX(ZSTDMT_SECTION_LOGSIZE_MIN, (zcs->params.cParams.windowLog + 2)); + zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); + zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation); diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 84d25f73..c00782e9 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -7,6 +7,10 @@ * of patent rights can be found in the PATENTS file in the same directory. */ + +/* Note : All prototypes defined in this file shall be considered experimental. + * There is no guarantee of API continuity (yet) on any of these prototypes */ + /* === Dependencies === */ #include /* size_t */ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */ @@ -27,12 +31,32 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, /* === Streaming functions === */ -ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel); -ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ -ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */ - ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ +ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); +ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ -ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); +ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input); -ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ -ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ +ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ +ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ + + +/* === Advanced functions and parameters === */ + +#ifndef ZSTDMT_SECTION_SIZE_MIN +# define ZSTDMT_SECTION_SIZE_MIN (1U << 20) /* 1 MB - Minimum size of each compression job */ +#endif + +ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */ + ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ + +/* ZSDTMT_parameter : + * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ +typedef enum { ZSTDMT_p_sectionSize /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */ + } ZSDTMT_parameter; + +/* ZSTDMT_setMTCtxParameter() : + * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter. + * The function must be called typically after ZSTD_createCCtx(). + * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions. + * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ +ZSTDLIB_API unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value); diff --git a/programs/fileio.c b/programs/fileio.c index 3864a5fa..86db12ac 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -113,6 +113,16 @@ void FIO_setNbThreads(unsigned nbThreads) { #endif g_nbThreads = nbThreads; } +static U32 g_blockSize = 0; +void FIO_setBlockSize(unsigned blockSize) { + if (blockSize && g_nbThreads==1) + DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n"); +#ifdef ZSTD_MULTITHREAD + if (blockSize-1 < ZSTDMT_SECTION_SIZE_MIN-1) /* intentional underflow */ + DISPLAYLEVEL(2, "Note : minimum block size is %u KB \n", (ZSTDMT_SECTION_SIZE_MIN>>10)); +#endif + g_blockSize = blockSize; +} /*-************************************* @@ -283,10 +293,12 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1); #ifdef ZSTD_MULTITHREAD { size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize); + if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode)); + ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_sectionSize, g_blockSize); #else { size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize); -#endif if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode)); +#endif } } free(dictBuffer); } diff --git a/programs/fileio.h b/programs/fileio.h index 9ef44929..19f09c33 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -41,6 +41,7 @@ void FIO_setChecksumFlag(unsigned checksumFlag); void FIO_setRemoveSrcFile(unsigned flag); void FIO_setMemLimit(unsigned memLimit); void FIO_setNbThreads(unsigned nbThreads); +void FIO_setBlockSize(unsigned blockSize); /*-************************************* diff --git a/programs/zstdcli.c b/programs/zstdcli.c index 785ecede..549dad01 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -118,6 +118,7 @@ static int usage_advanced(const char* programName) DISPLAY( "--[no-]check : integrity check (default:enabled) \n"); #ifdef ZSTD_MULTITHREAD DISPLAY( " -T# : use # threads for compression (default:1) \n"); + DISPLAY( " -B# : select size of independent sections (default:0==automatic) \n"); #endif #endif #ifndef ZSTD_NODECOMPRESS @@ -625,6 +626,7 @@ int main(int argCount, const char* argv[]) if (operation==zom_compress) { #ifndef ZSTD_NOCOMPRESS FIO_setNbThreads(nbThreads); + FIO_setBlockSize((U32)blockSize); if ((filenameIdx==1) && outFileName) operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams); else