introduced command --adapt

This commit is contained in:
Yann Collet 2018-08-11 20:48:06 -07:00
parent a996b1fd2d
commit e7a49c6683
6 changed files with 94 additions and 42 deletions

View File

@ -3704,6 +3704,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
ZSTD_CCtx_reset(cctx);
}
DEBUGLOG(5, "completed ZSTD_compress_generic delegating to ZSTDMT_compressStream_generic");
return flushMin;
} }
#endif

View File

@ -249,8 +249,8 @@ static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)
/* store buffer for later re-use, up to pool capacity */
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{
if (buf.start == NULL) return; /* compatible with release on NULL */
DEBUGLOG(5, "ZSTDMT_releaseBuffer");
if (buf.start == NULL) return; /* compatible with release on NULL */
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
if (bufPool->nbBuffers < bufPool->totalBuffers) {
bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */
@ -541,6 +541,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState,
/* Wait for our turn */
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
while (serialState->nextJobID < jobID) {
DEBUGLOG(5, "wait for serialState->cond");
ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);
}
/* A future job may error and skip our job */
@ -932,7 +933,7 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
@ -1079,7 +1080,7 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{
ZSTD_frameProgression fps;
DEBUGLOG(6, "ZSTDMT_getFrameProgression");
DEBUGLOG(5, "ZSTDMT_getFrameProgression");
fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
fps.consumed = mtctx->consumed;
fps.produced = mtctx->produced;
@ -1100,6 +1101,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
}
}
DEBUGLOG(5, "ZSTDMT_getFrameProgression : completed");
return fps;
}
@ -1576,7 +1578,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
/* try to flush something */
{ size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */
size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */
size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */
size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
if (ZSTD_isError(cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
@ -1615,6 +1617,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
DEBUGLOG(5, "dstBuffer released")
mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */
mtctx->consumed += srcSize;
@ -1691,6 +1694,7 @@ static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
range_t extDict;
range_t prefix;
DEBUGLOG(5, "ZSTDMT_doesOverlapWindow");
extDict.start = window.dictBase + window.lowLimit;
extDict.size = window.dictLimit - window.lowLimit;
@ -1711,12 +1715,13 @@ static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
{
if (mtctx->params.ldmParams.enableLdm) {
ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
DEBUGLOG(5, "ZSTDMT_waitForLdmComplete");
DEBUGLOG(5, "source [0x%zx, 0x%zx)",
(size_t)buffer.start,
(size_t)buffer.start + buffer.capacity);
ZSTD_PTHREAD_MUTEX_LOCK(mutex);
while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
DEBUGLOG(6, "Waiting for LDM to finish...");
DEBUGLOG(5, "Waiting for LDM to finish...");
ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
}
DEBUGLOG(6, "Done waiting for LDM to finish");
@ -1736,6 +1741,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
size_t const target = mtctx->targetSectionSize;
buffer_t buffer;
DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
assert(mtctx->inBuff.buffer.start == NULL);
assert(mtctx->roundBuff.capacity >= target);
@ -1749,7 +1755,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
buffer.start = start;
buffer.capacity = prefixSize;
if (ZSTDMT_isOverlapped(buffer, inUse)) {
DEBUGLOG(6, "Waiting for buffer...");
DEBUGLOG(5, "Waiting for buffer...");
return 0;
}
ZSTDMT_waitForLdmComplete(mtctx, buffer);
@ -1761,7 +1767,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
buffer.capacity = target;
if (ZSTDMT_isOverlapped(buffer, inUse)) {
DEBUGLOG(6, "Waiting for buffer...");
DEBUGLOG(5, "Waiting for buffer...");
return 0;
}
assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));
@ -1834,8 +1840,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
/* It is only possible for this operation to fail if there are
* still compression jobs ongoing.
*/
DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed")
assert(mtctx->doneJobID != mtctx->nextJobID);
}
} else
DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);
}
if (mtctx->inBuff.buffer.start != NULL) {
size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
@ -1863,6 +1871,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
/* check for potential compressed data ready to be flushed */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */
DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush);
return remainingToFlush;
}
}

View File

@ -226,6 +226,8 @@ void FIO_setOverlapLog(unsigned overlapLog){
DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n");
g_overlapLog = overlapLog;
}
static U32 g_adaptiveMode = 0;
void FIO_setAdaptiveMode(unsigned adapt) { g_adaptiveMode = adapt; }
static U32 g_ldmFlag = 0;
void FIO_setLdmFlag(unsigned ldmFlag) {
g_ldmFlag = (ldmFlag>0);
@ -738,12 +740,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue;
/* stats */
typedef enum { noChange, slower, faster } speedChange_e;
speedChange_e speedChange = noChange;
unsigned inputPresented = 0;
unsigned inputBlocked = 0;
unsigned lastJobID = 0;
unsigned long long lastProduced = 0;
unsigned long long lastFlushedSize = 0;
DISPLAYLEVEL(6, "compression using zstd format \n");
@ -774,6 +776,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
/* count stats */
inputPresented++;
if (oldIPos == inBuff.pos) inputBlocked++;
/* Write compressed stream */
@ -792,41 +795,74 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
/* check output speed */
if (zfp.currentJobID > 0) {
unsigned long long newlyProduced = zfp.produced - lastProduced;
if (zfp.currentJobID > 1) {
static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0 };
static unsigned long long lastFlushedSize = 0;
unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
assert(zfp.produced >= lastProduced);
if (newlyProduced == 0) {
DISPLAYLEVEL(6, "no more data compression generation => buffers are full, compression waiting => output (or input) too slow \n")
assert(zfp.produced >= cpszfp.produced);
if ( (zfp.ingested == cpszfp.ingested)
&& (zfp.consumed == cpszfp.consumed) ) {
DISPLAYLEVEL(6, "no data read nor consumed : buffers are full (?) or compression is slow + input has reached its limit. If buffers full : output is too slow => slow down \n")
speedChange = slower;
}
if ( (newlyProduced > (newlyFlushed * 9 / 8))
&& (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) {
DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) \n", newlyProduced, newlyFlushed);
DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush);
speedChange = slower;
}
lastProduced = zfp.produced;
cpszfp = zfp;
lastFlushedSize = compressedfilesize;
}
/* course correct only if there is at least one job completed */
/* course correct only if there is at least one new job completed */
if (zfp.currentJobID > lastJobID) {
DISPLAYLEVEL(6, "compression level adaptation check \n")
/* check input speed */
if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */
if (inputBlocked <= 1) { /* small tolerance */
if (inputBlocked <= 0) {
DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
speedChange = slower;
} else if (speedChange == noChange) {
static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0 };
static unsigned long long lastFlushedSize = 0;
unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested;
unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed;
unsigned long long newlyProduced = zfp.produced - csuzfp.produced;
unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
csuzfp = zfp;
lastFlushedSize = compressedfilesize;
assert(inputPresented > 0);
if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
&& (newlyFlushed * 17 / 16 > newlyProduced) /* flush everything that is produced */
&& (newlyIngested * 17 / 16 > newlyConsumed) /* can't keep up with input speed */
) {
DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
speedChange = faster;
}
}
inputBlocked = 0;
inputPresented = 0;
}
if (speedChange == slower) {
DISPLAYLEVEL(6, "slower speed , higher compression \n")
compressionLevel ++;
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
if (g_adaptiveMode) {
if (speedChange == slower) {
DISPLAYLEVEL(6, "slower speed , higher compression \n")
compressionLevel ++;
compressionLevel += (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
}
if (speedChange == faster) {
DISPLAYLEVEL(6, "slower speed , higher compression \n")
compressionLevel --;
compressionLevel -= (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
}
speedChange = noChange;
}
lastJobID = zfp.currentJobID;

View File

@ -57,6 +57,7 @@ void FIO_setMemLimit(unsigned memLimit);
void FIO_setNbWorkers(unsigned nbWorkers);
void FIO_setBlockSize(unsigned blockSize);
void FIO_setOverlapLog(unsigned overlapLog);
void FIO_setAdaptiveMode(unsigned adapt);
void FIO_setLdmFlag(unsigned ldmFlag);
void FIO_setLdmHashLog(unsigned ldmHashLog);
void FIO_setLdmMinMatch(unsigned ldmMinMatch);

View File

@ -102,6 +102,13 @@ the last one takes effect.
* `-#`:
`#` compression level \[1-19] (default: 3)
* `--fast[=#]`:
switch to ultra-fast compression levels.
If `=#` is not present, it defaults to `1`.
The higher the value, the faster the compression speed,
at the cost of some compression ratio.
This setting overwrites compression level if one was set previously.
Similarly, if a compression level is set after `--fast`, it overrides it.
* `--ultra`:
unlocks high compression levels 20+ (maximum 22), using a lot more memory.
Note that decompression will also require more memory when using these levels.
@ -115,25 +122,23 @@ the last one takes effect.
Note: If `windowLog` is set to larger than 27, `--long=windowLog` or
`--memory=windowSize` needs to be passed to the decompressor.
* `--fast[=#]`:
switch to ultra-fast compression levels.
If `=#` is not present, it defaults to `1`.
The higher the value, the faster the compression speed,
at the cost of some compression ratio.
This setting overwrites compression level if one was set previously.
Similarly, if a compression level is set after `--fast`, it overrides it.
* `-T#`, `--threads=#`:
Compress using `#` working threads (default: 1).
If `#` is 0, attempt to detect and use the number of physical CPU cores.
In all cases, the nb of threads is capped to ZSTDMT_NBTHREADS_MAX==200.
This modifier does nothing if `zstd` is compiled without multithread support.
* `--single-thread`:
Does not spawn a thread for compression, use caller thread instead.
This is the only available mode when multithread support is disabled.
In this mode, compression is serialized with I/O.
Does not spawn a thread for compression, use a single thread for both I/O and compression.
In this mode, compression is serialized with I/O, which is slightly slower.
(This is different from `-T1`, which spawns 1 compression thread in parallel of I/O).
Single-thread mode also features lower memory usage.
This mode is the only one available when multithread support is disabled.
Single-thread mode features lower memory usage.
Final compressed result is slightly different from `-T1`.
* `--adapt` :
`zstd` will dynamically adapt compression level to perceived I/O conditions.
The current compression level can be observed live by using command `-v`.
Works with multi-threading and `--long` mode.
Does not work with `--single-thread`.
* `-D file`:
use `file` as Dictionary to compress or decompress FILE(s)
* `--no-dictID`:

View File

@ -135,6 +135,7 @@ static int usage_advanced(const char* programName)
#ifndef ZSTD_NOCOMPRESS
DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel());
DISPLAY( "--long[=#]: enable long distance matching with given window log (default: %u)\n", g_defaultMaxWindowLog);
DISPLAY( "--adapt : automatically adapt compression level to I/O conditions \n");
DISPLAY( "--fast[=#]: switch to ultra fast compression level (default: %u)\n", 1);
#ifdef ZSTD_MULTITHREAD
DISPLAY( " -T# : spawns # compression threads (default: 1, 0==# cores) \n");
@ -395,6 +396,7 @@ int main(int argCount, const char* argv[])
ldmFlag = 0,
main_pause = 0,
nbWorkers = 0,
adapt = 0,
nextArgumentIsOutFileName = 0,
nextArgumentIsMaxDict = 0,
nextArgumentIsDictID = 0,
@ -511,6 +513,7 @@ int main(int argCount, const char* argv[])
if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(0); continue; }
if (!strcmp(argument, "--rm")) { FIO_setRemoveSrcFile(1); continue; }
if (!strcmp(argument, "--priority=rt")) { setRealTimePrio = 1; continue; }
if (!strcmp(argument, "--adapt")) { adapt = 1; continue; }
if (!strcmp(argument, "--single-thread")) { nbWorkers = 0; singleThread = 1; continue; }
if (!strcmp(argument, "--format=zstd")) { suffix = ZSTD_EXTENSION; FIO_setCompressionType(FIO_zstdCompression); continue; }
#ifdef ZSTD_GZCOMPRESS
@ -935,17 +938,14 @@ int main(int argCount, const char* argv[])
#ifndef ZSTD_NOCOMPRESS
FIO_setNbWorkers(nbWorkers);
FIO_setBlockSize((U32)blockSize);
if (g_overlapLog!=OVERLAP_LOG_DEFAULT) FIO_setOverlapLog(g_overlapLog);
FIO_setLdmFlag(ldmFlag);
FIO_setLdmHashLog(g_ldmHashLog);
FIO_setLdmMinMatch(g_ldmMinMatch);
if (g_ldmBucketSizeLog != LDM_PARAM_DEFAULT) {
FIO_setLdmBucketSizeLog(g_ldmBucketSizeLog);
}
if (g_ldmHashEveryLog != LDM_PARAM_DEFAULT) {
FIO_setLdmHashEveryLog(g_ldmHashEveryLog);
}
if (g_ldmBucketSizeLog != LDM_PARAM_DEFAULT) FIO_setLdmBucketSizeLog(g_ldmBucketSizeLog);
if (g_ldmHashEveryLog != LDM_PARAM_DEFAULT) FIO_setLdmHashEveryLog(g_ldmHashEveryLog);
FIO_setAdaptiveMode(adapt);
if (g_overlapLog!=OVERLAP_LOG_DEFAULT) FIO_setOverlapLog(g_overlapLog);
if ((filenameIdx==1) && outFileName)
operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
else