zstdmt cli and API allow selection of section sizes

By default, section sizes are 4x window size.
This new setting allow manual selection of section sizes.
The larger they are, the (slightly) better the compression ratio,
but also the higher the memory allocation cost,
and eventually the lesser the nb of possible threads,
since each section is compressed by a single thread.

It also introduces a prototype to set generic parameters,
ZSTDMT_setMTCtxParameter()

The idea is that it's possible to add enums
to extend the list of parameters that can be set this way.
This is more long-term oriented than a fixed-size struct.
Consider it as a test.
This commit is contained in:
Yann Collet 2017-01-24 17:02:26 -08:00
parent 3488a4a473
commit 512cbe8c10
5 changed files with 67 additions and 13 deletions

View File

@ -9,10 +9,6 @@
/* ====== Tuning parameters ====== */
#ifndef ZSTDMT_SECTION_LOGSIZE_MIN
# define ZSTDMT_SECTION_LOGSIZE_MIN 20 /* minimum size for a full compression job (20==2^20==1 MB) */
#endif
#define ZSTDMT_NBTHREADS_MAX 128
@ -285,6 +281,7 @@ struct ZSTDMT_CCtx_s {
unsigned frameEnded;
unsigned allJobsCompleted;
unsigned long long frameContentSize;
size_t sectionSize;
ZSTD_CDict* cdict;
ZSTD_CStream* cstream;
ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
@ -304,6 +301,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
cctx->nbThreads = nbThreads;
cctx->jobIDMask = nbJobs - 1;
cctx->allJobsCompleted = 1;
cctx->sectionSize = 0;
cctx->factory = POOL_create(nbThreads, 1);
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
@ -356,6 +354,22 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
return 0;
}
unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
{
switch(parameter)
{
case ZSTDMT_p_sectionSize :
mtctx->sectionSize = value;
return 0;
default :
return ERROR(compressionParameter_unsupported);
}
}
/* ------------------------------------------ */
/* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */
size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
@ -487,7 +501,8 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
if (zcs->cdict == NULL) return ERROR(memory_allocation);
} }
zcs->frameContentSize = pledgedSrcSize;
zcs->targetSectionSize = (size_t)1 << MAX(ZSTDMT_SECTION_LOGSIZE_MIN, (zcs->params.cParams.windowLog + 2));
zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);

View File

@ -7,6 +7,10 @@
* of patent rights can be found in the PATENTS file in the same directory.
*/
/* Note : All prototypes defined in this file shall be considered experimental.
* There is no guarantee of API continuity (yet) on any of these prototypes */
/* === Dependencies === */
#include <stddef.h> /* size_t */
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */
@ -27,12 +31,32 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
/* === Streaming functions === */
ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel);
ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
/* === Advanced functions and parameters === */
#ifndef ZSTDMT_SECTION_SIZE_MIN
# define ZSTDMT_SECTION_SIZE_MIN (1U << 20) /* 1 MB - Minimum size of each compression job */
#endif
ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
/* ZSDTMT_parameter :
* List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
typedef enum { ZSTDMT_p_sectionSize /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */
} ZSDTMT_parameter;
/* ZSTDMT_setMTCtxParameter() :
* allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter.
* The function must be called typically after ZSTD_createCCtx().
* Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions.
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
ZSTDLIB_API unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value);

View File

@ -113,6 +113,16 @@ void FIO_setNbThreads(unsigned nbThreads) {
#endif
g_nbThreads = nbThreads;
}
static U32 g_blockSize = 0;
void FIO_setBlockSize(unsigned blockSize) {
if (blockSize && g_nbThreads==1)
DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n");
#ifdef ZSTD_MULTITHREAD
if (blockSize-1 < ZSTDMT_SECTION_SIZE_MIN-1) /* intentional underflow */
DISPLAYLEVEL(2, "Note : minimum block size is %u KB \n", (ZSTDMT_SECTION_SIZE_MIN>>10));
#endif
g_blockSize = blockSize;
}
/*-*************************************
@ -283,10 +293,12 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1);
#ifdef ZSTD_MULTITHREAD
{ size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_sectionSize, g_blockSize);
#else
{ size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
#endif
if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
#endif
} }
free(dictBuffer);
}

View File

@ -41,6 +41,7 @@ void FIO_setChecksumFlag(unsigned checksumFlag);
void FIO_setRemoveSrcFile(unsigned flag);
void FIO_setMemLimit(unsigned memLimit);
void FIO_setNbThreads(unsigned nbThreads);
void FIO_setBlockSize(unsigned blockSize);
/*-*************************************

View File

@ -118,6 +118,7 @@ static int usage_advanced(const char* programName)
DISPLAY( "--[no-]check : integrity check (default:enabled) \n");
#ifdef ZSTD_MULTITHREAD
DISPLAY( " -T# : use # threads for compression (default:1) \n");
DISPLAY( " -B# : select size of independent sections (default:0==automatic) \n");
#endif
#endif
#ifndef ZSTD_NODECOMPRESS
@ -625,6 +626,7 @@ int main(int argCount, const char* argv[])
if (operation==zom_compress) {
#ifndef ZSTD_NOCOMPRESS
FIO_setNbThreads(nbThreads);
FIO_setBlockSize((U32)blockSize);
if ((filenameIdx==1) && outFileName)
operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
else