From 6f3ad1b22e034cf27f34d63838c6f7cdfe55b015 Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Wed, 5 Jul 2017 17:24:21 -0700 Subject: [PATCH] fixed the problem with pipeline tests by changing how jobs move through the threads --- contrib/adaptive-compression/multi.c | 81 +++++++++++----------------- 1 file changed, 30 insertions(+), 51 deletions(-) diff --git a/contrib/adaptive-compression/multi.c b/contrib/adaptive-compression/multi.c index 40045381..fd04a53b 100644 --- a/contrib/adaptive-compression/multi.c +++ b/contrib/adaptive-compression/multi.c @@ -26,15 +26,6 @@ typedef struct { buffer_t dst; unsigned compressionLevel; unsigned jobID; - unsigned jobCompleted; - unsigned jobReady; - unsigned jobWritten; - pthread_mutex_t* jobCompleted_mutex; - pthread_cond_t* jobCompleted_cond; - pthread_mutex_t* jobReady_mutex; - pthread_cond_t* jobReady_cond; - pthread_mutex_t* jobWrite_mutex; - pthread_cond_t* jobWrite_cond; size_t compressedSize; } jobDescription; @@ -45,6 +36,9 @@ typedef struct { unsigned lastJobID; unsigned nextJobID; unsigned threadError; + unsigned jobReadyID; + unsigned jobCompletedID; + unsigned jobWrittenID; unsigned allJobsCompleted; pthread_mutex_t jobCompleted_mutex; pthread_cond_t jobCompleted_cond; @@ -107,20 +101,11 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) pthread_mutex_init(&ctx->jobWrite_mutex, NULL); pthread_cond_init(&ctx->jobWrite_cond, NULL); ctx->numJobs = numJobs; + ctx->jobReadyID = 0; + ctx->jobCompletedID = 0; + ctx->jobWrittenID = 0; ctx->lastJobID = -1; /* intentional underflow */ ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); - { - unsigned u; - for (u=0; ujobs[u].jobCompleted_mutex = &ctx->jobCompleted_mutex; - ctx->jobs[u].jobCompleted_cond = &ctx->jobCompleted_cond; - ctx->jobs[u].jobReady_mutex = &ctx->jobReady_mutex; - ctx->jobs[u].jobReady_cond = &ctx->jobReady_cond; - ctx->jobs[u].jobWrite_mutex = &ctx->jobWrite_mutex; - ctx->jobs[u].jobWrite_cond = &ctx->jobWrite_cond; - ctx->jobs[u].jobWritten = 1; - } - } ctx->nextJobID = 0; ctx->threadError = 0; ctx->allJobsCompleted = 0; @@ -161,11 +146,11 @@ static void* compressionThread(void* arg) unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; // DEBUGLOG(2, "compressionThread(): waiting on job ready\n"); - pthread_mutex_lock(job->jobReady_mutex); - while(job->jobReady == 0) { - pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex); + pthread_mutex_lock(&ctx->jobReady_mutex); + while(currJob + 1 > ctx->jobReadyID) { + pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex); } - pthread_mutex_unlock(job->jobReady_mutex); + pthread_mutex_unlock(&ctx->jobReady_mutex); // DEBUGLOG(2, "compressionThread(): continuing after job ready\n"); /* compress the data */ { @@ -177,11 +162,11 @@ static void* compressionThread(void* arg) } job->compressedSize = compressedSize; } - pthread_mutex_lock(job->jobCompleted_mutex); - job->jobCompleted = 1; + pthread_mutex_lock(&ctx->jobCompleted_mutex); + ctx->jobCompletedID++; DEBUGLOG(2, "signaling for job %u\n", currJob); - pthread_cond_signal(job->jobCompleted_cond); - pthread_mutex_unlock(job->jobCompleted_mutex); + pthread_cond_signal(&ctx->jobCompleted_cond); + pthread_mutex_unlock(&ctx->jobCompleted_mutex); currJob++; if (currJob >= ctx->lastJobID || ctx->threadError) { /* finished compressing all jobs */ @@ -201,12 +186,12 @@ static void* outputThread(void* arg) unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; DEBUGLOG(2, "outputThread(): waiting on job completed\n"); - pthread_mutex_lock(job->jobCompleted_mutex); - while (job->jobCompleted == 0) { + pthread_mutex_lock(&ctx->jobCompleted_mutex); + while (currJob + 1 > ctx->jobCompletedID) { DEBUGLOG(2, "inside job completed wait loop waiting on %u\n", currJob); - pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex); + pthread_cond_wait(&ctx->jobCompleted_cond, &ctx->jobCompleted_mutex); } - pthread_mutex_unlock(job->jobCompleted_mutex); + pthread_mutex_unlock(&ctx->jobCompleted_mutex); DEBUGLOG(2, "outputThread(): continuing after job completed\n"); { size_t const compressedSize = job->compressedSize; @@ -224,10 +209,10 @@ static void* outputThread(void* arg) } currJob++; DEBUGLOG(2, "locking job write mutex\n"); - pthread_mutex_lock(job->jobWrite_mutex); - job->jobWritten = 1; - pthread_cond_signal(job->jobWrite_cond); - pthread_mutex_unlock(job->jobWrite_mutex); + pthread_mutex_lock(&ctx->jobWrite_mutex); + ctx->jobWrittenID++; + pthread_cond_signal(&ctx->jobWrite_cond); + pthread_mutex_unlock(&ctx->jobWrite_mutex); DEBUGLOG(2, "unlocking job write mutex\n"); DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID); @@ -250,23 +235,17 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize) unsigned const nextJobIndex = nextJob % ctx->numJobs; jobDescription* job = &ctx->jobs[nextJobIndex]; // DEBUGLOG(2, "createCompressionJob(): wait for job write\n"); - pthread_mutex_lock(job->jobWrite_mutex); - while (job->jobWritten == 0) { - pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex); + pthread_mutex_lock(&ctx->jobWrite_mutex); + while (nextJob - ctx->jobWrittenID >= ctx->numJobs) { + pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex); } - pthread_mutex_unlock(job->jobWrite_mutex); + pthread_mutex_unlock(&ctx->jobWrite_mutex); // DEBUGLOG(2, "createCompressionJob(): continuing after job write\n"); job->compressionLevel = ctx->compressionLevel; job->src.start = malloc(srcSize); job->src.size = srcSize; job->dst.size = ZSTD_compressBound(srcSize); job->dst.start = malloc(job->dst.size); - job->jobCompleted = 0; - job->jobWritten = 0; - job->jobCompleted_cond = &ctx->jobCompleted_cond; - job->jobCompleted_mutex = &ctx->jobCompleted_mutex; - job->jobReady_cond = &ctx->jobReady_cond; - job->jobReady_mutex = &ctx->jobReady_mutex; job->jobID = nextJob; if (!job->src.start || !job->dst.start) { /* problem occurred, free things then return */ @@ -276,10 +255,10 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize) return 1; } memcpy(job->src.start, data, srcSize); - pthread_mutex_lock(job->jobReady_mutex); - job->jobReady = 1; - pthread_cond_signal(job->jobReady_cond); - pthread_mutex_unlock(job->jobReady_mutex); + pthread_mutex_lock(&ctx->jobReady_mutex); + ctx->jobReadyID++; + pthread_cond_signal(&ctx->jobReady_cond); + pthread_mutex_unlock(&ctx->jobReady_mutex); ctx->nextJobID++; return 0; }