Merge pull request #643 from facebook/zstdmt

Optimize ZSTDMT_compress() memory usage
This commit is contained in:
Yann Collet 2017-04-03 17:25:05 -07:00 committed by GitHub
commit 40a78a7b5a
6 changed files with 93 additions and 37 deletions

View File

@ -32,7 +32,11 @@ typedef enum { ZSTDcs_created=0, ZSTDcs_init, ZSTDcs_ongoing, ZSTDcs_ending } ZS
* Helper functions
***************************************/
#define ZSTD_STATIC_ASSERT(c) { enum { ZSTD_static_assert = 1/(int)(!!(c)) }; }
size_t ZSTD_compressBound(size_t srcSize) { return FSE_compressBound(srcSize) + 12; }
size_t ZSTD_compressBound(size_t srcSize) {
size_t const lowLimit = 256 KB;
size_t const margin = (srcSize < lowLimit) ? (lowLimit-srcSize) >> 12 : 0; /* from 64 to 0 */
return srcSize + (srcSize >> 8) + margin;
}
/*-*************************************

View File

@ -33,7 +33,7 @@
# include <stdio.h>
# include <unistd.h>
# include <sys/times.h>
static unsigned g_debugLevel = 2;
static unsigned g_debugLevel = 5;
# define DEBUGLOGRAW(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); }
# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __FILE__ ": "); fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
@ -44,26 +44,26 @@
DEBUGLOGRAW(l, " \n"); \
}
static unsigned long long GetCurrentClockTimeMicroseconds()
static unsigned long long GetCurrentClockTimeMicroseconds(void)
{
static clock_t _ticksPerSecond = 0;
if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
struct tms junk; clock_t newTicks = (clock_t) times(&junk);
return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
{ struct tms junk; clock_t newTicks = (clock_t) times(&junk);
return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); }
}
#define MUTEX_WAIT_TIME_DLEVEL 5
#define PTHREAD_MUTEX_LOCK(mutex) \
if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \
pthread_mutex_lock(mutex); \
unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \
unsigned long long elapsedTime = (afterTime-beforeTime); \
if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
pthread_mutex_lock(mutex); \
{ unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
unsigned long long const elapsedTime = (afterTime-beforeTime); \
if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \
elapsedTime, #mutex); \
} \
} } \
} else pthread_mutex_lock(mutex);
#else
@ -228,7 +228,8 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
const void* const src = (const char*)job->srcStart + job->dictSize;
buffer_t const dstBuff = job->dstBuff;
DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u",
job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
if (job->cdict) { /* should only happen for first segment */
size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize);
if (job->cdict) DEBUGLOG(3, "using CDict ");
@ -250,7 +251,9 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->cSize = (job->lastChunk) ?
ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)",
(unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
_endJob:
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
@ -388,14 +391,17 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
int compressionLevel)
{
ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0);
U32 const overlapLog = (compressionLevel >= ZSTD_maxCLevel()) ? 0 : 3;
size_t const overlapSize = (size_t)1 << (params.cParams.windowLog - overlapLog);
size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2);
unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + (srcSize < chunkTargetSize) /* min 1 */;
unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + 1;
unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads);
size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
size_t remainingSrcSize = srcSize;
const char* const srcStart = (const char*)src;
size_t frameStartPos = 0;
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0;
DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize);
DEBUGLOG(2, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize);
@ -409,10 +415,11 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
{ unsigned u;
for (u=0; u<nbChunks; u++) {
size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
size_t const dstBufferCapacity = u ? ZSTD_compressBound(chunkSize) : dstCapacity;
buffer_t const dstAsBuffer = { dst, dstCapacity };
buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity) : dstAsBuffer;
size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize);
buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity);
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
size_t dictSize = u ? overlapSize : 0;
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */
@ -421,7 +428,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
break; /* let's wait for previous jobs to complete, but don't start new ones */
}
mtctx->jobs[u].srcStart = srcStart + frameStartPos;
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].dictSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize;
mtctx->jobs[u].fullFrameSize = srcSize;
mtctx->jobs[u].params = params;
@ -438,6 +446,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
frameStartPos += chunkSize;
dstBufferPos += dstBufferCapacity;
remainingSrcSize -= chunkSize;
} }
/* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */
@ -461,8 +470,10 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
if (ZSTD_isError(cSize)) error = cSize;
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
if (chunkID) { /* note : chunk 0 is already written directly into dst */
if (!error) memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize);
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff);
if (!error)
memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap if chunk decompressed within dst */
if (chunkID >= compressWithinDst) /* otherwise, it decompresses within dst */
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff);
mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
}
dstPos += cSize ;

View File

@ -1,5 +1,5 @@
.
.TH "ZSTD" "1" "March 2017" "zstd 1.1.5" "User Commands"
.TH "ZSTD" "1" "April 2017" "zstd 1.1.5" "User Commands"
.
.SH "NAME"
\fBzstd\fR \- zstd, unzstd, zstdcat \- Compress or decompress \.zst files
@ -97,6 +97,10 @@ Use FILEs as a training set to create a dictionary\. The training set should con
unlocks high compression levels 20+ (maximum 22), using a lot more memory\. Note that decompression will also require more memory when using these levels\.
.
.TP
\fB\-T#\fR
Compress using # threads (default: 1)\. This modifier is only available if \fBzstd\fR was compiled with multithreading support\.
.
.TP
\fB\-D file\fR
use \fBfile\fR as Dictionary to compress or decompress FILE(s)
.
@ -126,7 +130,7 @@ remove source file(s) after successful compression or decompression
.
.TP
\fB\-k\fR, \fB\-\-keep\fR
keep source file(s) after successful compression or decompression\. This is the default behaviour\.
keep source file(s) after successful compression or decompression\. This is the default behavior\.
.
.TP
\fB\-r\fR
@ -150,7 +154,7 @@ suppress warnings, interactivity, and notifications\. specify twice to suppress
.
.TP
\fB\-C\fR, \fB\-\-[no\-]check\fR
add integrety check computed from uncompressed data (default : enabled)
add integrity check computed from uncompressed data (default : enabled)
.
.TP
\fB\-\-\fR
@ -226,6 +230,9 @@ set process priority to real\-time
.
.SH "ADVANCED COMPRESSION OPTIONS"
.
.SS "\-B#:"
Select the size of each compression job\. This parameter is available only when multi\-threading is enabled\. Default value is \fB4 * windowSize\fR, which means it varies depending on compression level\. \fB\-B#\fR makes it possible to select a custom value\. Note that job size must respect a minimum value which is enforced transparently\. This minimum is either 1 MB, or \fBoverlapSize\fR, whichever is largest\.
.
.SS "\-\-zstd[=options]:"
\fBzstd\fR provides 22 predefined compression levels\. The selected or default predefined compression level can be changed with advanced compression options\. The \fIoptions\fR are provided as a comma\-separated list\. You may specify only the options you want to change and the rest will be taken from the selected or default compression level\. The list of available \fIoptions\fR:
.
@ -293,6 +300,13 @@ A larger minimum match length usually improves compression ratio but decreases c
.IP
The minimum \fItlen\fR is 4 and the maximum is 999\.
.
.TP
\fBoverlapLog\fR=\fIovlog\fR, \fBovlog\fR=\fIovlog\fR
Determine \fBoverlapSize\fR, amount of data reloaded from previous job\. This parameter is only available when multithreading is enabled\. Reloading more data improves compression ratio, but decreases speed\.
.
.IP
The minimum \fIovlog\fR is 0, and the maximum is 9\. 0 means "no overlap", hence completely independent jobs\. 9 means "full overlap", meaning up to \fBwindowSize\fR is reloaded from previous job\. Reducing \fIovlog\fR by 1 reduces the amount of reload by a factor 2\. Default \fIovlog\fR is 6, which means "reload \fBwindowSize / 8\fR"\. Exception : the maximum compression level (22) has a default \fIovlog\fR of 9\.
.
.SS "Example"
The following parameters sets advanced compression options to those of predefined level 19 for files bigger than 256 KB:
.

View File

@ -99,6 +99,9 @@ the last one takes effect.
* `--ultra`:
unlocks high compression levels 20+ (maximum 22), using a lot more memory.
Note that decompression will also require more memory when using these levels.
* `-T#`:
Compress using # threads (default: 1).
This modifier is only available if `zstd` was compiled with multithreading support.
* `-D file`:
use `file` as Dictionary to compress or decompress FILE(s)
* `--nodictID`:
@ -123,7 +126,7 @@ the last one takes effect.
remove source file(s) after successful compression or decompression
* `-k`, `--keep`:
keep source file(s) after successful compression or decompression.
This is the default behaviour.
This is the default behavior.
* `-r`:
operate recursively on dictionaries
* `-h`/`-H`, `--help`:
@ -136,10 +139,11 @@ the last one takes effect.
suppress warnings, interactivity, and notifications.
specify twice to suppress errors too.
* `-C`, `--[no-]check`:
add integrety check computed from uncompressed data (default : enabled)
add integrity check computed from uncompressed data (default : enabled)
* `--`:
All arguments after `--` are treated as files
DICTIONARY BUILDER
------------------
`zstd` offers _dictionary_ compression,
@ -218,8 +222,17 @@ BENCHMARK
* `--priority=rt`:
set process priority to real-time
ADVANCED COMPRESSION OPTIONS
----------------------------
### -B#:
Select the size of each compression job.
This parameter is available only when multi-threading is enabled.
Default value is `4 * windowSize`, which means it varies depending on compression level.
`-B#` makes it possible to select a custom value.
Note that job size must respect a minimum value which is enforced transparently.
This minimum is either 1 MB, or `overlapSize`, whichever is largest.
### --zstd[=options]:
`zstd` provides 22 predefined compression levels.
The selected or default predefined compression level can be changed with
@ -290,6 +303,19 @@ The list of available _options_:
The minimum _tlen_ is 4 and the maximum is 999.
- `overlapLog`=_ovlog_, `ovlog`=_ovlog_:
Determine `overlapSize`, amount of data reloaded from previous job.
This parameter is only available when multithreading is enabled.
Reloading more data improves compression ratio, but decreases speed.
The minimum _ovlog_ is 0, and the maximum is 9.
0 means "no overlap", hence completely independent jobs.
9 means "full overlap", meaning up to `windowSize` is reloaded from previous job.
Reducing _ovlog_ by 1 reduces the amount of reload by a factor 2.
Default _ovlog_ is 6, which means "reload `windowSize / 8`".
Exception : the maximum compression level (22) has a default _ovlog_ of 9.
### Example
The following parameters sets advanced compression options to those of
predefined level 19 for files bigger than 256 KB:

View File

@ -113,19 +113,20 @@ static int usage_advanced(const char* programName)
DISPLAY( "\n");
DISPLAY( "Advanced arguments :\n");
DISPLAY( " -V : display Version number and exit\n");
DISPLAY( " -v : verbose mode; specify multiple times to increase log level (default:%d)\n", DEFAULT_DISPLAY_LEVEL);
DISPLAY( " -v : verbose mode; specify multiple times to increase verbosity\n");
DISPLAY( " -q : suppress warnings; specify twice to suppress errors too\n");
DISPLAY( " -c : force write to standard output, even if it is the console\n");
#ifdef UTIL_HAS_CREATEFILELIST
DISPLAY( " -r : operate recursively on directories \n");
#endif
#ifndef ZSTD_NOCOMPRESS
DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel());
DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n");
DISPLAY( "--[no-]check : integrity check (default:enabled) \n");
#ifdef ZSTD_MULTITHREAD
DISPLAY( " -T# : use # threads for compression (default:1) \n");
DISPLAY( " -B# : select size of independent sections (default:0==automatic) \n");
DISPLAY( " -B# : select size of each job (default:0==automatic) \n");
#endif
DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n");
DISPLAY( "--[no-]check : integrity check (default:enabled) \n");
#endif
#ifdef UTIL_HAS_CREATEFILELIST
DISPLAY( " -r : operate recursively on directories \n");
#endif
#ifdef ZSTD_GZCOMPRESS
DISPLAY( "--format=gzip : compress files to the .gz format \n");
@ -134,7 +135,6 @@ static int usage_advanced(const char* programName)
DISPLAY( "--format=xz : compress files to the .xz format \n");
DISPLAY( "--format=lzma : compress files to the .lzma format \n");
#endif
#endif
#ifndef ZSTD_NODECOMPRESS
DISPLAY( "--test : test compressed file integrity \n");
#if ZSTD_SPARSE_DEFAULT

View File

@ -856,6 +856,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
/* some issues can only happen when reusing states */
if ((FUZ_rand(&lseed) & 0xFF) == 131) {
U32 const nbThreads = (FUZ_rand(&lseed) % 6) + 1;
DISPLAYLEVEL(5, "Creating new context with %u threads \n", nbThreads);
ZSTDMT_freeCCtx(zc);
zc = ZSTDMT_createCCtx(nbThreads);
resetAllowed=0;
@ -946,7 +947,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (U32)adjustedDstSize);
{ size_t const flushError = ZSTDMT_flushStream(zc, &outBuff);
CHECK (ZSTD_isError(flushError), "flush error : %s", ZSTD_getErrorName(flushError));
CHECK (ZSTD_isError(flushError), "ZSTDMT_flushStream error : %s", ZSTD_getErrorName(flushError));
} } }
/* final frame epilogue */
@ -957,7 +958,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (U32)adjustedDstSize);
remainingToFlush = ZSTDMT_endStream(zc, &outBuff);
CHECK (ZSTD_isError(remainingToFlush), "flush error : %s", ZSTD_getErrorName(remainingToFlush));
CHECK (ZSTD_isError(remainingToFlush), "ZSTDMT_endStream error : %s", ZSTD_getErrorName(remainingToFlush));
DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (U32)remainingToFlush);
} }
DISPLAYLEVEL(5, "Frame completed \n");