zstdmt : move on when not enough memory for a new input buffer
just continue operations without input forward progress, instead of an error that stops current compression session.
This commit is contained in:
parent
2cd15dd9a4
commit
8074261d00
@ -642,7 +642,7 @@ ZSTD_compressionParameters ZSTD_adjustCParams_internal(ZSTD_compressionParameter
|
|||||||
/* resize windowLog if src is small, to use less memory when necessary */
|
/* resize windowLog if src is small, to use less memory when necessary */
|
||||||
ZSTD_STATIC_ASSERT(ZSTD_CONTENTSIZE_UNKNOWN == (0ULL - 1));
|
ZSTD_STATIC_ASSERT(ZSTD_CONTENTSIZE_UNKNOWN == (0ULL - 1));
|
||||||
if ( (dictSize || (srcSize+1 > 1)) /* srcSize test depends on static assert condition */
|
if ( (dictSize || (srcSize+1 > 1)) /* srcSize test depends on static assert condition */
|
||||||
&& (srcSize-1 < (1ULL<<ZSTD_WINDOWLOG_MAX)) ) /* no correction is srcSize is large enough */ {
|
&& (srcSize-1 < (1ULL<<ZSTD_WINDOWLOG_MAX)) ) /* no correction when srcSize is large enough */ {
|
||||||
U32 const minSrcSize = (srcSize==0) ? 513 : 0;
|
U32 const minSrcSize = (srcSize==0) ? 513 : 0;
|
||||||
U64 const rSize = srcSize + dictSize + minSrcSize;
|
U64 const rSize = srcSize + dictSize + minSrcSize;
|
||||||
if (rSize < (1ULL<<ZSTD_WINDOWLOG_MAX)) {
|
if (rSize < (1ULL<<ZSTD_WINDOWLOG_MAX)) {
|
||||||
|
@ -976,7 +976,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
|
|||||||
|
|
||||||
|
|
||||||
/** ZSTDMT_compressStream_generic() :
|
/** ZSTDMT_compressStream_generic() :
|
||||||
* internal use only
|
* internal use only - exposed to be invoked from zstd_compress.c
|
||||||
* assumption : output and input are valid (pos <= size)
|
* assumption : output and input are valid (pos <= size)
|
||||||
* @return : minimum amount of data remaining to flush, 0 if none */
|
* @return : minimum amount of data remaining to flush, 0 if none */
|
||||||
size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
||||||
@ -985,6 +985,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
|||||||
ZSTD_EndDirective endOp)
|
ZSTD_EndDirective endOp)
|
||||||
{
|
{
|
||||||
size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize;
|
size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize;
|
||||||
|
unsigned forwardInputProgress = 0;
|
||||||
assert(output->pos <= output->size);
|
assert(output->pos <= output->size);
|
||||||
assert(input->pos <= input->size);
|
assert(input->pos <= input->size);
|
||||||
if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {
|
if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {
|
||||||
@ -995,7 +996,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
|||||||
return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
|
return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* single-pass shortcut (note : this is synchronous-mode) */
|
/* single-pass shortcut (note : synchronous-mode) */
|
||||||
if ( (mtctx->nextJobID == 0) /* just started */
|
if ( (mtctx->nextJobID == 0) /* just started */
|
||||||
&& (mtctx->inBuff.filled == 0) /* nothing buffered */
|
&& (mtctx->inBuff.filled == 0) /* nothing buffered */
|
||||||
&& (endOp == ZSTD_e_end) /* end order */
|
&& (endOp == ZSTD_e_end) /* end order */
|
||||||
@ -1016,18 +1017,16 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
|||||||
/* fill input buffer */
|
/* fill input buffer */
|
||||||
if (input->size > input->pos) { /* support NULL input */
|
if (input->size > input->pos) { /* support NULL input */
|
||||||
if (mtctx->inBuff.buffer.start == NULL) {
|
if (mtctx->inBuff.buffer.start == NULL) {
|
||||||
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
|
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : may fail, in which case, no forward input progress */
|
||||||
if (mtctx->inBuff.buffer.start == NULL) {
|
|
||||||
ZSTDMT_waitForAllJobsCompleted(mtctx);
|
|
||||||
return ERROR(memory_allocation);
|
|
||||||
}
|
|
||||||
mtctx->inBuff.filled = 0;
|
mtctx->inBuff.filled = 0;
|
||||||
}
|
}
|
||||||
{ size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
|
if (mtctx->inBuff.buffer.start) {
|
||||||
|
size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
|
||||||
DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad);
|
DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad);
|
||||||
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
|
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
|
||||||
input->pos += toLoad;
|
input->pos += toLoad;
|
||||||
mtctx->inBuff.filled += toLoad;
|
mtctx->inBuff.filled += toLoad;
|
||||||
|
forwardInputProgress = toLoad>0;
|
||||||
} }
|
} }
|
||||||
|
|
||||||
if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
|
if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
|
||||||
@ -1036,7 +1035,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* check for potential compressed data ready to be flushed */
|
/* check for potential compressed data ready to be flushed */
|
||||||
CHECK_F( ZSTDMT_flushNextJob(mtctx, output, (mtctx->inBuff.filled == mtctx->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */
|
CHECK_F( ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress /* blockToFlush */) ); /* block if there was no forward input progress */
|
||||||
|
|
||||||
if (input->pos < input->size) /* input not consumed : do not flush yet */
|
if (input->pos < input->size) /* input not consumed : do not flush yet */
|
||||||
endOp = ZSTD_e_continue;
|
endOp = ZSTD_e_continue;
|
||||||
|
Loading…
Reference in New Issue
Block a user