removed static variables
so that --adapt can work on multiple input files too
This commit is contained in:
parent
89bc309d90
commit
ca02ebee07
@ -1530,7 +1530,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
|
||||
mtctx->jobs[jobID].jobID = mtctx->nextJobID;
|
||||
mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
|
||||
mtctx->jobs[jobID].lastJob = endFrame;
|
||||
mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
|
||||
mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);
|
||||
mtctx->jobs[jobID].dstFlushed = 0;
|
||||
|
||||
/* Update the round buffer pos and clear the input buffer to be reset */
|
||||
|
@ -807,6 +807,8 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
|
||||
ZSTD_EndDirective directive = ZSTD_e_continue;
|
||||
|
||||
/* stats */
|
||||
ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
|
||||
ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
|
||||
typedef enum { noChange, slower, faster } speedChange_e;
|
||||
speedChange_e speedChange = noChange;
|
||||
unsigned flushWaiting = 0;
|
||||
@ -820,7 +822,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
|
||||
if (fileSize != UTIL_FILESIZE_UNKNOWN) {
|
||||
CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
|
||||
}
|
||||
(void)compressionLevel; (void)srcFileName;
|
||||
(void)srcFileName;
|
||||
|
||||
/* Main compression loop */
|
||||
do {
|
||||
@ -863,69 +865,85 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
|
||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
||||
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
|
||||
|
||||
/* check output speed */
|
||||
if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
|
||||
static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 }; /* note : requires fileio to run main thread */
|
||||
|
||||
unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
|
||||
unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
|
||||
assert(zfp.produced >= cpszfp.produced);
|
||||
assert(g_nbWorkers >= 1);
|
||||
|
||||
if ( (zfp.ingested == cpszfp.ingested) /* no data read : input buffer full */
|
||||
&& (zfp.consumed == cpszfp.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */
|
||||
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
|
||||
) {
|
||||
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
|
||||
speedChange = slower;
|
||||
}
|
||||
|
||||
cpszfp = zfp;
|
||||
|
||||
if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
|
||||
&& (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
|
||||
) {
|
||||
DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
|
||||
speedChange = slower;
|
||||
}
|
||||
flushWaiting = 0;
|
||||
/* display progress notifications */
|
||||
if (g_displayLevel >= 3) {
|
||||
DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
|
||||
compressionLevel,
|
||||
(U32)((zfp.ingested - zfp.consumed) >> 20),
|
||||
(U32)(zfp.consumed >> 20),
|
||||
(U32)(zfp.produced >> 20),
|
||||
cShare );
|
||||
} else { /* summarized notifications if == 2; */
|
||||
DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20));
|
||||
if (fileSize != UTIL_FILESIZE_UNKNOWN)
|
||||
DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20));
|
||||
DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare);
|
||||
DELAY_NEXT_UPDATE();
|
||||
}
|
||||
|
||||
/* course correct only if there is at least one new job completed */
|
||||
if (zfp.currentJobID > lastJobID) {
|
||||
DISPLAYLEVEL(6, "compression level adaptation check \n")
|
||||
/* adaptive mode : statistics measurement and speed correction */
|
||||
if (g_adaptiveMode) {
|
||||
|
||||
/* check input speed */
|
||||
if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */
|
||||
if (inputBlocked <= 0) {
|
||||
DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
|
||||
/* check output speed */
|
||||
if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
|
||||
|
||||
unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
|
||||
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
|
||||
assert(zfp.produced >= previous_zfp_update.produced);
|
||||
assert(g_nbWorkers >= 1);
|
||||
|
||||
if ( (zfp.ingested == previous_zfp_update.ingested) /* no data read : input buffer full */
|
||||
&& (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */
|
||||
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
|
||||
) {
|
||||
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
|
||||
speedChange = slower;
|
||||
} else if (speedChange == noChange) {
|
||||
static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0, 0, 0 };
|
||||
unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested;
|
||||
unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed;
|
||||
unsigned long long newlyProduced = zfp.produced - csuzfp.produced;
|
||||
unsigned long long newlyFlushed = zfp.flushed - csuzfp.flushed;
|
||||
csuzfp = zfp;
|
||||
assert(inputPresented > 0);
|
||||
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
|
||||
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
|
||||
(U32)newlyIngested, (U32)newlyConsumed,
|
||||
(U32)newlyFlushed, (U32)newlyProduced);
|
||||
if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
|
||||
&& (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
|
||||
&& (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
|
||||
) {
|
||||
DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
|
||||
newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
|
||||
speedChange = faster;
|
||||
}
|
||||
}
|
||||
inputBlocked = 0;
|
||||
inputPresented = 0;
|
||||
|
||||
previous_zfp_update = zfp;
|
||||
|
||||
if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
|
||||
&& (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
|
||||
) {
|
||||
DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
|
||||
speedChange = slower;
|
||||
}
|
||||
flushWaiting = 0;
|
||||
}
|
||||
|
||||
if (g_adaptiveMode) {
|
||||
/* course correct only if there is at least one new job completed */
|
||||
if (zfp.currentJobID > lastJobID) {
|
||||
DISPLAYLEVEL(6, "compression level adaptation check \n")
|
||||
|
||||
/* check input speed */
|
||||
if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */
|
||||
if (inputBlocked <= 0) {
|
||||
DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
|
||||
speedChange = slower;
|
||||
} else if (speedChange == noChange) {
|
||||
unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
|
||||
unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
|
||||
unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
|
||||
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
|
||||
previous_zfp_correction = zfp;
|
||||
assert(inputPresented > 0);
|
||||
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
|
||||
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
|
||||
(U32)newlyIngested, (U32)newlyConsumed,
|
||||
(U32)newlyFlushed, (U32)newlyProduced);
|
||||
if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
|
||||
&& (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
|
||||
&& (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
|
||||
) {
|
||||
DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
|
||||
newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
|
||||
speedChange = faster;
|
||||
}
|
||||
}
|
||||
inputBlocked = 0;
|
||||
inputPresented = 0;
|
||||
}
|
||||
|
||||
if (speedChange == slower) {
|
||||
DISPLAYLEVEL(6, "slower speed , higher compression \n")
|
||||
compressionLevel ++;
|
||||
@ -940,27 +958,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
|
||||
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
|
||||
}
|
||||
speedChange = noChange;
|
||||
}
|
||||
lastJobID = zfp.currentJobID;
|
||||
} /* if (zfp.currentJobID > lastJobID) */
|
||||
|
||||
if (g_displayLevel >= 3) {
|
||||
DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
|
||||
compressionLevel,
|
||||
(U32)((zfp.ingested - zfp.consumed) >> 20),
|
||||
(U32)(zfp.consumed >> 20),
|
||||
(U32)(zfp.produced >> 20),
|
||||
cShare );
|
||||
} else {
|
||||
/* g_displayLevel <= 2; only display notifications if == 2; */
|
||||
DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20));
|
||||
if (fileSize != UTIL_FILESIZE_UNKNOWN)
|
||||
DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20));
|
||||
DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare);
|
||||
DELAY_NEXT_UPDATE();
|
||||
}
|
||||
}
|
||||
}
|
||||
lastJobID = zfp.currentJobID;
|
||||
} /* if (zfp.currentJobID > lastJobID) */
|
||||
} /* if (g_adaptiveMode) */
|
||||
} /* if (READY_FOR_UPDATE()) */
|
||||
} /* while ((inBuff.pos != inBuff.size) */
|
||||
} while (directive != ZSTD_e_end);
|
||||
|
||||
if (ferror(srcFile)) {
|
||||
|
Loading…
Reference in New Issue
Block a user