updated ZSTDMT streaming API

ZSTDMT streaming API is now similar
and has same capabilites as single-thread streaming API.
It makes it easier to blend them together.
This commit is contained in:
Yann Collet 2017-06-03 01:15:02 -07:00
parent 58e8d793e1
commit 8c910d2097
8 changed files with 132 additions and 77 deletions

View File

@ -801,7 +801,7 @@ size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t di
size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize, size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize); </b>/**< pledgedSrcSize is optional and can be 0 (meaning unknown). note: if the contentSizeFlag is set, pledgedSrcSize == 0 means the source size is actually 0 */<b> ZSTD_parameters params, unsigned long long pledgedSrcSize); </b>/**< pledgedSrcSize is optional and can be 0 (meaning unknown). note: if the contentSizeFlag is set, pledgedSrcSize == 0 means the source size is actually 0 */<b>
size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict); </b>/**< note : cdict will just be referenced, and must outlive compression session */<b> size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict); </b>/**< note : cdict will just be referenced, and must outlive compression session */<b>
size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, unsigned long long pledgedSrcSize, ZSTD_frameParameters fParams); </b>/**< same as ZSTD_initCStream_usingCDict(), with control over frame parameters */<b> size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, ZSTD_frameParameters fParams, unsigned long long pledgedSrcSize); </b>/**< same as ZSTD_initCStream_usingCDict(), with control over frame parameters */<b>
</pre></b><BR> </pre></b><BR>
<pre><b>size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize); <pre><b>size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize);
</b><p> start a new compression job, using same parameters from previous job. </b><p> start a new compression job, using same parameters from previous job.

View File

@ -75,6 +75,7 @@ extern "C" {
#define STREAM_ACCUMULATOR_MIN_64 57 #define STREAM_ACCUMULATOR_MIN_64 57
#define STREAM_ACCUMULATOR_MIN ((U32)(MEM_32bits() ? STREAM_ACCUMULATOR_MIN_32 : STREAM_ACCUMULATOR_MIN_64)) #define STREAM_ACCUMULATOR_MIN ((U32)(MEM_32bits() ? STREAM_ACCUMULATOR_MIN_32 : STREAM_ACCUMULATOR_MIN_64))
/*-****************************************** /*-******************************************
* bitStream encoding API (write forward) * bitStream encoding API (write forward)
********************************************/ ********************************************/

View File

@ -305,4 +305,33 @@ MEM_STATIC U32 ZSTD_highbit32(U32 val)
void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx); void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx);
/*! ZSTD_compressBegin_internal() :
* innermost initialization function. Private use only.
* expects params to be valid.
* must receive dict, or cdict, or none, but not both.
* @return : 0, or an error code */
typedef enum { ZSTDb_not_buffered, ZSTDb_buffered } ZSTD_buffered_policy_e;
size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
const void* dict, size_t dictSize,
const ZSTD_CDict* cdict,
ZSTD_parameters params, U64 pledgedSrcSize,
ZSTD_buffered_policy_e zbuff);
/*! ZSTD_initCStream_internal() :
* Private use only. Init streaming operation.
* expects params to be valid.
* must receive dict, or cdict, or none, but not both.
* @return : 0, or an error code */
size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
const void* dict, size_t dictSize,
const ZSTD_CDict* cdict,
ZSTD_parameters params, U64 pledgedSrcSize);
/*! ZSTD_getParamsFromCDict() :
* as the name implies */
ZSTD_parameters ZSTD_getParamsFromCDict(const ZSTD_CDict* cdict);
#endif /* ZSTD_CCOMMON_H_MODULE */ #endif /* ZSTD_CCOMMON_H_MODULE */

View File

@ -74,7 +74,7 @@ struct ZSTD_CDict_s {
const void* dictContent; const void* dictContent;
size_t dictContentSize; size_t dictContentSize;
ZSTD_CCtx* refContext; ZSTD_CCtx* refContext;
}; /* typedef'd tp ZSTD_CDict within "zstd.h" */ }; /* typedef'd to ZSTD_CDict within "zstd.h" */
struct ZSTD_CCtx_s { struct ZSTD_CCtx_s {
const BYTE* nextSrc; /* next block here to continue on current prefix */ const BYTE* nextSrc; /* next block here to continue on current prefix */
@ -533,8 +533,6 @@ static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_parameters params, U64 fra
typedef enum { ZSTDcrp_continue, ZSTDcrp_noMemset } ZSTD_compResetPolicy_e; typedef enum { ZSTDcrp_continue, ZSTDcrp_noMemset } ZSTD_compResetPolicy_e;
typedef enum { ZSTDb_not_buffered, ZSTDb_buffered } ZSTD_buffered_policy_e;
/*! ZSTD_resetCCtx_internal() : /*! ZSTD_resetCCtx_internal() :
note : `params` are assumed fully validated at this stage */ note : `params` are assumed fully validated at this stage */
static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
@ -3089,7 +3087,7 @@ static size_t ZSTD_compress_insertDictionary(ZSTD_CCtx* cctx, const void* dict,
/*! ZSTD_compressBegin_internal() : /*! ZSTD_compressBegin_internal() :
* @return : 0, or an error code */ * @return : 0, or an error code */
static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
const void* dict, size_t dictSize, const void* dict, size_t dictSize,
const ZSTD_CDict* cdict, const ZSTD_CDict* cdict,
ZSTD_parameters params, U64 pledgedSrcSize, ZSTD_parameters params, U64 pledgedSrcSize,
@ -3392,7 +3390,7 @@ ZSTD_CDict* ZSTD_initStaticCDict(void* workspace, size_t workspaceSize,
return cdict; return cdict;
} }
static ZSTD_parameters ZSTD_getParamsFromCDict(const ZSTD_CDict* cdict) { ZSTD_parameters ZSTD_getParamsFromCDict(const ZSTD_CDict* cdict) {
return ZSTD_getParamsFromCCtx(cdict->refContext); return ZSTD_getParamsFromCCtx(cdict->refContext);
} }
@ -3505,38 +3503,12 @@ size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize)
return ZSTD_resetCStream_internal(zcs, params, pledgedSrcSize); return ZSTD_resetCStream_internal(zcs, params, pledgedSrcSize);
} }
/* ZSTD_initCStream_usingCDict_advanced() : size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
* same as ZSTD_initCStream_usingCDict(), with control over frame parameters */ const void* dict, size_t dictSize, const ZSTD_CDict* cdict,
size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, ZSTD_parameters params, unsigned long long pledgedSrcSize)
const ZSTD_CDict* cdict,
unsigned long long pledgedSrcSize,
ZSTD_frameParameters fParams)
{ /* cannot handle NULL cdict (does not know what to do) */
if (!cdict) return ERROR(dictionary_wrong);
{ ZSTD_parameters params = ZSTD_getParamsFromCDict(cdict);
params.fParams = fParams;
zcs->requestedParams = params;
zcs->compressionLevel = ZSTD_CLEVEL_CUSTOM;
zcs->cdict = cdict;
return ZSTD_resetCStream_internal(zcs, params, pledgedSrcSize);
}
}
/* note : cdict must outlive compression session */
size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict)
{
ZSTD_frameParameters const fParams = { 0 /* contentSize */, 0 /* checksum */, 0 /* hideDictID */ };
/* cannot handle NULL cdict (does not know what to do) */
if (!cdict) return ERROR(dictionary_wrong);
return ZSTD_initCStream_usingCDict_advanced(zcs, cdict, 0, fParams);
}
static size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{ {
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
zcs->cdict = NULL; assert(!((dict) && (cdict))); /* either dict or cdict, not both */
if (dict && dictSize >= 8) { if (dict && dictSize >= 8) {
if (zcs->staticSize) { /* static CCtx : never uses malloc */ if (zcs->staticSize) { /* static CCtx : never uses malloc */
@ -3544,14 +3516,46 @@ static size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
return ERROR(memory_allocation); return ERROR(memory_allocation);
} }
ZSTD_freeCDict(zcs->cdictLocal); ZSTD_freeCDict(zcs->cdictLocal);
zcs->cdict = NULL;
zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, 0 /* copy */, params.cParams, zcs->customMem); zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, 0 /* copy */, params.cParams, zcs->customMem);
if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
zcs->cdict = zcs->cdictLocal; zcs->cdict = zcs->cdictLocal;
} else {
if (cdict) {
ZSTD_parameters const cdictParams = ZSTD_getParamsFromCDict(cdict);
params.cParams = cdictParams.cParams; /* cParams are enforced from cdict */
}
zcs->cdict = cdict;
} }
zcs->requestedParams = params;
zcs->compressionLevel = ZSTD_CLEVEL_CUSTOM;
return ZSTD_resetCStream_internal(zcs, params, pledgedSrcSize); return ZSTD_resetCStream_internal(zcs, params, pledgedSrcSize);
} }
/* ZSTD_initCStream_usingCDict_advanced() :
* same as ZSTD_initCStream_usingCDict(), with control over frame parameters */
size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs,
const ZSTD_CDict* cdict,
ZSTD_frameParameters fParams,
unsigned long long pledgedSrcSize)
{ /* cannot handle NULL cdict (does not know what to do) */
if (!cdict) return ERROR(dictionary_wrong);
{ ZSTD_parameters params = ZSTD_getParamsFromCDict(cdict);
params.fParams = fParams;
return ZSTD_initCStream_internal(zcs,
NULL, 0, cdict,
params, pledgedSrcSize);
}
}
/* note : cdict must outlive compression session */
size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict)
{
ZSTD_frameParameters const fParams = { 0 /* contentSize */, 0 /* checksum */, 0 /* hideDictID */ };
return ZSTD_initCStream_usingCDict_advanced(zcs, cdict, fParams, 0); /* note : will check that cdict != NULL */
}
size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs,
const void* dict, size_t dictSize, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize) ZSTD_parameters params, unsigned long long pledgedSrcSize)
@ -3559,27 +3563,27 @@ size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs,
CHECK_F( ZSTD_checkCParams(params.cParams) ); CHECK_F( ZSTD_checkCParams(params.cParams) );
zcs->requestedParams = params; zcs->requestedParams = params;
zcs->compressionLevel = ZSTD_CLEVEL_CUSTOM; zcs->compressionLevel = ZSTD_CLEVEL_CUSTOM;
return ZSTD_initCStream_internal(zcs, dict, dictSize, params, pledgedSrcSize); return ZSTD_initCStream_internal(zcs, dict, dictSize, NULL, params, pledgedSrcSize);
} }
size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel) size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel)
{ {
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, dictSize); ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, dictSize);
zcs->compressionLevel = compressionLevel; zcs->compressionLevel = compressionLevel;
return ZSTD_initCStream_internal(zcs, dict, dictSize, params, 0); return ZSTD_initCStream_internal(zcs, dict, dictSize, NULL, params, 0);
} }
size_t ZSTD_initCStream_srcSize(ZSTD_CStream* zcs, int compressionLevel, unsigned long long pledgedSrcSize) size_t ZSTD_initCStream_srcSize(ZSTD_CStream* zcs, int compressionLevel, unsigned long long pledgedSrcSize)
{ {
ZSTD_parameters params = ZSTD_getParams(compressionLevel, pledgedSrcSize, 0); ZSTD_parameters params = ZSTD_getParams(compressionLevel, pledgedSrcSize, 0);
params.fParams.contentSizeFlag = (pledgedSrcSize>0); params.fParams.contentSizeFlag = (pledgedSrcSize>0);
return ZSTD_initCStream_internal(zcs, NULL, 0, params, pledgedSrcSize); return ZSTD_initCStream_internal(zcs, NULL, 0, NULL, params, pledgedSrcSize);
} }
size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel) size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel)
{ {
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
return ZSTD_initCStream_internal(zcs, NULL, 0, params, 0); return ZSTD_initCStream_internal(zcs, NULL, 0, NULL, params, 0);
} }
/*====== Compression ======*/ /*====== Compression ======*/
@ -3606,6 +3610,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
U32 someMoreWork = 1; U32 someMoreWork = 1;
/* check expectations */ /* check expectations */
DEBUGLOG(5, "ZSTD_compressStream_generic");
assert(zcs->inBuff != NULL); assert(zcs->inBuff != NULL);
assert(zcs->outBuff!= NULL); assert(zcs->outBuff!= NULL);
assert(output->pos <= output->size); assert(output->pos <= output->size);

View File

@ -27,19 +27,12 @@
/* ====== Debug ====== */ /* ====== Debug ====== */
#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=2) #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2)
# include <stdio.h> # include <stdio.h>
# include <unistd.h> # include <unistd.h>
# include <sys/times.h> # include <sys/times.h>
# define DEBUGLOGRAW(l, ...) if (l<=ZSTDMT_DEBUG) { fprintf(stderr, __VA_ARGS__); } # define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); }
# define DEBUGLOG(l, ...) { \
if (l<=ZSTDMT_DEBUG) { \
fprintf(stderr, __FILE__ ": "); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, " \n"); \
} \
}
# define DEBUG_PRINTHEX(l,p,n) { \ # define DEBUG_PRINTHEX(l,p,n) { \
unsigned debug_u; \ unsigned debug_u; \
@ -59,7 +52,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
#define MUTEX_WAIT_TIME_DLEVEL 5 #define MUTEX_WAIT_TIME_DLEVEL 5
#define PTHREAD_MUTEX_LOCK(mutex) { \ #define PTHREAD_MUTEX_LOCK(mutex) { \
if (ZSTDMT_DEBUG>=MUTEX_WAIT_TIME_DLEVEL) { \ if (ZSTD_DEBUG>=MUTEX_WAIT_TIME_DLEVEL) { \
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
pthread_mutex_lock(mutex); \ pthread_mutex_lock(mutex); \
{ unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \ { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
@ -73,7 +66,6 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
#else #else
# define DEBUGLOG(l, ...) {} /* disabled */
# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m) # define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
# define DEBUG_PRINTHEX(l,p,n) {} # define DEBUG_PRINTHEX(l,p,n) {}
@ -259,7 +251,7 @@ typedef struct {
pthread_mutex_t* jobCompleted_mutex; pthread_mutex_t* jobCompleted_mutex;
pthread_cond_t* jobCompleted_cond; pthread_cond_t* jobCompleted_cond;
ZSTD_parameters params; ZSTD_parameters params;
ZSTD_CDict* cdict; const ZSTD_CDict* cdict;
unsigned long long fullFrameSize; unsigned long long fullFrameSize;
} ZSTDMT_jobDescription; } ZSTDMT_jobDescription;
@ -273,7 +265,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
if (job->cdict) { /* should only happen for first segment */ if (job->cdict) { /* should only happen for first segment */
size_t const initError = ZSTD_compressBegin_usingCDict_advanced(job->cctx, job->cdict, job->params.fParams, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_usingCDict_advanced(job->cctx, job->cdict, job->params.fParams, job->fullFrameSize);
if (job->cdict) DEBUGLOG(3, "using CDict"); DEBUGLOG(3, "using CDict");
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { /* srcStart points at reloaded section */ } else { /* srcStart points at reloaded section */
if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */ if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */
@ -335,7 +327,8 @@ struct ZSTDMT_CCtx_s {
unsigned long long frameContentSize; unsigned long long frameContentSize;
size_t sectionSize; size_t sectionSize;
ZSTD_customMem cMem; ZSTD_customMem cMem;
ZSTD_CDict* cdict; ZSTD_CDict* cdictLocal;
const ZSTD_CDict* cdict;
}; };
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
@ -407,7 +400,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */ ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */
ZSTD_free(mtctx->jobs, mtctx->cMem); ZSTD_free(mtctx->jobs, mtctx->cMem);
ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdict); ZSTD_freeCDict(mtctx->cdictLocal);
pthread_mutex_destroy(&mtctx->jobCompleted_mutex); pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
pthread_cond_destroy(&mtctx->jobCompleted_cond); pthread_cond_destroy(&mtctx->jobCompleted_cond);
ZSTD_free(mtctx, mtctx->cMem); ZSTD_free(mtctx, mtctx->cMem);
@ -422,7 +415,7 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
+ ZSTDMT_sizeof_bufferPool(mtctx->buffPool) + ZSTDMT_sizeof_bufferPool(mtctx->buffPool)
+ (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription) + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)
+ ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool) + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)
+ ZSTD_sizeof_CDict(mtctx->cdict); + ZSTD_sizeof_CDict(mtctx->cdictLocal);
} }
size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
@ -567,28 +560,38 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
const void* dict, size_t dictSize, unsigned updateDict, const void* dict, size_t dictSize, const ZSTD_CDict* cdict,
ZSTD_parameters params, unsigned long long pledgedSrcSize) ZSTD_parameters params, unsigned long long pledgedSrcSize)
{ {
if (zcs->nbThreads==1) /* params are supposed to be fully validated at this point */
return ZSTD_initCStream_advanced(zcs->cctxPool->cctx[0], assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
dict, dictSize, assert(!((dict) && (cdict))); /* either dict or cdict, not both */
params, pledgedSrcSize);
if (zcs->nbThreads==1) {
return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0],
dict, dictSize, cdict,
params, pledgedSrcSize);
}
if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */
ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs); ZSTDMT_releaseAllJobResources(zcs);
zcs->allJobsCompleted = 1; zcs->allJobsCompleted = 1;
} }
zcs->params = params; zcs->params = params;
if (updateDict) {
ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL;
if (dict && dictSize) {
zcs->cdict = ZSTD_createCDict_advanced(dict, dictSize, 0 /* byRef */,
params.cParams, zcs->cMem);
if (zcs->cdict == NULL) return ERROR(memory_allocation);
} }
zcs->frameContentSize = pledgedSrcSize; zcs->frameContentSize = pledgedSrcSize;
if (dict) {
ZSTD_freeCDict(zcs->cdictLocal);
zcs->cdict = NULL;
zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, 0 /* byRef */,
params.cParams, zcs->cMem);
if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
zcs->cdict = zcs->cdictLocal;
} else {
zcs->cdict = cdict;
}
zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog); zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog);
DEBUGLOG(4, "overlapRLog : %u ", zcs->overlapRLog); DEBUGLOG(4, "overlapRLog : %u ", zcs->overlapRLog);
DEBUGLOG(3, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); DEBUGLOG(3, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10));
@ -610,13 +613,25 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
return 0; return 0;
} }
size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize) ZSTD_parameters params, unsigned long long pledgedSrcSize)
{ {
return ZSTDMT_initCStream_internal(zcs, dict, dictSize, 1, params, pledgedSrcSize); return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, NULL, params, pledgedSrcSize);
} }
size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
const ZSTD_CDict* cdict,
ZSTD_frameParameters fParams,
unsigned long long pledgedSrcSize)
{
ZSTD_parameters params = ZSTD_getParamsFromCDict(cdict);
if (cdict==NULL) return ERROR(GENERIC); /* method incompatible with NULL cdict */
params.fParams = fParams;
return ZSTDMT_initCStream_internal(mtctx, NULL, 0, cdict, params, pledgedSrcSize);
}
/* ZSTDMT_resetCStream() : /* ZSTDMT_resetCStream() :
* pledgedSrcSize is optional and can be zero == unknown */ * pledgedSrcSize is optional and can be zero == unknown */
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
@ -628,7 +643,7 @@ size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
return ZSTDMT_initCStream_internal(zcs, NULL, 0, 1, params, 0); return ZSTDMT_initCStream_internal(zcs, NULL, 0, NULL, params, 0);
} }
@ -780,8 +795,9 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
if (zcs->frameEnded) if (zcs->frameEnded)
/* current frame being ended. Only flush is allowed. Restart with init */ /* current frame being ended. Only flush is allowed. Restart with init */
return ERROR(stage_wrong); return ERROR(stage_wrong);
if (zcs->nbThreads==1) if (zcs->nbThreads==1) {
return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input); return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input);
}
/* fill input buffer */ /* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);

View File

@ -65,6 +65,10 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
ZSTD_parameters params, ZSTD_parameters params,
unsigned long long pledgedSrcSize); /* pledgedSrcSize is optional and can be zero == unknown */ unsigned long long pledgedSrcSize); /* pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
const ZSTD_CDict* cdict,
ZSTD_frameParameters fparams,
unsigned long long pledgedSrcSize); /* note : zero means empty */
/* ZSDTMT_parameter : /* ZSDTMT_parameter :
* List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */

View File

@ -932,7 +932,7 @@ ZSTDLIB_API size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dic
ZSTDLIB_API size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize, ZSTDLIB_API size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be 0 (meaning unknown). note: if the contentSizeFlag is set, pledgedSrcSize == 0 means the source size is actually 0 */ ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be 0 (meaning unknown). note: if the contentSizeFlag is set, pledgedSrcSize == 0 means the source size is actually 0 */
ZSTDLIB_API size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict); /**< note : cdict will just be referenced, and must outlive compression session */ ZSTDLIB_API size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict); /**< note : cdict will just be referenced, and must outlive compression session */
ZSTDLIB_API size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, unsigned long long pledgedSrcSize, ZSTD_frameParameters fParams); /**< same as ZSTD_initCStream_usingCDict(), with control over frame parameters */ ZSTDLIB_API size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, ZSTD_frameParameters fParams, unsigned long long pledgedSrcSize); /**< same as ZSTD_initCStream_usingCDict(), with control over frame parameters */
/*! ZSTD_resetCStream() : /*! ZSTD_resetCStream() :
* start a new compression job, using same parameters from previous job. * start a new compression job, using same parameters from previous job.

View File

@ -486,7 +486,7 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
{ ZSTD_compressionParameters const cParams = ZSTD_getCParams(1, CNBufferSize, dictionary.filled); { ZSTD_compressionParameters const cParams = ZSTD_getCParams(1, CNBufferSize, dictionary.filled);
ZSTD_frameParameters const fParams = { 1 /* contentSize */, 1 /* checksum */, 1 /* noDictID */}; ZSTD_frameParameters const fParams = { 1 /* contentSize */, 1 /* checksum */, 1 /* noDictID */};
ZSTD_CDict* const cdict = ZSTD_createCDict_advanced(dictionary.start, dictionary.filled, 1 /* byReference */, cParams, customMem); ZSTD_CDict* const cdict = ZSTD_createCDict_advanced(dictionary.start, dictionary.filled, 1 /* byReference */, cParams, customMem);
size_t const initError = ZSTD_initCStream_usingCDict_advanced(zc, cdict, CNBufferSize, fParams); size_t const initError = ZSTD_initCStream_usingCDict_advanced(zc, cdict, fParams, CNBufferSize);
if (ZSTD_isError(initError)) goto _output_error; if (ZSTD_isError(initError)) goto _output_error;
cSize = 0; cSize = 0;
outBuff.dst = compressedBuffer; outBuff.dst = compressedBuffer;
@ -497,7 +497,7 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
inBuff.pos = 0; inBuff.pos = 0;
{ size_t const r = ZSTD_compressStream(zc, &outBuff, &inBuff); { size_t const r = ZSTD_compressStream(zc, &outBuff, &inBuff);
if (ZSTD_isError(r)) goto _output_error; } if (ZSTD_isError(r)) goto _output_error; }
if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */ if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */
{ size_t const r = ZSTD_endStream(zc, &outBuff); { size_t const r = ZSTD_endStream(zc, &outBuff);
if (r != 0) goto _output_error; } /* error, or some data not flushed */ if (r != 0) goto _output_error; } /* error, or some data not flushed */
cSize = outBuff.pos; cSize = outBuff.pos;
@ -991,8 +991,8 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog; U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog;
U32 const cLevel = (FUZ_rand(&lseed) % U32 const cLevel = (FUZ_rand(&lseed) %
(ZSTD_maxCLevel() - (ZSTD_maxCLevel() -
(MAX(testLog, dictLog) / cLevelLimiter))) + (MAX(testLog, dictLog) / cLevelLimiter))) +
1; 1;
maxTestSize = FUZ_rLogLength(&lseed, testLog); maxTestSize = FUZ_rLogLength(&lseed, testLog);
oldTestLog = testLog; oldTestLog = testLog;