use pthread_cond to send signals between threads

This commit is contained in:
Yann Collet 2017-01-01 17:31:33 +01:00
parent e4f70cdbbc
commit 2ec635a162

View File

@ -115,6 +115,7 @@ typedef struct {
size_t cSize;
unsigned jobCompleted;
pthread_mutex_t* jobCompleted_mutex;
pthread_cond_t* jobCompleted_cond;
} ZSTDMT_jobDescription;
/* ZSTDMT_compressFrame() : POOL_function type */
@ -126,7 +127,9 @@ void ZSTDMT_compressFrame(void* jobDescription)
job->cSize = ZSTD_compressCCtx(job->cctx, job->dstBuff.start, job->dstBuff.size, job->srcStart, job->srcSize, job->compressionLevel);
DEBUGLOG(5, "compressed to %u bytes ", (unsigned)job->cSize);
job->jobCompleted = 1;
DEBUGLOG(5, "unlocking mutex jobCompleted_mutex");
DEBUGLOG(5, "sending jobCompleted signal");
pthread_mutex_lock(job->jobCompleted_mutex);
pthread_cond_signal(job->jobCompleted_cond);
pthread_mutex_unlock(job->jobCompleted_mutex);
DEBUGLOG(5, "ZSTDMT_compressFrame completed");
}
@ -188,6 +191,7 @@ struct ZSTDMT_CCtx_s {
ZSTDMT_CCtxPool* cctxPool;
unsigned nbThreads;
pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond;
ZSTDMT_jobDescription jobs[1]; /* variable size */
};
@ -201,6 +205,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
pthread_mutex_init(&cctx->jobCompleted_mutex, NULL);
pthread_cond_init(&cctx->jobCompleted_cond, NULL);
return cctx;
}
@ -248,6 +253,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
mtctx->jobs[u].frameID = u;
mtctx->jobs[u].jobCompleted = 0;
mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)frameSize);
POOL_add(mtctx->factory, ZSTDMT_compressFrame, &mtctx->jobs[u]);
@ -261,10 +267,14 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
size_t dstPos = 0;
for (frameID=0; frameID<nbFrames; frameID++) {
DEBUGLOG(3, "ready to write frame %u ", frameID);
pthread_mutex_lock(&mtctx->jobCompleted_mutex);
while (mtctx->jobs[frameID].jobCompleted==0) {
DEBUGLOG(4, "waiting for signal jobCompleted_mutex")
pthread_mutex_lock(&mtctx->jobCompleted_mutex);
DEBUGLOG(4, "waiting for jobCompleted signal for frame %u", frameID);
pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
}
pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
{ size_t const cSize = mtctx->jobs[frameID].cSize;
if (ZSTD_isError(cSize)) return cSize;
if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall);