reduced competition for completion mutex by separating mutex use based on which values is updated

This commit is contained in:
Paul Cruz 2017-07-23 14:09:16 -07:00
parent 880f08d104
commit 483d936b87

View File

@ -92,8 +92,9 @@ typedef struct {
cond_t allJobsCompleted_cond; cond_t allJobsCompleted_cond;
mutex_t jobWrite_mutex; mutex_t jobWrite_mutex;
cond_t jobWrite_cond; cond_t jobWrite_cond;
mutex_t completion_mutex; mutex_t compressionCompletion_mutex;
mutex_t wait_mutex; mutex_t createCompletion_mutex;
mutex_t writeCompletion_mutex;
size_t lastDictSize; size_t lastDictSize;
inBuff_t input; inBuff_t input;
jobDescription* jobs; jobDescription* jobs;
@ -152,8 +153,9 @@ static int freeCCtx(adaptCCtx* ctx)
error |= destroyCond(&ctx->allJobsCompleted_cond); error |= destroyCond(&ctx->allJobsCompleted_cond);
error |= destroyMutex(&ctx->jobWrite_mutex); error |= destroyMutex(&ctx->jobWrite_mutex);
error |= destroyCond(&ctx->jobWrite_cond); error |= destroyCond(&ctx->jobWrite_cond);
error |= destroyMutex(&ctx->completion_mutex); error |= destroyMutex(&ctx->compressionCompletion_mutex);
error |= destroyMutex(&ctx->wait_mutex); error |= destroyMutex(&ctx->createCompletion_mutex);
error |= destroyMutex(&ctx->writeCompletion_mutex);
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)); error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
free(ctx->input.buffer.start); free(ctx->input.buffer.start);
if (ctx->jobs){ if (ctx->jobs){
@ -192,8 +194,9 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
pthreadError |= initCond(&ctx->allJobsCompleted_cond); pthreadError |= initCond(&ctx->allJobsCompleted_cond);
pthreadError |= initMutex(&ctx->jobWrite_mutex); pthreadError |= initMutex(&ctx->jobWrite_mutex);
pthreadError |= initCond(&ctx->jobWrite_cond); pthreadError |= initCond(&ctx->jobWrite_cond);
pthreadError |= initMutex(&ctx->completion_mutex); pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
pthreadError |= initMutex(&ctx->wait_mutex); pthreadError |= initMutex(&ctx->createCompletion_mutex);
pthreadError |= initMutex(&ctx->writeCompletion_mutex);
if (pthreadError) return pthreadError; if (pthreadError) return pthreadError;
} }
ctx->numJobs = numJobs; ctx->numJobs = numJobs;
@ -323,28 +326,32 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel); DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel);
/* read and reset completion measurements */ /* read and reset completion measurements */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion); DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion);
DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion); DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion);
createWaitCompressionCompletion = ctx->createWaitCompressionCompletion; createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion; writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
DEBUG(2, "resetting adaptive variables\n");
ctx->createWaitCompressionCompletion = 1; ctx->createWaitCompressionCompletion = 1;
ctx->compressWaitCreateCompletion = 1;
ctx->compressWaitWriteCompletion = 1;
ctx->writeWaitCompressionCompletion = 1; ctx->writeWaitCompressionCompletion = 1;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
ctx->compressWaitWriteCompletion = 1;
pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
ctx->compressWaitCreateCompletion = 1;
pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
/* adaptation logic */ /* adaptation logic */
if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) { if (1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) {
/* both create and write threads waiting on compression */ /* compression waiting on either create or write */
/* use writeWaitCompressionCompletion */ /* use whichever one waited less because it was slower */
double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion); double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const boundChange = MIN(change, ctx->compressionLevel - 1); unsigned const boundChange = MIN(change, ctx->compressionLevel - 1);
@ -404,15 +411,21 @@ static void* compressionThread(void* arg)
if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1; if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (willWaitForCreate || willWaitForWrite) { if (willWaitForCreate || willWaitForWrite) {
ctx->compressWaitCreateCompletion = ctx->createCompletion;
ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(2, "compression will wait for create or write\n"); DEBUG(2, "compression will wait for create or write\n");
pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
ctx->compressWaitCreateCompletion = ctx->createCompletion;
DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion); DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion); DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
} }
/* wait until job is ready */ /* wait until job is ready */
@ -429,9 +442,9 @@ static void* compressionThread(void* arg)
} }
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
/* reset compression completion */ /* reset compression completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
ctx->compressionCompletion = 0; ctx->compressionCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
DEBUG(3, "compressionThread(): continuing after job ready\n"); DEBUG(3, "compressionThread(): continuing after job ready\n");
DEBUG(3, "DICTIONARY ENDED\n"); DEBUG(3, "DICTIONARY ENDED\n");
@ -502,10 +515,10 @@ static void* compressionThread(void* arg)
blockNum++; blockNum++;
/* update completion */ /* update completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
ctx->compressionCompletion = 1 - (double)remaining/job->src.size; ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
DEBUG(2, "compression completion %u %f\n", currJob, ctx->compressionCompletion); DEBUG(2, "compression completion %u %f\n", currJob, ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
} }
} while (remaining != 0); } while (remaining != 0);
job->dst.size = job->compressedSize; job->dst.size = job->compressedSize;
@ -557,20 +570,20 @@ static void* outputThread(void* arg)
DEBUG(3, "outputThread(): waiting on job compressed\n"); DEBUG(3, "outputThread(): waiting on job compressed\n");
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
/* write thread is waiting on compression thread */ /* write thread is waiting on compression thread */
ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(3, "write thread waiting : writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion); DEBUG(3, "write thread waiting : writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion); DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
/* reset write completion */ /* reset write completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
ctx->writeCompletion = 0; ctx->writeCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
DEBUG(3, "outputThread(): continuing after job compressed\n"); DEBUG(3, "outputThread(): continuing after job compressed\n");
{ {
@ -593,10 +606,10 @@ static void* outputThread(void* arg)
remaining -= ret; remaining -= ret;
/* update completion variable for writing */ /* update completion variable for writing */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
ctx->writeCompletion = 1 - (double)remaining/compressedSize; ctx->writeCompletion = 1 - (double)remaining/compressedSize;
DEBUG(2, "write completion %u %f\n", currJob, ctx->writeCompletion); DEBUG(2, "write completion %u %f\n", currJob, ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
if (remaining == 0) break; if (remaining == 0) break;
} }
@ -642,20 +655,20 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
/* wait until the job has been compressed */ /* wait until the job has been compressed */
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
/* creation thread is waiting, take measurement of completion */ /* creation thread is waiting, take measurement of completion */
ctx->createWaitCompressionCompletion = ctx->compressionCompletion; ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion); DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion); DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
/* reset create completion */ /* reset create completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
ctx->createCompletion = 0; ctx->createCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
DEBUG(3, "createCompressionJob(): continuing after job write\n"); DEBUG(3, "createCompressionJob(): continuing after job write\n");
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize); DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
@ -735,10 +748,10 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
} }
pos += ret; pos += ret;
remaining -= ret; remaining -= ret;
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
DEBUG(2, "create completion %u %f\n", currJob, ctx->createCompletion); DEBUG(2, "create completion %u %f\n", currJob, ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
} }
if (remaining != 0 && !feof(srcFile)) { if (remaining != 0 && !feof(srcFile)) {
DISPLAY("Error: problem occurred during read from src file\n"); DISPLAY("Error: problem occurred during read from src file\n");