fix a subtle issue in continue mode

The deep fuzzer tests caught a subtle bug that was probably there for a long time.
The impact of the bug is not a crash, or any other clear error signal,
rather, it reduces performance, by cutting data into smaller blocks.
Eventually, the following test would fail because it produces too many 1-byte blocks,
requiring more space than buffer can provide :
`./zstreamtest_asan --mt -s3514 -t1678312 -i1678314`

The root scenario is as follows :
- Create context, initialize it using explicit parameters or a `cdict` to pin them down, set `pledgedSrcSize=1`
- The compression parameters will not be adapted, but `windowSize` and `blockSize` will be automatically set to `1`.
  `windowSize` and `blockSize` are dynamic values, set within `ZSTD_resetCCtx_internal()`.
  The automatic adaptation makes it possible to generate smaller contexts for smaller input sizes.
- Complete compression
- New compression with same context, using same parameters, but `pledgedSrcSize=ZSTD_CONTENTSIZE_UNKNOWN`
  trigger "continue mode"
- Continue mode doesn't modify blockSize, because it used to depend on `windowLog` only,
  but in fact, it also depends on `pledgedSrcSize`.
- The "old" blocksize (1) is still there,
  next compression will use this value to cut input into blocks,
  resulting in more blocks and worse performance than necessary performance.

Given the scenario, and its possible variants, I'm surprised it did not show up before.
But I suspect it did show up, it's just that it never triggered an error, because "worse performance" is not a trigger.
The above test is a special corner case, where performance is so impacted that it reaches an error case.

The fix works, but I'm not completely pleased.
I think the current code relies too much on implied relations between variables.
This will likely break again in the future when some related part of the code change.
Unfortunately, no time to make larger changes if we want to keep the release target for zstd v1.3.3.
So a longer term fix will have to be considered after the release.

To do : create a reliable test case which triggers this scenario for CI tests.
This commit is contained in:
Yann Collet 2017-12-19 09:43:03 +01:00
parent 569e06b91e
commit f299fa39ac
3 changed files with 47 additions and 37 deletions

View File

@ -57,20 +57,19 @@ extern "C" {
extern int g_debuglog_enable;
/* recommended values for ZSTD_DEBUG display levels :
* 1 : no display, enables assert() only
* 2 : reserved for currently active debugging path
* 3 : events once per object lifetime (CCtx, CDict)
* 2 : reserved for currently active debug path
* 3 : events once per object lifetime (CCtx, CDict, etc.)
* 4 : events once per frame
* 5 : events once per block
* 6 : events once per sequence (*very* verbose) */
# define RAWLOG(l, ...) { \
if ((g_debuglog_enable) & (l<=ZSTD_DEBUG)) { \
fprintf(stderr, __VA_ARGS__); \
# define RAWLOG(l, ...) { \
if ((g_debuglog_enable) & (l<=ZSTD_DEBUG)) { \
fprintf(stderr, __VA_ARGS__); \
} }
# define DEBUGLOG(l, ...) { \
if ((g_debuglog_enable) & (l<=ZSTD_DEBUG)) { \
fprintf(stderr, __FILE__ ": "); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, " \n"); \
# define DEBUGLOG(l, ...) { \
if ((g_debuglog_enable) & (l<=ZSTD_DEBUG)) { \
fprintf(stderr, __FILE__ ": " __VA_ARGS__); \
fprintf(stderr, " \n"); \
} }
#else
# define RAWLOG(l, ...) {} /* disabled */

View File

@ -728,10 +728,7 @@ size_t ZSTD_estimateCStreamSize(int compressionLevel) {
static U32 ZSTD_equivalentCParams(ZSTD_compressionParameters cParams1,
ZSTD_compressionParameters cParams2)
{
U32 bslog1 = MIN(cParams1.windowLog, ZSTD_BLOCKSIZELOG_MAX);
U32 bslog2 = MIN(cParams2.windowLog, ZSTD_BLOCKSIZELOG_MAX);
return (bslog1 == bslog2) /* same block size */
& (cParams1.hashLog == cParams2.hashLog)
return (cParams1.hashLog == cParams2.hashLog)
& (cParams1.chainLog == cParams2.chainLog)
& (cParams1.strategy == cParams2.strategy) /* opt parser space */
& ((cParams1.searchLength==3) == (cParams2.searchLength==3)); /* hashlog3 space */
@ -755,24 +752,29 @@ typedef enum { ZSTDb_not_buffered, ZSTDb_buffered } ZSTD_buffered_policy_e;
/* ZSTD_sufficientBuff() :
* check internal buffers exist for streaming if buffPol == ZSTDb_buffered .
* Note : they are assumed to be correctly sized if ZSTD_equivalentCParams()==1 */
static U32 ZSTD_sufficientBuff(size_t bufferSize, ZSTD_buffered_policy_e buffPol, ZSTD_compressionParameters cParams2, U64 pledgedSrcSize)
static U32 ZSTD_sufficientBuff(size_t bufferSize1, size_t blockSize1,
ZSTD_buffered_policy_e buffPol2,
ZSTD_compressionParameters cParams2,
U64 pledgedSrcSize)
{
size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << cParams2.windowLog), pledgedSrcSize));
size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, windowSize);
size_t const neededBufferSize = (buffPol==ZSTDb_buffered) ? windowSize + blockSize : 0;
return (neededBufferSize <= bufferSize);
size_t const blockSize2 = MIN(ZSTD_BLOCKSIZE_MAX, windowSize);
size_t const neededBufferSize = (buffPol2==ZSTDb_buffered) ? windowSize + blockSize2 : 0;
return (blockSize2 <= blockSize1) /* seqStore space depends on blockSize */
& (neededBufferSize <= bufferSize1);
}
/** Equivalence for resetCCtx purposes */
static U32 ZSTD_equivalentParams(ZSTD_CCtx_params params1,
ZSTD_CCtx_params params2,
size_t buffSize1,
size_t buffSize1, size_t blockSize1,
ZSTD_buffered_policy_e buffPol2,
U64 pledgedSrcSize)
{
DEBUGLOG(4, "ZSTD_equivalentParams: pledgedSrcSize=%u", (U32)pledgedSrcSize);
return ZSTD_equivalentCParams(params1.cParams, params2.cParams) &&
ZSTD_equivalentLdmParams(params1.ldmParams, params2.ldmParams) &&
ZSTD_sufficientBuff(buffSize1, buffPol2, params2.cParams, pledgedSrcSize);
ZSTD_sufficientBuff(buffSize1, blockSize1, buffPol2, params2.cParams, pledgedSrcSize);
}
/*! ZSTD_continueCCtx() :
@ -780,7 +782,11 @@ static U32 ZSTD_equivalentParams(ZSTD_CCtx_params params1,
static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_CCtx_params params, U64 pledgedSrcSize)
{
U32 const end = (U32)(cctx->nextSrc - cctx->base);
DEBUGLOG(4, "continue mode");
size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << params.cParams.windowLog), pledgedSrcSize));
size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, windowSize);
DEBUGLOG(4, "ZSTD_continueCCtx");
cctx->blockSize = blockSize; /* previous block size could be different even for same windowLog, due to pledgedSrcSize */
cctx->appliedParams = params;
cctx->pledgedSrcSizePlusOne = pledgedSrcSize+1;
cctx->consumedSrcSize = 0;
@ -813,8 +819,11 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
if (crp == ZSTDcrp_continue) {
if (ZSTD_equivalentParams(params, zc->appliedParams, zc->inBuffSize, zbuff, pledgedSrcSize)) {
DEBUGLOG(4, "ZSTD_equivalentParams()==1 -> continue mode");
if (ZSTD_equivalentParams(params, zc->appliedParams,
zc->inBuffSize, zc->blockSize,
zbuff, pledgedSrcSize)) {
DEBUGLOG(4, "ZSTD_equivalentParams()==1 -> continue mode (wLog=%u, blockSize=%u)",
zc->appliedParams.cParams.windowLog, (U32)zc->blockSize);
assert(!(params.ldmParams.enableLdm &&
params.ldmParams.hashEveryLog == ZSTD_LDM_HASHEVERYLOG_NOTSET));
zc->entropy->hufCTable_repeatMode = HUF_repeat_none;
@ -865,8 +874,8 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
tableSpace + tokenSpace + bufferSpace;
DEBUGLOG(4, "Need %uKB workspace, including %uKB for tables, and %uKB for buffers",
(U32)(neededSpace>>10), (U32)(tableSpace>>10), (U32)(bufferSpace>>10));
DEBUGLOG(4, "chainSize: %u - hSize: %u - h3Size: %u - windowSize: %u",
(U32)chainSize, (U32)hSize, (U32)h3Size, (U32)windowSize);
DEBUGLOG(4, "chainSize: %u - hSize: %u - h3Size: %u - windowSize: %u - blockSize: %u",
(U32)chainSize, (U32)hSize, (U32)h3Size, (U32)windowSize, (U32)blockSize);
if (zc->workSpaceSize < neededSpace) { /* too small : resize */
DEBUGLOG(4, "Need to update workSpaceSize from %uK to %uK",
@ -1666,6 +1675,7 @@ static size_t ZSTD_compress_frameChunk (ZSTD_CCtx* cctx,
U32 const maxDist = (U32)1 << cctx->appliedParams.cParams.windowLog;
assert(cctx->appliedParams.cParams.windowLog <= 31);
DEBUGLOG(5, "ZSTD_compress_frameChunk (blockSize=%u)", (U32)blockSize);
if (cctx->appliedParams.fParams.checksumFlag && srcSize)
XXH64_update(&cctx->xxhState, src, srcSize);
@ -1838,6 +1848,7 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx,
cctx->lowLimit = lowLimitMax;
}
DEBUGLOG(5, "ZSTD_compressContinue_internal (blockSize=%u)", (U32)cctx->blockSize);
{ size_t const cSize = frame ?
ZSTD_compress_frameChunk (cctx, dst, dstCapacity, src, srcSize, lastFrameChunk) :
ZSTD_compressBlock_internal (cctx, dst, dstCapacity, src, srcSize);
@ -2062,7 +2073,7 @@ size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
ZSTD_CCtx_params params, U64 pledgedSrcSize,
ZSTD_buffered_policy_e zbuff)
{
DEBUGLOG(4, "ZSTD_compressBegin_internal");
DEBUGLOG(4, "ZSTD_compressBegin_internal: wlog=%u", params.cParams.windowLog);
/* params are supposed to be fully validated at this point */
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
assert(!((dict) && (cdict))); /* either dict or cdict, not both */
@ -2086,7 +2097,7 @@ size_t ZSTD_compressBegin_advanced_internal(ZSTD_CCtx* cctx,
ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize)
{
DEBUGLOG(4, "ZSTD_compressBegin_advanced_internal");
DEBUGLOG(4, "ZSTD_compressBegin_advanced_internal: wlog=%u", params.cParams.windowLog);
/* compression parameters verification and optimization */
CHECK_F( ZSTD_checkCParams(params.cParams) );
return ZSTD_compressBegin_internal(cctx,

View File

@ -355,7 +355,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize);
DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict");
DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict (windowLog=%u)", job->params.cParams.windowLog);
assert(job->firstChunk); /* only allowed for first job */
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { /* srcStart points at reloaded section */
@ -367,9 +367,11 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->cSize = forceWindowError;
goto _endJob;
}
/* load dictionary in "content-only" mode (no header analysis) */
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->prefixSize, ZSTD_dm_rawContent, NULL, jobParams, pledgedSrcSize);
DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal called with windowLog = %u ", jobParams.cParams.windowLog);
DEBUGLOG(5, "ZSTDMT_compressChunk: invoking ZSTD_compressBegin_advanced_internal with windowLog = %u ", jobParams.cParams.windowLog);
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
NULL,
jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) {
DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal error : %s ", ZSTD_getErrorName(initError));
job->cSize = initError;
@ -637,8 +639,7 @@ static size_t ZSTDMT_compress_advanced_internal(
assert(jobParams.nbThreads == 0);
assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal");
DEBUGLOG(4, "nbChunks : %2u (raw chunkSize : %u bytes; fixed chunkSize: %u) ",
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbChunks=%2u (rawSize=%u bytes; fixedSize=%u) ",
nbChunks, (U32)proposedChunkSize, (U32)avgChunkSize);
if (nbChunks==1) { /* fallback to single-thread mode */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
@ -688,7 +689,7 @@ static size_t ZSTDMT_compress_advanced_internal(
XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
}
DEBUGLOG(5, "posting job %u (%u bytes)", u, (U32)chunkSize);
DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize);
DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12);
POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
@ -893,8 +894,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
{
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
DEBUGLOG(5, "ZSTDMT_createCompressionJob");
DEBUGLOG(5, "preparing job %u to compress %u bytes with %u preload ",
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
@ -945,7 +945,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->params.fParams.checksumFlag = 0;
} }
DEBUGLOG(4, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
zcs->nextJobID,
(U32)zcs->jobs[jobID].srcSize,
zcs->jobs[jobID].lastChunk,