From 7303ed5d74fa185d6c37228ae4370e983921d26c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 29 Jun 2017 11:30:31 -0700 Subject: [PATCH 01/16] minor : specify compression level in tags --- tests/fullbench.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/fullbench.c b/tests/fullbench.c index 2cdad2e3..251d3ece 100644 --- a/tests/fullbench.c +++ b/tests/fullbench.c @@ -236,14 +236,14 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) switch(benchNb) { case 1: - benchFunction = local_ZSTD_compress; benchName = "ZSTD_compress"; + benchFunction = local_ZSTD_compress; benchName = "ZSTD_compress(1)"; break; case 2: benchFunction = local_ZSTD_decompress; benchName = "ZSTD_decompress"; break; #ifndef ZSTD_DLL_IMPORT case 11: - benchFunction = local_ZSTD_compressContinue; benchName = "ZSTD_compressContinue"; + benchFunction = local_ZSTD_compressContinue; benchName = "ZSTD_compressContinue(1)"; break; case 12: benchFunction = local_ZSTD_compressContinue_extDict; benchName = "ZSTD_compressContinue_extDict"; @@ -259,7 +259,7 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) break; #endif case 41: - benchFunction = local_ZSTD_compressStream; benchName = "ZSTD_compressStream"; + benchFunction = local_ZSTD_compressStream; benchName = "ZSTD_compressStream(1)"; break; case 42: benchFunction = local_ZSTD_decompressStream; benchName = "ZSTD_decompressStream"; From 2e84bec9ac6ed31b8cf809e1198b2860c76624ed Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 29 Jun 2017 13:03:10 -0700 Subject: [PATCH 02/16] updated fullbench to also measure ZSTD_compress_generic() will make it possible to visualize optimization opportunity for ZSTD_e_end --- tests/fullbench.c | 56 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/tests/fullbench.c b/tests/fullbench.c index 251d3ece..1c1807a5 100644 --- a/tests/fullbench.c +++ b/tests/fullbench.c @@ -153,6 +153,39 @@ size_t local_ZSTD_compressStream(void* dst, size_t dstCapacity, void* buff2, con return buffOut.pos; } +static size_t local_ZSTD_compress_generic_end(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize) +{ + ZSTD_outBuffer buffOut; + ZSTD_inBuffer buffIn; + (void)buff2; + ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); + buffOut.dst = dst; + buffOut.size = dstCapacity; + buffOut.pos = 0; + buffIn.src = src; + buffIn.size = srcSize; + buffIn.pos = 0; + ZSTD_compress_generic(g_cstream, &buffOut, &buffIn, ZSTD_e_end); + return buffOut.pos; +} + +static size_t local_ZSTD_compress_generic_continue(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize) +{ + ZSTD_outBuffer buffOut; + ZSTD_inBuffer buffIn; + (void)buff2; + ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); + buffOut.dst = dst; + buffOut.size = dstCapacity; + buffOut.pos = 0; + buffIn.src = src; + buffIn.size = srcSize; + buffIn.pos = 0; + ZSTD_compress_generic(g_cstream, &buffOut, &buffIn, ZSTD_e_continue); + ZSTD_compress_generic(g_cstream, &buffOut, &buffIn, ZSTD_e_end); + return buffOut.pos; +} + static ZSTD_DStream* g_dstream= NULL; static size_t local_ZSTD_decompressStream(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize) { @@ -264,6 +297,12 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) case 42: benchFunction = local_ZSTD_decompressStream; benchName = "ZSTD_decompressStream"; break; + case 51: + benchFunction = local_ZSTD_compress_generic_continue; benchName = "ZSTD_compress_generic, continue"; + break; + case 52: + benchFunction = local_ZSTD_compress_generic_end; benchName = "ZSTD_compress_generic, end"; + break; default : return 0; } @@ -276,6 +315,10 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) free(dstBuff); free(buff2); return 12; } + if (g_zcc==NULL) g_zcc = ZSTD_createCCtx(); + if (g_zdc==NULL) g_zdc = ZSTD_createDCtx(); + if (g_cstream==NULL) g_cstream = ZSTD_createCStream(); + if (g_dstream==NULL) g_dstream = ZSTD_createDStream(); /* Preparation */ switch(benchNb) @@ -284,18 +327,10 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) g_cSize = ZSTD_compress(buff2, dstBuffSize, src, srcSize, 1); break; #ifndef ZSTD_DLL_IMPORT - case 11 : - if (g_zcc==NULL) g_zcc = ZSTD_createCCtx(); - break; - case 12 : - if (g_zcc==NULL) g_zcc = ZSTD_createCCtx(); - break; case 13 : - if (g_zdc==NULL) g_zdc = ZSTD_createDCtx(); g_cSize = ZSTD_compress(buff2, dstBuffSize, src, srcSize, 1); break; case 31: /* ZSTD_decodeLiteralsBlock */ - if (g_zdc==NULL) g_zdc = ZSTD_createDCtx(); { blockProperties_t bp; ZSTD_frameHeader zfp; size_t frameHeaderSize, skippedSize; @@ -313,7 +348,6 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) break; } case 32: /* ZSTD_decodeSeqHeaders */ - if (g_zdc==NULL) g_zdc = ZSTD_createDCtx(); { blockProperties_t bp; ZSTD_frameHeader zfp; const BYTE* ip = dstBuff; @@ -342,11 +376,7 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) case 31: goto _cleanOut; #endif - case 41 : - if (g_cstream==NULL) g_cstream = ZSTD_createCStream(); - break; case 42 : - if (g_dstream==NULL) g_dstream = ZSTD_createDStream(); g_cSize = ZSTD_compress(buff2, dstBuffSize, src, srcSize, 1); break; From a3d9926c405d664519089e20884beff0c5970e74 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 29 Jun 2017 14:44:49 -0700 Subject: [PATCH 03/16] compression optimization opportunity switch to single-pass mode directly into output buffer when outputSize >= ZSTD_compressBound(inputSize). Speed gains observed with fullbench (~+15% on level 1) --- lib/common/zstd_internal.h | 1 + lib/compress/zstd_compress.c | 33 ++++++++++++++++++++++++++------- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/lib/common/zstd_internal.h b/lib/common/zstd_internal.h index b2c7cfcd..59761b13 100644 --- a/lib/common/zstd_internal.h +++ b/lib/common/zstd_internal.h @@ -72,6 +72,7 @@ #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) # include /* recommended values for ZSTD_DEBUG display levels : + * 1 : no display, enables assert() only * 2 : reserved for currently active debugging path * 3 : events once per object lifetime (CCtx, CDict) * 4 : events once per frame diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index f492d92b..d4c029e9 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -2954,6 +2954,8 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx, const BYTE* const ip = (const BYTE*) src; size_t fhSize = 0; + DEBUGLOG(5, "ZSTD_compressContinue_internal"); + DEBUGLOG(5, "stage: %u", cctx->stage); if (cctx->stage==ZSTDcs_created) return ERROR(stage_wrong); /* missing init (ZSTD_compressBegin) */ if (frame && (cctx->stage==ZSTDcs_init)) { @@ -3211,9 +3213,9 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, ZSTD_parameters params, U64 pledgedSrcSize, ZSTD_buffered_policy_e zbuff) { - DEBUGLOG(5, "ZSTD_compressBegin_internal"); - DEBUGLOG(5, "dict ? %s", dict ? "dict" : cdict ? "cdict" : "none"); - DEBUGLOG(5, "dictMode : %u", (U32)dictMode); + DEBUGLOG(4, "ZSTD_compressBegin_internal"); + DEBUGLOG(4, "dict ? %s", dict ? "dict" : cdict ? "cdict" : "none"); + DEBUGLOG(4, "dictMode : %u", (U32)dictMode); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ @@ -3633,7 +3635,7 @@ static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, ZSTD_parameters params, unsigned long long pledgedSrcSize) { - DEBUGLOG(5, "ZSTD_resetCStream_internal"); + DEBUGLOG(4, "ZSTD_resetCStream_internal"); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ @@ -3766,6 +3768,9 @@ MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, return length; } +/** ZSTD_compressStream_generic(): + * internal function for all *compressStream*() variants and *compress_generic() + * @return : hint size for next input */ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input, @@ -3780,7 +3785,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, U32 someMoreWork = 1; /* check expectations */ - DEBUGLOG(5, "ZSTD_compressStream_generic"); + DEBUGLOG(5, "ZSTD_compressStream_generic, order %u", (U32)flushMode); assert(zcs->inBuff != NULL); assert(zcs->inBuffSize>0); assert(zcs->outBuff!= NULL); @@ -3796,6 +3801,20 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, return ERROR(init_missing); case zcss_load: + if ( (flushMode == ZSTD_e_end) + && ((size_t)(oend-op) >= ZSTD_compressBound(iend-ip)) + && (zcs->inBuffPos == 0) ) { + /* shortcut to compression pass directly into output buffer */ + size_t const cSize = ZSTD_compressEnd(zcs, + op, oend-op, ip, iend-ip); + DEBUGLOG(4, "ZSTD_compressEnd : %u", (U32)cSize); + if (ZSTD_isError(cSize)) return cSize; + ip = iend; + op += cSize; + zcs->frameEnded = 1; + ZSTD_startNewCompression(zcs); + someMoreWork = 0; break; + } /* complete inBuffer */ { size_t const toLoad = zcs->inBuffTarget - zcs->inBuffPos; size_t const loaded = ZSTD_limitCopy( @@ -3922,8 +3941,8 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, if (input->pos > input->size) return ERROR(GENERIC); assert(cctx!=NULL); + /* transparent initialization stage */ if (cctx->streamStage == zcss_init) { - /* transparent reset */ const void* const prefix = cctx->prefix; size_t const prefixSize = cctx->prefixSize; ZSTD_parameters params = cctx->requestedParams; @@ -3944,6 +3963,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, CHECK_F( ZSTD_resetCStream_internal(cctx, prefix, prefixSize, cctx->dictMode, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); } } + /* compression stage */ #ifdef ZSTD_MULTITHREAD if (cctx->nbThreads > 1) { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); @@ -3956,7 +3976,6 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, } #endif - DEBUGLOG(5, "calling ZSTD_compressStream_generic(%i,...)", endOp); CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) ); DEBUGLOG(5, "completed ZSTD_compress_generic"); return cctx->outBuffContentSize - cctx->outBuffFlushedSize; /* remaining to flush */ From afb0aca7392a1407b62edd9f5fc6bef8e7b1a669 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 29 Jun 2017 18:19:09 -0700 Subject: [PATCH 04/16] zstreamtest : big tests are only enabled in 64-bits mode to avoid requesting too much memory in 32-bits mode during MT tests --- Makefile | 12 +++++++++--- tests/zstreamtest.c | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 9afdf658..ac3034c9 100644 --- a/Makefile +++ b/Makefile @@ -117,30 +117,36 @@ CMAKE_PARAMS = -DZSTD_BUILD_CONTRIB:BOOL=ON -DZSTD_BUILD_STATIC:BOOL=ON -DZSTD_B list: @$(MAKE) -pRrq -f $(lastword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F: '/^# File/,/^# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}' | sort | egrep -v -e '^[^[:alnum:]]' -e '^$@$$' | xargs -.PHONY: install uninstall travis-install clangtest gpptest armtest usan asan uasan +.PHONY: install clangtest gpptest armtest usan asan uasan install: @$(MAKE) -C $(ZSTDDIR) $@ @$(MAKE) -C $(PRGDIR) $@ +.PHONY: uninstall uninstall: @$(MAKE) -C $(ZSTDDIR) $@ @$(MAKE) -C $(PRGDIR) $@ +.PHONY: travis-install travis-install: $(MAKE) install PREFIX=~/install_test_dir +.PHONY: gppbuild gppbuild: clean g++ -v CC=g++ $(MAKE) -C programs all CFLAGS="-O3 -Wall -Wextra -Wundef -Wshadow -Wcast-align -Werror" +.PHONY: gcc5build gcc5build: clean gcc-5 -v CC=gcc-5 $(MAKE) all MOREFLAGS="-Werror" +.PHONY: gcc6build gcc6build: clean gcc-6 -v CC=gcc-6 $(MAKE) all MOREFLAGS="-Werror" +.PHONY: clangbuild clangbuild: clean clang -v CXX=clang++ CC=clang $(MAKE) all MOREFLAGS="-Werror -Wconversion -Wno-sign-conversion -Wdocumentation" @@ -227,7 +233,7 @@ msan: clean $(MAKE) test CC=clang MOREFLAGS="-g -fsanitize=memory -fno-omit-frame-pointer" HAVE_LZMA=0 # datagen.c fails this test for no obvious reason msan-%: clean - LDFLAGS=-fuse-ld=gold MOREFLAGS="-fno-sanitize-recover=all -fsanitize=memory -fno-omit-frame-pointer" FUZZER_FLAGS=--no-big-tests $(MAKE) -C $(TESTDIR) HAVE_LZMA=0 $* + LDFLAGS=-fuse-ld=gold MOREFLAGS="-g -fno-sanitize-recover=all -fsanitize=memory -fno-omit-frame-pointer" FUZZER_FLAGS=--no-big-tests $(MAKE) -C $(TESTDIR) HAVE_LZMA=0 $* asan32: clean $(MAKE) -C $(TESTDIR) test32 CC=clang MOREFLAGS="-g -fsanitize=address" @@ -236,7 +242,7 @@ uasan: clean $(MAKE) test CC=clang MOREFLAGS="-g -fno-sanitize-recover=all -fsanitize-recover=signed-integer-overflow -fsanitize=address,undefined" uasan-%: clean - LDFLAGS=-fuse-ld=gold MOREFLAGS="-Og -fno-sanitize-recover=all -fsanitize-recover=signed-integer-overflow -fsanitize=address,undefined" $(MAKE) -C $(TESTDIR) $* + LDFLAGS=-fuse-ld=gold MOREFLAGS="-g -fno-sanitize-recover=all -fsanitize-recover=signed-integer-overflow -fsanitize=address,undefined" $(MAKE) -C $(TESTDIR) $* tsan-%: clean LDFLAGS=-fuse-ld=gold MOREFLAGS="-g -fno-sanitize-recover=all -fsanitize=thread" $(MAKE) -C $(TESTDIR) $* FUZZER_FLAGS=--no-big-tests diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index f16cc4b0..9b2b8eaf 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -1540,7 +1540,7 @@ int main(int argc, const char** argv) int proba = FUZ_COMPRESSIBILITY_DEFAULT; int result=0; int mainPause = 0; - int bigTests = 1; + int bigTests = (sizeof(size_t) == 8); e_api selected_api = simple_api; const char* const programName = argv[0]; ZSTD_customMem const customMem = { allocFunction, freeFunction, NULL }; From e7e5a8cef7b6e1ac3bcbb7a25a4401072fd02a36 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 29 Jun 2017 18:56:24 -0700 Subject: [PATCH 05/16] made fullbench compatible with multi-threading fullbench 61/62 measure speed of ZSTD_compress_generic with 2 threads --- tests/Makefile | 10 ++--- tests/fullbench.c | 96 +++++++++++++++++++++++++++++++++-------------- 2 files changed, 72 insertions(+), 34 deletions(-) diff --git a/tests/Makefile b/tests/Makefile index 5c240800..0b8deec2 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -94,11 +94,11 @@ zstd-nolegacy: gzstd: $(MAKE) -C $(PRGDIR) $@ -fullbench : $(ZSTD_FILES) $(PRGDIR)/datagen.c fullbench.c - $(CC) $(FLAGS) $^ -o $@$(EXT) - -fullbench32 : $(ZSTD_FILES) $(PRGDIR)/datagen.c fullbench.c - $(CC) -m32 $(FLAGS) $^ -o $@$(EXT) +fullbench32: CPPFLAGS += -m32 +fullbench fullbench32 : CPPFLAGS += $(MULTITHREAD_CPP) +fullbench fullbench32 : LDFLAGS += $(MULTITHREAD_LD) +fullbench fullbench32 : $(ZSTD_FILES) $(PRGDIR)/datagen.c fullbench.c + $(CC) $(FLAGS) $^ -o $@$(EXT) fullbench-lib: $(PRGDIR)/datagen.c fullbench.c $(MAKE) -C $(ZSTDDIR) libzstd.a diff --git a/tests/fullbench.c b/tests/fullbench.c index 1c1807a5..176e5a62 100644 --- a/tests/fullbench.c +++ b/tests/fullbench.c @@ -14,9 +14,8 @@ #include "util.h" /* Compiler options, UTIL_GetFileSize */ #include /* malloc */ #include /* fprintf, fopen, ftello64 */ -#include /* clock_t, clock, CLOCKS_PER_SEC */ -#include "mem.h" +#include "mem.h" /* U32 */ #ifndef ZSTD_DLL_IMPORT #include "zstd_internal.h" /* ZSTD_blockHeaderSize, blockType_e, KB, MB */ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressBegin, ZSTD_compressContinue, etc. */ @@ -26,7 +25,8 @@ #define GB *(1U<<30) typedef enum { bt_raw, bt_rle, bt_compressed, bt_reserved } blockType_e; #endif -#include "zstd.h" /* ZSTD_VERSION_STRING */ +#include "zstd.h" /* ZSTD_versionString */ +#include "util.h" /* time functions */ #include "datagen.h" @@ -35,7 +35,7 @@ **************************************/ #define PROGRAM_DESCRIPTION "Zstandard speed analyzer" #define AUTHOR "Yann Collet" -#define WELCOME_MESSAGE "*** %s %s %i-bits, by %s (%s) ***\n", PROGRAM_DESCRIPTION, ZSTD_VERSION_STRING, (int)(sizeof(void*)*8), AUTHOR, __DATE__ +#define WELCOME_MESSAGE "*** %s %s %i-bits, by %s (%s) ***\n", PROGRAM_DESCRIPTION, ZSTD_versionString(), (int)(sizeof(void*)*8), AUTHOR, __DATE__ #define NBLOOPS 6 #define TIMELOOP_S 2 @@ -69,12 +69,6 @@ static void BMK_SetNbIterations(U32 nbLoops) /*_******************************************************* * Private functions *********************************************************/ -static clock_t BMK_clockSpan( clock_t clockStart ) -{ - return clock() - clockStart; /* works even if overflow, span limited to <= ~30mn */ -} - - static size_t BMK_findMaxMem(U64 requiredMem) { size_t const step = 64 MB; @@ -186,6 +180,41 @@ static size_t local_ZSTD_compress_generic_continue(void* dst, size_t dstCapacity return buffOut.pos; } +static size_t local_ZSTD_compress_generic_T2_end(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize) +{ + ZSTD_outBuffer buffOut; + ZSTD_inBuffer buffIn; + (void)buff2; + ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); + ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2); + buffOut.dst = dst; + buffOut.size = dstCapacity; + buffOut.pos = 0; + buffIn.src = src; + buffIn.size = srcSize; + buffIn.pos = 0; + while (ZSTD_compress_generic(g_cstream, &buffOut, &buffIn, ZSTD_e_end)) {} + return buffOut.pos; +} + +static size_t local_ZSTD_compress_generic_T2_continue(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize) +{ + ZSTD_outBuffer buffOut; + ZSTD_inBuffer buffIn; + (void)buff2; + ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); + ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2); + buffOut.dst = dst; + buffOut.size = dstCapacity; + buffOut.pos = 0; + buffIn.src = src; + buffIn.size = srcSize; + buffIn.pos = 0; + ZSTD_compress_generic(g_cstream, &buffOut, &buffIn, ZSTD_e_continue); + while(ZSTD_compress_generic(g_cstream, &buffOut, &buffIn, ZSTD_e_end)) {} + return buffOut.pos; +} + static ZSTD_DStream* g_dstream= NULL; static size_t local_ZSTD_decompressStream(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize) { @@ -269,39 +298,45 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) switch(benchNb) { case 1: - benchFunction = local_ZSTD_compress; benchName = "ZSTD_compress(1)"; + benchFunction = local_ZSTD_compress; benchName = "compress(1)"; break; case 2: - benchFunction = local_ZSTD_decompress; benchName = "ZSTD_decompress"; + benchFunction = local_ZSTD_decompress; benchName = "decompress"; break; #ifndef ZSTD_DLL_IMPORT case 11: - benchFunction = local_ZSTD_compressContinue; benchName = "ZSTD_compressContinue(1)"; + benchFunction = local_ZSTD_compressContinue; benchName = "compressContinue(1)"; break; case 12: - benchFunction = local_ZSTD_compressContinue_extDict; benchName = "ZSTD_compressContinue_extDict"; + benchFunction = local_ZSTD_compressContinue_extDict; benchName = "compressContinue_extDict"; break; case 13: - benchFunction = local_ZSTD_decompressContinue; benchName = "ZSTD_decompressContinue"; + benchFunction = local_ZSTD_decompressContinue; benchName = "decompressContinue"; break; case 31: - benchFunction = local_ZSTD_decodeLiteralsBlock; benchName = "ZSTD_decodeLiteralsBlock"; + benchFunction = local_ZSTD_decodeLiteralsBlock; benchName = "decodeLiteralsBlock"; break; case 32: - benchFunction = local_ZSTD_decodeSeqHeaders; benchName = "ZSTD_decodeSeqHeaders"; + benchFunction = local_ZSTD_decodeSeqHeaders; benchName = "decodeSeqHeaders"; break; #endif case 41: - benchFunction = local_ZSTD_compressStream; benchName = "ZSTD_compressStream(1)"; + benchFunction = local_ZSTD_compressStream; benchName = "compressStream(1)"; break; case 42: - benchFunction = local_ZSTD_decompressStream; benchName = "ZSTD_decompressStream"; + benchFunction = local_ZSTD_decompressStream; benchName = "decompressStream"; break; case 51: - benchFunction = local_ZSTD_compress_generic_continue; benchName = "ZSTD_compress_generic, continue"; + benchFunction = local_ZSTD_compress_generic_continue; benchName = "compress_generic, continue"; break; case 52: - benchFunction = local_ZSTD_compress_generic_end; benchName = "ZSTD_compress_generic, end"; + benchFunction = local_ZSTD_compress_generic_end; benchName = "compress_generic, end"; + break; + case 61: + benchFunction = local_ZSTD_compress_generic_T2_continue; benchName = "compress_generic, -T2, continue"; + break; + case 62: + benchFunction = local_ZSTD_compress_generic_T2_end; benchName = "compress_generic, -T2, end"; break; default : return 0; @@ -389,22 +424,25 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) { size_t i; for (i=0; i %s !! \n", benchName, ZSTD_getErrorName(benchResult)); exit(1); } } - { clock_t const clockTotal = BMK_clockSpan(clockStart); - double const averageTime = (double)clockTotal / CLOCKS_PER_SEC / nbRounds; + { U64 const clockSpanMicro = UTIL_clockSpanMicro(clockStart, ticksPerSecond); + double const averageTime = (double)clockSpanMicro / TIME_SEC_MICROSEC / nbRounds; if (averageTime < bestTime) bestTime = averageTime; DISPLAY("%2i- %-30.30s : %7.1f MB/s (%9u)\r", loopNb, benchName, (double)srcSize / (1 MB) / bestTime, (U32)benchResult); } } } From b5bb7c6d952d4cf1890afde07d55ca286ad2f7dd Mon Sep 17 00:00:00 2001 From: cyan4973 Date: Thu, 29 Jun 2017 19:59:37 -0700 Subject: [PATCH 06/16] fixed Visual compilation of fullbench-dll --- tests/fullbench.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/fullbench.c b/tests/fullbench.c index 176e5a62..0f287348 100644 --- a/tests/fullbench.c +++ b/tests/fullbench.c @@ -18,13 +18,13 @@ #include "mem.h" /* U32 */ #ifndef ZSTD_DLL_IMPORT #include "zstd_internal.h" /* ZSTD_blockHeaderSize, blockType_e, KB, MB */ - #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressBegin, ZSTD_compressContinue, etc. */ #else #define KB *(1 <<10) #define MB *(1 <<20) #define GB *(1U<<30) typedef enum { bt_raw, bt_rle, bt_compressed, bt_reserved } blockType_e; #endif +#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressBegin, ZSTD_compressContinue, etc. */ #include "zstd.h" /* ZSTD_versionString */ #include "util.h" /* time functions */ #include "datagen.h" @@ -110,8 +110,9 @@ size_t local_ZSTD_decompress(void* dst, size_t dstSize, void* buff2, const void* return ZSTD_decompress(dst, dstSize, buff2, g_cSize); } -#ifndef ZSTD_DLL_IMPORT static ZSTD_DCtx* g_zdc = NULL; + +#ifndef ZSTD_DLL_IMPORT extern size_t ZSTD_decodeLiteralsBlock(ZSTD_DCtx* ctx, const void* src, size_t srcSize); size_t local_ZSTD_decodeLiteralsBlock(void* dst, size_t dstSize, void* buff2, const void* src, size_t srcSize) { @@ -232,8 +233,9 @@ static size_t local_ZSTD_decompressStream(void* dst, size_t dstCapacity, void* b return buffOut.pos; } -#ifndef ZSTD_DLL_IMPORT static ZSTD_CCtx* g_zcc = NULL; + +#ifndef ZSTD_DLL_IMPORT size_t local_ZSTD_compressContinue(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize) { (void)buff2; From d5c046c609c77bd2ab8035d7b0e11d2084272a36 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 30 Jun 2017 14:51:01 -0700 Subject: [PATCH 07/16] implemented shortcut for zstd_compress_generic() in MT mode added ZSTDMT_compress_advanced() API --- lib/common/zstd_internal.h | 6 ++ lib/compress/zstd_compress.c | 8 +- lib/compress/zstdmt_compress.c | 146 ++++++++++++++++++++++++++------- lib/compress/zstdmt_compress.h | 8 ++ 4 files changed, 133 insertions(+), 35 deletions(-) diff --git a/lib/common/zstd_internal.h b/lib/common/zstd_internal.h index 59761b13..a0cc938a 100644 --- a/lib/common/zstd_internal.h +++ b/lib/common/zstd_internal.h @@ -333,6 +333,12 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, ZSTD_parameters params, unsigned long long pledgedSrcSize); +/*! ZSTD_initCStream_internal() : + * Private use only. To be called from zstdmt_compress.c in single-thread mode. */ +size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, + ZSTD_outBuffer* output, + ZSTD_inBuffer* input, + ZSTD_EndDirective const flushMode); /*! ZSTD_getParamsFromCDict() : * as the name implies */ diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d4c029e9..f2afe625 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3771,10 +3771,10 @@ MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, /** ZSTD_compressStream_generic(): * internal function for all *compressStream*() variants and *compress_generic() * @return : hint size for next input */ -static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, - ZSTD_outBuffer* output, - ZSTD_inBuffer* input, - ZSTD_EndDirective const flushMode) +size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, + ZSTD_outBuffer* output, + ZSTD_inBuffer* input, + ZSTD_EndDirective const flushMode) { const char* const istart = (const char*)input->src; const char* const iend = istart + input->size; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index d5f08c76..f7ee7502 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -261,7 +261,7 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; const void* const src = (const char*)job->srcStart + job->dictSize; buffer_t const dstBuff = job->dstBuff; - DEBUGLOG(4, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", + DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); if (job->cdict) { /* should only happen for first segment */ size_t const initError = ZSTD_compressBegin_usingCDict_advanced(job->cctx, job->cdict, job->params.fParams, job->fullFrameSize); @@ -285,7 +285,7 @@ void ZSTDMT_compressChunk(void* jobDescription) job->cSize = (job->lastChunk) ? ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); - DEBUGLOG(4, "compressed %u bytes into %u bytes (first:%u) (last:%u)", + DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); @@ -374,10 +374,11 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads) } /* ZSTDMT_releaseAllJobResources() : - * Ensure all workers are killed first. */ + * note : ensure all workers are killed first ! */ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) { unsigned jobID; + DEBUGLOG(4, "ZSTDMT_releaseAllJobResources"); for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff); mtctx->jobs[jobID].dstBuff = g_nullBuffer; @@ -439,14 +440,14 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ -size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, +size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, - int compressionLevel) + const ZSTD_CDict* cdict, + ZSTD_parameters const params, + unsigned overlapRLog) { - ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); - U32 const overlapLog = (compressionLevel >= ZSTD_maxCLevel()) ? 0 : 3; - size_t const overlapSize = (size_t)1 << (params.cParams.windowLog - overlapLog); + size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2); unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + 1; unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads); @@ -459,11 +460,10 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, DEBUGLOG(4, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); - params.fParams.contentSizeFlag = 1; if (nbChunks==1) { /* fallback to single-thread mode */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; - return ZSTD_compressCCtx(cctx, dst, dstCapacity, src, srcSize, compressionLevel); + return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params); } { unsigned u; @@ -485,8 +485,11 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].dictSize = dictSize; mtctx->jobs[u].srcSize = chunkSize; + mtctx->jobs[u].cdict = mtctx->nextJobID==0 ? cdict : NULL; mtctx->jobs[u].fullFrameSize = srcSize; mtctx->jobs[u].params = params; + /* do not calculate checksum within sections, but write it in header for first section */ + if (mtctx->nextJobID) mtctx->jobs[u].params.fParams.checksumFlag = 0; mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctx = cctx; mtctx->jobs[u].firstChunk = (u==0); @@ -495,8 +498,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; - DEBUGLOG(4, "posting job %u (%u bytes)", u, (U32)chunkSize); - DEBUG_PRINTHEX(5, mtctx->jobs[u].srcStart, 12); + DEBUGLOG(5, "posting job %u (%u bytes)", u, (U32)chunkSize); + DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12); POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); frameStartPos += chunkSize; @@ -526,8 +529,10 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, if (chunkID) { /* note : chunk 0 is already written directly into dst */ if (!error) memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap if chunk decompressed within dst */ - if (chunkID >= compressWithinDst) /* otherwise, it decompresses within dst */ + if (chunkID >= compressWithinDst) { /* otherwise, it decompresses within dst */ + DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst); ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); + } mtctx->jobs[chunkID].dstBuff = g_nullBuffer; } dstPos += cSize ; @@ -536,7 +541,18 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); return error ? error : dstPos; } +} + +size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + int compressionLevel) +{ + U32 const overlapRLog = (compressionLevel >= ZSTD_maxCLevel()) ? 0 : 3; + ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); + params.fParams.contentSizeFlag = 1; + return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapRLog); } @@ -546,6 +562,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) { + DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); while (zcs->doneJobID < zcs->nextJobID) { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); @@ -559,17 +576,19 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) } +/** ZSTDMT_initCStream_internal() : + * internal usage only */ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, const ZSTD_CDict* cdict, ZSTD_parameters params, unsigned long long pledgedSrcSize) { - DEBUGLOG(5, "ZSTDMT_initCStream_internal"); + DEBUGLOG(4, "ZSTDMT_initCStream_internal"); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ if (zcs->nbThreads==1) { - DEBUGLOG(5, "single thread mode"); + DEBUGLOG(4, "single thread mode"); return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], dict, dictSize, cdict, params, pledgedSrcSize); @@ -584,6 +603,7 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->params = params; zcs->frameContentSize = pledgedSrcSize; if (dict) { + DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal); ZSTD_freeCDict(zcs->cdictLocal); zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, 0 /* byRef */, ZSTD_dm_auto, /* note : a loadPrefix becomes an internal CDict */ @@ -591,18 +611,19 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->cdict = zcs->cdictLocal; if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); } else { + DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal); ZSTD_freeCDict(zcs->cdictLocal); zcs->cdictLocal = NULL; zcs->cdict = cdict; } zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog); - DEBUGLOG(5, "overlapRLog : %u ", zcs->overlapRLog); - DEBUGLOG(5, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); + DEBUGLOG(4, "overlapRLog : %u ", zcs->overlapRLog); + DEBUGLOG(4, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); - DEBUGLOG(5, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); + DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); zcs->marginSize = zcs->targetSectionSize >> 2; zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize; zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); @@ -797,6 +818,75 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi } } +/** ZSTDMT_compressStream_generic() : + * internal use only + * assumption : output and input are valid (pos <= size) + * @return : minimum amount of data remaining to flush, 0 if none */ +size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, + ZSTD_outBuffer* output, + ZSTD_inBuffer* input, + ZSTD_EndDirective endOp) +{ + size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize + mtctx->marginSize; + assert(output->pos <= output->size); + assert(input->pos <= input->size); + if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) { + /* current frame being ended. Only flush/end are allowed. Or start new job with init */ + return ERROR(stage_wrong); + } + if (mtctx->nbThreads==1) { + return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp); + } + if ( (mtctx->nextJobID==0) /* just started */ + && (mtctx->inBuff.filled==0) /* nothing buffered yet */ + && (endOp==ZSTD_e_end) /* end order, immediately at beginning */ + && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) /* enough room */ + && (mtctx->cdict==NULL) ) { /* no dictionary */ + size_t const cSize = ZSTDMT_compress_advanced(mtctx, + (char*)output->dst + output->pos, output->size - output->pos, + (const char*)input->src + input->pos, input->size - input->pos, + mtctx->cdict, mtctx->params, mtctx->overlapRLog); + if (ZSTD_isError(cSize)) return cSize; + input->pos = input->size; + output->pos += cSize; + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); /* was allocated in initStream */ + mtctx->allJobsCompleted = 1; + mtctx->frameEnded = 1; + return 0; + } + + /* fill input buffer */ + { size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); + memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); + input->pos += toLoad; + mtctx->inBuff.filled += toLoad; + } + + if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ + && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { /* avoid overwriting job round buffer */ + CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) ); + } + + /* check for potential compressed data ready to be flushed */ + CHECK_F( ZSTDMT_flushNextJob(mtctx, output, (mtctx->inBuff.filled == mtctx->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */ + + if (input->pos < input->size) /* input not consumed : do not flush yet */ + endOp = ZSTD_e_continue; + + switch(endOp) + { + case ZSTD_e_flush: + return ZSTDMT_flushStream(mtctx, output); + case ZSTD_e_end: + return ZSTDMT_endStream(mtctx, output); + case ZSTD_e_continue: + return 1; + default: + return ERROR(GENERIC); /* invalid endDirective */ + } +} + + size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize; @@ -833,21 +923,13 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp { size_t const srcSize = zcs->inBuff.filled - zcs->dictSize; - if (srcSize) - DEBUGLOG(5, "ZSTDMT_flushStream_internal : %u bytes left to compress", - (U32)srcSize); if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { - DEBUGLOG(5, "create new job with %u bytes to compress", (U32)srcSize); - DEBUGLOG(5, "end order : %u", endFrame); CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); - DEBUGLOG(5, "resulting zcs->frameEnded : %u", zcs->frameEnded); } /* check if there is any data available to flush */ - DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u", - zcs->doneJobID, zcs->nextJobID); - return ZSTDMT_flushNextJob(zcs, output, 1 /*blockToFlush */); + return ZSTDMT_flushNextJob(zcs, output, 1 /* blockToFlush */); } @@ -861,22 +943,24 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { - DEBUGLOG(5, "ZSTDMT_endStream"); + DEBUGLOG(4, "ZSTDMT_endStream"); if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */); } -size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, +size_t ZSTDMT_compressStream_generic2(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { + DEBUGLOG(5, "ZSTDMT_compressStream_generic"); DEBUGLOG(5, "in: pos:%u / size:%u ; endOp=%u", (U32)input->pos, (U32)input->size, (U32)endOp); - if (input->pos < input->size) /* exclude final flushes */ + if (input->pos < input->size) /* some input to consume */ CHECK_F(ZSTDMT_compressStream(mtctx, output, input)); - if (input->pos < input->size) endOp = ZSTD_e_continue; + if (input->pos < input->size) /* input not consumed : do not flush yet */ + endOp = ZSTD_e_continue; switch(endOp) { case ZSTD_e_flush: diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index d36914de..fad63b6d 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -44,6 +44,7 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, int compressionLevel); + /* === Streaming functions === */ ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); @@ -61,6 +62,13 @@ ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); # define ZSTDMT_SECTION_SIZE_MIN (1U << 20) /* 1 MB - Minimum size of each compression job */ #endif +ZSTDLIB_API size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + const ZSTD_CDict* cdict, + ZSTD_parameters const params, + unsigned overlapRLog); + ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /* dict can be released after init, a local copy is preserved within zcs */ ZSTD_parameters params, From d8b33a598d9b707732969bd3da02b0fea1b1985c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 30 Jun 2017 15:44:57 -0700 Subject: [PATCH 08/16] Optimized ZSTDMT single-pass mode speed on large sources by ensuring job sizes remain "not too large" --- lib/compress/zstdmt_compress.c | 55 +++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index f7ee7502..c96ef482 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -331,14 +331,20 @@ struct ZSTDMT_CCtx_s { const ZSTD_CDict* cdict; }; +static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) +{ + U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; + U32 const nbJobs = 1 << nbJobsLog2; + *nbJobsPtr = nbJobs; + return (ZSTDMT_jobDescription*) ZSTD_calloc( + nbJobs * sizeof(ZSTDMT_jobDescription), cMem); +} + ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) { ZSTDMT_CCtx* mtctx; - U32 const minNbJobs = nbThreads + 2; - U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1; - U32 const nbJobs = 1 << nbJobsLog2; - DEBUGLOG(5, "nbThreads: %u ; minNbJobs: %u ; nbJobsLog2: %u ; nbJobs: %u", - nbThreads, minNbJobs, nbJobsLog2, nbJobs); + U32 nbJobs = nbThreads + 2; + DEBUGLOG(3, "ZSTDMT_createCCtx_advanced"); if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL)) @@ -349,13 +355,12 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) if (!mtctx) return NULL; mtctx->cMem = cMem; mtctx->nbThreads = nbThreads; - mtctx->jobIDMask = nbJobs - 1; mtctx->allJobsCompleted = 1; mtctx->sectionSize = 0; mtctx->overlapRLog = 3; mtctx->factory = POOL_create(nbThreads, 1); - mtctx->jobs = (ZSTDMT_jobDescription*)ZSTD_calloc( - nbJobs * sizeof(*mtctx->jobs), cMem); + mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); + mtctx->jobIDMask = nbJobs - 1; mtctx->buffPool = ZSTDMT_createBufferPool(nbThreads, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); if (!mtctx->factory | !mtctx->jobs | !mtctx->buffPool | !mtctx->cctxPool) { @@ -448,24 +453,39 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, unsigned overlapRLog) { size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); - size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2); - unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + 1; - unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads); + size_t const chunkSizeTarget = (size_t)1 << (params.cParams.windowLog + 2); + size_t const chunkMaxSize = chunkSizeTarget << 2; + size_t const passSizeMax = chunkMaxSize * mtctx->nbThreads; + unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; + unsigned nbChunksLarge = multiplier * mtctx->nbThreads; + unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; + unsigned nbChunksSmall = MIN(nbChunksMax, mtctx->nbThreads); + unsigned nbChunks = (multiplier>1) ? nbChunksLarge : nbChunksSmall; size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; - size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ - size_t remainingSrcSize = srcSize; + size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; + size_t remainingSrcSize = srcSize; unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ size_t frameStartPos = 0, dstBufferPos = 0; - DEBUGLOG(4, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); + DEBUGLOG(4, "windowLog : %2u => chunkSizeTarget : %u bytes ", params.cParams.windowLog, (U32)chunkSizeTarget); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); + assert(avgChunkSize >= 256 KB); /* required for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B) */ if (nbChunks==1) { /* fallback to single-thread mode */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params); } + if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */ + U32 nbJobs = nbChunks; + ZSTD_free(mtctx->jobs, mtctx->cMem); + mtctx->jobIDMask = 0; + mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem); + if (mtctx->jobs==NULL) return ERROR(memory_allocation); + mtctx->jobIDMask = nbJobs - 1; + } + { unsigned u; for (u=0; ujobs[u].fullFrameSize = srcSize; mtctx->jobs[u].params = params; /* do not calculate checksum within sections, but write it in header for first section */ - if (mtctx->nextJobID) mtctx->jobs[u].params.fParams.checksumFlag = 0; + if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0; mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctx = cctx; mtctx->jobs[u].firstChunk = (u==0); @@ -837,11 +857,12 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, if (mtctx->nbThreads==1) { return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp); } + + /* single-pass shortcut (note : this is blocking-mode) */ if ( (mtctx->nextJobID==0) /* just started */ && (mtctx->inBuff.filled==0) /* nothing buffered yet */ && (endOp==ZSTD_e_end) /* end order, immediately at beginning */ - && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) /* enough room */ - && (mtctx->cdict==NULL) ) { /* no dictionary */ + && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */ size_t const cSize = ZSTDMT_compress_advanced(mtctx, (char*)output->dst + output->pos, output->size - output->pos, (const char*)input->src + input->pos, input->size - input->pos, From 58bd0e70fc83b1e519ea41b264fe2bfab8e43348 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 30 Jun 2017 16:01:02 -0700 Subject: [PATCH 09/16] fixed : dictionary compression with new advanced API in Multi-threading mode --- lib/compress/zstdmt_compress.c | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index c96ef482..38f8af2e 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -474,6 +474,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, if (nbChunks==1) { /* fallback to single-thread mode */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; + if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, params.fParams); return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params); } From 1bafe393e4e185404a1b29155b6bbaf66371c2d7 Mon Sep 17 00:00:00 2001 From: cyan4973 Date: Sat, 1 Jul 2017 06:59:24 -0700 Subject: [PATCH 10/16] fix : ZSTDMT_compressStream_generic() can accept NULL input also : converge implementations towards new version of ZSTDMT_compressStream_generic() --- lib/compress/zstdmt_compress.c | 56 +++------------------------------- 1 file changed, 4 insertions(+), 52 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 38f8af2e..c203bf33 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -50,7 +50,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void) return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); } } -#define MUTEX_WAIT_TIME_DLEVEL 5 +#define MUTEX_WAIT_TIME_DLEVEL 6 #define PTHREAD_MUTEX_LOCK(mutex) { \ if (ZSTD_DEBUG>=MUTEX_WAIT_TIME_DLEVEL) { \ unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ @@ -878,7 +878,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* fill input buffer */ - { size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); + if (input->src) { /* support NULL input */ + size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); input->pos += toLoad; mtctx->inBuff.filled += toLoad; @@ -911,30 +912,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { - size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize; - if (zcs->frameEnded) { - /* current frame being ended. Only flush is allowed. Or start new job with init */ - DEBUGLOG(5, "ZSTDMT_compressStream: zcs::frameEnded==1"); - return ERROR(stage_wrong); - } - if (zcs->nbThreads==1) { - return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input); - } - - /* fill input buffer */ - { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); - memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, (const char*)input->src + input->pos, toLoad); - input->pos += toLoad; - zcs->inBuff.filled += toLoad; - } - - if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ - && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */ - CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0 /* endFrame */) ); - } - - /* check for data to flush */ - CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */ + CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) ); /* recommended next input size : fill current input buffer */ return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ @@ -970,29 +948,3 @@ size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) return ZSTD_endStream(zcs->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */); } - -size_t ZSTDMT_compressStream_generic2(ZSTDMT_CCtx* mtctx, - ZSTD_outBuffer* output, - ZSTD_inBuffer* input, - ZSTD_EndDirective endOp) -{ - DEBUGLOG(5, "ZSTDMT_compressStream_generic"); - DEBUGLOG(5, "in: pos:%u / size:%u ; endOp=%u", - (U32)input->pos, (U32)input->size, (U32)endOp); - if (input->pos < input->size) /* some input to consume */ - CHECK_F(ZSTDMT_compressStream(mtctx, output, input)); - if (input->pos < input->size) /* input not consumed : do not flush yet */ - endOp = ZSTD_e_continue; - switch(endOp) - { - case ZSTD_e_flush: - return ZSTDMT_flushStream(mtctx, output); - case ZSTD_e_end: - DEBUGLOG(5, "endOp:%u; calling ZSTDMT_endStream", (U32)endOp); - return ZSTDMT_endStream(mtctx, output); - case ZSTD_e_continue: - return 1; - default: - return ERROR(GENERIC); /* invalid endDirective */ - } -} From c07e43c2b512e7633dc524129967aa0ac21b4c01 Mon Sep 17 00:00:00 2001 From: cyan4973 Date: Sat, 1 Jul 2017 07:05:11 -0700 Subject: [PATCH 11/16] added --show-leak-kind=all to valgrind tests --- tests/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Makefile b/tests/Makefile index 0b8deec2..e6ddce06 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -224,7 +224,7 @@ clean: ifneq (,$(filter $(shell uname),Linux Darwin GNU/kFreeBSD GNU OpenBSD FreeBSD NetBSD DragonFly SunOS)) HOST_OS = POSIX -valgrindTest: VALGRIND = valgrind --leak-check=full --error-exitcode=1 +valgrindTest: VALGRIND = valgrind --leak-check=full --show-leak-kinds=all --error-exitcode=1 valgrindTest: zstd datagen fuzzer fullbench @echo "\n ---- valgrind tests : memory analyzer ----" $(VALGRIND) ./datagen -g50M > $(VOID) From 4b26306cb82d7ec4d3b96001f3342224c51aa3e0 Mon Sep 17 00:00:00 2001 From: cyan4973 Date: Sat, 1 Jul 2017 08:03:59 -0700 Subject: [PATCH 12/16] blindfix : fullbench's one-time leak, detected by valgrind --- tests/fullbench.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/fullbench.c b/tests/fullbench.c index 0f287348..81de5157 100644 --- a/tests/fullbench.c +++ b/tests/fullbench.c @@ -453,6 +453,10 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb) _cleanOut: free(dstBuff); free(buff2); + ZSTD_freeCCtx(g_zcc); g_zcc=NULL; + ZSTD_freeDCtx(g_zdc); g_zdc=NULL; + ZSTD_freeCStream(g_cstream); g_cstream=NULL; + ZSTD_freeDStream(g_dstream); g_dstream=NULL; return 0; } From 2485f88bf8104e52a2ef9b50d00db35960daa270 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Sat, 1 Jul 2017 09:09:34 -0700 Subject: [PATCH 13/16] fixed legacy version init bug --- lib/decompress/zstd_decompress.c | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/decompress/zstd_decompress.c b/lib/decompress/zstd_decompress.c index ff791969..152d16a5 100644 --- a/lib/decompress/zstd_decompress.c +++ b/lib/decompress/zstd_decompress.c @@ -198,6 +198,7 @@ ZSTD_DCtx* ZSTD_createDCtx_advanced(ZSTD_customMem customMem) if (!dctx) return NULL; dctx->customMem = customMem; dctx->legacyContext = NULL; + dctx->previousLegacyVersion = 0; ZSTD_initDCtx_internal(dctx); return dctx; } From 5a773615950616c2ec2d2357cdaabcb4a7119c8c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Mon, 3 Jul 2017 15:21:24 -0700 Subject: [PATCH 14/16] fixed wrong function name in comment --- lib/common/zstd_internal.h | 16 +--------------- lib/compress/zstd_compress.c | 1 + 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/lib/common/zstd_internal.h b/lib/common/zstd_internal.h index a0cc938a..f2c4e624 100644 --- a/lib/common/zstd_internal.h +++ b/lib/common/zstd_internal.h @@ -309,20 +309,6 @@ MEM_STATIC U32 ZSTD_highbit32(U32 val) void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx); -typedef enum { ZSTDb_not_buffered, ZSTDb_buffered } ZSTD_buffered_policy_e; -#if 0 -/*! ZSTD_compressBegin_internal() : - * innermost initialization function. Private use only. - * expects params to be valid. - * must receive dict, or cdict, or none, but not both. - * @return : 0, or an error code */ -size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, - const void* dict, size_t dictSize, - const ZSTD_CDict* cdict, - ZSTD_parameters params, U64 pledgedSrcSize, - ZSTD_buffered_policy_e zbuff); -#endif - /*! ZSTD_initCStream_internal() : * Private use only. Init streaming operation. * expects params to be valid. @@ -333,7 +319,7 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, ZSTD_parameters params, unsigned long long pledgedSrcSize); -/*! ZSTD_initCStream_internal() : +/*! ZSTD_compressStream_generic() : * Private use only. To be called from zstdmt_compress.c in single-thread mode. */ size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, ZSTD_outBuffer* output, diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index f2afe625..2d57bc2d 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -627,6 +627,7 @@ static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_parameters params, U64 ple } typedef enum { ZSTDcrp_continue, ZSTDcrp_noMemset } ZSTD_compResetPolicy_e; +typedef enum { ZSTDb_not_buffered, ZSTDb_buffered } ZSTD_buffered_policy_e; /*! ZSTD_resetCCtx_internal() : note : `params` are assumed fully validated at this stage */ From 2084b041f4a508e7fa668cd67ca2750a352c2621 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Mon, 3 Jul 2017 15:52:19 -0700 Subject: [PATCH 15/16] fixed comments --- lib/compress/zstd_compress.c | 8 ++++---- lib/compress/zstdmt_compress.c | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 2d57bc2d..c1e7cb5c 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3215,7 +3215,7 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, ZSTD_buffered_policy_e zbuff) { DEBUGLOG(4, "ZSTD_compressBegin_internal"); - DEBUGLOG(4, "dict ? %s", dict ? "dict" : cdict ? "cdict" : "none"); + DEBUGLOG(4, "dict ? %s", dict ? "dict" : (cdict ? "cdict" : "none")); DEBUGLOG(4, "dictMode : %u", (U32)dictMode); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); @@ -3803,7 +3803,7 @@ size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, case zcss_load: if ( (flushMode == ZSTD_e_end) - && ((size_t)(oend-op) >= ZSTD_compressBound(iend-ip)) + && ((size_t)(oend-op) >= ZSTD_compressBound(iend-ip)) /* enough dstCapacity */ && (zcs->inBuffPos == 0) ) { /* shortcut to compression pass directly into output buffer */ size_t const cSize = ZSTD_compressEnd(zcs, @@ -3815,8 +3815,8 @@ size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, zcs->frameEnded = 1; ZSTD_startNewCompression(zcs); someMoreWork = 0; break; - } - /* complete inBuffer */ + } + /* complete loading into inBuffer */ { size_t const toLoad = zcs->inBuffTarget - zcs->inBuffPos; size_t const loaded = ZSTD_limitCopy( zcs->inBuff + zcs->inBuffPos, toLoad, diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index c203bf33..9e44da8b 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -527,8 +527,8 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, dstBufferPos += dstBufferCapacity; remainingSrcSize -= chunkSize; } } - /* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */ + /* collect result */ { unsigned chunkID; size_t error = 0, dstPos = 0; for (chunkID=0; chunkIDjobs[chunkID].cSize; if (ZSTD_isError(cSize)) error = cSize; if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); - if (chunkID) { /* note : chunk 0 is already written directly into dst */ + if (chunkID) { /* note : chunk 0 is written directly at dst, which is correct position */ if (!error) - memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap if chunk decompressed within dst */ - if (chunkID >= compressWithinDst) { /* otherwise, it decompresses within dst */ + memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */ + if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */ DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst); ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); } From 2de2396a36a7412916563f8a96ccf4572dc2b561 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Mon, 3 Jul 2017 16:23:36 -0700 Subject: [PATCH 16/16] refactor ZSTDMT_compress() --- lib/compress/zstdmt_compress.c | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 9e44da8b..17f3b37a 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -445,6 +445,18 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ +static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) { + size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2); + size_t const chunkMaxSize = chunkSizeTarget << 2; + size_t const passSizeMax = chunkMaxSize * nbThreads; + unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; + unsigned const nbChunksLarge = multiplier * nbThreads; + unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; + unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads); + return (multiplier>1) ? nbChunksLarge : nbChunksSmall; +} + + size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, @@ -453,14 +465,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, unsigned overlapRLog) { size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); - size_t const chunkSizeTarget = (size_t)1 << (params.cParams.windowLog + 2); - size_t const chunkMaxSize = chunkSizeTarget << 2; - size_t const passSizeMax = chunkMaxSize * mtctx->nbThreads; - unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; - unsigned nbChunksLarge = multiplier * mtctx->nbThreads; - unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; - unsigned nbChunksSmall = MIN(nbChunksMax, mtctx->nbThreads); - unsigned nbChunks = (multiplier>1) ? nbChunksLarge : nbChunksSmall; + unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, mtctx->nbThreads); size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; @@ -468,7 +473,6 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ size_t frameStartPos = 0, dstBufferPos = 0; - DEBUGLOG(4, "windowLog : %2u => chunkSizeTarget : %u bytes ", params.cParams.windowLog, (U32)chunkSizeTarget); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); assert(avgChunkSize >= 256 KB); /* required for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B) */ @@ -499,7 +503,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, if ((cctx==NULL) || (dstBuffer.start==NULL)) { mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */ mtctx->jobs[u].jobCompleted = 1; - nbChunks = u+1; + nbChunks = u+1; /* only wait and free u jobs, instead of initially expected nbChunks ones */ break; /* let's wait for previous jobs to complete, but don't start new ones */ }