#include /* malloc */ #include /* posix only, to be replaced by a more portable version */ #include "zstd_internal.h" /* MIN, ERROR */ #include "zstdmt_compress.h" #if 0 # include # include # include static unsigned g_debugLevel = 2; # define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); } static unsigned long long GetCurrentClockTimeMicroseconds() { static clock_t _ticksPerSecond = 0; if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK); struct tms junk; clock_t newTicks = (clock_t) times(&junk); return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); } #define MUTEX_WAIT_TIME_DLEVEL 5 #define PTHREAD_MUTEX_LOCK(mutex) \ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \ unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \ pthread_mutex_lock(mutex); \ unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \ unsigned long long elapsedTime = (afterTime-beforeTime); \ if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread %li took %llu microseconds to acquire mutex %s \n", \ (long int) pthread_self(), elapsedTime, #mutex); \ } \ } else pthread_mutex_lock(mutex); #else # define DEBUGLOG(l, ...) /* disabled */ # define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m) #endif #define ZSTDMT_NBTHREADS_MAX 128 #define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX) typedef struct frameToWrite_s { const void* start; size_t frameSize; unsigned frameID; unsigned isLastFrame; } frameToWrite_t; typedef struct ZSTDMT_dstBuffer_s { ZSTD_outBuffer out; unsigned frameIDToWrite; pthread_mutex_t frameTable_mutex; pthread_mutex_t allFramesWritten_mutex; frameToWrite_t stackedFrame[ZSTDMT_NBSTACKEDFRAMES_MAX]; unsigned nbStackedFrames; } ZSTDMT_dstBufferManager; static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t dstCapacity) { ZSTDMT_dstBufferManager dbm; dbm.out.dst = dst; dbm.out.size = dstCapacity; dbm.out.pos = 0; dbm.frameIDToWrite = 0; pthread_mutex_init(&dbm.frameTable_mutex, NULL); pthread_mutex_t* const allFramesWritten_mutex = &dbm.allFramesWritten_mutex; pthread_mutex_init(allFramesWritten_mutex, NULL); PTHREAD_MUTEX_LOCK(allFramesWritten_mutex); /* maybe could be merged into init ? */ dbm.nbStackedFrames = 0; return dbm; } /* note : can fail if nbStackedFrames > ZSTDMT_NBSTACKEDFRAMES_MAX. * note2 : can only be called from a section with frameTable_mutex already locked */ static void ZSTDMT_stackFrameToWrite(ZSTDMT_dstBufferManager* dstBufferManager, frameToWrite_t frame) { dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames++] = frame; } typedef struct buffer_s { void* start; size_t bufferSize; } buffer_t; static buffer_t ZSTDMT_getDstBuffer(const ZSTDMT_dstBufferManager* dstBufferManager) { ZSTD_outBuffer const out = dstBufferManager->out; buffer_t buf; buf.start = (char*)(out.dst) + out.pos; buf.bufferSize = out.size - out.pos; return buf; } /* condition : stackNumber < dstBufferManager->nbStackedFrames. * note : there can only be one write at a time, due to frameID condition */ static size_t ZSTDMT_writeFrame(ZSTDMT_dstBufferManager* dstBufferManager, unsigned stackNumber) { ZSTD_outBuffer const out = dstBufferManager->out; size_t const frameSize = dstBufferManager->stackedFrame[stackNumber].frameSize; const void* const frameStart = dstBufferManager->stackedFrame[stackNumber].start; if (out.pos + frameSize > out.size) return ERROR(dstSize_tooSmall); DEBUGLOG(3, "writing frame %u (%u bytes) ", dstBufferManager->stackedFrame[stackNumber].frameID, (U32)frameSize); memcpy((char*)out.dst + out.pos, frameStart, frameSize); dstBufferManager->out.pos += frameSize; dstBufferManager->frameIDToWrite = dstBufferManager->stackedFrame[stackNumber].frameID + 1; return 0; } static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager, const void* src, size_t srcSize, unsigned frameID, unsigned isLastFrame) { unsigned lastFrameWritten = 0; /* check if correct frame ordering; stack otherwise */ DEBUGLOG(5, "considering writing frame %u ", frameID); PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); if (frameID != dstBufferManager->frameIDToWrite) { DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite); frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame }; ZSTDMT_stackFrameToWrite(dstBufferManager, frame); pthread_mutex_unlock(&dstBufferManager->frameTable_mutex); return 0; } pthread_mutex_unlock(&dstBufferManager->frameTable_mutex); /* write frame * note : only one write possible due to frameID condition */ DEBUGLOG(3, "writing frame %u (%u bytes) ", frameID, (U32)srcSize); ZSTD_outBuffer const out = dstBufferManager->out; if (out.pos + srcSize > out.size) return ERROR(dstSize_tooSmall); if (frameID) /* frameID==0 compress directly in dst buffer */ memcpy((char*)out.dst + out.pos, src, srcSize); dstBufferManager->out.pos += srcSize; dstBufferManager->frameIDToWrite = frameID+1; lastFrameWritten = isLastFrame; /* check if more frames are stacked */ PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); unsigned frameWritten = dstBufferManager->nbStackedFrames>0; while (frameWritten) { unsigned u; frameID++; frameWritten = 0; for (u=0; unbStackedFrames; u++) { if (dstBufferManager->stackedFrame[u].frameID == frameID) { pthread_mutex_unlock(&dstBufferManager->frameTable_mutex); DEBUGLOG(4, "catch up frame %u ", frameID); { size_t const writeError = ZSTDMT_writeFrame(dstBufferManager, u); if (ZSTD_isError(writeError)) return writeError; } lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame; dstBufferManager->frameIDToWrite = frameID+1; /* remove frame from stack */ PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1]; dstBufferManager->nbStackedFrames -= 1; frameWritten = dstBufferManager->nbStackedFrames>0; break; } } } pthread_mutex_unlock(&dstBufferManager->frameTable_mutex); /* end reached : last frame written */ if (lastFrameWritten) pthread_mutex_unlock(&dstBufferManager->allFramesWritten_mutex); return 0; } typedef struct ZSTDMT_jobDescription_s { const void* src; /* NULL means : kill thread */ size_t srcSize; int compressionLevel; ZSTDMT_dstBufferManager* dstManager; unsigned frameNumber; unsigned isLastFrame; } ZSTDMT_jobDescription; typedef struct ZSTDMT_jobAgency_s { pthread_mutex_t jobAnnounce_mutex; pthread_mutex_t jobApply_mutex; ZSTDMT_jobDescription jobAnnounce; } ZSTDMT_jobAgency; /* ZSTDMT_postjob() : * This function is blocking as long as previous posted job is not taken. * It could be made non-blocking, with a storage queue. * But blocking has benefits : on top of memory savings, * the caller will be able to measure delay, allowing dynamic speed throttle (via compression level). */ static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job) { DEBUGLOG(5, "starting job posting "); PTHREAD_MUTEX_LOCK(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */ DEBUGLOG(5, "job posting mutex acquired "); jobAgency->jobAnnounce = job; /* post job */ pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex); /* announce */ DEBUGLOG(5, "job available now "); } static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency) { PTHREAD_MUTEX_LOCK(&jobAgency->jobAnnounce_mutex); /* should check return code */ ZSTDMT_jobDescription const job = jobAgency->jobAnnounce; pthread_mutex_unlock(&jobAgency->jobApply_mutex); return job; } #define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX typedef struct ZSTDMT_bufferPool_s { pthread_mutex_t bufferPool_mutex; buffer_t bTable[ZSTDMT_NBBUFFERSPOOLED_MAX]; unsigned nbBuffers; } ZSTDMT_bufferPool; static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) { PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex); if (pool->nbBuffers) { /* try to use an existing buffer */ pool->nbBuffers--; buffer_t const buf = pool->bTable[pool->nbBuffers]; pthread_mutex_unlock(&pool->bufferPool_mutex); size_t const availBufferSize = buf.bufferSize; if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */ return buf; free(buf.start); /* size conditions not respected : create a new buffer */ } pthread_mutex_unlock(&pool->bufferPool_mutex); /* create new buffer */ buffer_t buf; buf.bufferSize = bSize; buf.start = calloc(1, bSize); return buf; } /* effectively store buffer for later re-use, up to pool capacity */ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) { PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex); if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) { pthread_mutex_unlock(&pool->bufferPool_mutex); free(buf.start); return; } pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */ pthread_mutex_unlock(&pool->bufferPool_mutex); } struct ZSTDMT_CCtx_s { pthread_t pthread[ZSTDMT_NBTHREADS_MAX]; unsigned nbThreads; ZSTDMT_jobAgency jobAgency; ZSTDMT_bufferPool bufferPool; }; static void* ZSTDMT_compressionThread(void* arg) { if (arg==NULL) return NULL; /* error : should not be possible */ ZSTDMT_CCtx* const mtctx = (ZSTDMT_CCtx*) arg; ZSTDMT_jobAgency* const jobAgency = &mtctx->jobAgency; ZSTDMT_bufferPool* const pool = &mtctx->bufferPool; ZSTD_CCtx* const cctx = ZSTD_createCCtx(); if (cctx==NULL) return NULL; /* allocation failure : thread not started */ DEBUGLOG(3, "thread %li created ", (long int)pthread_self()); for (;;) { ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency); if (job.src == NULL) { DEBUGLOG(4, "thread exit "); ZSTD_freeCCtx(cctx); return NULL; } ZSTDMT_dstBufferManager* dstBufferManager = job.dstManager; size_t const dstBufferCapacity = ZSTD_compressBound(job.srcSize); DEBUGLOG(4, "requesting a dstBuffer for frame %u", job.frameNumber); buffer_t const dstBuffer = job.frameNumber ? ZSTDMT_getBuffer(pool, dstBufferCapacity) : ZSTDMT_getDstBuffer(dstBufferManager); /* lack params */ DEBUGLOG(4, "start compressing frame %u", job.frameNumber); //size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel); size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel); if (ZSTD_isError(cSize)) return (void*)(cSize); /* error - find a better way */ size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame); /* pas clair */ if (ZSTD_isError(writeError)) return (void*)writeError; if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer); } } ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) { if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx)); if (!cctx) return NULL; /* init jobAgency */ pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */ pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL); PTHREAD_MUTEX_LOCK(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */ /* init bufferPool */ pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL); /* start all workers */ cctx->nbThreads = nbThreads; DEBUGLOG(2, "nbThreads : %u \n", nbThreads); unsigned t; for (t = 0; t < nbThreads; t++) { pthread_create(&cctx->pthread[t], NULL, ZSTDMT_compressionThread, cctx); /* check return value ? */ } return cctx; } size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx) { /* free threads */ /* free mutex (if necessary) */ /* free bufferPool */ free(cctx); /* incompleted ! */ return 0; } size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, int compressionLevel) { ZSTDMT_jobAgency* jobAgency = &cctx->jobAgency; ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0); size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2); unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */; unsigned const nbFrames = MIN(nbFramesMax, cctx->nbThreads); size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames; size_t remainingSrcSize = srcSize; const char* const srcStart = (const char*)src; size_t frameStartPos = 0; ZSTDMT_dstBufferManager dbm = ZSTDMT_createDstBufferManager(dst, dstCapacity); DEBUGLOG(2, "windowLog : %u => frameSizeTarget : %u ", params.cParams.windowLog, (U32)frameSizeTarget); DEBUGLOG(2, "nbFrames : %u (size : %u bytes) ", nbFrames, (U32)avgFrameSize); { unsigned u; for (u=0; u