ZSTDMT_initCStream_usingDict() can outlive dict

Like ZSTD_initCStream_usingDict(),
ZSTDMT_initCStream_usingDict() now keep a copy of dict internally.
This way, dict can be released :
it does not longer have to outlive all future compression sessions.
This commit is contained in:
Yann Collet 2017-01-22 16:40:06 -08:00
parent bd6bc22612
commit c593348722
4 changed files with 41 additions and 18 deletions

View File

@ -2603,7 +2603,6 @@ static size_t ZSTD_compress_insertDictionary(ZSTD_CCtx* zc, const void* dict, si
}
}
/*! ZSTD_compressBegin_internal() :
* @return : 0, or an error code */
static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
@ -2825,7 +2824,7 @@ static ZSTD_parameters ZSTD_getParamsFromCDict(const ZSTD_CDict* cdict) {
return ZSTD_getParamsFromCCtx(cdict->refContext);
}
size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, U64 pledgedSrcSize)
size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, unsigned long long pledgedSrcSize)
{
if (cdict->dictContentSize) CHECK_F(ZSTD_copyCCtx(cctx, cdict->refContext, pledgedSrcSize))
else CHECK_F(ZSTD_compressBegin_advanced(cctx, NULL, 0, cdict->refContext->params, pledgedSrcSize));

View File

@ -217,6 +217,7 @@ typedef struct {
pthread_mutex_t* jobCompleted_mutex;
pthread_cond_t* jobCompleted_cond;
ZSTD_parameters params;
ZSTD_CDict* cdict;
const void* dict;
size_t dictSize;
unsigned long long fullFrameSize;
@ -227,8 +228,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
{
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
buffer_t const dstBuff = job->dstBuff;
size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->dict, job->dictSize, job->params, job->fullFrameSize);
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize);
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else {
size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->dict, job->dictSize, job->params, job->fullFrameSize);
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
}
if (!job->firstChunk) { /* flush frame header */
size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
@ -271,8 +277,7 @@ struct ZSTDMT_CCtx_s {
unsigned frameEnded;
unsigned allJobsCompleted;
unsigned long long frameContentSize;
const void* dict;
size_t dictSize;
ZSTD_CDict* cdict;
ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
};
@ -324,6 +329,7 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
{
if (mtctx==NULL) return 0; /* compatible with free on NULL */
ZSTD_freeCDict(mtctx->cdict);
POOL_free(mtctx->factory);
if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */
@ -442,8 +448,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) {
}
}
size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize) {
static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
const void* dict, size_t dictSize, unsigned updateDict,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
ZSTD_customMem const cmem = { NULL, NULL, NULL };
if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
@ -451,8 +461,12 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t di
}
params.fParams.checksumFlag = 0; /* current limitation : no checksum (to be lifted in a later version) */
zcs->params = params;
zcs->dict = dict;
zcs->dictSize = dictSize;
if (updateDict) {
ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL;
if (dict && dictSize) {
zcs->cdict = ZSTD_createCDict_advanced(dict, dictSize, 0, params, cmem);
if (zcs->cdict == NULL) return ERROR(memory_allocation);
} }
zcs->frameContentSize = pledgedSrcSize;
zcs->targetSectionSize = (size_t)1 << MAX(ZSTDMT_SECTION_LOGSIZE_MIN, (zcs->params.cParams.windowLog + 2));
zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog);
@ -466,16 +480,23 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t di
return 0;
}
size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs,
const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
return ZSTDMT_initCStream_internal(zcs, dict, dictSize, 1, params, pledgedSrcSize);
}
/* ZSTDMT_resetCStream() :
* pledgedSrcSize is optional and can be zero == unknown */
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
{
return ZSTDMT_initCStream_advanced(zcs, zcs->dict, zcs->dictSize, zcs->params, pledgedSrcSize);
return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize);
}
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
return ZSTDMT_initCStream_advanced(zcs, NULL, 0, params, 0);
return ZSTDMT_initCStream_internal(zcs, NULL, 0, 1, params, 0);
}
@ -510,8 +531,9 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dict = zcs->nextJobID == 0 ? zcs->dict : NULL;
zcs->jobs[jobID].dictSize = zcs->nextJobID == 0 ? zcs->dictSize : 0;
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].dict = NULL;
zcs->jobs[jobID].dictSize = 0;
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx;
@ -598,8 +620,9 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dict = zcs->nextJobID == 0 ? zcs->dict : NULL;
zcs->jobs[jobID].dictSize = zcs->nextJobID == 0 ? zcs->dictSize : 0;
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].dict = NULL;
zcs->jobs[jobID].dictSize = 0;
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx;

View File

@ -29,8 +29,8 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown ; current limitation : no checksum */
ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init */
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< params current limitation : no checksum ; pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);

View File

@ -572,6 +572,7 @@ ZSTDLIB_API size_t ZSTD_compressBegin(ZSTD_CCtx* cctx, int compressionLevel);
ZSTDLIB_API size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel);
ZSTDLIB_API size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, ZSTD_parameters params, unsigned long long pledgedSrcSize);
ZSTDLIB_API size_t ZSTD_copyCCtx(ZSTD_CCtx* cctx, const ZSTD_CCtx* preparedCCtx, unsigned long long pledgedSrcSize);
ZSTDLIB_API size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, unsigned long long pledgedSrcSize);
ZSTDLIB_API size_t ZSTD_compressContinue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
ZSTDLIB_API size_t ZSTD_compressEnd(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);