fixed some complex scenarios

Fixed : multithreading to compress some small data with dictionary
Fixed : ZSTD_initCStream_usingCDict()
Improved streaming memory usage when pledgedSrcSize is known.
This commit is contained in:
Yann Collet 2017-11-16 15:02:28 -08:00
parent 05dffe43a7
commit 15768cabb5
4 changed files with 44 additions and 39 deletions

View File

@ -249,7 +249,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_compressionLevel:
if (value == 0) return 0; /* special value : 0 means "don't change anything" */
if (cctx->cdict) return ERROR(stage_wrong);
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
@ -260,7 +259,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
case ZSTD_p_minMatch:
case ZSTD_p_targetLength:
case ZSTD_p_compressionStrategy:
if (value == 0) return 0; /* special value : 0 means "don't change anything" */
if (cctx->cdict) return ERROR(stage_wrong);
ZSTD_cLevelToCParams(cctx); /* Can optimize if srcSize is known */
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
@ -277,8 +275,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_nbThreads:
if (value==0) return 0;
DEBUGLOG(5, " setting nbThreads : %u", value);
if (value > 1 && cctx->staticSize) {
return ERROR(parameter_unsupported); /* MT not compatible with static alloc */
}
@ -288,22 +284,15 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_overlapSizeLog:
DEBUGLOG(5, " setting overlap with nbThreads == %u", cctx->requestedParams.nbThreads);
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_enableLongDistanceMatching:
if (cctx->cdict) return ERROR(stage_wrong);
if (value != 0) {
ZSTD_cLevelToCParams(cctx);
}
if (value>0) ZSTD_cLevelToCParams(cctx);
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_ldmHashLog:
case ZSTD_p_ldmMinMatch:
if (value == 0) return 0; /* special value : 0 means "don't change anything" */
if (cctx->cdict) return ERROR(stage_wrong);
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_ldmBucketSizeLog:
case ZSTD_p_ldmHashEveryLog:
if (cctx->cdict) return ERROR(stage_wrong);
@ -779,24 +768,27 @@ static U32 ZSTD_equivalentLdmParams(ldmParams_t ldmParams1,
typedef enum { ZSTDb_not_buffered, ZSTDb_buffered } ZSTD_buffered_policy_e;
/* ZSTD_equivalentBuffPolicy() :
/* ZSTD_sufficientBuff() :
* check internal buffers exist for streaming if buffPol == ZSTDb_buffered .
* Note : they are assumed to be correctly sized if ZSTD_equivalentCParams()==1 */
static U32 ZSTD_equivalentBuffPolicy(size_t bufferSize, ZSTD_buffered_policy_e buffPol)
static U32 ZSTD_sufficientBuff(size_t bufferSize, ZSTD_buffered_policy_e buffPol, ZSTD_compressionParameters cParams2, U64 pledgedSrcSize)
{
if ((buffPol == ZSTDb_buffered) & (!bufferSize)) return 0;
return 1;
size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << cParams2.windowLog), pledgedSrcSize));
size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, windowSize);
size_t const neededBufferSize = (buffPol==ZSTDb_buffered) ? windowSize + blockSize : 0;
return (neededBufferSize <= bufferSize);
}
/** Equivalence for resetCCtx purposes */
static U32 ZSTD_equivalentParams(ZSTD_CCtx_params params1,
ZSTD_CCtx_params params2,
size_t buffSize1,
ZSTD_buffered_policy_e buffPol2)
ZSTD_buffered_policy_e buffPol2,
U64 pledgedSrcSize)
{
return ZSTD_equivalentCParams(params1.cParams, params2.cParams) &&
ZSTD_equivalentLdmParams(params1.ldmParams, params2.ldmParams) &&
ZSTD_equivalentBuffPolicy(buffSize1, buffPol2);
ZSTD_sufficientBuff(buffSize1, buffPol2, params2.cParams, pledgedSrcSize);
}
/*! ZSTD_continueCCtx() :
@ -837,7 +829,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
if (crp == ZSTDcrp_continue) {
if (ZSTD_equivalentParams(params, zc->appliedParams, zc->inBuffSize, zbuff)) {
if (ZSTD_equivalentParams(params, zc->appliedParams, zc->inBuffSize, zbuff, pledgedSrcSize)) {
DEBUGLOG(4, "ZSTD_equivalentParams()==1 -> continue mode");
assert(!(params.ldmParams.enableLdm &&
params.ldmParams.hashEveryLog == ZSTD_LDM_HASHEVERYLOG_NOTSET));
@ -858,7 +850,8 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
}
{ size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params.cParams.windowLog);
{ size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << params.cParams.windowLog), pledgedSrcSize));
size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, windowSize);
U32 const divider = (params.cParams.searchLength==3) ? 3 : 4;
size_t const maxNbSeq = blockSize / divider;
size_t const tokenSpace = blockSize + 11*maxNbSeq;
@ -870,7 +863,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
size_t const h3Size = ((size_t)1) << hashLog3;
size_t const tableSpace = (chainSize + hSize + h3Size) * sizeof(U32);
size_t const buffOutSize = (zbuff==ZSTDb_buffered) ? ZSTD_compressBound(blockSize)+1 : 0;
size_t const buffInSize = (zbuff==ZSTDb_buffered) ? ((size_t)1 << params.cParams.windowLog) + blockSize : 0;
size_t const buffInSize = (zbuff==ZSTDb_buffered) ? windowSize + blockSize : 0;
void* ptr;
/* Check if workSpace is large enough, alloc a new one if needed */
@ -886,10 +879,10 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
: 0;
size_t const neededSpace = entropySpace + optSpace + ldmSpace +
tableSpace + tokenSpace + bufferSpace;
DEBUGLOG(4, "Need %uKB workspace, including %uKB for tables",
(U32)(neededSpace>>10), (U32)(tableSpace>>10));
DEBUGLOG(4, "chainSize: %u - hSize: %u - h3Size: %u",
(U32)chainSize, (U32)hSize, (U32)h3Size);
DEBUGLOG(4, "Need %uKB workspace, including %uKB for tables, and %uKB for buffers",
(U32)(neededSpace>>10), (U32)(tableSpace>>10), (U32)(bufferSpace>>10));
DEBUGLOG(4, "chainSize: %u - hSize: %u - h3Size: %u - windowSize: %u",
(U32)chainSize, (U32)hSize, (U32)h3Size, (U32)windowSize);
if (zc->workSpaceSize < neededSpace) { /* too small : resize */
DEBUGLOG(4, "Need to update workSpaceSize from %uK to %uK",
@ -917,7 +910,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
zc->consumedSrcSize = 0;
if (pledgedSrcSize == ZSTD_CONTENTSIZE_UNKNOWN)
zc->appliedParams.fParams.contentSizeFlag = 0;
DEBUGLOG(5, "pledged content size : %u ; flag : %u",
DEBUGLOG(4, "pledged content size : %u ; flag : %u",
(U32)pledgedSrcSize, zc->appliedParams.fParams.contentSizeFlag);
zc->blockSize = blockSize;
@ -1016,8 +1009,8 @@ void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx) {
/*! ZSTD_copyCCtx_internal() :
* Duplicate an existing context `srcCCtx` into another one `dstCCtx`.
* The "context", in this case, refers to the hash and chain tables, entropy
* tables, and dictionary offsets.
* The "context", in this case, refers to the hash and chain tables,
* entropy tables, and dictionary offsets.
* Only works during stage ZSTDcs_init (i.e. after creation, but before first call to ZSTD_compressContinue()).
* pledgedSrcSize=0 means "empty".
* @return : 0, or an error code */
@ -2079,6 +2072,7 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
assert(!((dict) && (cdict))); /* either dict or cdict, not both */
if (cdict && cdict->dictContentSize>0) {
cctx->requestedParams = params;
return ZSTD_copyCCtx_internal(cctx, cdict->refContext,
params.fParams, pledgedSrcSize,
zbuff);
@ -2525,9 +2519,9 @@ size_t ZSTD_CStreamOutSize(void)
}
static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs,
const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict,
const ZSTD_CCtx_params params, unsigned long long pledgedSrcSize)
const void* const dict, size_t const dictSize, ZSTD_dictMode_e const dictMode,
const ZSTD_CDict* const cdict,
ZSTD_CCtx_params const params, unsigned long long const pledgedSrcSize)
{
DEBUGLOG(4, "ZSTD_resetCStream_internal");
/* params are supposed to be fully validated at this point */
@ -2606,8 +2600,9 @@ size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs,
const ZSTD_CDict* cdict,
ZSTD_frameParameters fParams,
unsigned long long pledgedSrcSize)
{ /* cannot handle NULL cdict (does not know what to do) */
if (!cdict) return ERROR(dictionary_wrong);
{
DEBUGLOG(4, "ZSTD_initCStream_usingCDict_advanced");
if (!cdict) return ERROR(dictionary_wrong); /* cannot handle NULL cdict (does not know what to do) */
{ ZSTD_CCtx_params params = zcs->requestedParams;
params.cParams = ZSTD_getCParamsFromCDict(cdict);
params.fParams = fParams;
@ -2620,8 +2615,9 @@ size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs,
/* note : cdict must outlive compression session */
size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict)
{
ZSTD_frameParameters const fParams = { 0 /* contentSize */, 0 /* checksum */, 0 /* hideDictID */ };
return ZSTD_initCStream_usingCDict_advanced(zcs, cdict, fParams, 0); /* note : will check that cdict != NULL */
ZSTD_frameParameters const fParams = { 0 /* contentSizeFlag */, 0 /* checksum */, 0 /* hideDictID */ };
DEBUGLOG(4, "ZSTD_initCStream_usingCDict");
return ZSTD_initCStream_usingCDict_advanced(zcs, cdict, fParams, ZSTD_CONTENTSIZE_UNKNOWN); /* note : will check that cdict != NULL */
}
/* ZSTD_initCStream_advanced() :
@ -2853,7 +2849,8 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/);
#ifdef ZSTD_MULTITHREAD
if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN)
params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
if (params.nbThreads > 1) {
if (cctx->mtctx == NULL || cctx->appliedParams.nbThreads != params.nbThreads) {
ZSTDMT_freeCCtx(cctx->mtctx);
@ -2874,6 +2871,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
prefixDict.dictMode, cctx->cdict, params,
cctx->pledgedSrcSizePlusOne-1) );
assert(cctx->streamStage == zcss_load);
assert(cctx->appliedParams.nbThreads <= 1);
} }
/* compression stage */

View File

@ -231,7 +231,7 @@ static U32 ZSTD_insertAndFindFirstIndexHash3 (ZSTD_CCtx* zc, const BYTE* ip)
/*-*************************************
* Binary Tree search
***************************************/
//FORCE_INLINE_TEMPLATE
FORCE_INLINE_TEMPLATE
U32 ZSTD_insertBtAndGetAllMatches (
ZSTD_CCtx* zc,
const BYTE* const ip, const BYTE* const iLimit, const int extDict,

View File

@ -112,6 +112,9 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value);
/* ZSTDMT_CCtxParam_setNbThreads()
* Set nbThreads, and clamp it correctly,
* but also reset jobSize and overlapLog */
size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads);
/*! ZSTDMT_initCStream_internal() :

View File

@ -473,6 +473,7 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
DISPLAYLEVEL(3, "test%3i : digested dictionary : ", testNb++);
{ ZSTD_CDict* const cdict = ZSTD_createCDict(dictionary.start, dictionary.filled, 1 /*byRef*/ );
size_t const initError = ZSTD_initCStream_usingCDict(zc, cdict);
DISPLAYLEVEL(5, "ZSTD_initCStream_usingCDict result : %u ", (U32)initError);
if (ZSTD_isError(initError)) goto _output_error;
cSize = 0;
outBuff.dst = compressedBuffer;
@ -481,10 +482,13 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
inBuff.src = CNBuffer;
inBuff.size = CNBufferSize;
inBuff.pos = 0;
DISPLAYLEVEL(5, "- starting ZSTD_compressStream ");
CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) );
if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */
{ size_t const r = ZSTD_endStream(zc, &outBuff);
if (r != 0) goto _output_error; } /* error, or some data not flushed */
{ size_t const r = ZSTD_endStream(zc, &outBuff);
DISPLAYLEVEL(5, "- ZSTD_endStream result : %u ", (U32)r);
if (r != 0) goto _output_error; /* error, or some data not flushed */
}
cSize = outBuff.pos;
ZSTD_freeCDict(cdict);
DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/CNBufferSize*100);