Add multithread support to COVER

This commit is contained in:
Nick Terrell 2017-01-27 11:56:02 -08:00
parent cafdd31a38
commit 2fe9126591
3 changed files with 22 additions and 22 deletions

View File

@ -14,11 +14,10 @@
#include <stdlib.h> /* malloc, free, qsort */
#include <string.h> /* memset */
#include <time.h> /* clock */
#ifdef ZSTD_PTHREAD
#include "threading.h"
#endif
#include "mem.h" /* read */
#include "mem.h" /* read */
#include "pool.h"
#include "threading.h"
#include "zstd_internal.h" /* includes zstd.h */
#ifndef ZDICT_STATIC_LINKING_ONLY
#define ZDICT_STATIC_LINKING_ONLY
@ -690,11 +689,9 @@ ZDICTLIB_API size_t COVER_trainFromBuffer(
* compiled with multithreaded support.
*/
typedef struct COVER_best_s {
#ifdef ZSTD_PTHREAD
pthread_mutex_t mutex;
pthread_cond_t cond;
size_t liveJobs;
#endif
void *dict;
size_t dictSize;
COVER_params_t parameters;
@ -708,11 +705,9 @@ static void COVER_best_init(COVER_best_t *best) {
if (!best) {
return;
}
#ifdef ZSTD_PTHREAD
pthread_mutex_init(&best->mutex, NULL);
pthread_cond_init(&best->cond, NULL);
best->liveJobs = 0;
#endif
best->dict = NULL;
best->dictSize = 0;
best->compressedSize = (size_t)-1;
@ -726,13 +721,11 @@ static void COVER_best_wait(COVER_best_t *best) {
if (!best) {
return;
}
#ifdef ZSTD_PTHREAD
pthread_mutex_lock(&best->mutex);
while (best->liveJobs != 0) {
pthread_cond_wait(&best->cond, &best->mutex);
}
pthread_mutex_unlock(&best->mutex);
#endif
}
/**
@ -746,10 +739,8 @@ static void COVER_best_destroy(COVER_best_t *best) {
if (best->dict) {
free(best->dict);
}
#ifdef ZSTD_PTHREAD
pthread_mutex_destroy(&best->mutex);
pthread_cond_destroy(&best->cond);
#endif
}
/**
@ -760,11 +751,9 @@ static void COVER_best_start(COVER_best_t *best) {
if (!best) {
return;
}
#ifdef ZSTD_PTHREAD
pthread_mutex_lock(&best->mutex);
++best->liveJobs;
pthread_mutex_unlock(&best->mutex);
#endif
}
/**
@ -779,12 +768,10 @@ static void COVER_best_finish(COVER_best_t *best, size_t compressedSize,
return;
}
{
#ifdef ZSTD_PTHREAD
size_t liveJobs;
pthread_mutex_lock(&best->mutex);
--best->liveJobs;
liveJobs = best->liveJobs;
#endif
/* If the new dictionary is better */
if (compressedSize < best->compressedSize) {
/* Allocate space if necessary */
@ -805,12 +792,10 @@ static void COVER_best_finish(COVER_best_t *best, size_t compressedSize,
best->parameters = parameters;
best->compressedSize = compressedSize;
}
#ifdef ZSTD_PTHREAD
pthread_mutex_unlock(&best->mutex);
if (liveJobs == 0) {
pthread_cond_broadcast(&best->cond);
}
#endif
}
}
@ -928,11 +913,12 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer,
unsigned nbSamples,
COVER_params_t *parameters) {
/* constants */
const unsigned nbThreads = parameters->nbThreads;
const unsigned kMinD = parameters->d == 0 ? 6 : parameters->d;
const unsigned kMaxD = parameters->d == 0 ? 16 : parameters->d;
const unsigned kMinK = parameters->k == 0 ? kMaxD : parameters->k;
const unsigned kMaxK = parameters->k == 0 ? 2048 : parameters->k;
const unsigned kSteps = parameters->steps == 0 ? 256 : parameters->steps;
const unsigned kSteps = parameters->steps == 0 ? 32 : parameters->steps;
const unsigned kStepSize = MAX((kMaxK - kMinK) / kSteps, 1);
const unsigned kIterations =
(1 + (kMaxD - kMinD) / 2) * (1 + (kMaxK - kMinK) / kStepSize);
@ -942,6 +928,7 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer,
unsigned d;
unsigned k;
COVER_best_t best;
POOL_ctx *pool = NULL;
/* Checks */
if (kMinK < kMaxD || kMaxK < kMinK) {
LOCALDISPLAYLEVEL(displayLevel, 1, "Incorrect parameters\n");
@ -956,6 +943,12 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer,
ZDICT_DICTSIZE_MIN);
return ERROR(dstSize_tooSmall);
}
if (nbThreads > 1) {
pool = POOL_create(nbThreads, 1);
if (!pool) {
return ERROR(memory_allocation);
}
}
/* Initialization */
COVER_best_init(&best);
/* Turn down global display level to clean up display at level 2 and below */
@ -998,7 +991,11 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer,
}
/* Call the function and pass ownership of data to it */
COVER_best_start(&best);
COVER_tryParameters(data);
if (pool) {
POOL_add(pool, &COVER_tryParameters, data);
} else {
COVER_tryParameters(data);
}
/* Print status */
LOCALDISPLAYUPDATE(displayLevel, 2, "\r%u%% ",
(U32)((iteration * 100) / kIterations));
@ -1018,6 +1015,7 @@ ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void *dictBuffer,
*parameters = best.parameters;
memcpy(dictBuffer, best.dict, dictSize);
COVER_best_destroy(&best);
POOL_free(pool);
return dictSize;
}
}

View File

@ -93,8 +93,9 @@ ZDICTLIB_API size_t ZDICT_trainFromBuffer_advanced(void* dictBuffer, size_t dict
typedef struct {
unsigned k; /* Segment size : constraint: 0 < k : Reasonable range [16, 2048+] */
unsigned d; /* dmer size : constraint: 0 < d <= k : Reasonable range [6, 16] */
unsigned steps; /* Number of steps : Only used for optimization : 0 means default (256) : Higher means more parameters checked */
unsigned steps; /* Number of steps : Only used for optimization : 0 means default (32) : Higher means more parameters checked */
unsigned nbThreads; /* Number of threads : constraint: 0 < nbThreads : 1 means single-threaded : Only used for optimization : Ignored if ZSTD_MULTITHREAD is not defined */
unsigned notificationLevel; /* Write to stderr; 0 = none (default); 1 = errors; 2 = progression; 3 = details; 4 = debug; */
unsigned dictID; /* 0 means auto mode (32-bits random value); other : force dictID value */
int compressionLevel; /* 0 means default; target a specific zstd compression level */
@ -132,7 +133,7 @@ ZDICTLIB_API size_t COVER_trainFromBuffer(void* dictBuffer, size_t dictBufferCap
@return : size of dictionary stored into `dictBuffer` (<= `dictBufferCapacity`)
or an error code, which can be tested with ZDICT_isError().
On success `*parameters` contains the parameters selected.
Note : COVER_optimizeTrainFromBuffer() requires about 9 bytes of memory for each input byte.
Note : COVER_optimizeTrainFromBuffer() requires about 4 bytes of memory for each input byte and additionally another 4 bytes of memory for each byte of memory for each tread.
*/
ZDICTLIB_API size_t COVER_optimizeTrainFromBuffer(void* dictBuffer, size_t dictBufferCapacity,
const void* samplesBuffer, const size_t *samplesSizes, unsigned nbSamples,

View File

@ -577,6 +577,7 @@ int main(int argCount, const char* argv[])
if (operation==zom_train) {
#ifndef ZSTD_NODICT
if (cover) {
coverParams.nbThreads = nbThreads;
coverParams.compressionLevel = dictCLevel;
coverParams.notificationLevel = displayLevel;
coverParams.dictID = dictID;