added mechanism for measuring how much of a job has been created

This commit is contained in:
Paul Cruz 2017-07-19 10:10:47 -07:00
parent 559ea4ff25
commit e11bf55d0b

View File

@ -89,8 +89,10 @@ typedef struct {
unsigned adaptParam; unsigned adaptParam;
unsigned compressionCompletionMeasured; unsigned compressionCompletionMeasured;
unsigned writeCompletionMeasured; unsigned writeCompletionMeasured;
unsigned createCompletionMeasured;
double compressionCompletion; double compressionCompletion;
double writeCompletion; double writeCompletion;
double createCompletion;
mutex_t jobCompressed_mutex; mutex_t jobCompressed_mutex;
cond_t jobCompressed_cond; cond_t jobCompressed_cond;
mutex_t jobReady_mutex; mutex_t jobReady_mutex;
@ -344,7 +346,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
unsigned const writeSlow = (compressWaiting && createWaiting); unsigned const writeSlow = (compressWaiting && createWaiting);
unsigned const compressSlow = (writeWaiting && createWaiting); unsigned const compressSlow = (writeWaiting && createWaiting);
unsigned const createSlow = (compressWaiting && writeWaiting); unsigned const createSlow = (compressWaiting && writeWaiting);
DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting); DEBUG(3, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
if (allSlow) { if (allSlow) {
reset = 1; reset = 1;
} }
@ -352,13 +354,14 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel); DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
double completion; double completion;
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
completion = ctx->writeCompletion; completion = writeSlow ? ctx->writeCompletion : ctx->createCompletion;
DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
{ {
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1; unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1; unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change); DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
DEBUG(2, "write completion: %f\n", completion); DEBUG(3, "write completion: %f\n", completion);
ctx->compressionLevel += change; ctx->compressionLevel += change;
reset = 1; reset = 1;
} }
@ -418,6 +421,9 @@ static void* compressionThread(void* arg)
ctx->stats.readyCounter++; ctx->stats.readyCounter++;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex); pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx); reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletionMeasured = 1;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
adaptCompressionLevel(ctx); adaptCompressionLevel(ctx);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
@ -695,6 +701,12 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
} }
pos += ret; pos += ret;
remaining -= ret; remaining -= ret;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (!ctx->createCompletionMeasured) {
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
}
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
} }
if (remaining != 0 && !feof(srcFile)) { if (remaining != 0 && !feof(srcFile)) {
DISPLAY("Error: problem occurred during read from src file\n"); DISPLAY("Error: problem occurred during read from src file\n");