changed completed variables to compressed for clarity

This commit is contained in:
Paul Cruz 2017-07-07 13:18:55 -07:00
parent 8c0eb62920
commit 09d7c6a994

View File

@ -33,11 +33,11 @@ typedef struct {
} buffer_t;
typedef struct {
unsigned waitCompleted;
unsigned waitCompressed;
unsigned waitReady;
unsigned waitWrite;
unsigned readyCounter;
unsigned completedCounter;
unsigned compressedCounter;
unsigned writeCounter;
} stat_t;
@ -57,12 +57,12 @@ typedef struct {
unsigned nextJobID;
unsigned threadError;
unsigned jobReadyID;
unsigned jobCompletedID;
unsigned jobCompressedID;
unsigned jobWriteID;
unsigned allJobsCompleted;
unsigned adaptParam;
pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond;
pthread_mutex_t jobCompressed_mutex;
pthread_cond_t jobCompressed_cond;
pthread_mutex_t jobReady_mutex;
pthread_cond_t jobReady_cond;
pthread_mutex_t allJobsCompleted_mutex;
@ -87,8 +87,8 @@ static void freeCompressionJobs(adaptCCtx* ctx)
static int freeCCtx(adaptCCtx* ctx)
{
{
int const completedMutexError = pthread_mutex_destroy(&ctx->jobCompleted_mutex);
int const completedCondError = pthread_cond_destroy(&ctx->jobCompleted_cond);
int const compressedMutexError = pthread_mutex_destroy(&ctx->jobCompressed_mutex);
int const compressedCondError = pthread_cond_destroy(&ctx->jobCompressed_cond);
int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex);
int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond);
int const allJobsMutexError = pthread_mutex_destroy(&ctx->allJobsCompleted_mutex);
@ -100,7 +100,7 @@ static int freeCCtx(adaptCCtx* ctx)
freeCompressionJobs(ctx);
free(ctx->jobs);
}
return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError | jobWriteMutexError | jobWriteCondError;
return compressedMutexError | compressedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError | jobWriteMutexError | jobWriteCondError;
}
}
@ -114,8 +114,8 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
}
memset(ctx, 0, sizeof(adaptCCtx));
ctx->compressionLevel = g_compressionLevel;
pthread_mutex_init(&ctx->jobCompleted_mutex, NULL); /* TODO: add checks for errors on each mutex */
pthread_cond_init(&ctx->jobCompleted_cond, NULL);
pthread_mutex_init(&ctx->jobCompressed_mutex, NULL); /* TODO: add checks for errors on each mutex */
pthread_cond_init(&ctx->jobCompressed_cond, NULL);
pthread_mutex_init(&ctx->jobReady_mutex, NULL);
pthread_cond_init(&ctx->jobReady_cond, NULL);
pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
@ -124,7 +124,7 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
pthread_cond_init(&ctx->jobWrite_cond, NULL);
ctx->numJobs = numJobs;
ctx->jobReadyID = 0;
ctx->jobCompletedID = 0;
ctx->jobCompressedID = 0;
ctx->jobWriteID = 0;
ctx->lastJobID = -1; /* intentional underflow */
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
@ -164,14 +164,14 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
static unsigned adaptCompressionLevel(adaptCCtx* ctx)
{
unsigned reset = 0;
unsigned const allSlow = ctx->adaptParam < ctx->stats.completedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0;
unsigned const allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0;
unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0;
unsigned const writeWaiting = ctx->adaptParam < ctx->stats.completedCounter ? 1 : 0;
unsigned const writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter ? 1 : 0;
unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter ? 1 : 0;
unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting)) ? 1 : 0;
unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting)) ? 1 : 0;
unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting)) ? 1 : 0;
DEBUGLOG(2, "ready: %u completed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.completedCounter, ctx->stats.writeCounter);
DEBUGLOG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
if (allSlow) {
reset = 1;
}
@ -188,7 +188,7 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
if (reset) {
ctx->stats.readyCounter = 0;
ctx->stats.writeCounter = 0;
ctx->stats.completedCounter = 0;
ctx->stats.compressedCounter = 0;
}
return ctx->compressionLevel;
}
@ -222,11 +222,11 @@ static void* compressionThread(void* arg)
}
job->compressedSize = compressedSize;
}
pthread_mutex_lock(&ctx->jobCompleted_mutex);
ctx->jobCompletedID++;
pthread_mutex_lock(&ctx->jobCompressed_mutex);
ctx->jobCompressedID++;
DEBUGLOG(2, "signaling for job %u\n", currJob);
pthread_cond_signal(&ctx->jobCompleted_cond);
pthread_mutex_unlock(&ctx->jobCompleted_mutex);
pthread_cond_signal(&ctx->jobCompressed_cond);
pthread_mutex_unlock(&ctx->jobCompressed_mutex);
DEBUGLOG(2, "finished job compression %u\n", currJob);
currJob++;
if (currJob >= ctx->lastJobID || ctx->threadError) {
@ -266,14 +266,14 @@ static void* outputThread(void* arg)
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUGLOG(2, "outputThread(): waiting on job completed\n");
pthread_mutex_lock(&ctx->jobCompleted_mutex);
while (currJob + 1 > ctx->jobCompletedID) {
ctx->stats.waitCompleted++;
ctx->stats.completedCounter++;
pthread_mutex_lock(&ctx->jobCompressed_mutex);
while (currJob + 1 > ctx->jobCompressedID) {
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
DEBUGLOG(2, "waiting on job completed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompleted_cond, &ctx->jobCompleted_mutex);
pthread_cond_wait(&ctx->jobCompressed_cond, &ctx->jobCompressed_mutex);
}
pthread_mutex_unlock(&ctx->jobCompleted_mutex);
pthread_mutex_unlock(&ctx->jobCompressed_mutex);
DEBUGLOG(2, "outputThread(): continuing after job completed\n");
{
size_t const compressedSize = job->compressedSize;
@ -320,7 +320,7 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
jobDescription* job = &ctx->jobs[nextJobIndex];
DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
pthread_mutex_lock(&ctx->jobWrite_mutex);
DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompletedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompletedID, ctx->jobWriteID, ctx->numJobs);
DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
while (nextJob - ctx->jobWriteID >= ctx->numJobs) {
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
@ -358,7 +358,7 @@ static void printStats(stat_t stats)
{
DISPLAY("========STATISTICS========\n");
DISPLAY("# times waited on job ready: %u\n", stats.waitReady);
DISPLAY("# times waited on job completed: %u\n", stats.waitCompleted);
DISPLAY("# times waited on job compressed: %u\n", stats.waitCompressed);
DISPLAY("# times waited on job Write: %u\n\n", stats.waitWrite);
}