Pass cctx parameters to MTCtx

This commit is contained in:
Stella Lau 2017-08-18 16:17:24 -07:00
parent 399ae013d4
commit 63b8c98531
3 changed files with 165 additions and 59 deletions

View File

@ -213,7 +213,7 @@ static ZSTD_parameters ZSTD_getParamsFromCCtxParams(const ZSTD_CCtx_params cctxP
}
// TODO: get rid of this function too
static ZSTD_CCtx_params ZSTD_makeCCtxParamsFromParams(ZSTD_parameters params) {
ZSTD_CCtx_params ZSTD_makeCCtxParamsFromParams(ZSTD_parameters params) {
ZSTD_CCtx_params cctxParams;
memset(&cctxParams, 0, sizeof(ZSTD_CCtx_params));
cctxParams.cParams = params.cParams;
@ -3474,6 +3474,17 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
return ZSTD_compress_insertDictionary(cctx, dict, dictSize, params.dictMode);
}
size_t ZSTD_compressBegin_advanced_opaque(ZSTD_CCtx* cctx,
const void* dict, size_t dictSize,
ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize)
{
/* compression parameters verification and optimization */
CHECK_F( ZSTD_checkCParams(params.cParams) );
return ZSTD_compressBegin_internal(cctx, dict, dictSize, NULL,
params, pledgedSrcSize,
ZSTDb_not_buffered);
}
/*! ZSTD_compressBegin_advanced() :
* @return : 0, or an error code */
@ -3481,15 +3492,13 @@ size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx,
const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
ZSTD_CCtx_params cctxParams = cctx->requestedParams;
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
cctxParams.dictMode = ZSTD_dm_auto;
/* compression parameters verification and optimization */
CHECK_F(ZSTD_checkCParams(params.cParams));
return ZSTD_compressBegin_internal(cctx, dict, dictSize, NULL,
cctxParams, pledgedSrcSize, ZSTDb_not_buffered);
return ZSTD_compressBegin_advanced_opaque(cctx, dict, dictSize, cctxParams,
pledgedSrcSize);
}
@ -3580,10 +3589,11 @@ static size_t ZSTD_compress_internal (ZSTD_CCtx* cctx,
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
cctxParams.dictMode = ZSTD_dm_auto;
CHECK_F( ZSTD_compressBegin_internal(cctx, dict, dictSize, NULL,
cctxParams, srcSize, ZSTDb_not_buffered) );
return ZSTD_compressEnd(cctx, dst, dstCapacity, src, srcSize);
return ZSTD_compress_advanced_opaque(cctx,
dst, dstCapacity,
src, srcSize,
dict, dictSize,
cctxParams);
}
size_t ZSTD_compress_advanced (ZSTD_CCtx* ctx,
@ -3596,6 +3606,18 @@ size_t ZSTD_compress_advanced (ZSTD_CCtx* ctx,
return ZSTD_compress_internal(ctx, dst, dstCapacity, src, srcSize, dict, dictSize, params);
}
/* Internal */
size_t ZSTD_compress_advanced_opaque(ZSTD_CCtx* cctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const void* dict,size_t dictSize,
ZSTD_CCtx_params params)
{
CHECK_F( ZSTD_compressBegin_internal(cctx, dict, dictSize, NULL,
params, srcSize, ZSTDb_not_buffered) );
return ZSTD_compressEnd(cctx, dst, dstCapacity, src, srcSize);
}
size_t ZSTD_compress_usingDict(ZSTD_CCtx* ctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize,
const void* dict, size_t dictSize, int compressionLevel)
{
@ -3920,14 +3942,13 @@ size_t ZSTD_CStreamOutSize(void)
return ZSTD_compressBound(ZSTD_BLOCKSIZE_MAX) + ZSTD_blockHeaderSize + 4 /* 32-bits hash */ ;
}
static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs,
const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
static size_t ZSTD_resetCStream_internal_opaque(
ZSTD_CStream* zcs,
const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params, unsigned long long pledgedSrcSize)
{
ZSTD_CCtx_params cctxParams = ZSTD_makeCCtxParamsFromParams(params);
cctxParams.compressionLevel = zcs->requestedParams.compressionLevel;
cctxParams.dictMode = dictMode;
params.dictMode = dictMode;
DEBUGLOG(4, "ZSTD_resetCStream_internal");
/* params are supposed to be fully validated at this point */
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
@ -3936,7 +3957,7 @@ static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs,
CHECK_F( ZSTD_compressBegin_internal(zcs,
dict, dictSize,
cdict,
cctxParams, pledgedSrcSize,
params, pledgedSrcSize,
ZSTDb_buffered) );
zcs->inToCompress = 0;
@ -3948,6 +3969,19 @@ static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs,
return 0; /* ready to go */
}
static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs,
const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
ZSTD_CCtx_params cctxParams = zcs->requestedParams;
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
cctxParams.dictMode = dictMode;
return ZSTD_resetCStream_internal_opaque(zcs, dict, dictSize, dictMode,
cdict, cctxParams, pledgedSrcSize);
}
size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize)
{
ZSTD_parameters params = ZSTD_getParamsFromCCtxParams(zcs->requestedParams);
@ -3959,13 +3993,11 @@ size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize)
return ZSTD_resetCStream_internal(zcs, NULL, 0, zcs->requestedParams.dictMode, zcs->cdict, params, pledgedSrcSize);
}
/*! ZSTD_initCStream_internal() :
* Note : not static, but hidden (not exposed). Used by zstdmt_compress.c
* Assumption 1 : params are valid
* Assumption 2 : either dict, or cdict, is defined, not both */
size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
const void* dict, size_t dictSize, const ZSTD_CDict* cdict,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
size_t ZSTD_initCStream_internal_opaque(ZSTD_CStream* zcs,
const void* dict, size_t dictSize,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize)
{
DEBUGLOG(5, "ZSTD_initCStream_internal");
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
@ -3993,11 +4025,28 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
zcs->cdictLocal = NULL;
zcs->cdict = cdict;
}
zcs->requestedParams.cParams = params.cParams;
zcs->requestedParams.fParams = params.fParams;
zcs->requestedParams.compressionLevel = ZSTD_CLEVEL_CUSTOM;
zcs->requestedParams = params;
return ZSTD_resetCStream_internal(zcs, NULL, 0, zcs->requestedParams.dictMode, zcs->cdict, params, pledgedSrcSize);
return ZSTD_resetCStream_internal_opaque(
zcs, NULL, 0, zcs->requestedParams.dictMode, zcs->cdict,
params, pledgedSrcSize);
}
/*! ZSTD_initCStream_internal() :
* Note : not static, but hidden (not exposed). Used by zstdmt_compress.c
* Assumption 1 : params are valid
* Assumption 2 : either dict, or cdict, is defined, not both */
size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
const void* dict, size_t dictSize, const ZSTD_CDict* cdict,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
ZSTD_CCtx_params cctxParams = zcs->requestedParams;
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
cctxParams.compressionLevel = ZSTD_CLEVEL_CUSTOM;
return ZSTD_initCStream_internal_opaque(zcs, dict, dictSize, cdict,
cctxParams, pledgedSrcSize);
}
/* ZSTD_initCStream_usingCDict_advanced() :
@ -4227,7 +4276,6 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
const void* dict, size_t dictSize, const ZSTD_CDict* cdict,
ZSTD_parameters params, unsigned long long pledgedSrcSize);
size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
ZSTD_outBuffer* output,
ZSTD_inBuffer* input,

View File

@ -186,6 +186,14 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
ZSTD_free(buf.start, bufPool->cMem);
}
static void ZSTDMT_zeroCCtxParams(ZSTD_CCtx_params* params)
{
params->forceWindow = 0;
params->dictMode = (ZSTD_dictMode_e)(0);
params->nbThreads = 0;
params->jobSize = 0;
params->overlapSizeLog = 0;
}
/* ===== CCtx Pool ===== */
/* a single CCtx Pool can be invoked from multiple threads in parallel */
@ -292,7 +300,7 @@ typedef struct {
unsigned jobScanned;
pthread_mutex_t* jobCompleted_mutex;
pthread_cond_t* jobCompleted_cond;
ZSTD_parameters params;
ZSTD_CCtx_params params;
const ZSTD_CDict* cdict;
ZSTDMT_CCtxPool* cctxPool;
ZSTDMT_bufferPool* bufPool;
@ -330,7 +338,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
} else { /* srcStart points at reloaded section */
if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */
{ size_t const dictModeError = ZSTD_setCCtxParameter(cctx, ZSTD_p_forceRawDict, 1); /* Force loading dictionary in "content-only" mode (no header analysis) */
size_t const initError = ZSTD_compressBegin_advanced(cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize);
size_t const initError = ZSTD_compressBegin_advanced_opaque(cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize);
if (ZSTD_isError(initError) || ZSTD_isError(dictModeError)) { job->cSize = initError; goto _endJob; }
ZSTD_setCCtxParameter(cctx, ZSTD_p_forceWindow, 1);
} }
@ -382,7 +390,7 @@ struct ZSTDMT_CCtx_s {
size_t dictSize;
size_t targetDictSize;
inBuff_t inBuff;
ZSTD_parameters params;
ZSTD_CCtx_params params;
XXH64_state_t xxhState;
unsigned nbThreads;
unsigned jobIDMask;
@ -528,17 +536,17 @@ static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbT
return (multiplier>1) ? nbChunksLarge : nbChunksSmall;
}
size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
ZSTD_parameters const params,
unsigned overlapLog)
static size_t ZSTDMT_compress_advanced_opaque(
ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params const cctxParams,
unsigned overlapLog)
{
unsigned const overlapRLog = (overlapLog>9) ? 0 : 9-overlapLog;
size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, mtctx->nbThreads);
size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (cctxParams.cParams.windowLog - overlapRLog);
unsigned nbChunks = computeNbChunks(srcSize, cctxParams.cParams.windowLog, mtctx->nbThreads);
size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
const char* const srcStart = (const char*)src;
@ -546,12 +554,15 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0;
XXH64_state_t xxh64;
ZSTD_CCtx_params requestedParams = cctxParams;
ZSTDMT_zeroCCtxParams(&requestedParams);
DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize);
if (nbChunks==1) { /* fallback to single-thread mode */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, params.fParams);
return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params);
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, cctxParams.fParams);
return ZSTD_compress_advanced_opaque(cctx, dst, dstCapacity, src, srcSize, NULL, 0, requestedParams);
}
assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is required for compressWithinDst */
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) );
@ -580,7 +591,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
mtctx->jobs[u].srcSize = chunkSize;
mtctx->jobs[u].cdict = mtctx->nextJobID==0 ? cdict : NULL;
mtctx->jobs[u].fullFrameSize = srcSize;
mtctx->jobs[u].params = params;
mtctx->jobs[u].params = requestedParams;
/* do not calculate checksum within sections, but write it in header for first section */
if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0;
mtctx->jobs[u].dstBuff = dstBuffer;
@ -592,7 +603,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
if (params.fParams.checksumFlag) {
if (cctxParams.fParams.checksumFlag) {
XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
}
@ -636,7 +647,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
} /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag);
if (params.fParams.checksumFlag) {
if (cctxParams.fParams.checksumFlag) {
U32 const checksum = (U32)XXH64_digest(&xxh64);
if (dstPos + 4 > dstCapacity) {
error = ERROR(dstSize_tooSmall);
@ -649,6 +660,23 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos);
return error ? error : dstPos;
}
}
size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
ZSTD_parameters const params,
unsigned overlapLog)
{
ZSTD_CCtx_params cctxParams = mtctx->params;
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
return ZSTDMT_compress_advanced_opaque(mtctx,
dst, dstCapacity,
src, srcSize,
cdict, cctxParams, overlapLog);
}
@ -683,23 +711,28 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
}
}
/** ZSTDMT_initCStream_internal() :
* internal usage only */
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
const void* dict, size_t dictSize, const ZSTD_CDict* cdict,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
size_t ZSTDMT_initCStream_internal_opaque(
ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,
const ZSTD_CDict* cdict, ZSTD_CCtx_params cctxParams,
unsigned long long pledgedSrcSize)
{
ZSTD_parameters params;
params.cParams = cctxParams.cParams;
params.fParams = cctxParams.fParams;
DEBUGLOG(4, "ZSTDMT_initCStream_internal");
/* params are supposed to be fully validated at this point */
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
assert(!((dict) && (cdict))); /* either dict or cdict, not both */
/* TODO: Set stuff to 0 to preserve old semantics. */
ZSTDMT_zeroCCtxParams(&cctxParams);
if (zcs->nbThreads==1) {
DEBUGLOG(4, "single thread mode");
return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0],
dict, dictSize, cdict,
params, pledgedSrcSize);
return ZSTD_initCStream_internal_opaque(zcs->cctxPool->cctx[0],
dict, dictSize, cdict,
cctxParams, pledgedSrcSize);
}
if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */
@ -708,7 +741,7 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
zcs->allJobsCompleted = 1;
}
zcs->params = params;
zcs->params = cctxParams;
zcs->frameContentSize = pledgedSrcSize;
if (dict) {
DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal);
@ -742,6 +775,21 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
zcs->allJobsCompleted = 0;
if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
return 0;
}
/** ZSTDMT_initCStream_internal() :
* internal usage only */
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
const void* dict, size_t dictSize, const ZSTD_CDict* cdict,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
ZSTD_CCtx_params cctxParams = zcs->params;
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
return ZSTDMT_initCStream_internal_opaque(zcs, dict, dictSize, cdict,
cctxParams, pledgedSrcSize);
}
size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
@ -772,7 +820,8 @@ size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
{
if (zcs->nbThreads==1)
return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize);
return ZSTDMT_initCStream_internal_opaque(zcs, NULL, 0, 0, zcs->params,
pledgedSrcSize);
}
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
@ -930,7 +979,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
&& (mtctx->inBuff.filled==0) /* nothing buffered */
&& (endOp==ZSTD_e_end) /* end order */
&& (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */
size_t const cSize = ZSTDMT_compress_advanced(mtctx,
size_t const cSize = ZSTDMT_compress_advanced_opaque(mtctx,
(char*)output->dst + output->pos, output->size - output->pos,
(const char*)input->src + input->pos, input->size - input->pos,
mtctx->cdict, mtctx->params, mtctx->overlapLog);

View File

@ -69,6 +69,15 @@ ZSTDLIB_API size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
const ZSTD_CDict* cdict,
ZSTD_parameters const params,
unsigned overlapLog);
#if 0
ZSTDLIB_API size_t ZSTDMT_compress_advanced_opaque(
ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params* const params,
unsigned overlapLog);
#endif
ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, /* dict can be released after init, a local copy is preserved within zcs */