split compression into smaller blocks

This commit is contained in:
Paul Cruz 2017-07-19 11:23:40 -07:00
parent 338951cd48
commit f1ac518b59

View File

@ -434,44 +434,66 @@ static void* compressionThread(void* arg)
/* adapt compression level */
adaptCompressionLevel(ctx);
/* compress the data */
{
size_t const compressionBlockSize = 4 << 17; /* 128 KB */
unsigned const cLevel = ctx->compressionLevel;
unsigned blockNum = 0;
size_t remaining = job->src.size;
size_t srcPos = 0;
size_t dstPos = 0;
size_t dictPos = 0;
DEBUG(3, "cLevel used: %u\n", cLevel);
DEBUG(3, "compression level used: %u\n", cLevel);
/* begin compression */
{
size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
DEBUG(3, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, cLevel);
size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
DISPLAY("Error: something went wrong while starting compression\n");
signalErrorToThreads(ctx);
return arg;
}
}
/* continue compression */
if (currJob != 0) { /* not first job flush/overwrite the frame header */
size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, 0);
if (ZSTD_isError(hSize)) {
DISPLAY("Error: something went wrong while continuing compression\n");
job->compressedSize = hSize;
signalErrorToThreads(ctx);
return arg;
/* reset compressed size */
job->compressedSize = 0;
while (remaining != 0) {
size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
/* begin compression */
{
size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
DEBUG(3, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize + dictPos, useDictSize, cLevel);
size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
DISPLAY("Error: something went wrong while starting compression\n");
signalErrorToThreads(ctx);
return arg;
}
}
/* continue compression */
if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0);
if (ZSTD_isError(hSize)) {
DISPLAY("Error: something went wrong while continuing compression\n");
job->compressedSize = hSize;
signalErrorToThreads(ctx);
return arg;
}
ZSTD_invalidateRepCodes(ctx->cctx);
}
{
size_t const ret = (job->lastJob && remaining <= compressionBlockSize) ?
ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
if (ZSTD_isError(ret)) {
DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret));
signalErrorToThreads(ctx);
return arg;
}
job->compressedSize += ret;
remaining -= actualBlockSize;
srcPos += actualBlockSize;
dstPos += ret;
dictPos += actualBlockSize;
blockNum++;
}
ZSTD_invalidateRepCodes(ctx->cctx);
}
job->compressedSize = (job->lastJob) ?
ZSTD_compressEnd (ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, job->src.size) :
ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, job->src.size);
if (ZSTD_isError(job->compressedSize)) {
DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(job->compressedSize));
signalErrorToThreads(ctx);
return arg;
}
job->dst.size = job->compressedSize;
}