From 2fe9126591ea7f436710fe0dd3a79bd473195c40 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 27 Jan 2017 11:56:02 -0800 Subject: [PATCH] Add multithread support to COVER --- lib/dictBuilder/cover.c | 38 ++++++++++++++++++-------------------- lib/dictBuilder/zdict.h | 5 +++-- programs/zstdcli.c | 1 + 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/dictBuilder/cover.c b/lib/dictBuilder/cover.c index 089f077c..c5b606db 100644 --- a/lib/dictBuilder/cover.c +++ b/lib/dictBuilder/cover.c @@ -14,11 +14,10 @@ #include /* malloc, free, qsort */ #include /* memset */ #include /* clock */ -#ifdef ZSTD_PTHREAD -#include "threading.h" -#endif -#include "mem.h" /* read */ +#include "mem.h" /* read */ +#include "pool.h" +#include "threading.h" #include "zstd_internal.h" /* includes zstd.h */ #ifndef ZDICT_STATIC_LINKING_ONLY #define ZDICT_STATIC_LINKING_ONLY @@ -690,11 +689,9 @@ ZDICTLIB_API size_t COVER_trainFromBuffer( * compiled with multithreaded support. */ typedef struct COVER_best_s { -#ifdef ZSTD_PTHREAD pthread_mutex_t mutex; pthread_cond_t cond; size_t liveJobs; -#endif void *dict; size_t dictSize; COVER_params_t parameters; @@ -708,11 +705,9 @@ static void COVER_best_init(COVER_best_t *best) { if (!best) { return; } -#ifdef ZSTD_PTHREAD pthread_mutex_init(&best->mutex, NULL); pthread_cond_init(&best->cond, NULL); best->liveJobs = 0; -#endif best->dict = NULL; best->dictSize = 0; best->compressedSize = (size_t)-1; @@ -726,13 +721,11 @@ static void COVER_best_wait(COVER_best_t *best) { if (!best) { return; } -#ifdef ZSTD_PTHREAD pthread_mutex_lock(&best->mutex); while (best->liveJobs != 0) { pthread_cond_wait(&best->cond, &best->mutex); } pthread_mutex_unlock(&best->mutex); -#endif } /** @@ -746,10 +739,8 @@ static void COVER_best_destroy(COVER_best_t *best) { if (best->dict) { free(best->dict); } -#ifdef ZSTD_PTHREAD pthread_mutex_destroy(&best->mutex); pthread_cond_destroy(&best->cond); -#endif } /** @@ -760,11 +751,9 @@ static void COVER_best_start(COVER_best_t *best) { if (!best) { return; } -#ifdef ZSTD_PTHREAD pthread_mutex_lock(&best->mutex); ++best->liveJobs; pthread_mutex_unlock(&best->mutex); -#endif } /** @@ -779,12 +768,10 @@ static void COVER_best_finish(COVER_best_t *best, size_t compressedSize, return; } { -#ifdef ZSTD_PTHREAD size_t liveJobs; pthread_mutex_lock(&best->mutex); --best->liveJobs; liveJobs = best->liveJobs; -#endif /* If the new dictionary is better */ if (compressedSize < best->compressedSize) { /* Allocate space if necessary */ @@ -805,12 +792,10 @@ static void COVER_best_finish(COVER_best_t *best, size_t compressedSize, best->parameters = parameters; best->compressedSize = compressedSize; } -#ifdef ZSTD_PTHREAD pthread_mutex_unlock(&best->mutex); if (liveJobs == 0) { pthread_cond_broadcast(&best->cond); } -#endif } } @@ -928,11 +913,12 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer, unsigned nbSamples, COVER_params_t *parameters) { /* constants */ + const unsigned nbThreads = parameters->nbThreads; const unsigned kMinD = parameters->d == 0 ? 6 : parameters->d; const unsigned kMaxD = parameters->d == 0 ? 16 : parameters->d; const unsigned kMinK = parameters->k == 0 ? kMaxD : parameters->k; const unsigned kMaxK = parameters->k == 0 ? 2048 : parameters->k; - const unsigned kSteps = parameters->steps == 0 ? 256 : parameters->steps; + const unsigned kSteps = parameters->steps == 0 ? 32 : parameters->steps; const unsigned kStepSize = MAX((kMaxK - kMinK) / kSteps, 1); const unsigned kIterations = (1 + (kMaxD - kMinD) / 2) * (1 + (kMaxK - kMinK) / kStepSize); @@ -942,6 +928,7 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer, unsigned d; unsigned k; COVER_best_t best; + POOL_ctx *pool = NULL; /* Checks */ if (kMinK < kMaxD || kMaxK < kMinK) { LOCALDISPLAYLEVEL(displayLevel, 1, "Incorrect parameters\n"); @@ -956,6 +943,12 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer, ZDICT_DICTSIZE_MIN); return ERROR(dstSize_tooSmall); } + if (nbThreads > 1) { + pool = POOL_create(nbThreads, 1); + if (!pool) { + return ERROR(memory_allocation); + } + } /* Initialization */ COVER_best_init(&best); /* Turn down global display level to clean up display at level 2 and below */ @@ -998,7 +991,11 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer, } /* Call the function and pass ownership of data to it */ COVER_best_start(&best); - COVER_tryParameters(data); + if (pool) { + POOL_add(pool, &COVER_tryParameters, data); + } else { + COVER_tryParameters(data); + } /* Print status */ LOCALDISPLAYUPDATE(displayLevel, 2, "\r%u%% ", (U32)((iteration * 100) / kIterations)); @@ -1018,6 +1015,7 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer, *parameters = best.parameters; memcpy(dictBuffer, best.dict, dictSize); COVER_best_destroy(&best); + POOL_free(pool); return dictSize; } } diff --git a/lib/dictBuilder/zdict.h b/lib/dictBuilder/zdict.h index 1b3bcb5b..aba538c4 100644 --- a/lib/dictBuilder/zdict.h +++ b/lib/dictBuilder/zdict.h @@ -93,8 +93,9 @@ ZDICTLIB_API size_t ZDICT_trainFromBuffer_advanced(void* dictBuffer, size_t dict typedef struct { unsigned k; /* Segment size : constraint: 0 < k : Reasonable range [16, 2048+] */ unsigned d; /* dmer size : constraint: 0 < d <= k : Reasonable range [6, 16] */ - unsigned steps; /* Number of steps : Only used for optimization : 0 means default (256) : Higher means more parameters checked */ + unsigned steps; /* Number of steps : Only used for optimization : 0 means default (32) : Higher means more parameters checked */ + unsigned nbThreads; /* Number of threads : constraint: 0 < nbThreads : 1 means single-threaded : Only used for optimization : Ignored if ZSTD_MULTITHREAD is not defined */ unsigned notificationLevel; /* Write to stderr; 0 = none (default); 1 = errors; 2 = progression; 3 = details; 4 = debug; */ unsigned dictID; /* 0 means auto mode (32-bits random value); other : force dictID value */ int compressionLevel; /* 0 means default; target a specific zstd compression level */ @@ -132,7 +133,7 @@ ZDICTLIB_API size_t COVER_trainFromBuffer(void* dictBuffer, size_t dictBufferCap @return : size of dictionary stored into `dictBuffer` (<= `dictBufferCapacity`) or an error code, which can be tested with ZDICT_isError(). On success `*parameters` contains the parameters selected. - Note : COVER_optimizeTrainFromBuffer() requires about 9 bytes of memory for each input byte. + Note : COVER_optimizeTrainFromBuffer() requires about 4 bytes of memory for each input byte and additionally another 4 bytes of memory for each byte of memory for each tread. */ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void* dictBuffer, size_t dictBufferCapacity, const void* samplesBuffer, const size_t *samplesSizes, unsigned nbSamples, diff --git a/programs/zstdcli.c b/programs/zstdcli.c index 64f2c919..8b8a16b2 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -577,6 +577,7 @@ int main(int argCount, const char* argv[]) if (operation==zom_train) { #ifndef ZSTD_NODICT if (cover) { + coverParams.nbThreads = nbThreads; coverParams.compressionLevel = dictCLevel; coverParams.notificationLevel = displayLevel; coverParams.dictID = dictID;