2017-08-31 19:20:50 +00:00
/*
2017-07-12 23:40:24 +00:00
* Copyright ( c ) 2017 - present , Facebook , Inc .
* All rights reserved .
*
2017-08-31 19:20:50 +00:00
* This source code is licensed under both the BSD - style license ( found in the
* LICENSE file in the root directory of this source tree ) and the GPLv2 ( found
* in the COPYING file in the root directory of this source tree ) .
2017-07-12 23:40:24 +00:00
*/
# include <stdio.h> /* fprintf */
# include <stdlib.h> /* malloc, free */
# include <pthread.h> /* pthread functions */
# include <string.h> /* memset */
# include "zstd_internal.h"
# include "util.h"
2017-07-03 21:18:46 +00:00
# define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
2017-07-07 01:09:10 +00:00
# define PRINT(...) fprintf(stdout, __VA_ARGS__)
2017-07-11 01:16:42 +00:00
# define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
2017-07-03 21:18:46 +00:00
# define FILE_CHUNK_SIZE 4 << 20
2017-07-11 17:23:25 +00:00
# define MAX_NUM_JOBS 2
2017-07-05 19:20:16 +00:00
# define stdinmark " / *stdin*\\"
# define stdoutmark " / *stdout*\\"
2017-07-05 21:19:56 +00:00
# define MAX_PATH 256
2017-07-05 23:54:34 +00:00
# define DEFAULT_DISPLAY_LEVEL 1
2017-07-06 18:05:51 +00:00
# define DEFAULT_COMPRESSION_LEVEL 6
2017-07-23 17:18:54 +00:00
# define MAX_COMPRESSION_LEVEL_CHANGE 2
2017-07-25 18:16:27 +00:00
# define CONVERGENCE_LOWER_BOUND 5
# define CLEVEL_DECREASE_COOLDOWN 5
2017-07-26 22:52:15 +00:00
# define CHANGE_BY_TWO_THRESHOLD 0.1
# define CHANGE_BY_ONE_THRESHOLD 0.65
2017-07-03 21:18:46 +00:00
2017-07-25 21:08:39 +00:00
# ifndef DEBUG_MODE
2017-07-05 23:54:34 +00:00
static int g_displayLevel = DEFAULT_DISPLAY_LEVEL ;
2017-07-25 21:08:39 +00:00
# else
static int g_displayLevel = DEBUG_MODE ;
# endif
2017-07-06 18:05:51 +00:00
static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL ;
2017-07-07 22:42:20 +00:00
static UTIL_time_t g_startTime ;
2017-07-07 03:40:00 +00:00
static size_t g_streamedSize = 0 ;
2017-07-24 23:19:07 +00:00
static unsigned g_useProgressBar = 1 ;
2017-07-07 22:42:20 +00:00
static UTIL_freq_t g_ticksPerSecond ;
2017-07-13 21:46:54 +00:00
static unsigned g_forceCompressionLevel = 0 ;
2017-07-28 22:30:46 +00:00
static unsigned g_minCLevel = 1 ;
2017-08-07 20:11:07 +00:00
static unsigned g_maxCLevel ;
2017-07-03 21:18:46 +00:00
2017-07-04 00:28:59 +00:00
typedef struct {
void * start ;
size_t size ;
2017-07-12 19:21:21 +00:00
size_t capacity ;
2017-07-04 00:28:59 +00:00
} buffer_t ;
2017-07-10 22:37:14 +00:00
typedef struct {
size_t filled ;
buffer_t buffer ;
} inBuff_t ;
2017-07-04 00:28:59 +00:00
typedef struct {
buffer_t src ;
buffer_t dst ;
unsigned jobID ;
2017-07-22 00:49:39 +00:00
unsigned lastJobPlusOne ;
2017-07-04 00:28:59 +00:00
size_t compressedSize ;
2017-07-12 23:02:20 +00:00
size_t dictSize ;
2017-07-04 00:28:59 +00:00
} jobDescription ;
2017-07-17 17:12:44 +00:00
typedef struct {
pthread_mutex_t pMutex ;
int noError ;
} mutex_t ;
typedef struct {
pthread_cond_t pCond ;
int noError ;
} cond_t ;
2017-07-04 00:28:59 +00:00
typedef struct {
unsigned compressionLevel ;
unsigned numJobs ;
unsigned nextJobID ;
unsigned threadError ;
2017-07-27 00:02:47 +00:00
/*
* JobIDs for the next jobs to be created , compressed , and written
*/
2017-07-06 00:24:21 +00:00
unsigned jobReadyID ;
2017-07-07 20:18:55 +00:00
unsigned jobCompressedID ;
2017-07-06 23:06:53 +00:00
unsigned jobWriteID ;
2017-07-04 02:24:22 +00:00
unsigned allJobsCompleted ;
2017-07-27 00:02:47 +00:00
/*
* counter for how many jobs in a row the compression level has not changed
* if the counter becomes > = CONVERGENCE_LOWER_BOUND , the next time the
* compression level tries to change ( by non - zero amount ) resets the counter
* to 1 and does not apply the change
*/
2017-07-25 17:01:10 +00:00
unsigned convergenceCounter ;
2017-07-27 00:02:47 +00:00
/*
* cooldown counter in order to prevent rapid successive decreases in compression level
* whenever compression level is decreased , cooldown is set to CLEVEL_DECREASE_COOLDOWN
* whenever adaptCompressionLevel ( ) is called and cooldown ! = 0 , it is decremented
* as long as cooldown ! = 0 , the compression level cannot be decreased
*/
2017-07-25 18:16:27 +00:00
unsigned cooldown ;
2017-07-27 00:02:47 +00:00
/*
* XWaitYCompletion
* Range from 0.0 to 1.0
* if the value is not 1.0 , then this implies that thread X waited on thread Y to finish
* and thread Y was XWaitYCompletion finished at the time of the wait ( i . e . compressWaitWriteCompletion = 0.5
* implies that the compression thread waited on the write thread and it was only 50 % finished writing a job )
*/
2017-07-21 20:38:24 +00:00
double createWaitCompressionCompletion ;
double compressWaitCreateCompletion ;
double compressWaitWriteCompletion ;
double writeWaitCompressionCompletion ;
2017-07-27 00:02:47 +00:00
/*
* Completion values
* Range from 0.0 to 1.0
* Jobs are divided into mini - chunks in order to measure completion
* these values are updated each time a thread finishes its operation on the
* mini - chunk ( i . e . finishes writing out , compressing , etc . this mini - chunk ) .
*/
2017-07-18 20:30:29 +00:00
double compressionCompletion ;
2017-07-18 22:23:11 +00:00
double writeCompletion ;
2017-07-19 17:10:47 +00:00
double createCompletion ;
2017-07-27 00:02:47 +00:00
2017-07-17 17:12:44 +00:00
mutex_t jobCompressed_mutex ;
cond_t jobCompressed_cond ;
mutex_t jobReady_mutex ;
cond_t jobReady_cond ;
mutex_t allJobsCompleted_mutex ;
cond_t allJobsCompleted_cond ;
mutex_t jobWrite_mutex ;
cond_t jobWrite_cond ;
2017-07-23 21:09:16 +00:00
mutex_t compressionCompletion_mutex ;
mutex_t createCompletion_mutex ;
mutex_t writeCompletion_mutex ;
2017-08-02 17:27:33 +00:00
mutex_t compressionLevel_mutex ;
2017-07-12 19:21:21 +00:00
size_t lastDictSize ;
2017-07-10 22:37:14 +00:00
inBuff_t input ;
2017-07-04 00:28:59 +00:00
jobDescription * jobs ;
2017-07-07 22:13:40 +00:00
ZSTD_CCtx * cctx ;
2017-07-04 00:28:59 +00:00
} adaptCCtx ;
2017-07-17 21:01:13 +00:00
typedef struct {
adaptCCtx * ctx ;
FILE * dstFile ;
} outputThreadArg ;
2017-07-13 23:38:20 +00:00
typedef struct {
FILE * srcFile ;
adaptCCtx * ctx ;
2017-07-17 21:01:13 +00:00
outputThreadArg * otArg ;
2017-07-13 23:38:20 +00:00
} fcResources ;
2017-07-05 17:20:56 +00:00
static void freeCompressionJobs ( adaptCCtx * ctx )
{
unsigned u ;
for ( u = 0 ; u < ctx - > numJobs ; u + + ) {
jobDescription job = ctx - > jobs [ u ] ;
free ( job . dst . start ) ;
free ( job . src . start ) ;
}
}
2017-07-17 17:12:44 +00:00
static int destroyMutex ( mutex_t * mutex )
{
if ( mutex - > noError ) {
int const ret = pthread_mutex_destroy ( & mutex - > pMutex ) ;
return ret ;
}
return 0 ;
}
static int destroyCond ( cond_t * cond )
{
if ( cond - > noError ) {
int const ret = pthread_cond_destroy ( & cond - > pCond ) ;
return ret ;
}
return 0 ;
}
2017-07-05 17:20:56 +00:00
static int freeCCtx ( adaptCCtx * ctx )
{
2017-07-10 23:03:09 +00:00
if ( ! ctx ) return 0 ;
2017-07-12 23:50:43 +00:00
{
int error = 0 ;
2017-07-17 17:12:44 +00:00
error | = destroyMutex ( & ctx - > jobCompressed_mutex ) ;
error | = destroyCond ( & ctx - > jobCompressed_cond ) ;
error | = destroyMutex ( & ctx - > jobReady_mutex ) ;
error | = destroyCond ( & ctx - > jobReady_cond ) ;
error | = destroyMutex ( & ctx - > allJobsCompleted_mutex ) ;
error | = destroyCond ( & ctx - > allJobsCompleted_cond ) ;
error | = destroyMutex ( & ctx - > jobWrite_mutex ) ;
error | = destroyCond ( & ctx - > jobWrite_cond ) ;
2017-07-23 21:09:16 +00:00
error | = destroyMutex ( & ctx - > compressionCompletion_mutex ) ;
error | = destroyMutex ( & ctx - > createCompletion_mutex ) ;
error | = destroyMutex ( & ctx - > writeCompletion_mutex ) ;
2017-08-02 17:27:33 +00:00
error | = destroyMutex ( & ctx - > compressionLevel_mutex ) ;
2017-07-12 23:50:43 +00:00
error | = ZSTD_isError ( ZSTD_freeCCtx ( ctx - > cctx ) ) ;
free ( ctx - > input . buffer . start ) ;
if ( ctx - > jobs ) {
freeCompressionJobs ( ctx ) ;
free ( ctx - > jobs ) ;
}
free ( ctx ) ;
return error ;
2017-07-05 17:20:56 +00:00
}
}
2017-07-17 17:12:44 +00:00
static int initMutex ( mutex_t * mutex )
{
int const ret = pthread_mutex_init ( & mutex - > pMutex , NULL ) ;
mutex - > noError = ! ret ;
return ret ;
}
static int initCond ( cond_t * cond )
{
int const ret = pthread_cond_init ( & cond - > pCond , NULL ) ;
cond - > noError = ! ret ;
return ret ;
}
2017-07-19 00:32:36 +00:00
static int initCCtx ( adaptCCtx * ctx , unsigned numJobs )
2017-07-04 00:28:59 +00:00
{
2017-07-06 18:05:51 +00:00
ctx - > compressionLevel = g_compressionLevel ;
2017-07-17 17:12:44 +00:00
{
int pthreadError = 0 ;
pthreadError | = initMutex ( & ctx - > jobCompressed_mutex ) ;
pthreadError | = initCond ( & ctx - > jobCompressed_cond ) ;
pthreadError | = initMutex ( & ctx - > jobReady_mutex ) ;
pthreadError | = initCond ( & ctx - > jobReady_cond ) ;
pthreadError | = initMutex ( & ctx - > allJobsCompleted_mutex ) ;
pthreadError | = initCond ( & ctx - > allJobsCompleted_cond ) ;
pthreadError | = initMutex ( & ctx - > jobWrite_mutex ) ;
pthreadError | = initCond ( & ctx - > jobWrite_cond ) ;
2017-07-23 21:09:16 +00:00
pthreadError | = initMutex ( & ctx - > compressionCompletion_mutex ) ;
pthreadError | = initMutex ( & ctx - > createCompletion_mutex ) ;
pthreadError | = initMutex ( & ctx - > writeCompletion_mutex ) ;
2017-08-02 17:27:33 +00:00
pthreadError | = initMutex ( & ctx - > compressionLevel_mutex ) ;
2017-07-19 00:32:36 +00:00
if ( pthreadError ) return pthreadError ;
2017-07-17 17:12:44 +00:00
}
2017-07-04 00:28:59 +00:00
ctx - > numJobs = numJobs ;
2017-07-06 00:24:21 +00:00
ctx - > jobReadyID = 0 ;
2017-07-07 20:18:55 +00:00
ctx - > jobCompressedID = 0 ;
2017-07-06 23:06:53 +00:00
ctx - > jobWriteID = 0 ;
2017-07-12 19:21:21 +00:00
ctx - > lastDictSize = 0 ;
2017-07-21 20:38:24 +00:00
ctx - > createWaitCompressionCompletion = 1 ;
ctx - > compressWaitCreateCompletion = 1 ;
ctx - > compressWaitWriteCompletion = 1 ;
ctx - > writeWaitCompressionCompletion = 1 ;
2017-07-23 17:18:54 +00:00
ctx - > createCompletion = 1 ;
ctx - > writeCompletion = 1 ;
ctx - > compressionCompletion = 1 ;
2017-07-25 17:01:10 +00:00
ctx - > convergenceCounter = 0 ;
2017-07-25 18:16:27 +00:00
ctx - > cooldown = 0 ;
2017-07-20 23:19:16 +00:00
2017-07-04 00:44:22 +00:00
ctx - > jobs = calloc ( 1 , numJobs * sizeof ( jobDescription ) ) ;
2017-07-19 00:32:36 +00:00
if ( ! ctx - > jobs ) {
DISPLAY ( " Error: could not allocate space for jobs during context creation \n " ) ;
return 1 ;
}
2017-07-10 23:27:58 +00:00
/* initializing jobs */
2017-07-10 23:03:09 +00:00
{
unsigned jobNum ;
for ( jobNum = 0 ; jobNum < numJobs ; jobNum + + ) {
jobDescription * job = & ctx - > jobs [ jobNum ] ;
2017-07-12 23:02:20 +00:00
job - > src . start = malloc ( 2 * FILE_CHUNK_SIZE ) ;
2017-07-10 23:10:19 +00:00
job - > dst . start = malloc ( ZSTD_compressBound ( FILE_CHUNK_SIZE ) ) ;
2017-07-22 00:49:39 +00:00
job - > lastJobPlusOne = 0 ;
2017-07-12 23:02:20 +00:00
if ( ! job - > src . start | | ! job - > dst . start ) {
2017-07-10 23:03:09 +00:00
DISPLAY ( " Could not allocate buffers for jobs \n " ) ;
2017-07-19 00:32:36 +00:00
return 1 ;
2017-07-10 23:03:09 +00:00
}
2017-07-12 19:21:21 +00:00
job - > src . capacity = FILE_CHUNK_SIZE ;
job - > dst . capacity = ZSTD_compressBound ( FILE_CHUNK_SIZE ) ;
2017-07-10 23:03:09 +00:00
}
}
2017-07-19 00:32:36 +00:00
2017-07-04 00:28:59 +00:00
ctx - > nextJobID = 0 ;
ctx - > threadError = 0 ;
2017-07-04 02:24:22 +00:00
ctx - > allJobsCompleted = 0 ;
2017-07-19 00:32:36 +00:00
2017-07-07 22:13:40 +00:00
ctx - > cctx = ZSTD_createCCtx ( ) ;
2017-07-19 00:32:36 +00:00
if ( ! ctx - > cctx ) {
DISPLAY ( " Error: could not allocate ZSTD_CCtx \n " ) ;
return 1 ;
}
2017-07-10 22:37:14 +00:00
ctx - > input . filled = 0 ;
2017-07-12 19:21:21 +00:00
ctx - > input . buffer . capacity = 2 * FILE_CHUNK_SIZE ;
2017-07-19 00:32:36 +00:00
2017-07-12 19:21:21 +00:00
ctx - > input . buffer . start = malloc ( ctx - > input . buffer . capacity ) ;
2017-07-10 22:37:14 +00:00
if ( ! ctx - > input . buffer . start ) {
DISPLAY ( " Error: could not allocate input buffer \n " ) ;
2017-07-19 00:32:36 +00:00
return 1 ;
2017-07-10 22:37:14 +00:00
}
2017-07-19 00:32:36 +00:00
return 0 ;
}
static adaptCCtx * createCCtx ( unsigned numJobs )
{
adaptCCtx * const ctx = calloc ( 1 , sizeof ( adaptCCtx ) ) ;
if ( ctx = = NULL ) {
DISPLAY ( " Error: could not allocate space for context \n " ) ;
2017-07-07 22:13:40 +00:00
return NULL ;
}
2017-07-19 00:32:36 +00:00
{
int const error = initCCtx ( ctx , numJobs ) ;
if ( error ) {
freeCCtx ( ctx ) ;
return NULL ;
}
return ctx ;
2017-07-04 00:28:59 +00:00
}
}
2017-07-17 22:34:58 +00:00
static void signalErrorToThreads ( adaptCCtx * ctx )
{
ctx - > threadError = 1 ;
pthread_mutex_lock ( & ctx - > jobReady_mutex . pMutex ) ;
pthread_cond_signal ( & ctx - > jobReady_cond . pCond ) ;
pthread_mutex_unlock ( & ctx - > jobReady_mutex . pMutex ) ;
pthread_mutex_lock ( & ctx - > jobCompressed_mutex . pMutex ) ;
2017-07-31 20:43:03 +00:00
pthread_cond_broadcast ( & ctx - > jobCompressed_cond . pCond ) ;
2017-07-17 22:34:58 +00:00
pthread_mutex_unlock ( & ctx - > jobReady_mutex . pMutex ) ;
2017-07-04 00:28:59 +00:00
2017-07-17 22:34:58 +00:00
pthread_mutex_lock ( & ctx - > jobWrite_mutex . pMutex ) ;
pthread_cond_signal ( & ctx - > jobWrite_cond . pCond ) ;
pthread_mutex_unlock ( & ctx - > jobWrite_mutex . pMutex ) ;
pthread_mutex_lock ( & ctx - > allJobsCompleted_mutex . pMutex ) ;
pthread_cond_signal ( & ctx - > allJobsCompleted_cond . pCond ) ;
pthread_mutex_unlock ( & ctx - > allJobsCompleted_mutex . pMutex ) ;
}
2017-07-05 17:20:56 +00:00
static void waitUntilAllJobsCompleted ( adaptCCtx * ctx )
2017-07-04 00:28:59 +00:00
{
2017-07-13 23:38:20 +00:00
if ( ! ctx ) return ;
2017-07-17 17:12:44 +00:00
pthread_mutex_lock ( & ctx - > allJobsCompleted_mutex . pMutex ) ;
2017-07-17 22:34:58 +00:00
while ( ctx - > allJobsCompleted = = 0 & & ! ctx - > threadError ) {
2017-07-17 17:12:44 +00:00
pthread_cond_wait ( & ctx - > allJobsCompleted_cond . pCond , & ctx - > allJobsCompleted_mutex . pMutex ) ;
2017-07-04 02:24:22 +00:00
}
2017-07-17 17:12:44 +00:00
pthread_mutex_unlock ( & ctx - > allJobsCompleted_mutex . pMutex ) ;
2017-07-04 00:28:59 +00:00
}
2017-07-25 21:53:40 +00:00
/* map completion percentages to values for changing compression level */
2017-07-25 17:32:14 +00:00
static unsigned convertCompletionToChange ( double completion )
{
2017-07-26 22:52:15 +00:00
if ( completion < CHANGE_BY_TWO_THRESHOLD ) {
2017-07-25 17:32:14 +00:00
return 2 ;
}
2017-07-26 22:52:15 +00:00
else if ( completion < CHANGE_BY_ONE_THRESHOLD ) {
2017-07-25 17:32:14 +00:00
return 1 ;
}
else {
return 0 ;
}
}
2017-07-13 00:10:58 +00:00
/*
* Compression level is changed depending on which part of the compression process is lagging
* Currently , three theads exist for job creation , compression , and file writing respectively .
* adaptCompressionLevel ( ) increments or decrements compression level based on which of the threads is lagging
* job creation or file writing lag = > increased compression level
* compression thread lag = > decreased compression level
* detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
*/
2017-07-18 00:59:50 +00:00
static void adaptCompressionLevel ( adaptCCtx * ctx )
2017-07-06 23:06:53 +00:00
{
2017-07-21 20:38:24 +00:00
double createWaitCompressionCompletion ;
double compressWaitCreateCompletion ;
double compressWaitWriteCompletion ;
double writeWaitCompressionCompletion ;
2017-07-21 16:26:35 +00:00
double const threshold = 0.00001 ;
2017-08-02 17:27:33 +00:00
unsigned prevCompressionLevel ;
pthread_mutex_lock ( & ctx - > compressionLevel_mutex . pMutex ) ;
prevCompressionLevel = ctx - > compressionLevel ;
pthread_mutex_unlock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-21 20:38:24 +00:00
2017-07-26 22:52:15 +00:00
if ( g_forceCompressionLevel ) {
2017-08-02 17:27:33 +00:00
pthread_mutex_lock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-26 22:52:15 +00:00
ctx - > compressionLevel = g_compressionLevel ;
2017-08-02 17:27:33 +00:00
pthread_mutex_unlock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-26 22:52:15 +00:00
return ;
}
2017-08-02 17:27:33 +00:00
DEBUG ( 2 , " adapting compression level %u \n " , prevCompressionLevel ) ;
2017-07-23 21:09:16 +00:00
2017-07-25 21:53:40 +00:00
/* read and reset completion measurements */
2017-07-23 21:09:16 +00:00
pthread_mutex_lock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
2017-07-25 21:53:40 +00:00
DEBUG ( 2 , " createWaitCompressionCompletion %f \n " , ctx - > createWaitCompressionCompletion ) ;
DEBUG ( 2 , " writeWaitCompressionCompletion %f \n " , ctx - > writeWaitCompressionCompletion ) ;
2017-07-21 20:38:24 +00:00
createWaitCompressionCompletion = ctx - > createWaitCompressionCompletion ;
writeWaitCompressionCompletion = ctx - > writeWaitCompressionCompletion ;
2017-07-23 21:09:16 +00:00
pthread_mutex_unlock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
pthread_mutex_lock ( & ctx - > writeCompletion_mutex . pMutex ) ;
2017-07-25 21:53:40 +00:00
DEBUG ( 2 , " compressWaitWriteCompletion %f \n " , ctx - > compressWaitWriteCompletion ) ;
2017-07-23 21:09:16 +00:00
compressWaitWriteCompletion = ctx - > compressWaitWriteCompletion ;
pthread_mutex_unlock ( & ctx - > writeCompletion_mutex . pMutex ) ;
pthread_mutex_lock ( & ctx - > createCompletion_mutex . pMutex ) ;
2017-07-25 21:53:40 +00:00
DEBUG ( 2 , " compressWaitCreateCompletion %f \n " , ctx - > compressWaitCreateCompletion ) ;
2017-07-23 21:09:16 +00:00
compressWaitCreateCompletion = ctx - > compressWaitCreateCompletion ;
pthread_mutex_unlock ( & ctx - > createCompletion_mutex . pMutex ) ;
2017-07-25 17:01:10 +00:00
DEBUG ( 2 , " convergence counter: %u \n " , ctx - > convergenceCounter ) ;
2017-07-25 18:16:27 +00:00
2017-08-02 17:27:33 +00:00
assert ( g_minCLevel < = prevCompressionLevel & & g_maxCLevel > = prevCompressionLevel ) ;
2017-07-29 00:46:51 +00:00
2017-07-21 20:38:24 +00:00
/* adaptation logic */
2017-07-25 18:16:27 +00:00
if ( ctx - > cooldown ) ctx - > cooldown - - ;
if ( ( 1 - createWaitCompressionCompletion > threshold | | 1 - writeWaitCompressionCompletion > threshold ) & & ctx - > cooldown = = 0 ) {
2017-07-24 18:01:36 +00:00
/* create or write waiting on compression */
2017-07-23 21:09:16 +00:00
/* use whichever one waited less because it was slower */
2017-07-21 23:05:01 +00:00
double const completion = MAX ( createWaitCompressionCompletion , writeWaitCompressionCompletion ) ;
2017-07-25 17:32:14 +00:00
unsigned const change = convertCompletionToChange ( completion ) ;
2017-08-02 17:27:33 +00:00
unsigned const boundChange = MIN ( change , prevCompressionLevel - g_minCLevel ) ;
2017-07-26 17:20:29 +00:00
if ( ctx - > convergenceCounter > = CONVERGENCE_LOWER_BOUND & & boundChange ! = 0 ) {
2017-07-25 17:01:10 +00:00
/* reset convergence counter, might have been a spike */
ctx - > convergenceCounter = 0 ;
2017-07-26 17:20:29 +00:00
DEBUG ( 2 , " convergence counter reset, no change applied \n " ) ;
2017-07-25 17:01:10 +00:00
}
else if ( boundChange ! = 0 ) {
2017-08-02 17:27:33 +00:00
pthread_mutex_lock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-25 17:01:10 +00:00
ctx - > compressionLevel - = boundChange ;
2017-08-02 17:27:33 +00:00
pthread_mutex_unlock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-25 18:16:27 +00:00
ctx - > cooldown = CLEVEL_DECREASE_COOLDOWN ;
2017-07-25 17:01:10 +00:00
ctx - > convergenceCounter = 1 ;
2017-07-26 17:20:29 +00:00
DEBUG ( 2 , " create or write threads waiting on compression, tried to decrease compression level by %u \n \n " , boundChange ) ;
}
2017-07-21 20:38:24 +00:00
}
2017-07-26 17:05:10 +00:00
else if ( 1 - compressWaitWriteCompletion > threshold | | 1 - compressWaitCreateCompletion > threshold ) {
2017-07-24 18:01:36 +00:00
/* compress waiting on write */
2017-07-26 17:05:10 +00:00
double const completion = MIN ( compressWaitWriteCompletion , compressWaitCreateCompletion ) ;
2017-07-25 17:32:14 +00:00
unsigned const change = convertCompletionToChange ( completion ) ;
2017-08-02 17:27:33 +00:00
unsigned const boundChange = MIN ( change , g_maxCLevel - prevCompressionLevel ) ;
2017-07-26 17:20:29 +00:00
if ( ctx - > convergenceCounter > = CONVERGENCE_LOWER_BOUND & & boundChange ! = 0 ) {
/* reset convergence counter, might have been a spike */
2017-07-25 17:01:10 +00:00
ctx - > convergenceCounter = 0 ;
2017-07-26 17:20:29 +00:00
DEBUG ( 2 , " convergence counter reset, no change applied \n " ) ;
2017-07-25 17:01:10 +00:00
}
else if ( boundChange ! = 0 ) {
2017-08-02 17:27:33 +00:00
pthread_mutex_lock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-25 17:01:10 +00:00
ctx - > compressionLevel + = boundChange ;
2017-08-02 17:27:33 +00:00
pthread_mutex_unlock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-26 17:05:10 +00:00
ctx - > cooldown = 0 ;
2017-07-25 17:01:10 +00:00
ctx - > convergenceCounter = 1 ;
2017-07-26 17:20:29 +00:00
DEBUG ( 2 , " compress waiting on write or create, tried to increase compression level by %u \n \n " , boundChange ) ;
2017-07-25 17:01:10 +00:00
}
2017-07-21 16:26:35 +00:00
}
2017-07-21 01:45:33 +00:00
2017-08-02 17:27:33 +00:00
pthread_mutex_lock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-25 17:01:10 +00:00
if ( ctx - > compressionLevel = = prevCompressionLevel ) {
ctx - > convergenceCounter + + ;
}
2017-08-02 17:27:33 +00:00
pthread_mutex_unlock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-06 23:06:53 +00:00
}
2017-07-13 17:15:27 +00:00
static size_t getUseableDictSize ( unsigned compressionLevel )
{
2017-07-25 23:03:43 +00:00
ZSTD_parameters const params = ZSTD_getParams ( compressionLevel , 0 , 0 ) ;
2017-07-25 21:53:40 +00:00
unsigned const overlapLog = compressionLevel > = ( unsigned ) ZSTD_maxCLevel ( ) ? 0 : 3 ;
size_t const overlapSize = 1 < < ( params . cParams . windowLog - overlapLog ) ;
2017-07-13 17:15:27 +00:00
return overlapSize ;
}
2017-07-04 00:28:59 +00:00
static void * compressionThread ( void * arg )
{
2017-07-25 23:03:43 +00:00
adaptCCtx * const ctx = ( adaptCCtx * ) arg ;
2017-07-04 00:28:59 +00:00
unsigned currJob = 0 ;
for ( ; ; ) {
2017-07-05 18:52:55 +00:00
unsigned const currJobIndex = currJob % ctx - > numJobs ;
2017-07-25 21:53:40 +00:00
jobDescription * const job = & ctx - > jobs [ currJobIndex ] ;
2017-07-24 21:40:23 +00:00
DEBUG ( 2 , " starting compression for job %u \n " , currJob ) ;
2017-07-23 17:18:54 +00:00
{
/* check if compression thread will have to wait */
unsigned willWaitForCreate = 0 ;
unsigned willWaitForWrite = 0 ;
pthread_mutex_lock ( & ctx - > jobReady_mutex . pMutex ) ;
if ( currJob + 1 > ctx - > jobReadyID ) willWaitForCreate = 1 ;
pthread_mutex_unlock ( & ctx - > jobReady_mutex . pMutex ) ;
pthread_mutex_lock ( & ctx - > jobWrite_mutex . pMutex ) ;
if ( currJob - ctx - > jobWriteID > = ctx - > numJobs ) willWaitForWrite = 1 ;
pthread_mutex_unlock ( & ctx - > jobWrite_mutex . pMutex ) ;
2017-07-23 21:09:16 +00:00
2017-07-26 23:40:05 +00:00
pthread_mutex_lock ( & ctx - > createCompletion_mutex . pMutex ) ;
2017-07-24 22:14:58 +00:00
if ( willWaitForCreate ) {
DEBUG ( 2 , " compression will wait for create on job %u \n " , currJob ) ;
2017-07-23 21:09:16 +00:00
ctx - > compressWaitCreateCompletion = ctx - > createCompletion ;
2017-07-23 17:18:54 +00:00
DEBUG ( 2 , " create completion %f \n " , ctx - > compressWaitCreateCompletion ) ;
2017-07-23 21:09:16 +00:00
2017-07-24 22:14:58 +00:00
}
2017-07-26 23:40:05 +00:00
else {
ctx - > compressWaitCreateCompletion = 1 ;
}
pthread_mutex_unlock ( & ctx - > createCompletion_mutex . pMutex ) ;
2017-07-24 22:14:58 +00:00
2017-07-26 23:40:05 +00:00
pthread_mutex_lock ( & ctx - > writeCompletion_mutex . pMutex ) ;
2017-07-24 22:14:58 +00:00
if ( willWaitForWrite ) {
DEBUG ( 2 , " compression will wait for write on job %u \n " , currJob ) ;
2017-07-23 21:09:16 +00:00
ctx - > compressWaitWriteCompletion = ctx - > writeCompletion ;
2017-07-23 17:18:54 +00:00
DEBUG ( 2 , " write completion %f \n " , ctx - > compressWaitWriteCompletion ) ;
}
2017-07-26 23:40:05 +00:00
else {
ctx - > compressWaitWriteCompletion = 1 ;
}
pthread_mutex_unlock ( & ctx - > writeCompletion_mutex . pMutex ) ;
2017-07-23 21:09:16 +00:00
2017-07-23 17:18:54 +00:00
}
2017-07-22 00:49:39 +00:00
/* wait until job is ready */
2017-07-17 17:12:44 +00:00
pthread_mutex_lock ( & ctx - > jobReady_mutex . pMutex ) ;
2017-07-22 00:49:39 +00:00
while ( currJob + 1 > ctx - > jobReadyID & & ! ctx - > threadError ) {
2017-07-17 17:12:44 +00:00
pthread_cond_wait ( & ctx - > jobReady_cond . pCond , & ctx - > jobReady_mutex . pMutex ) ;
2017-07-04 00:28:59 +00:00
}
2017-07-17 17:12:44 +00:00
pthread_mutex_unlock ( & ctx - > jobReady_mutex . pMutex ) ;
2017-07-20 23:19:16 +00:00
2017-07-22 00:49:39 +00:00
/* wait until job previously in this space is written */
pthread_mutex_lock ( & ctx - > jobWrite_mutex . pMutex ) ;
while ( currJob - ctx - > jobWriteID > = ctx - > numJobs & & ! ctx - > threadError ) {
pthread_cond_wait ( & ctx - > jobWrite_cond . pCond , & ctx - > jobWrite_mutex . pMutex ) ;
}
pthread_mutex_unlock ( & ctx - > jobWrite_mutex . pMutex ) ;
2017-07-21 01:45:33 +00:00
/* reset compression completion */
2017-07-23 21:09:16 +00:00
pthread_mutex_lock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
2017-07-21 01:45:33 +00:00
ctx - > compressionCompletion = 0 ;
2017-07-23 21:09:16 +00:00
pthread_mutex_unlock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
2017-07-20 23:19:16 +00:00
2017-07-19 17:23:46 +00:00
/* adapt compression level */
2017-07-21 01:45:33 +00:00
if ( currJob ) adaptCompressionLevel ( ctx ) ;
2017-07-19 18:23:40 +00:00
2017-08-02 17:27:33 +00:00
pthread_mutex_lock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-21 23:05:01 +00:00
DEBUG ( 2 , " job %u compressed with level %u \n " , currJob , ctx - > compressionLevel ) ;
2017-08-02 17:27:33 +00:00
pthread_mutex_unlock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-04 00:28:59 +00:00
/* compress the data */
{
2017-07-26 22:52:15 +00:00
size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX ; /* 128 KB */
2017-08-02 17:27:33 +00:00
unsigned cLevel ;
2017-07-19 18:23:40 +00:00
unsigned blockNum = 0 ;
size_t remaining = job - > src . size ;
size_t srcPos = 0 ;
size_t dstPos = 0 ;
2017-08-02 17:27:33 +00:00
pthread_mutex_lock ( & ctx - > compressionLevel_mutex . pMutex ) ;
cLevel = ctx - > compressionLevel ;
pthread_mutex_unlock ( & ctx - > compressionLevel_mutex . pMutex ) ;
2017-07-19 18:23:40 +00:00
/* reset compressed size */
job - > compressedSize = 0 ;
2017-07-24 22:06:11 +00:00
DEBUG ( 2 , " calling ZSTD_compressBegin() \n " ) ;
2017-07-19 23:00:54 +00:00
/* begin compression */
{
size_t const useDictSize = MIN ( getUseableDictSize ( cLevel ) , job - > dictSize ) ;
size_t const dictModeError = ZSTD_setCCtxParameter ( ctx - > cctx , ZSTD_p_forceRawDict , 1 ) ;
2017-07-26 21:29:59 +00:00
ZSTD_parameters params = ZSTD_getParams ( cLevel , 0 , useDictSize ) ;
params . cParams . windowLog = 23 ;
{
size_t const initError = ZSTD_compressBegin_advanced ( ctx - > cctx , job - > src . start + job - > dictSize - useDictSize , useDictSize , params , 0 ) ;
size_t const windowSizeError = ZSTD_setCCtxParameter ( ctx - > cctx , ZSTD_p_forceWindow , 1 ) ;
if ( ZSTD_isError ( dictModeError ) | | ZSTD_isError ( initError ) | | ZSTD_isError ( windowSizeError ) ) {
DISPLAY ( " Error: something went wrong while starting compression \n " ) ;
signalErrorToThreads ( ctx ) ;
return arg ;
}
2017-07-19 23:00:54 +00:00
}
}
2017-07-24 22:06:11 +00:00
DEBUG ( 2 , " finished with ZSTD_compressBegin() \n " ) ;
2017-07-19 23:00:54 +00:00
2017-07-19 21:54:15 +00:00
do {
2017-07-19 18:23:40 +00:00
size_t const actualBlockSize = MIN ( remaining , compressionBlockSize ) ;
2017-07-11 01:16:42 +00:00
2017-07-19 18:23:40 +00:00
/* continue compression */
if ( currJob ! = 0 | | blockNum ! = 0 ) { /* not first block of first job flush/overwrite the frame header */
size_t const hSize = ZSTD_compressContinue ( ctx - > cctx , job - > dst . start + dstPos , job - > dst . capacity - dstPos , job - > src . start + job - > dictSize + srcPos , 0 ) ;
if ( ZSTD_isError ( hSize ) ) {
DISPLAY ( " Error: something went wrong while continuing compression \n " ) ;
job - > compressedSize = hSize ;
signalErrorToThreads ( ctx ) ;
return arg ;
}
ZSTD_invalidateRepCodes ( ctx - > cctx ) ;
}
{
2017-07-22 00:49:39 +00:00
size_t const ret = ( job - > lastJobPlusOne = = currJob + 1 & & remaining = = actualBlockSize ) ?
2017-07-19 18:23:40 +00:00
ZSTD_compressEnd ( ctx - > cctx , job - > dst . start + dstPos , job - > dst . capacity - dstPos , job - > src . start + job - > dictSize + srcPos , actualBlockSize ) :
ZSTD_compressContinue ( ctx - > cctx , job - > dst . start + dstPos , job - > dst . capacity - dstPos , job - > src . start + job - > dictSize + srcPos , actualBlockSize ) ;
if ( ZSTD_isError ( ret ) ) {
DISPLAY ( " Error: something went wrong during compression: %s \n " , ZSTD_getErrorName ( ret ) ) ;
signalErrorToThreads ( ctx ) ;
return arg ;
}
job - > compressedSize + = ret ;
remaining - = actualBlockSize ;
srcPos + = actualBlockSize ;
dstPos + = ret ;
blockNum + + ;
2017-07-19 18:47:17 +00:00
/* update completion */
2017-07-23 21:09:16 +00:00
pthread_mutex_lock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
2017-07-20 17:53:51 +00:00
ctx - > compressionCompletion = 1 - ( double ) remaining / job - > src . size ;
2017-07-23 21:09:16 +00:00
pthread_mutex_unlock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
2017-07-11 01:16:42 +00:00
}
2017-07-19 21:54:15 +00:00
} while ( remaining ! = 0 ) ;
2017-07-12 19:21:21 +00:00
job - > dst . size = job - > compressedSize ;
2017-07-04 00:28:59 +00:00
}
2017-07-17 17:12:44 +00:00
pthread_mutex_lock ( & ctx - > jobCompressed_mutex . pMutex ) ;
2017-07-07 20:18:55 +00:00
ctx - > jobCompressedID + + ;
2017-07-22 00:49:39 +00:00
pthread_cond_broadcast ( & ctx - > jobCompressed_cond . pCond ) ;
2017-07-17 17:12:44 +00:00
pthread_mutex_unlock ( & ctx - > jobCompressed_mutex . pMutex ) ;
2017-07-22 00:49:39 +00:00
if ( job - > lastJobPlusOne = = currJob + 1 | | ctx - > threadError ) {
2017-07-04 00:28:59 +00:00
/* finished compressing all jobs */
break ;
}
2017-07-24 21:40:23 +00:00
DEBUG ( 2 , " finished compressing job %u \n " , currJob ) ;
2017-07-22 00:49:39 +00:00
currJob + + ;
2017-07-04 00:28:59 +00:00
}
return arg ;
}
2017-07-25 21:26:55 +00:00
static void displayProgress ( unsigned cLevel , unsigned last )
2017-07-07 00:48:18 +00:00
{
2017-07-28 22:55:02 +00:00
UTIL_time_t currTime ;
UTIL_getTime ( & currTime ) ;
2017-07-07 17:25:38 +00:00
if ( ! g_useProgressBar ) return ;
2017-07-28 22:30:46 +00:00
{
double const timeElapsed = ( double ) ( UTIL_getSpanTimeMicro ( g_ticksPerSecond , g_startTime , currTime ) / 1000.0 ) ;
double const sizeMB = ( double ) g_streamedSize / ( 1 < < 20 ) ;
double const avgCompRate = sizeMB * 1000 / timeElapsed ;
fprintf ( stderr , " \r | Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s | " , cLevel , timeElapsed / 1000.0 , sizeMB , avgCompRate ) ;
if ( last ) {
fprintf ( stderr , " \n " ) ;
}
else {
fflush ( stderr ) ;
}
2017-07-07 00:48:18 +00:00
}
}
2017-07-04 00:28:59 +00:00
static void * outputThread ( void * arg )
{
2017-07-17 21:01:13 +00:00
outputThreadArg * const otArg = ( outputThreadArg * ) arg ;
adaptCCtx * const ctx = otArg - > ctx ;
FILE * const dstFile = otArg - > dstFile ;
2017-07-04 02:00:55 +00:00
2017-07-04 00:28:59 +00:00
unsigned currJob = 0 ;
for ( ; ; ) {
2017-07-05 18:52:55 +00:00
unsigned const currJobIndex = currJob % ctx - > numJobs ;
2017-07-25 21:53:40 +00:00
jobDescription * const job = & ctx - > jobs [ currJobIndex ] ;
2017-07-26 23:40:05 +00:00
unsigned willWaitForCompress = 0 ;
2017-07-24 21:40:23 +00:00
DEBUG ( 2 , " starting write for job %u \n " , currJob ) ;
2017-07-26 23:40:05 +00:00
2017-07-17 17:12:44 +00:00
pthread_mutex_lock ( & ctx - > jobCompressed_mutex . pMutex ) ;
2017-07-26 23:40:05 +00:00
if ( currJob + 1 > ctx - > jobCompressedID ) willWaitForCompress = 1 ;
pthread_mutex_unlock ( & ctx - > jobCompressed_mutex . pMutex ) ;
pthread_mutex_lock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
if ( willWaitForCompress ) {
2017-07-22 01:02:55 +00:00
/* write thread is waiting on compression thread */
2017-07-21 20:38:24 +00:00
ctx - > writeWaitCompressionCompletion = ctx - > compressionCompletion ;
2017-07-22 01:02:55 +00:00
DEBUG ( 2 , " writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f \n " , currJob , ctx - > writeWaitCompressionCompletion ) ;
2017-07-26 23:40:05 +00:00
}
else {
ctx - > writeWaitCompressionCompletion = 1 ;
}
pthread_mutex_unlock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
pthread_mutex_lock ( & ctx - > jobCompressed_mutex . pMutex ) ;
while ( currJob + 1 > ctx - > jobCompressedID & & ! ctx - > threadError ) {
2017-07-17 17:12:44 +00:00
pthread_cond_wait ( & ctx - > jobCompressed_cond . pCond , & ctx - > jobCompressed_mutex . pMutex ) ;
2017-07-04 00:28:59 +00:00
}
2017-07-17 17:12:44 +00:00
pthread_mutex_unlock ( & ctx - > jobCompressed_mutex . pMutex ) ;
2017-07-20 23:19:16 +00:00
2017-07-21 01:45:33 +00:00
/* reset write completion */
2017-07-23 21:09:16 +00:00
pthread_mutex_lock ( & ctx - > writeCompletion_mutex . pMutex ) ;
2017-07-21 01:45:33 +00:00
ctx - > writeCompletion = 0 ;
2017-07-23 21:09:16 +00:00
pthread_mutex_unlock ( & ctx - > writeCompletion_mutex . pMutex ) ;
2017-07-20 23:19:16 +00:00
2017-07-04 00:28:59 +00:00
{
size_t const compressedSize = job - > compressedSize ;
2017-07-18 20:30:29 +00:00
size_t remaining = compressedSize ;
2017-07-04 00:28:59 +00:00
if ( ZSTD_isError ( compressedSize ) ) {
DISPLAY ( " Error: an error occurred during compression \n " ) ;
2017-07-17 22:34:58 +00:00
signalErrorToThreads ( ctx ) ;
2017-07-05 16:57:50 +00:00
return arg ;
2017-07-04 00:28:59 +00:00
}
{
2017-07-21 20:38:24 +00:00
size_t const blockSize = MAX ( compressedSize > > 7 , 1 < < 10 ) ;
2017-07-18 20:30:29 +00:00
size_t pos = 0 ;
for ( ; ; ) {
size_t const writeSize = MIN ( remaining , blockSize ) ;
size_t const ret = fwrite ( job - > dst . start + pos , 1 , writeSize , dstFile ) ;
if ( ret ! = writeSize ) break ;
pos + = ret ;
remaining - = ret ;
2017-07-18 22:23:11 +00:00
/* update completion variable for writing */
2017-07-23 21:09:16 +00:00
pthread_mutex_lock ( & ctx - > writeCompletion_mutex . pMutex ) ;
2017-07-20 17:53:51 +00:00
ctx - > writeCompletion = 1 - ( double ) remaining / compressedSize ;
2017-07-23 21:09:16 +00:00
pthread_mutex_unlock ( & ctx - > writeCompletion_mutex . pMutex ) ;
2017-07-18 22:23:11 +00:00
2017-07-18 20:30:29 +00:00
if ( remaining = = 0 ) break ;
}
if ( pos ! = compressedSize ) {
2017-07-04 00:28:59 +00:00
DISPLAY ( " Error: an error occurred during file write operation \n " ) ;
2017-07-17 22:34:58 +00:00
signalErrorToThreads ( ctx ) ;
2017-07-05 16:49:27 +00:00
return arg ;
2017-07-04 00:28:59 +00:00
}
}
}
2017-08-02 17:27:33 +00:00
{
unsigned cLevel ;
pthread_mutex_lock ( & ctx - > compressionLevel_mutex . pMutex ) ;
cLevel = ctx - > compressionLevel ;
pthread_mutex_unlock ( & ctx - > compressionLevel_mutex . pMutex ) ;
displayProgress ( cLevel , job - > lastJobPlusOne = = currJob + 1 ) ;
}
2017-07-17 17:12:44 +00:00
pthread_mutex_lock ( & ctx - > jobWrite_mutex . pMutex ) ;
2017-07-06 23:06:53 +00:00
ctx - > jobWriteID + + ;
2017-07-17 17:12:44 +00:00
pthread_cond_signal ( & ctx - > jobWrite_cond . pCond ) ;
pthread_mutex_unlock ( & ctx - > jobWrite_mutex . pMutex ) ;
2017-07-05 23:54:34 +00:00
2017-07-22 00:49:39 +00:00
if ( job - > lastJobPlusOne = = currJob + 1 | | ctx - > threadError ) {
2017-07-04 00:28:59 +00:00
/* finished with all jobs */
2017-07-17 17:12:44 +00:00
pthread_mutex_lock ( & ctx - > allJobsCompleted_mutex . pMutex ) ;
2017-07-04 02:24:22 +00:00
ctx - > allJobsCompleted = 1 ;
2017-07-17 17:12:44 +00:00
pthread_cond_signal ( & ctx - > allJobsCompleted_cond . pCond ) ;
pthread_mutex_unlock ( & ctx - > allJobsCompleted_mutex . pMutex ) ;
2017-07-04 00:28:59 +00:00
break ;
}
2017-07-24 21:40:23 +00:00
DEBUG ( 2 , " finished writing job %u \n " , currJob ) ;
2017-07-22 00:49:39 +00:00
currJob + + ;
2017-07-20 23:19:16 +00:00
2017-07-04 00:28:59 +00:00
}
return arg ;
}
2017-07-10 23:27:58 +00:00
static int createCompressionJob ( adaptCCtx * ctx , size_t srcSize , int last )
2017-07-04 00:28:59 +00:00
{
unsigned const nextJob = ctx - > nextJobID ;
2017-07-05 18:52:55 +00:00
unsigned const nextJobIndex = nextJob % ctx - > numJobs ;
2017-07-25 21:53:40 +00:00
jobDescription * const job = & ctx - > jobs [ nextJobIndex ] ;
2017-07-22 00:49:39 +00:00
2017-07-04 03:05:42 +00:00
job - > src . size = srcSize ;
job - > jobID = nextJob ;
2017-07-22 00:49:39 +00:00
if ( last ) job - > lastJobPlusOne = nextJob + 1 ;
2017-07-17 21:39:10 +00:00
{
/* swap buffer */
void * const copy = job - > src . start ;
job - > src . start = ctx - > input . buffer . start ;
ctx - > input . buffer . start = copy ;
}
2017-07-12 23:02:20 +00:00
job - > dictSize = ctx - > lastDictSize ;
2017-07-18 22:23:11 +00:00
2017-07-04 00:28:59 +00:00
ctx - > nextJobID + + ;
2017-07-11 01:16:42 +00:00
/* if not on the last job, reuse data as dictionary in next job */
if ( ! last ) {
2017-07-12 19:21:21 +00:00
size_t const oldDictSize = ctx - > lastDictSize ;
2017-07-17 21:39:10 +00:00
memcpy ( ctx - > input . buffer . start , job - > src . start + oldDictSize , srcSize ) ;
2017-07-13 17:15:27 +00:00
ctx - > lastDictSize = srcSize ;
ctx - > input . filled = srcSize ;
2017-07-11 01:16:42 +00:00
}
2017-07-18 22:23:11 +00:00
/* signal job ready */
pthread_mutex_lock ( & ctx - > jobReady_mutex . pMutex ) ;
ctx - > jobReadyID + + ;
pthread_cond_signal ( & ctx - > jobReady_cond . pCond ) ;
pthread_mutex_unlock ( & ctx - > jobReady_mutex . pMutex ) ;
2017-07-04 00:28:59 +00:00
return 0 ;
}
2017-07-17 21:01:13 +00:00
static int performCompression ( adaptCCtx * ctx , FILE * const srcFile , outputThreadArg * otArg )
2017-07-03 21:18:46 +00:00
{
2017-07-25 21:53:40 +00:00
/* early error check to exit */
2017-07-17 21:01:13 +00:00
if ( ! ctx | | ! srcFile | | ! otArg ) {
2017-07-13 23:38:20 +00:00
return 1 ;
2017-07-04 00:28:59 +00:00
}
/* create output thread */
{
pthread_t out ;
2017-07-17 21:01:13 +00:00
if ( pthread_create ( & out , NULL , & outputThread , otArg ) ) {
2017-07-04 00:28:59 +00:00
DISPLAY ( " Error: could not create output thread \n " ) ;
2017-07-17 22:34:58 +00:00
signalErrorToThreads ( ctx ) ;
2017-07-13 23:38:20 +00:00
return 1 ;
2017-07-04 00:28:59 +00:00
}
2017-08-02 00:36:13 +00:00
else if ( pthread_detach ( out ) ) {
DISPLAY ( " Error: could not detach output thread \n " ) ;
signalErrorToThreads ( ctx ) ;
return 1 ;
}
2017-07-04 00:28:59 +00:00
}
2017-07-04 02:24:22 +00:00
2017-07-04 00:28:59 +00:00
/* create compression thread */
{
pthread_t compression ;
if ( pthread_create ( & compression , NULL , & compressionThread , ctx ) ) {
DISPLAY ( " Error: could not create compression thread \n " ) ;
2017-07-17 22:34:58 +00:00
signalErrorToThreads ( ctx ) ;
2017-07-13 23:38:20 +00:00
return 1 ;
2017-07-04 00:28:59 +00:00
}
2017-08-02 00:36:13 +00:00
else if ( pthread_detach ( compression ) ) {
DISPLAY ( " Error: could not detach compression thread \n " ) ;
signalErrorToThreads ( ctx ) ;
return 1 ;
}
2017-07-04 00:28:59 +00:00
}
2017-07-20 23:19:16 +00:00
{
unsigned currJob = 0 ;
/* creating jobs */
for ( ; ; ) {
size_t pos = 0 ;
size_t const readBlockSize = 1 < < 15 ;
size_t remaining = FILE_CHUNK_SIZE ;
2017-07-26 17:05:10 +00:00
unsigned const nextJob = ctx - > nextJobID ;
2017-07-26 23:40:05 +00:00
unsigned willWaitForCompress = 0 ;
2017-07-24 21:40:23 +00:00
DEBUG ( 2 , " starting creation of job %u \n " , currJob ) ;
2017-07-26 17:05:10 +00:00
pthread_mutex_lock ( & ctx - > jobCompressed_mutex . pMutex ) ;
2017-07-26 23:40:05 +00:00
if ( nextJob - ctx - > jobCompressedID > = ctx - > numJobs ) willWaitForCompress = 1 ;
pthread_mutex_unlock ( & ctx - > jobCompressed_mutex . pMutex ) ;
pthread_mutex_lock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
if ( willWaitForCompress ) {
2017-07-26 17:05:10 +00:00
/* creation thread is waiting, take measurement of completion */
ctx - > createWaitCompressionCompletion = ctx - > compressionCompletion ;
DEBUG ( 2 , " create thread waiting for nextJob: %u, createWaitCompressionCompletion %f \n " , nextJob , ctx - > createWaitCompressionCompletion ) ;
2017-07-26 23:40:05 +00:00
}
else {
ctx - > createWaitCompressionCompletion = 1 ;
}
pthread_mutex_unlock ( & ctx - > compressionCompletion_mutex . pMutex ) ;
/* wait until the job has been compressed */
pthread_mutex_lock ( & ctx - > jobCompressed_mutex . pMutex ) ;
while ( nextJob - ctx - > jobCompressedID > = ctx - > numJobs & & ! ctx - > threadError ) {
2017-07-26 17:05:10 +00:00
pthread_cond_wait ( & ctx - > jobCompressed_cond . pCond , & ctx - > jobCompressed_mutex . pMutex ) ;
}
pthread_mutex_unlock ( & ctx - > jobCompressed_mutex . pMutex ) ;
2017-07-26 17:34:48 +00:00
/* reset create completion */
pthread_mutex_lock ( & ctx - > createCompletion_mutex . pMutex ) ;
ctx - > createCompletion = 0 ;
pthread_mutex_unlock ( & ctx - > createCompletion_mutex . pMutex ) ;
2017-07-20 23:19:16 +00:00
while ( remaining ! = 0 & & ! feof ( srcFile ) ) {
size_t const ret = fread ( ctx - > input . buffer . start + ctx - > input . filled + pos , 1 , readBlockSize , srcFile ) ;
if ( ret ! = readBlockSize & & ! feof ( srcFile ) ) {
/* error could not read correct number of bytes */
DISPLAY ( " Error: problem occurred during read from src file \n " ) ;
signalErrorToThreads ( ctx ) ;
return 1 ;
}
pos + = ret ;
remaining - = ret ;
2017-07-23 21:09:16 +00:00
pthread_mutex_lock ( & ctx - > createCompletion_mutex . pMutex ) ;
2017-07-20 23:19:16 +00:00
ctx - > createCompletion = 1 - ( double ) remaining / ( ( size_t ) FILE_CHUNK_SIZE ) ;
2017-07-23 21:09:16 +00:00
pthread_mutex_unlock ( & ctx - > createCompletion_mutex . pMutex ) ;
2017-07-20 23:19:16 +00:00
}
if ( remaining ! = 0 & & ! feof ( srcFile ) ) {
2017-07-19 16:59:17 +00:00
DISPLAY ( " Error: problem occurred during read from src file \n " ) ;
signalErrorToThreads ( ctx ) ;
return 1 ;
}
2017-07-20 23:19:16 +00:00
g_streamedSize + = pos ;
/* reading was fine, now create the compression job */
{
int const last = feof ( srcFile ) ;
int const error = createCompressionJob ( ctx , pos , last ) ;
if ( error ! = 0 ) {
signalErrorToThreads ( ctx ) ;
return error ;
}
}
2017-07-24 21:40:23 +00:00
DEBUG ( 2 , " finished creating job %u \n " , currJob ) ;
2017-07-20 23:19:16 +00:00
currJob + + ;
if ( feof ( srcFile ) ) {
break ;
2017-07-03 21:18:46 +00:00
}
2017-07-04 03:05:42 +00:00
}
2017-07-03 21:18:46 +00:00
}
2017-07-13 23:38:20 +00:00
/* success -- created all jobs */
return 0 ;
}
static fcResources createFileCompressionResources ( const char * const srcFilename , const char * const dstFilenameOrNull )
{
fcResources fcr ;
unsigned const stdinUsed = ! strcmp ( srcFilename , stdinmark ) ;
FILE * const srcFile = stdinUsed ? stdin : fopen ( srcFilename , " rb " ) ;
const char * const outFilenameIntermediate = ( stdinUsed & & ! dstFilenameOrNull ) ? stdoutmark : dstFilenameOrNull ;
const char * outFilename = outFilenameIntermediate ;
char fileAndSuffix [ MAX_PATH ] ;
size_t const numJobs = MAX_NUM_JOBS ;
memset ( & fcr , 0 , sizeof ( fcr ) ) ;
if ( ! outFilenameIntermediate ) {
if ( snprintf ( fileAndSuffix , MAX_PATH , " %s.zst " , srcFilename ) + 1 > MAX_PATH ) {
DISPLAY ( " Error: output filename is too long \n " ) ;
return fcr ;
}
outFilename = fileAndSuffix ;
}
2017-07-17 21:01:13 +00:00
{
unsigned const stdoutUsed = ! strcmp ( outFilename , stdoutmark ) ;
FILE * const dstFile = stdoutUsed ? stdout : fopen ( outFilename , " wb " ) ;
fcr . otArg = malloc ( sizeof ( outputThreadArg ) ) ;
if ( ! fcr . otArg ) {
DISPLAY ( " Error: could not allocate space for output thread argument \n " ) ;
return fcr ;
}
fcr . otArg - > dstFile = dstFile ;
}
2017-07-13 23:38:20 +00:00
/* checking for errors */
2017-07-17 21:01:13 +00:00
if ( ! fcr . otArg - > dstFile | | ! srcFile ) {
DISPLAY ( " Error: some file(s) could not be opened \n " ) ;
2017-07-13 23:38:20 +00:00
return fcr ;
}
/* creating context */
2017-07-17 21:01:13 +00:00
fcr . ctx = createCCtx ( numJobs ) ;
fcr . otArg - > ctx = fcr . ctx ;
2017-07-13 23:38:20 +00:00
fcr . srcFile = srcFile ;
return fcr ;
}
static int freeFileCompressionResources ( fcResources * fcr )
{
int ret = 0 ;
waitUntilAllJobsCompleted ( fcr - > ctx ) ;
ret | = ( fcr - > srcFile ! = NULL ) ? fclose ( fcr - > srcFile ) : 0 ;
ret | = ( fcr - > ctx ! = NULL ) ? freeCCtx ( fcr - > ctx ) : 0 ;
2017-07-17 21:01:13 +00:00
if ( fcr - > otArg ) {
ret | = ( fcr - > otArg - > dstFile ! = stdout ) ? fclose ( fcr - > otArg - > dstFile ) : 0 ;
free ( fcr - > otArg ) ;
/* no need to freeCCtx() on otArg->ctx because it should be the same context */
}
2017-07-13 23:38:20 +00:00
return ret ;
}
static int compressFilename ( const char * const srcFilename , const char * const dstFilenameOrNull )
{
int ret = 0 ;
2017-07-28 22:30:46 +00:00
fcResources fcr = createFileCompressionResources ( srcFilename , dstFilenameOrNull ) ;
2017-07-13 23:38:20 +00:00
UTIL_getTime ( & g_startTime ) ;
g_streamedSize = 0 ;
2017-07-17 21:01:13 +00:00
ret | = performCompression ( fcr . ctx , fcr . srcFile , fcr . otArg ) ;
2017-07-13 23:38:20 +00:00
ret | = freeFileCompressionResources ( & fcr ) ;
2017-07-03 21:18:46 +00:00
return ret ;
}
2017-07-05 17:48:04 +00:00
2017-07-07 18:32:14 +00:00
static int compressFilenames ( const char * * filenameTable , unsigned numFiles , unsigned forceStdout )
2017-07-05 21:19:56 +00:00
{
int ret = 0 ;
unsigned fileNum ;
for ( fileNum = 0 ; fileNum < numFiles ; fileNum + + ) {
const char * filename = filenameTable [ fileNum ] ;
2017-07-07 18:32:14 +00:00
if ( ! forceStdout ) {
2017-07-08 00:07:05 +00:00
ret | = compressFilename ( filename , NULL ) ;
2017-07-07 18:32:14 +00:00
}
else {
ret | = compressFilename ( filename , stdoutmark ) ;
}
2017-07-05 21:19:56 +00:00
}
return ret ;
}
2017-07-06 18:05:51 +00:00
/*! readU32FromChar() :
@ return : unsigned integer value read from input in ` char ` format
allows and interprets K , KB , KiB , M , MB and MiB suffix .
Will also modify ` * stringPtr ` , advancing it to position where it stopped reading .
Note : function result can overflow if digit string > MAX_UINT */
static unsigned readU32FromChar ( const char * * stringPtr )
{
unsigned result = 0 ;
while ( ( * * stringPtr > = ' 0 ' ) & & ( * * stringPtr < = ' 9 ' ) )
result * = 10 , result + = * * stringPtr - ' 0 ' , ( * stringPtr ) + + ;
if ( ( * * stringPtr = = ' K ' ) | | ( * * stringPtr = = ' M ' ) ) {
result < < = 10 ;
if ( * * stringPtr = = ' M ' ) result < < = 10 ;
( * stringPtr ) + + ;
if ( * * stringPtr = = ' i ' ) ( * stringPtr ) + + ;
if ( * * stringPtr = = ' B ' ) ( * stringPtr ) + + ;
}
return result ;
}
2017-08-10 23:11:59 +00:00
static void help ( const char * progPath )
2017-07-07 01:09:10 +00:00
{
PRINT ( " Usage: \n " ) ;
2017-08-10 23:11:59 +00:00
PRINT ( " %s [options] [file(s)] \n " , progPath ) ;
2017-07-07 01:09:10 +00:00
PRINT ( " \n " ) ;
PRINT ( " Options: \n " ) ;
PRINT ( " -oFILE : specify the output file name \n " ) ;
2017-07-31 16:47:09 +00:00
PRINT ( " -i# : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults) \n " , DEFAULT_COMPRESSION_LEVEL ) ;
2017-07-07 01:09:10 +00:00
PRINT ( " -h : display help/information \n " ) ;
2017-07-19 16:43:17 +00:00
PRINT ( " -f : force the compression level to stay constant \n " ) ;
2017-07-26 00:47:02 +00:00
PRINT ( " -c : force write to stdout \n " ) ;
2017-07-24 23:26:20 +00:00
PRINT ( " -p : hide progress bar \n " ) ;
2017-07-24 23:19:07 +00:00
PRINT ( " -q : quiet mode -- do not show progress bar or other information \n " ) ;
2017-07-31 16:47:09 +00:00
PRINT ( " -l# : provide lower bound for compression level -- default 1 \n " ) ;
2017-08-07 20:11:07 +00:00
PRINT ( " -u# : provide upper bound for compression level -- default %u \n " , ZSTD_maxCLevel ( ) ) ;
2017-07-07 01:09:10 +00:00
}
2017-07-05 17:48:04 +00:00
/* return 0 if successful, else return error */
int main ( int argCount , const char * argv [ ] )
{
2017-07-05 21:19:56 +00:00
const char * outFilename = NULL ;
const char * * filenameTable = ( const char * * ) malloc ( argCount * sizeof ( const char * ) ) ;
unsigned filenameIdx = 0 ;
2017-07-07 18:32:14 +00:00
unsigned forceStdout = 0 ;
2017-07-29 00:46:51 +00:00
unsigned providedInitialCLevel = 0 ;
2017-07-05 21:19:56 +00:00
int ret = 0 ;
2017-07-05 19:20:16 +00:00
int argNum ;
2017-07-28 22:30:46 +00:00
filenameTable [ 0 ] = stdinmark ;
2017-08-07 20:11:07 +00:00
g_maxCLevel = ZSTD_maxCLevel ( ) ;
2017-07-05 21:19:56 +00:00
2017-07-07 22:42:20 +00:00
UTIL_initTimer ( & g_ticksPerSecond ) ;
2017-07-05 21:19:56 +00:00
if ( filenameTable = = NULL ) {
DISPLAY ( " Error: could not allocate sapce for filename table. \n " ) ;
return 1 ;
}
2017-07-05 19:20:16 +00:00
for ( argNum = 1 ; argNum < argCount ; argNum + + ) {
const char * argument = argv [ argNum ] ;
2017-07-05 21:19:56 +00:00
/* output filename designated with "-o" */
2017-07-07 17:58:43 +00:00
if ( argument [ 0 ] = = ' - ' & & strlen ( argument ) > 1 ) {
switch ( argument [ 1 ] ) {
case ' o ' :
argument + = 2 ;
outFilename = argument ;
break ;
case ' i ' :
argument + = 2 ;
g_compressionLevel = readU32FromChar ( & argument ) ;
2017-07-29 00:46:51 +00:00
providedInitialCLevel = 1 ;
2017-07-07 17:58:43 +00:00
break ;
case ' h ' :
2017-08-10 23:11:59 +00:00
help ( argv [ 0 ] ) ;
2017-07-07 17:58:43 +00:00
goto _main_exit ;
case ' p ' :
2017-07-24 23:26:20 +00:00
g_useProgressBar = 0 ;
2017-07-07 17:58:43 +00:00
break ;
2017-07-07 18:32:14 +00:00
case ' c ' :
forceStdout = 1 ;
2017-07-11 17:23:25 +00:00
outFilename = stdoutmark ;
2017-07-07 18:32:14 +00:00
break ;
2017-07-13 21:46:54 +00:00
case ' f ' :
g_forceCompressionLevel = 1 ;
break ;
2017-07-24 23:19:07 +00:00
case ' q ' :
g_useProgressBar = 0 ;
g_displayLevel = 0 ;
break ;
2017-07-28 22:30:46 +00:00
case ' l ' :
argument + = 2 ;
g_minCLevel = readU32FromChar ( & argument ) ;
break ;
case ' u ' :
argument + = 2 ;
g_maxCLevel = readU32FromChar ( & argument ) ;
break ;
2017-07-07 17:58:43 +00:00
default :
DISPLAY ( " Error: invalid argument provided \n " ) ;
ret = 1 ;
goto _main_exit ;
2017-07-05 19:20:16 +00:00
}
2017-07-07 17:58:43 +00:00
continue ;
2017-07-05 19:20:16 +00:00
}
2017-07-05 21:19:56 +00:00
/* regular files to be compressed */
filenameTable [ filenameIdx + + ] = argument ;
2017-07-05 17:48:04 +00:00
}
2017-07-05 21:19:56 +00:00
2017-07-29 00:46:51 +00:00
/* check initial, max, and min compression levels */
{
unsigned const minMaxInconsistent = g_minCLevel > g_maxCLevel ;
unsigned const initialNotInRange = g_minCLevel > g_compressionLevel | | g_maxCLevel < g_compressionLevel ;
if ( minMaxInconsistent | | ( initialNotInRange & & providedInitialCLevel ) ) {
DISPLAY ( " Error: provided compression level parameters are invalid \n " ) ;
ret = 1 ;
goto _main_exit ;
}
else if ( initialNotInRange ) {
g_compressionLevel = g_minCLevel ;
}
}
2017-07-05 21:19:56 +00:00
/* error checking with number of files */
2017-07-07 18:32:14 +00:00
if ( filenameIdx > 1 & & ( outFilename ! = NULL & & strcmp ( outFilename , stdoutmark ) ) ) {
2017-07-05 21:19:56 +00:00
DISPLAY ( " Error: multiple input files provided, cannot use specified output file \n " ) ;
ret = 1 ;
goto _main_exit ;
}
/* compress files */
if ( filenameIdx < = 1 ) {
ret | = compressFilename ( filenameTable [ 0 ] , outFilename ) ;
}
else {
2017-07-07 18:32:14 +00:00
ret | = compressFilenames ( filenameTable , filenameIdx , forceStdout ) ;
2017-07-05 21:19:56 +00:00
}
_main_exit :
free ( filenameTable ) ;
return ret ;
2017-07-05 17:48:04 +00:00
}