fix leaky abstraction regarding measuring completion

This commit is contained in:
Paul Cruz 2017-07-26 16:40:05 -07:00
parent 715f36ca81
commit ab5a78547e

View File

@ -367,20 +367,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion); DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
createWaitCompressionCompletion = ctx->createWaitCompressionCompletion; createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion; writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
ctx->createWaitCompressionCompletion = 1;
ctx->writeWaitCompressionCompletion = 1;
pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion); DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion);
compressWaitWriteCompletion = ctx->compressWaitWriteCompletion; compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
ctx->compressWaitWriteCompletion = 1;
pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion); DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
ctx->compressWaitCreateCompletion = 1;
pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter); DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
@ -462,22 +458,28 @@ static void* compressionThread(void* arg)
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
if (willWaitForCreate) { if (willWaitForCreate) {
DEBUG(2, "compression will wait for create on job %u\n", currJob); DEBUG(2, "compression will wait for create on job %u\n", currJob);
pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
ctx->compressWaitCreateCompletion = ctx->createCompletion; ctx->compressWaitCreateCompletion = ctx->createCompletion;
DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion); DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
} }
else {
ctx->compressWaitCreateCompletion = 1;
}
pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
if (willWaitForWrite) { if (willWaitForWrite) {
DEBUG(2, "compression will wait for write on job %u\n", currJob); DEBUG(2, "compression will wait for write on job %u\n", currJob);
pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
ctx->compressWaitWriteCompletion = ctx->writeCompletion; ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion); DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
} }
else {
ctx->compressWaitWriteCompletion = 1;
}
pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
} }
@ -610,14 +612,27 @@ static void* outputThread(void* arg)
for ( ; ; ) { for ( ; ; ) {
unsigned const currJobIndex = currJob % ctx->numJobs; unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* const job = &ctx->jobs[currJobIndex]; jobDescription* const job = &ctx->jobs[currJobIndex];
unsigned willWaitForCompress = 0;
DEBUG(2, "starting write for job %u\n", currJob); DEBUG(2, "starting write for job %u\n", currJob);
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1;
pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
if (willWaitForCompress) {
/* write thread is waiting on compression thread */ /* write thread is waiting on compression thread */
ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion); DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); }
else {
ctx->writeWaitCompressionCompletion = 1;
}
pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
@ -751,17 +766,27 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
size_t const readBlockSize = 1 << 15; size_t const readBlockSize = 1 << 15;
size_t remaining = FILE_CHUNK_SIZE; size_t remaining = FILE_CHUNK_SIZE;
unsigned const nextJob = ctx->nextJobID; unsigned const nextJob = ctx->nextJobID;
unsigned willWaitForCompress = 0;
DEBUG(2, "starting creation of job %u\n", currJob); DEBUG(2, "starting creation of job %u\n", currJob);
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1;
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
if (willWaitForCompress) {
/* creation thread is waiting, take measurement of completion */
ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
}
else {
ctx->createWaitCompressionCompletion = 1;
}
pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
/* wait until the job has been compressed */ /* wait until the job has been compressed */
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
/* creation thread is waiting, take measurement of completion */
ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);