2017-08-18 23:52:05 +00:00
/*
2017-01-20 22:00:41 +00:00
* Copyright ( c ) 2016 - present , Yann Collet , Facebook , Inc .
* All rights reserved .
*
2017-08-18 23:52:05 +00:00
* This source code is licensed under both the BSD - style license ( found in the
* LICENSE file in the root directory of this source tree ) and the GPLv2 ( found
* in the COPYING file in the root directory of this source tree ) .
2017-09-08 07:09:23 +00:00
* You may select , at your option , one of the above - listed licenses .
2017-01-20 22:00:41 +00:00
*/
2017-01-19 23:32:07 +00:00
/* ====== Tuning parameters ====== */
2017-07-13 17:10:13 +00:00
# define ZSTDMT_NBTHREADS_MAX 256
2017-07-13 09:22:58 +00:00
# define ZSTDMT_OVERLAPLOG_DEFAULT 6
2017-01-22 06:06:49 +00:00
2017-01-20 20:23:30 +00:00
/* ====== Compiler specifics ====== */
# if defined(_MSC_VER)
2017-06-03 01:20:48 +00:00
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
2017-01-19 23:32:07 +00:00
# endif
2017-01-20 20:23:30 +00:00
/* ====== Dependencies ====== */
2017-06-03 01:20:48 +00:00
# include <string.h> /* memcpy, memset */
# include "pool.h" /* threadpool */
# include "threading.h" /* mutex */
2017-05-30 23:12:06 +00:00
# include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
2016-12-27 06:19:36 +00:00
# include "zstdmt_compress.h"
2017-01-20 20:23:30 +00:00
/* ====== Debug ====== */
2017-06-03 08:15:02 +00:00
# if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2)
2017-01-17 23:31:16 +00:00
2016-12-27 06:19:36 +00:00
# include <stdio.h>
2016-12-29 00:24:01 +00:00
# include <unistd.h>
# include <sys / times.h>
2017-06-03 08:15:02 +00:00
# define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); }
2017-01-19 18:18:17 +00:00
2017-05-30 23:12:06 +00:00
# define DEBUG_PRINTHEX(l,p,n) { \
unsigned debug_u ; \
for ( debug_u = 0 ; debug_u < ( n ) ; debug_u + + ) \
2017-01-19 18:18:17 +00:00
DEBUGLOGRAW ( l , " %02X " , ( ( const unsigned char * ) ( p ) ) [ debug_u ] ) ; \
2017-05-30 23:12:06 +00:00
DEBUGLOGRAW ( l , " \n " ) ; \
2017-01-19 18:18:17 +00:00
}
2016-12-29 00:24:01 +00:00
2017-03-30 23:47:19 +00:00
static unsigned long long GetCurrentClockTimeMicroseconds ( void )
2016-12-29 00:24:01 +00:00
{
static clock_t _ticksPerSecond = 0 ;
if ( _ticksPerSecond < = 0 ) _ticksPerSecond = sysconf ( _SC_CLK_TCK ) ;
2017-03-30 23:47:19 +00:00
{ struct tms junk ; clock_t newTicks = ( clock_t ) times ( & junk ) ;
return ( ( ( ( unsigned long long ) newTicks ) * ( 1000000 ) ) / _ticksPerSecond ) ; }
2016-12-29 00:24:01 +00:00
}
2017-07-01 13:59:24 +00:00
# define MUTEX_WAIT_TIME_DLEVEL 6
2017-05-30 23:12:06 +00:00
# define PTHREAD_MUTEX_LOCK(mutex) { \
2017-06-03 08:15:02 +00:00
if ( ZSTD_DEBUG > = MUTEX_WAIT_TIME_DLEVEL ) { \
2017-05-30 23:12:06 +00:00
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds ( ) ; \
pthread_mutex_lock ( mutex ) ; \
{ unsigned long long const afterTime = GetCurrentClockTimeMicroseconds ( ) ; \
unsigned long long const elapsedTime = ( afterTime - beforeTime ) ; \
if ( elapsedTime > 1000 ) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
DEBUGLOG ( MUTEX_WAIT_TIME_DLEVEL , " Thread took %llu microseconds to acquire mutex %s \n " , \
elapsedTime , # mutex ) ; \
} } \
2017-08-23 17:24:19 +00:00
} else pthread_mutex_lock ( mutex ) ; \
2017-05-30 23:12:06 +00:00
}
2016-12-29 00:24:01 +00:00
2016-12-27 06:19:36 +00:00
# else
2016-12-29 00:24:01 +00:00
# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
2017-01-19 18:18:17 +00:00
# define DEBUG_PRINTHEX(l,p,n) {}
2016-12-29 00:24:01 +00:00
2016-12-27 06:19:36 +00:00
# endif
2016-12-29 00:24:01 +00:00
2017-01-12 00:25:46 +00:00
/* ===== Buffer Pool ===== */
2017-07-11 21:14:07 +00:00
/* a single Buffer Pool can be invoked from multiple threads in parallel */
2016-12-27 06:19:36 +00:00
typedef struct buffer_s {
void * start ;
2016-12-31 05:04:25 +00:00
size_t size ;
2016-12-27 06:19:36 +00:00
} buffer_t ;
2017-01-18 19:57:34 +00:00
static const buffer_t g_nullBuffer = { NULL , 0 } ;
2017-01-18 01:46:33 +00:00
2016-12-27 06:19:36 +00:00
typedef struct ZSTDMT_bufferPool_s {
2017-07-11 21:14:07 +00:00
pthread_mutex_t poolMutex ;
2017-07-11 22:17:25 +00:00
size_t bufferSize ;
2017-01-20 20:23:30 +00:00
unsigned totalBuffers ;
2016-12-27 06:19:36 +00:00
unsigned nbBuffers ;
2017-05-30 23:12:06 +00:00
ZSTD_customMem cMem ;
2016-12-31 13:45:33 +00:00
buffer_t bTable [ 1 ] ; /* variable size */
2016-12-27 06:19:36 +00:00
} ZSTDMT_bufferPool ;
2017-05-30 23:12:06 +00:00
static ZSTDMT_bufferPool * ZSTDMT_createBufferPool ( unsigned nbThreads , ZSTD_customMem cMem )
2016-12-31 13:45:33 +00:00
{
2017-07-12 21:23:34 +00:00
unsigned const maxNbBuffers = 2 * nbThreads + 3 ;
2017-05-30 23:12:06 +00:00
ZSTDMT_bufferPool * const bufPool = ( ZSTDMT_bufferPool * ) ZSTD_calloc (
sizeof ( ZSTDMT_bufferPool ) + ( maxNbBuffers - 1 ) * sizeof ( buffer_t ) , cMem ) ;
2016-12-31 13:45:33 +00:00
if ( bufPool = = NULL ) return NULL ;
2017-07-19 08:05:40 +00:00
if ( pthread_mutex_init ( & bufPool - > poolMutex , NULL ) ) {
ZSTD_free ( bufPool , cMem ) ;
return NULL ;
}
2017-07-11 22:17:25 +00:00
bufPool - > bufferSize = 64 KB ;
2016-12-31 13:45:33 +00:00
bufPool - > totalBuffers = maxNbBuffers ;
2017-01-22 06:06:49 +00:00
bufPool - > nbBuffers = 0 ;
2017-05-30 23:12:06 +00:00
bufPool - > cMem = cMem ;
2016-12-31 13:45:33 +00:00
return bufPool ;
}
static void ZSTDMT_freeBufferPool ( ZSTDMT_bufferPool * bufPool )
{
unsigned u ;
if ( ! bufPool ) return ; /* compatibility with free on NULL */
for ( u = 0 ; u < bufPool - > totalBuffers ; u + + )
2017-05-30 23:12:06 +00:00
ZSTD_free ( bufPool - > bTable [ u ] . start , bufPool - > cMem ) ;
2017-07-11 21:14:07 +00:00
pthread_mutex_destroy ( & bufPool - > poolMutex ) ;
2017-05-30 23:12:06 +00:00
ZSTD_free ( bufPool , bufPool - > cMem ) ;
2016-12-31 13:45:33 +00:00
}
2017-06-02 00:56:14 +00:00
/* only works at initialization, not during compression */
static size_t ZSTDMT_sizeof_bufferPool ( ZSTDMT_bufferPool * bufPool )
{
size_t const poolSize = sizeof ( * bufPool )
+ ( bufPool - > totalBuffers - 1 ) * sizeof ( buffer_t ) ;
unsigned u ;
size_t totalBufferSize = 0 ;
2017-07-11 21:14:07 +00:00
pthread_mutex_lock ( & bufPool - > poolMutex ) ;
2017-06-02 00:56:14 +00:00
for ( u = 0 ; u < bufPool - > totalBuffers ; u + + )
totalBufferSize + = bufPool - > bTable [ u ] . size ;
2017-07-11 21:14:07 +00:00
pthread_mutex_unlock ( & bufPool - > poolMutex ) ;
2017-06-02 00:56:14 +00:00
return poolSize + totalBufferSize ;
}
2017-07-11 22:17:25 +00:00
static void ZSTDMT_setBufferSize ( ZSTDMT_bufferPool * bufPool , size_t bSize )
{
bufPool - > bufferSize = bSize ;
}
2017-07-04 17:36:41 +00:00
/** ZSTDMT_getBuffer() :
2017-07-11 22:17:25 +00:00
* assumption : bufPool must be valid */
static buffer_t ZSTDMT_getBuffer ( ZSTDMT_bufferPool * bufPool )
2016-12-27 06:19:36 +00:00
{
2017-07-11 22:17:25 +00:00
size_t const bSize = bufPool - > bufferSize ;
2017-07-12 00:18:26 +00:00
DEBUGLOG ( 5 , " ZSTDMT_getBuffer " ) ;
2017-07-11 21:14:07 +00:00
pthread_mutex_lock ( & bufPool - > poolMutex ) ;
if ( bufPool - > nbBuffers ) { /* try to use an existing buffer */
buffer_t const buf = bufPool - > bTable [ - - ( bufPool - > nbBuffers ) ] ;
2016-12-31 05:04:25 +00:00
size_t const availBufferSize = buf . size ;
2017-07-11 21:14:07 +00:00
if ( ( availBufferSize > = bSize ) & ( availBufferSize < = 10 * bSize ) ) {
2017-05-30 23:12:06 +00:00
/* large enough, but not too much */
2017-07-11 21:14:07 +00:00
pthread_mutex_unlock ( & bufPool - > poolMutex ) ;
2016-12-27 06:19:36 +00:00
return buf ;
2017-07-11 21:14:07 +00:00
}
2017-05-30 23:12:06 +00:00
/* size conditions not respected : scratch this buffer, create new one */
2017-07-12 00:18:26 +00:00
DEBUGLOG ( 5 , " existing buffer does not meet size conditions => freeing " ) ;
2017-07-11 21:14:07 +00:00
ZSTD_free ( buf . start , bufPool - > cMem ) ;
2016-12-27 06:19:36 +00:00
}
2017-07-11 21:14:07 +00:00
pthread_mutex_unlock ( & bufPool - > poolMutex ) ;
2016-12-27 06:19:36 +00:00
/* create new buffer */
2017-07-12 00:18:26 +00:00
DEBUGLOG ( 5 , " create a new buffer " ) ;
2017-01-20 20:23:30 +00:00
{ buffer_t buffer ;
2017-07-11 21:14:07 +00:00
void * const start = ZSTD_malloc ( bSize , bufPool - > cMem ) ;
2017-01-22 05:56:36 +00:00
buffer . start = start ; /* note : start can be NULL if malloc fails ! */
2017-07-11 22:17:25 +00:00
buffer . size = ( start = = NULL ) ? 0 : bSize ;
2017-01-22 05:56:36 +00:00
return buffer ;
}
2016-12-27 06:19:36 +00:00
}
2017-01-11 14:35:56 +00:00
/* store buffer for later re-use, up to pool capacity */
2017-07-11 21:14:07 +00:00
static void ZSTDMT_releaseBuffer ( ZSTDMT_bufferPool * bufPool , buffer_t buf )
2016-12-27 06:19:36 +00:00
{
2017-07-11 22:56:40 +00:00
if ( buf . start = = NULL ) return ; /* compatible with release on NULL */
2017-07-12 00:18:26 +00:00
DEBUGLOG ( 5 , " ZSTDMT_releaseBuffer " ) ;
2017-07-11 21:14:07 +00:00
pthread_mutex_lock ( & bufPool - > poolMutex ) ;
if ( bufPool - > nbBuffers < bufPool - > totalBuffers ) {
2017-07-12 00:18:26 +00:00
bufPool - > bTable [ bufPool - > nbBuffers + + ] = buf ; /* stored for later use */
2017-07-11 21:14:07 +00:00
pthread_mutex_unlock ( & bufPool - > poolMutex ) ;
2016-12-27 06:19:36 +00:00
return ;
}
2017-07-11 21:14:07 +00:00
pthread_mutex_unlock ( & bufPool - > poolMutex ) ;
2016-12-31 13:45:33 +00:00
/* Reached bufferPool capacity (should not happen) */
2017-07-12 00:18:26 +00:00
DEBUGLOG ( 5 , " buffer pool capacity reached => freeing " ) ;
2017-07-11 21:14:07 +00:00
ZSTD_free ( buf . start , bufPool - > cMem ) ;
2016-12-27 06:19:36 +00:00
}
2017-08-24 18:25:41 +00:00
/* Sets parameters relevant to the compression job, initializing others to
2017-08-22 01:10:44 +00:00
* default values . Notably , nbThreads should probably be zero . */
2017-08-21 22:39:37 +00:00
static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams ( ZSTD_CCtx_params const params )
2017-08-18 23:17:24 +00:00
{
2017-08-21 22:39:37 +00:00
ZSTD_CCtx_params jobParams ;
2017-08-30 21:36:54 +00:00
memset ( & jobParams , 0 , sizeof ( jobParams ) ) ;
2017-08-21 22:39:37 +00:00
jobParams . cParams = params . cParams ;
jobParams . fParams = params . fParams ;
jobParams . compressionLevel = params . compressionLevel ;
2017-08-31 22:40:16 +00:00
2017-09-01 19:24:59 +00:00
jobParams . ldmParams = params . ldmParams ;
2017-08-21 22:39:37 +00:00
return jobParams ;
2017-08-18 23:17:24 +00:00
}
2016-12-27 06:19:36 +00:00
2017-01-12 00:25:46 +00:00
/* ===== CCtx Pool ===== */
2017-07-11 21:14:07 +00:00
/* a single CCtx Pool can be invoked from multiple threads in parallel */
2017-07-10 23:30:55 +00:00
2016-12-31 05:04:25 +00:00
typedef struct {
2017-07-10 23:30:55 +00:00
pthread_mutex_t poolMutex ;
2016-12-31 05:04:25 +00:00
unsigned totalCCtx ;
unsigned availCCtx ;
2017-05-30 23:12:06 +00:00
ZSTD_customMem cMem ;
2016-12-31 05:04:25 +00:00
ZSTD_CCtx * cctx [ 1 ] ; /* variable size */
} ZSTDMT_CCtxPool ;
2017-01-11 14:44:26 +00:00
/* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */
static void ZSTDMT_freeCCtxPool ( ZSTDMT_CCtxPool * pool )
{
unsigned u ;
2017-01-18 20:12:10 +00:00
for ( u = 0 ; u < pool - > totalCCtx ; u + + )
ZSTD_freeCCtx ( pool - > cctx [ u ] ) ; /* note : compatible with free on NULL */
2017-07-10 23:30:55 +00:00
pthread_mutex_destroy ( & pool - > poolMutex ) ;
2017-05-30 23:12:06 +00:00
ZSTD_free ( pool , pool - > cMem ) ;
2017-01-11 14:44:26 +00:00
}
2017-01-22 06:06:49 +00:00
/* ZSTDMT_createCCtxPool() :
* implies nbThreads > = 1 , checked by caller ZSTDMT_createCCtx ( ) */
2017-05-30 23:12:06 +00:00
static ZSTDMT_CCtxPool * ZSTDMT_createCCtxPool ( unsigned nbThreads ,
2017-06-02 00:56:14 +00:00
ZSTD_customMem cMem )
2016-12-31 05:04:25 +00:00
{
2017-05-30 23:12:06 +00:00
ZSTDMT_CCtxPool * const cctxPool = ( ZSTDMT_CCtxPool * ) ZSTD_calloc (
sizeof ( ZSTDMT_CCtxPool ) + ( nbThreads - 1 ) * sizeof ( ZSTD_CCtx * ) , cMem ) ;
2016-12-31 05:04:25 +00:00
if ( ! cctxPool ) return NULL ;
2017-07-19 08:05:40 +00:00
if ( pthread_mutex_init ( & cctxPool - > poolMutex , NULL ) ) {
ZSTD_free ( cctxPool , cMem ) ;
return NULL ;
}
2017-05-30 23:12:06 +00:00
cctxPool - > cMem = cMem ;
2017-01-18 20:12:10 +00:00
cctxPool - > totalCCtx = nbThreads ;
2017-01-23 09:43:58 +00:00
cctxPool - > availCCtx = 1 ; /* at least one cctx for single-thread mode */
2017-05-30 23:12:06 +00:00
cctxPool - > cctx [ 0 ] = ZSTD_createCCtx_advanced ( cMem ) ;
2017-01-23 09:43:58 +00:00
if ( ! cctxPool - > cctx [ 0 ] ) { ZSTDMT_freeCCtxPool ( cctxPool ) ; return NULL ; }
2017-07-04 17:36:41 +00:00
DEBUGLOG ( 3 , " cctxPool created, with %u threads " , nbThreads ) ;
2016-12-31 05:04:25 +00:00
return cctxPool ;
}
2017-06-02 00:56:14 +00:00
/* only works during initialization phase, not during compression */
static size_t ZSTDMT_sizeof_CCtxPool ( ZSTDMT_CCtxPool * cctxPool )
{
2017-07-10 23:30:55 +00:00
pthread_mutex_lock ( & cctxPool - > poolMutex ) ;
{ unsigned const nbThreads = cctxPool - > totalCCtx ;
size_t const poolSize = sizeof ( * cctxPool )
+ ( nbThreads - 1 ) * sizeof ( ZSTD_CCtx * ) ;
unsigned u ;
size_t totalCCtxSize = 0 ;
for ( u = 0 ; u < nbThreads ; u + + ) {
totalCCtxSize + = ZSTD_sizeof_CCtx ( cctxPool - > cctx [ u ] ) ;
}
pthread_mutex_unlock ( & cctxPool - > poolMutex ) ;
return poolSize + totalCCtxSize ;
}
2017-06-02 00:56:14 +00:00
}
2017-07-10 23:30:55 +00:00
static ZSTD_CCtx * ZSTDMT_getCCtx ( ZSTDMT_CCtxPool * cctxPool )
2016-12-31 05:04:25 +00:00
{
2017-07-10 23:30:55 +00:00
DEBUGLOG ( 5 , " ZSTDMT_getCCtx " ) ;
pthread_mutex_lock ( & cctxPool - > poolMutex ) ;
if ( cctxPool - > availCCtx ) {
cctxPool - > availCCtx - - ;
{ ZSTD_CCtx * const cctx = cctxPool - > cctx [ cctxPool - > availCCtx ] ;
pthread_mutex_unlock ( & cctxPool - > poolMutex ) ;
return cctx ;
} }
pthread_mutex_unlock ( & cctxPool - > poolMutex ) ;
DEBUGLOG ( 5 , " create one more CCtx " ) ;
return ZSTD_createCCtx_advanced ( cctxPool - > cMem ) ; /* note : can be NULL, when creation fails ! */
2016-12-27 06:19:36 +00:00
}
2016-12-31 05:04:25 +00:00
static void ZSTDMT_releaseCCtx ( ZSTDMT_CCtxPool * pool , ZSTD_CCtx * cctx )
{
2017-01-18 20:12:10 +00:00
if ( cctx = = NULL ) return ; /* compatibility with release on NULL */
2017-07-10 23:30:55 +00:00
pthread_mutex_lock ( & pool - > poolMutex ) ;
2016-12-31 05:04:25 +00:00
if ( pool - > availCCtx < pool - > totalCCtx )
pool - > cctx [ pool - > availCCtx + + ] = cctx ;
2017-07-10 23:30:55 +00:00
else {
2017-01-18 20:12:10 +00:00
/* pool overflow : should not happen, since totalCCtx==nbThreads */
2017-07-10 23:30:55 +00:00
DEBUGLOG ( 5 , " CCtx pool overflow : free cctx " ) ;
2016-12-31 05:04:25 +00:00
ZSTD_freeCCtx ( cctx ) ;
2017-07-10 23:30:55 +00:00
}
pthread_mutex_unlock ( & pool - > poolMutex ) ;
2016-12-31 05:04:25 +00:00
}
2017-01-12 00:25:46 +00:00
/* ===== Thread worker ===== */
typedef struct {
2017-01-17 23:31:16 +00:00
buffer_t src ;
2017-01-12 00:25:46 +00:00
const void * srcStart ;
2017-01-25 06:32:12 +00:00
size_t dictSize ;
2017-07-11 22:56:40 +00:00
size_t srcSize ;
2017-01-12 00:25:46 +00:00
buffer_t dstBuff ;
2017-01-19 18:32:55 +00:00
size_t cSize ;
size_t dstFlushed ;
2017-01-12 00:25:46 +00:00
unsigned firstChunk ;
unsigned lastChunk ;
unsigned jobCompleted ;
2017-01-24 19:48:40 +00:00
unsigned jobScanned ;
2017-01-12 00:25:46 +00:00
pthread_mutex_t * jobCompleted_mutex ;
pthread_cond_t * jobCompleted_cond ;
2017-08-18 23:17:24 +00:00
ZSTD_CCtx_params params ;
2017-06-03 08:15:02 +00:00
const ZSTD_CDict * cdict ;
2017-07-10 23:30:55 +00:00
ZSTDMT_CCtxPool * cctxPool ;
2017-07-11 21:59:10 +00:00
ZSTDMT_bufferPool * bufPool ;
2017-01-19 23:32:07 +00:00
unsigned long long fullFrameSize ;
2017-01-12 00:25:46 +00:00
} ZSTDMT_jobDescription ;
/* ZSTDMT_compressChunk() : POOL_function type */
void ZSTDMT_compressChunk ( void * jobDescription )
{
ZSTDMT_jobDescription * const job = ( ZSTDMT_jobDescription * ) jobDescription ;
2017-07-10 23:30:55 +00:00
ZSTD_CCtx * cctx = ZSTDMT_getCCtx ( job - > cctxPool ) ;
2017-01-25 06:32:12 +00:00
const void * const src = ( const char * ) job - > srcStart + job - > dictSize ;
2017-07-11 21:59:10 +00:00
buffer_t dstBuff = job - > dstBuff ;
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 5 , " job (first:%u) (last:%u) : dictSize %u, srcSize %u " ,
2017-03-30 22:51:58 +00:00
job - > firstChunk , job - > lastChunk , ( U32 ) job - > dictSize , ( U32 ) job - > srcSize ) ;
2017-07-10 23:30:55 +00:00
if ( cctx = = NULL ) {
job - > cSize = ERROR ( memory_allocation ) ;
goto _endJob ;
}
2017-07-11 21:59:10 +00:00
if ( dstBuff . start = = NULL ) {
2017-07-11 22:17:25 +00:00
dstBuff = ZSTDMT_getBuffer ( job - > bufPool ) ;
2017-07-11 21:59:10 +00:00
if ( dstBuff . start = = NULL ) {
job - > cSize = ERROR ( memory_allocation ) ;
goto _endJob ;
}
job - > dstBuff = dstBuff ;
}
2017-02-24 07:09:10 +00:00
if ( job - > cdict ) { /* should only happen for first segment */
2017-07-10 23:30:55 +00:00
size_t const initError = ZSTD_compressBegin_usingCDict_advanced ( cctx , job - > cdict , job - > params . fParams , job - > fullFrameSize ) ;
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 5 , " using CDict " ) ;
2017-01-23 00:40:06 +00:00
if ( ZSTD_isError ( initError ) ) { job - > cSize = initError ; goto _endJob ; }
2017-02-24 07:09:10 +00:00
} else { /* srcStart points at reloaded section */
2017-04-12 01:34:02 +00:00
if ( ! job - > firstChunk ) job - > params . fParams . contentSizeFlag = 0 ; /* ensure no srcSize control */
2017-08-24 02:11:05 +00:00
{ ZSTD_CCtx_params jobParams = job - > params ;
size_t const forceWindowError =
ZSTD_CCtxParam_setParameter ( & jobParams , ZSTD_p_forceMaxWindow , ! job - > firstChunk ) ;
2017-08-29 23:18:21 +00:00
/* Force loading dictionary in "content-only" mode (no header analysis) */
2017-08-29 22:10:42 +00:00
size_t const initError = ZSTD_compressBegin_advanced_internal ( cctx , job - > srcStart , job - > dictSize , ZSTD_dm_rawContent , jobParams , job - > fullFrameSize ) ;
if ( ZSTD_isError ( initError ) | | ZSTD_isError ( forceWindowError ) ) {
job - > cSize = initError ;
goto _endJob ;
}
2017-04-12 01:34:02 +00:00
} }
2017-02-24 07:09:10 +00:00
if ( ! job - > firstChunk ) { /* flush and overwrite frame header when it's not first segment */
2017-07-10 23:30:55 +00:00
size_t const hSize = ZSTD_compressContinue ( cctx , dstBuff . start , dstBuff . size , src , 0 ) ;
2017-01-19 18:18:17 +00:00
if ( ZSTD_isError ( hSize ) ) { job - > cSize = hSize ; goto _endJob ; }
2017-07-10 23:30:55 +00:00
ZSTD_invalidateRepCodes ( cctx ) ;
2017-01-19 18:18:17 +00:00
}
2017-01-12 00:25:46 +00:00
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 5 , " Compressing : " ) ;
2017-01-23 19:43:51 +00:00
DEBUG_PRINTHEX ( 4 , job - > srcStart , 12 ) ;
2017-02-24 07:09:10 +00:00
job - > cSize = ( job - > lastChunk ) ?
2017-07-10 23:30:55 +00:00
ZSTD_compressEnd ( cctx , dstBuff . start , dstBuff . size , src , job - > srcSize ) :
ZSTD_compressContinue ( cctx , dstBuff . start , dstBuff . size , src , job - > srcSize ) ;
2017-07-12 00:18:26 +00:00
DEBUGLOG ( 5 , " compressed %u bytes into %u bytes (first:%u) (last:%u) " ,
2017-03-30 22:51:58 +00:00
( unsigned ) job - > srcSize , ( unsigned ) job - > cSize , job - > firstChunk , job - > lastChunk ) ;
2017-04-01 01:27:03 +00:00
DEBUGLOG ( 5 , " dstBuff.size : %u ; => %s " , ( U32 ) dstBuff . size , ZSTD_getErrorName ( job - > cSize ) ) ;
2017-01-12 00:25:46 +00:00
_endJob :
2017-07-10 23:30:55 +00:00
ZSTDMT_releaseCCtx ( job - > cctxPool , cctx ) ;
2017-07-11 22:56:40 +00:00
ZSTDMT_releaseBuffer ( job - > bufPool , job - > src ) ;
job - > src = g_nullBuffer ; job - > srcStart = NULL ;
2017-01-12 00:25:46 +00:00
PTHREAD_MUTEX_LOCK ( job - > jobCompleted_mutex ) ;
job - > jobCompleted = 1 ;
2017-01-24 19:48:40 +00:00
job - > jobScanned = 0 ;
2017-01-12 00:25:46 +00:00
pthread_cond_signal ( job - > jobCompleted_cond ) ;
pthread_mutex_unlock ( job - > jobCompleted_mutex ) ;
}
2017-01-17 23:31:16 +00:00
/* ------------------------------------------ */
2017-01-12 00:25:46 +00:00
/* ===== Multi-threaded compression ===== */
2017-01-17 23:31:16 +00:00
/* ------------------------------------------ */
2017-01-12 00:25:46 +00:00
2017-07-11 22:56:40 +00:00
typedef struct {
buffer_t buffer ;
size_t filled ;
} inBuff_t ;
2016-12-31 05:04:25 +00:00
struct ZSTDMT_CCtx_s {
POOL_ctx * factory ;
2017-05-27 07:21:33 +00:00
ZSTDMT_jobDescription * jobs ;
2017-07-11 21:59:10 +00:00
ZSTDMT_bufferPool * bufPool ;
2016-12-31 05:04:25 +00:00
ZSTDMT_CCtxPool * cctxPool ;
pthread_mutex_t jobCompleted_mutex ;
2017-01-01 16:31:33 +00:00
pthread_cond_t jobCompleted_cond ;
2017-01-17 23:31:16 +00:00
size_t targetSectionSize ;
size_t inBuffSize ;
2017-01-25 06:32:12 +00:00
size_t dictSize ;
size_t targetDictSize ;
2017-01-17 23:31:16 +00:00
inBuff_t inBuff ;
2017-08-18 23:17:24 +00:00
ZSTD_CCtx_params params ;
2017-01-24 19:48:40 +00:00
XXH64_state_t xxhState ;
2017-01-17 23:31:16 +00:00
unsigned jobIDMask ;
unsigned doneJobID ;
unsigned nextJobID ;
unsigned frameEnded ;
2017-01-18 23:18:17 +00:00
unsigned allJobsCompleted ;
2017-01-19 23:32:07 +00:00
unsigned long long frameContentSize ;
2017-05-30 23:12:06 +00:00
ZSTD_customMem cMem ;
2017-06-03 08:15:02 +00:00
ZSTD_CDict * cdictLocal ;
const ZSTD_CDict * cdict ;
2016-12-31 05:04:25 +00:00
} ;
2017-06-30 22:44:57 +00:00
static ZSTDMT_jobDescription * ZSTDMT_allocJobsTable ( U32 * nbJobsPtr , ZSTD_customMem cMem )
{
U32 const nbJobsLog2 = ZSTD_highbit32 ( * nbJobsPtr ) + 1 ;
U32 const nbJobs = 1 < < nbJobsLog2 ;
* nbJobsPtr = nbJobs ;
return ( ZSTDMT_jobDescription * ) ZSTD_calloc (
nbJobs * sizeof ( ZSTDMT_jobDescription ) , cMem ) ;
}
2017-08-25 20:14:51 +00:00
/* Internal only */
size_t ZSTDMT_initializeCCtxParameters ( ZSTD_CCtx_params * params , unsigned nbThreads )
{
params - > nbThreads = nbThreads ;
params - > overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT ;
params - > jobSize = 0 ;
return 0 ;
}
2017-05-30 23:12:06 +00:00
ZSTDMT_CCtx * ZSTDMT_createCCtx_advanced ( unsigned nbThreads , ZSTD_customMem cMem )
2016-12-27 06:19:36 +00:00
{
2017-05-30 23:12:06 +00:00
ZSTDMT_CCtx * mtctx ;
2017-06-30 22:44:57 +00:00
U32 nbJobs = nbThreads + 2 ;
DEBUGLOG ( 3 , " ZSTDMT_createCCtx_advanced " ) ;
2017-05-30 23:12:06 +00:00
2017-07-13 17:10:13 +00:00
if ( nbThreads < 1 ) return NULL ;
nbThreads = MIN ( nbThreads , ZSTDMT_NBTHREADS_MAX ) ;
2017-05-30 23:12:06 +00:00
if ( ( cMem . customAlloc ! = NULL ) ^ ( cMem . customFree ! = NULL ) )
/* invalid custom allocator */
return NULL ;
mtctx = ( ZSTDMT_CCtx * ) ZSTD_calloc ( sizeof ( ZSTDMT_CCtx ) , cMem ) ;
if ( ! mtctx ) return NULL ;
2017-08-25 20:14:51 +00:00
ZSTDMT_initializeCCtxParameters ( & mtctx - > params , nbThreads ) ;
2017-05-30 23:12:06 +00:00
mtctx - > cMem = cMem ;
mtctx - > allJobsCompleted = 1 ;
2017-08-25 01:12:28 +00:00
mtctx - > factory = POOL_create_advanced ( nbThreads , 0 , cMem ) ;
2017-06-30 22:44:57 +00:00
mtctx - > jobs = ZSTDMT_allocJobsTable ( & nbJobs , cMem ) ;
mtctx - > jobIDMask = nbJobs - 1 ;
2017-07-11 21:59:10 +00:00
mtctx - > bufPool = ZSTDMT_createBufferPool ( nbThreads , cMem ) ;
2017-05-30 23:12:06 +00:00
mtctx - > cctxPool = ZSTDMT_createCCtxPool ( nbThreads , cMem ) ;
2017-07-11 21:59:10 +00:00
if ( ! mtctx - > factory | ! mtctx - > jobs | ! mtctx - > bufPool | ! mtctx - > cctxPool ) {
2017-05-30 23:12:06 +00:00
ZSTDMT_freeCCtx ( mtctx ) ;
2017-01-11 14:58:05 +00:00
return NULL ;
}
2017-07-19 08:05:40 +00:00
if ( pthread_mutex_init ( & mtctx - > jobCompleted_mutex , NULL ) ) {
ZSTDMT_freeCCtx ( mtctx ) ;
return NULL ;
}
if ( pthread_cond_init ( & mtctx - > jobCompleted_cond , NULL ) ) {
ZSTDMT_freeCCtx ( mtctx ) ;
return NULL ;
}
2017-07-04 17:36:41 +00:00
DEBUGLOG ( 3 , " mt_cctx created, for %u threads " , nbThreads ) ;
2017-05-30 23:12:06 +00:00
return mtctx ;
}
ZSTDMT_CCtx * ZSTDMT_createCCtx ( unsigned nbThreads )
{
return ZSTDMT_createCCtx_advanced ( nbThreads , ZSTD_defaultCMem ) ;
2016-12-27 06:19:36 +00:00
}
2017-01-18 01:46:33 +00:00
/* ZSTDMT_releaseAllJobResources() :
2017-06-30 21:51:01 +00:00
* note : ensure all workers are killed first ! */
2017-01-18 01:46:33 +00:00
static void ZSTDMT_releaseAllJobResources ( ZSTDMT_CCtx * mtctx )
{
unsigned jobID ;
2017-07-04 17:36:41 +00:00
DEBUGLOG ( 3 , " ZSTDMT_releaseAllJobResources " ) ;
2017-01-18 01:46:33 +00:00
for ( jobID = 0 ; jobID < = mtctx - > jobIDMask ; jobID + + ) {
2017-07-11 21:59:10 +00:00
ZSTDMT_releaseBuffer ( mtctx - > bufPool , mtctx - > jobs [ jobID ] . dstBuff ) ;
2017-01-18 01:46:33 +00:00
mtctx - > jobs [ jobID ] . dstBuff = g_nullBuffer ;
2017-07-11 21:59:10 +00:00
ZSTDMT_releaseBuffer ( mtctx - > bufPool , mtctx - > jobs [ jobID ] . src ) ;
2017-01-18 01:46:33 +00:00
mtctx - > jobs [ jobID ] . src = g_nullBuffer ;
}
2017-01-19 18:18:17 +00:00
memset ( mtctx - > jobs , 0 , ( mtctx - > jobIDMask + 1 ) * sizeof ( ZSTDMT_jobDescription ) ) ;
2017-07-11 21:59:10 +00:00
ZSTDMT_releaseBuffer ( mtctx - > bufPool , mtctx - > inBuff . buffer ) ;
2017-01-18 19:57:34 +00:00
mtctx - > inBuff . buffer = g_nullBuffer ;
2017-01-19 18:18:17 +00:00
mtctx - > allJobsCompleted = 1 ;
2017-01-18 01:46:33 +00:00
}
2017-09-28 09:33:41 +00:00
static void ZSTDMT_waitForAllJobsCompleted ( ZSTDMT_CCtx * zcs )
{
DEBUGLOG ( 4 , " ZSTDMT_waitForAllJobsCompleted " ) ;
while ( zcs - > doneJobID < zcs - > nextJobID ) {
unsigned const jobID = zcs - > doneJobID & zcs - > jobIDMask ;
PTHREAD_MUTEX_LOCK ( & zcs - > jobCompleted_mutex ) ;
while ( zcs - > jobs [ jobID ] . jobCompleted = = 0 ) {
DEBUGLOG ( 5 , " waiting for jobCompleted signal from chunk %u " , zcs - > doneJobID ) ; /* we want to block when waiting for data to flush */
pthread_cond_wait ( & zcs - > jobCompleted_cond , & zcs - > jobCompleted_mutex ) ;
}
pthread_mutex_unlock ( & zcs - > jobCompleted_mutex ) ;
zcs - > doneJobID + + ;
}
}
2017-01-11 14:58:05 +00:00
size_t ZSTDMT_freeCCtx ( ZSTDMT_CCtx * mtctx )
2016-12-27 06:19:36 +00:00
{
2017-01-18 01:46:33 +00:00
if ( mtctx = = NULL ) return 0 ; /* compatible with free on NULL */
2016-12-31 05:04:25 +00:00
POOL_free ( mtctx - > factory ) ;
2017-09-28 09:33:41 +00:00
if ( ! mtctx - > allJobsCompleted ) {
ZSTDMT_waitForAllJobsCompleted ( mtctx ) ;
ZSTDMT_releaseAllJobResources ( mtctx ) ; /* stop workers first */
}
2017-07-11 21:59:10 +00:00
ZSTDMT_freeBufferPool ( mtctx - > bufPool ) ; /* release job resources into pools first */
2017-05-30 23:12:06 +00:00
ZSTD_free ( mtctx - > jobs , mtctx - > cMem ) ;
2016-12-31 05:04:25 +00:00
ZSTDMT_freeCCtxPool ( mtctx - > cctxPool ) ;
2017-06-03 08:15:02 +00:00
ZSTD_freeCDict ( mtctx - > cdictLocal ) ;
2016-12-31 13:45:33 +00:00
pthread_mutex_destroy ( & mtctx - > jobCompleted_mutex ) ;
2017-01-01 23:49:42 +00:00
pthread_cond_destroy ( & mtctx - > jobCompleted_cond ) ;
2017-05-30 23:12:06 +00:00
ZSTD_free ( mtctx , mtctx - > cMem ) ;
2016-12-27 06:19:36 +00:00
return 0 ;
}
2017-06-02 00:56:14 +00:00
size_t ZSTDMT_sizeof_CCtx ( ZSTDMT_CCtx * mtctx )
{
if ( mtctx = = NULL ) return 0 ; /* supports sizeof NULL */
return sizeof ( * mtctx )
2017-07-11 21:59:10 +00:00
+ POOL_sizeof ( mtctx - > factory )
+ ZSTDMT_sizeof_bufferPool ( mtctx - > bufPool )
+ ( mtctx - > jobIDMask + 1 ) * sizeof ( ZSTDMT_jobDescription )
+ ZSTDMT_sizeof_CCtxPool ( mtctx - > cctxPool )
+ ZSTD_sizeof_CDict ( mtctx - > cdictLocal ) ;
2017-06-02 00:56:14 +00:00
}
2017-08-25 20:14:51 +00:00
/* Internal only */
size_t ZSTDMT_CCtxParam_setMTCtxParameter (
2017-08-25 23:13:40 +00:00
ZSTD_CCtx_params * params , ZSTDMT_parameter parameter , unsigned value ) {
2017-01-25 01:02:26 +00:00
switch ( parameter )
{
case ZSTDMT_p_sectionSize :
2017-08-25 20:14:51 +00:00
params - > jobSize = value ;
2017-01-25 01:02:26 +00:00
return 0 ;
2017-01-30 19:00:00 +00:00
case ZSTDMT_p_overlapSectionLog :
2017-09-01 01:25:56 +00:00
DEBUGLOG ( 4 , " ZSTDMT_p_overlapSectionLog : %u " , value ) ;
2017-08-25 20:14:51 +00:00
params - > overlapSizeLog = ( value > = 9 ) ? 9 : value ;
2017-01-26 01:01:13 +00:00
return 0 ;
2017-01-25 01:02:26 +00:00
default :
2017-07-14 00:12:16 +00:00
return ERROR ( parameter_unsupported ) ;
2017-01-25 01:02:26 +00:00
}
}
2017-08-25 23:13:40 +00:00
size_t ZSTDMT_setMTCtxParameter ( ZSTDMT_CCtx * mtctx , ZSTDMT_parameter parameter , unsigned value )
2017-08-25 20:14:51 +00:00
{
switch ( parameter )
{
case ZSTDMT_p_sectionSize :
return ZSTDMT_CCtxParam_setMTCtxParameter ( & mtctx - > params , parameter , value ) ;
case ZSTDMT_p_overlapSectionLog :
return ZSTDMT_CCtxParam_setMTCtxParameter ( & mtctx - > params , parameter , value ) ;
default :
return ERROR ( parameter_unsupported ) ;
}
}
2017-01-25 01:02:26 +00:00
/* ------------------------------------------ */
/* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */
2016-12-31 05:04:25 +00:00
2017-07-03 23:23:36 +00:00
static unsigned computeNbChunks ( size_t srcSize , unsigned windowLog , unsigned nbThreads ) {
size_t const chunkSizeTarget = ( size_t ) 1 < < ( windowLog + 2 ) ;
size_t const chunkMaxSize = chunkSizeTarget < < 2 ;
size_t const passSizeMax = chunkMaxSize * nbThreads ;
unsigned const multiplier = ( unsigned ) ( srcSize / passSizeMax ) + 1 ;
unsigned const nbChunksLarge = multiplier * nbThreads ;
unsigned const nbChunksMax = ( unsigned ) ( srcSize / chunkSizeTarget ) + 1 ;
unsigned const nbChunksSmall = MIN ( nbChunksMax , nbThreads ) ;
return ( multiplier > 1 ) ? nbChunksLarge : nbChunksSmall ;
}
2017-08-22 21:24:47 +00:00
static size_t ZSTDMT_compress_advanced_internal (
2017-08-18 23:17:24 +00:00
ZSTDMT_CCtx * mtctx ,
void * dst , size_t dstCapacity ,
const void * src , size_t srcSize ,
const ZSTD_CDict * cdict ,
2017-08-25 20:14:51 +00:00
ZSTD_CCtx_params const params )
2016-12-27 06:19:36 +00:00
{
2017-08-25 20:14:51 +00:00
ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams ( params ) ;
unsigned const overlapRLog = ( params . overlapSizeLog > 9 ) ? 0 : 9 - params . overlapSizeLog ;
size_t const overlapSize = ( overlapRLog > = 9 ) ? 0 : ( size_t ) 1 < < ( params . cParams . windowLog - overlapRLog ) ;
unsigned nbChunks = computeNbChunks ( srcSize , params . cParams . windowLog , params . nbThreads ) ;
2017-01-12 00:25:46 +00:00
size_t const proposedChunkSize = ( srcSize + ( nbChunks - 1 ) ) / nbChunks ;
2017-06-30 22:44:57 +00:00
size_t const avgChunkSize = ( ( proposedChunkSize & 0x1FFFF ) < 0x7FFF ) ? proposedChunkSize + 0xFFFF : proposedChunkSize ; /* avoid too small last block */
2016-12-27 06:19:36 +00:00
const char * const srcStart = ( const char * ) src ;
2017-06-30 22:44:57 +00:00
size_t remainingSrcSize = srcSize ;
2017-04-01 01:27:03 +00:00
unsigned const compressWithinDst = ( dstCapacity > = ZSTD_compressBound ( srcSize ) ) ? nbChunks : ( unsigned ) ( dstCapacity / ZSTD_compressBound ( avgChunkSize ) ) ; /* presumes avgChunkSize >= 256 KB, which should be the case */
size_t frameStartPos = 0 , dstBufferPos = 0 ;
2017-07-12 00:18:26 +00:00
XXH64_state_t xxh64 ;
2017-08-25 20:14:51 +00:00
assert ( jobParams . nbThreads = = 0 ) ;
2017-08-25 20:23:16 +00:00
assert ( mtctx - > cctxPool - > totalCCtx = = params . nbThreads ) ;
2016-12-31 05:04:25 +00:00
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 4 , " nbChunks : %2u (chunkSize : %u bytes) " , nbChunks , ( U32 ) avgChunkSize ) ;
2017-01-23 08:56:54 +00:00
if ( nbChunks = = 1 ) { /* fallback to single-thread mode */
2017-01-23 09:43:58 +00:00
ZSTD_CCtx * const cctx = mtctx - > cctxPool - > cctx [ 0 ] ;
2017-08-25 18:36:17 +00:00
if ( cdict ) return ZSTD_compress_usingCDict_advanced ( cctx , dst , dstCapacity , src , srcSize , cdict , jobParams . fParams ) ;
return ZSTD_compress_advanced_internal ( cctx , dst , dstCapacity , src , srcSize , NULL , 0 , jobParams ) ;
2017-01-23 08:56:54 +00:00
}
2017-07-13 09:22:58 +00:00
assert ( avgChunkSize > = 256 KB ) ; /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is required for compressWithinDst */
2017-07-11 22:17:25 +00:00
ZSTDMT_setBufferSize ( mtctx - > bufPool , ZSTD_compressBound ( avgChunkSize ) ) ;
2017-07-12 00:18:26 +00:00
XXH64_reset ( & xxh64 , 0 ) ;
2017-01-23 08:56:54 +00:00
2017-06-30 22:44:57 +00:00
if ( nbChunks > mtctx - > jobIDMask + 1 ) { /* enlarge job table */
U32 nbJobs = nbChunks ;
ZSTD_free ( mtctx - > jobs , mtctx - > cMem ) ;
mtctx - > jobIDMask = 0 ;
mtctx - > jobs = ZSTDMT_allocJobsTable ( & nbJobs , mtctx - > cMem ) ;
if ( mtctx - > jobs = = NULL ) return ERROR ( memory_allocation ) ;
mtctx - > jobIDMask = nbJobs - 1 ;
}
2016-12-27 06:19:36 +00:00
{ unsigned u ;
2017-01-12 00:25:46 +00:00
for ( u = 0 ; u < nbChunks ; u + + ) {
size_t const chunkSize = MIN ( remainingSrcSize , avgChunkSize ) ;
2017-04-01 01:27:03 +00:00
size_t const dstBufferCapacity = ZSTD_compressBound ( chunkSize ) ;
buffer_t const dstAsBuffer = { ( char * ) dst + dstBufferPos , dstBufferCapacity } ;
2017-07-11 21:59:10 +00:00
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer ;
2017-03-30 22:51:58 +00:00
size_t dictSize = u ? overlapSize : 0 ;
2017-01-12 01:01:28 +00:00
2017-07-11 22:56:40 +00:00
mtctx - > jobs [ u ] . src = g_nullBuffer ;
2017-03-30 22:51:58 +00:00
mtctx - > jobs [ u ] . srcStart = srcStart + frameStartPos - dictSize ;
mtctx - > jobs [ u ] . dictSize = dictSize ;
2017-01-12 00:25:46 +00:00
mtctx - > jobs [ u ] . srcSize = chunkSize ;
2017-06-30 21:51:01 +00:00
mtctx - > jobs [ u ] . cdict = mtctx - > nextJobID = = 0 ? cdict : NULL ;
2017-01-11 17:21:25 +00:00
mtctx - > jobs [ u ] . fullFrameSize = srcSize ;
2017-08-25 18:36:17 +00:00
mtctx - > jobs [ u ] . params = jobParams ;
2017-06-30 21:51:01 +00:00
/* do not calculate checksum within sections, but write it in header for first section */
2017-06-30 22:44:57 +00:00
if ( u ! = 0 ) mtctx - > jobs [ u ] . params . fParams . checksumFlag = 0 ;
2016-12-31 05:04:25 +00:00
mtctx - > jobs [ u ] . dstBuff = dstBuffer ;
2017-07-10 23:30:55 +00:00
mtctx - > jobs [ u ] . cctxPool = mtctx - > cctxPool ;
2017-07-11 21:59:10 +00:00
mtctx - > jobs [ u ] . bufPool = mtctx - > bufPool ;
2017-01-12 00:25:46 +00:00
mtctx - > jobs [ u ] . firstChunk = ( u = = 0 ) ;
mtctx - > jobs [ u ] . lastChunk = ( u = = nbChunks - 1 ) ;
2016-12-31 05:04:25 +00:00
mtctx - > jobs [ u ] . jobCompleted = 0 ;
mtctx - > jobs [ u ] . jobCompleted_mutex = & mtctx - > jobCompleted_mutex ;
2017-01-01 16:31:33 +00:00
mtctx - > jobs [ u ] . jobCompleted_cond = & mtctx - > jobCompleted_cond ;
2016-12-31 05:04:25 +00:00
2017-08-25 20:14:51 +00:00
if ( params . fParams . checksumFlag ) {
2017-07-12 00:18:26 +00:00
XXH64_update ( & xxh64 , srcStart + frameStartPos , chunkSize ) ;
}
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 5 , " posting job %u (%u bytes) " , u , ( U32 ) chunkSize ) ;
DEBUG_PRINTHEX ( 6 , mtctx - > jobs [ u ] . srcStart , 12 ) ;
2017-01-12 00:25:46 +00:00
POOL_add ( mtctx - > factory , ZSTDMT_compressChunk , & mtctx - > jobs [ u ] ) ;
2016-12-31 05:04:25 +00:00
2017-01-12 00:25:46 +00:00
frameStartPos + = chunkSize ;
2017-04-01 01:27:03 +00:00
dstBufferPos + = dstBufferCapacity ;
2017-01-12 00:25:46 +00:00
remainingSrcSize - = chunkSize ;
2016-12-27 06:19:36 +00:00
} }
2016-12-31 05:04:25 +00:00
2017-07-03 22:52:19 +00:00
/* collect result */
2017-07-12 00:18:26 +00:00
{ size_t error = 0 , dstPos = 0 ;
unsigned chunkID ;
2017-01-12 00:25:46 +00:00
for ( chunkID = 0 ; chunkID < nbChunks ; chunkID + + ) {
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 5 , " waiting for chunk %u " , chunkID ) ;
2017-01-11 17:21:25 +00:00
PTHREAD_MUTEX_LOCK ( & mtctx - > jobCompleted_mutex ) ;
2017-01-12 00:25:46 +00:00
while ( mtctx - > jobs [ chunkID ] . jobCompleted = = 0 ) {
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 5 , " waiting for jobCompleted signal from chunk %u " , chunkID ) ;
2017-01-01 16:31:33 +00:00
pthread_cond_wait ( & mtctx - > jobCompleted_cond , & mtctx - > jobCompleted_mutex ) ;
2016-12-31 05:04:25 +00:00
}
2017-01-01 16:31:33 +00:00
pthread_mutex_unlock ( & mtctx - > jobCompleted_mutex ) ;
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 5 , " ready to write chunk %u " , chunkID ) ;
2017-01-01 16:31:33 +00:00
2017-01-18 01:46:33 +00:00
mtctx - > jobs [ chunkID ] . srcStart = NULL ;
2017-01-12 00:25:46 +00:00
{ size_t const cSize = mtctx - > jobs [ chunkID ] . cSize ;
2017-01-12 02:06:35 +00:00
if ( ZSTD_isError ( cSize ) ) error = cSize ;
if ( ( ! error ) & & ( dstPos + cSize > dstCapacity ) ) error = ERROR ( dstSize_tooSmall ) ;
2017-07-03 22:52:19 +00:00
if ( chunkID ) { /* note : chunk 0 is written directly at dst, which is correct position */
2017-04-01 01:27:03 +00:00
if ( ! error )
2017-07-03 22:52:19 +00:00
memmove ( ( char * ) dst + dstPos , mtctx - > jobs [ chunkID ] . dstBuff . start , cSize ) ; /* may overlap when chunk compressed within dst */
if ( chunkID > = compressWithinDst ) { /* chunk compressed into its own buffer, which must be released */
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 5 , " releasing buffer %u>=%u " , chunkID , compressWithinDst ) ;
2017-07-11 21:59:10 +00:00
ZSTDMT_releaseBuffer ( mtctx - > bufPool , mtctx - > jobs [ chunkID ] . dstBuff ) ;
2017-06-30 21:51:01 +00:00
}
2017-01-18 01:46:33 +00:00
mtctx - > jobs [ chunkID ] . dstBuff = g_nullBuffer ;
2016-12-31 13:45:33 +00:00
}
2016-12-31 05:04:25 +00:00
dstPos + = cSize ;
}
2017-07-11 21:59:10 +00:00
} /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
2017-07-12 00:18:26 +00:00
2017-08-25 20:14:51 +00:00
DEBUGLOG ( 4 , " checksumFlag : %u " , params . fParams . checksumFlag ) ;
if ( params . fParams . checksumFlag ) {
2017-07-12 00:18:26 +00:00
U32 const checksum = ( U32 ) XXH64_digest ( & xxh64 ) ;
if ( dstPos + 4 > dstCapacity ) {
error = ERROR ( dstSize_tooSmall ) ;
} else {
DEBUGLOG ( 4 , " writing checksum : %08X \n " , checksum ) ;
MEM_writeLE32 ( ( char * ) dst + dstPos , checksum ) ;
dstPos + = 4 ;
} }
2017-06-20 01:25:35 +00:00
if ( ! error ) DEBUGLOG ( 4 , " compressed size : %u " , ( U32 ) dstPos ) ;
2017-01-12 02:06:35 +00:00
return error ? error : dstPos ;
2016-12-31 05:04:25 +00:00
}
2017-08-18 23:17:24 +00:00
}
size_t ZSTDMT_compress_advanced ( ZSTDMT_CCtx * mtctx ,
void * dst , size_t dstCapacity ,
const void * src , size_t srcSize ,
const ZSTD_CDict * cdict ,
ZSTD_parameters const params ,
unsigned overlapLog )
{
ZSTD_CCtx_params cctxParams = mtctx - > params ;
cctxParams . cParams = params . cParams ;
cctxParams . fParams = params . fParams ;
2017-08-25 20:14:51 +00:00
cctxParams . overlapSizeLog = overlapLog ;
2017-08-22 21:24:47 +00:00
return ZSTDMT_compress_advanced_internal ( mtctx ,
dst , dstCapacity ,
src , srcSize ,
2017-08-25 20:14:51 +00:00
cdict , cctxParams ) ;
2017-06-30 21:51:01 +00:00
}
2016-12-27 06:19:36 +00:00
2017-06-30 21:51:01 +00:00
size_t ZSTDMT_compressCCtx ( ZSTDMT_CCtx * mtctx ,
void * dst , size_t dstCapacity ,
const void * src , size_t srcSize ,
int compressionLevel )
{
2017-07-13 09:22:58 +00:00
U32 const overlapLog = ( compressionLevel > = ZSTD_maxCLevel ( ) ) ? 9 : ZSTDMT_OVERLAPLOG_DEFAULT ;
2017-06-30 21:51:01 +00:00
ZSTD_parameters params = ZSTD_getParams ( compressionLevel , srcSize , 0 ) ;
params . fParams . contentSizeFlag = 1 ;
2017-07-13 09:22:58 +00:00
return ZSTDMT_compress_advanced ( mtctx , dst , dstCapacity , src , srcSize , NULL , params , overlapLog ) ;
2016-12-27 06:19:36 +00:00
}
2017-01-12 00:25:46 +00:00
/* ====================================== */
/* ======= Streaming API ======= */
/* ====================================== */
2017-08-22 21:24:47 +00:00
size_t ZSTDMT_initCStream_internal (
2017-08-30 21:36:54 +00:00
ZSTDMT_CCtx * zcs ,
const void * dict , size_t dictSize , ZSTD_dictMode_e dictMode ,
2017-08-25 20:14:51 +00:00
const ZSTD_CDict * cdict , ZSTD_CCtx_params params ,
2017-08-18 23:17:24 +00:00
unsigned long long pledgedSrcSize )
2017-01-23 00:40:06 +00:00
{
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 4 , " ZSTDMT_initCStream_internal " ) ;
2017-06-03 08:15:02 +00:00
/* params are supposed to be fully validated at this point */
2017-08-25 20:14:51 +00:00
assert ( ! ZSTD_isError ( ZSTD_checkCParams ( params . cParams ) ) ) ;
2017-06-03 08:15:02 +00:00
assert ( ! ( ( dict ) & & ( cdict ) ) ) ; /* either dict or cdict, not both */
2017-08-25 20:23:16 +00:00
assert ( zcs - > cctxPool - > totalCCtx = = params . nbThreads ) ;
2017-06-03 08:15:02 +00:00
2017-08-25 20:14:51 +00:00
if ( params . nbThreads = = 1 ) {
ZSTD_CCtx_params const singleThreadParams = ZSTDMT_makeJobCCtxParams ( params ) ;
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 4 , " single thread mode " ) ;
2017-08-25 20:14:51 +00:00
assert ( singleThreadParams . nbThreads = = 0 ) ;
2017-08-22 01:10:44 +00:00
return ZSTD_initCStream_internal ( zcs - > cctxPool - > cctx [ 0 ] ,
dict , dictSize , cdict ,
2017-08-25 20:14:51 +00:00
singleThreadParams , pledgedSrcSize ) ;
2017-06-03 08:15:02 +00:00
}
2017-06-03 01:20:48 +00:00
if ( zcs - > allJobsCompleted = = 0 ) { /* previous compression not correctly finished */
2017-01-18 23:18:17 +00:00
ZSTDMT_waitForAllJobsCompleted ( zcs ) ;
ZSTDMT_releaseAllJobResources ( zcs ) ;
zcs - > allJobsCompleted = 1 ;
}
2017-06-03 08:15:02 +00:00
2017-08-25 20:14:51 +00:00
zcs - > params = params ;
2017-01-19 23:32:07 +00:00
zcs - > frameContentSize = pledgedSrcSize ;
2017-06-03 08:15:02 +00:00
if ( dict ) {
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 4 , " cdictLocal: %08X " , ( U32 ) ( size_t ) zcs - > cdictLocal ) ;
2017-06-03 08:15:02 +00:00
ZSTD_freeCDict ( zcs - > cdictLocal ) ;
2017-08-21 21:49:16 +00:00
zcs - > cdictLocal = ZSTD_createCDict_advanced ( dict , dictSize ,
2017-08-30 21:36:54 +00:00
ZSTD_dlm_byCopy , dictMode , /* note : a loadPrefix becomes an internal CDict */
2017-08-25 20:14:51 +00:00
params . cParams , zcs - > cMem ) ;
2017-06-03 08:15:02 +00:00
zcs - > cdict = zcs - > cdictLocal ;
2017-06-21 19:26:40 +00:00
if ( zcs - > cdictLocal = = NULL ) return ERROR ( memory_allocation ) ;
2017-06-03 08:15:02 +00:00
} else {
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 4 , " cdictLocal: %08X " , ( U32 ) ( size_t ) zcs - > cdictLocal ) ;
2017-06-21 19:26:40 +00:00
ZSTD_freeCDict ( zcs - > cdictLocal ) ;
zcs - > cdictLocal = NULL ;
2017-06-03 08:15:02 +00:00
zcs - > cdict = cdict ;
}
2017-08-25 20:14:51 +00:00
zcs - > targetDictSize = ( params . overlapSizeLog = = 0 ) ? 0 : ( size_t ) 1 < < ( params . cParams . windowLog - ( 9 - params . overlapSizeLog ) ) ;
DEBUGLOG ( 4 , " overlapLog : %u " , params . overlapSizeLog ) ;
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 4 , " overlap Size : %u KB " , ( U32 ) ( zcs - > targetDictSize > > 10 ) ) ;
2017-08-25 20:14:51 +00:00
zcs - > targetSectionSize = params . jobSize ? params . jobSize : ( size_t ) 1 < < ( params . cParams . windowLog + 2 ) ;
2017-01-25 01:02:26 +00:00
zcs - > targetSectionSize = MAX ( ZSTDMT_SECTION_SIZE_MIN , zcs - > targetSectionSize ) ;
2017-01-30 21:35:45 +00:00
zcs - > targetSectionSize = MAX ( zcs - > targetDictSize , zcs - > targetSectionSize ) ;
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 4 , " Section Size : %u KB " , ( U32 ) ( zcs - > targetSectionSize > > 10 ) ) ;
2017-07-11 15:54:29 +00:00
zcs - > inBuffSize = zcs - > targetDictSize + zcs - > targetSectionSize ;
2017-07-11 22:17:25 +00:00
ZSTDMT_setBufferSize ( zcs - > bufPool , MAX ( zcs - > inBuffSize , ZSTD_compressBound ( zcs - > targetSectionSize ) ) ) ;
2017-07-11 00:16:41 +00:00
zcs - > inBuff . buffer = g_nullBuffer ;
2017-01-25 06:32:12 +00:00
zcs - > dictSize = 0 ;
2017-01-12 00:25:46 +00:00
zcs - > doneJobID = 0 ;
zcs - > nextJobID = 0 ;
2017-01-17 23:31:16 +00:00
zcs - > frameEnded = 0 ;
2017-01-18 23:18:17 +00:00
zcs - > allJobsCompleted = 0 ;
2017-08-25 20:14:51 +00:00
if ( params . fParams . checksumFlag ) XXH64_reset ( & zcs - > xxhState , 0 ) ;
2017-01-12 00:25:46 +00:00
return 0 ;
2017-08-18 23:17:24 +00:00
}
2017-06-03 08:15:02 +00:00
size_t ZSTDMT_initCStream_advanced ( ZSTDMT_CCtx * mtctx ,
2017-07-11 21:59:10 +00:00
const void * dict , size_t dictSize ,
ZSTD_parameters params ,
unsigned long long pledgedSrcSize )
2017-01-23 00:40:06 +00:00
{
2017-08-22 21:24:47 +00:00
ZSTD_CCtx_params cctxParams = mtctx - > params ;
2017-06-21 22:13:00 +00:00
DEBUGLOG ( 5 , " ZSTDMT_initCStream_advanced " ) ;
2017-08-22 21:24:47 +00:00
cctxParams . cParams = params . cParams ;
cctxParams . fParams = params . fParams ;
2017-08-30 21:36:54 +00:00
return ZSTDMT_initCStream_internal ( mtctx , dict , dictSize , ZSTD_dm_auto , NULL ,
2017-08-22 21:24:47 +00:00
cctxParams , pledgedSrcSize ) ;
2017-06-03 08:15:02 +00:00
}
size_t ZSTDMT_initCStream_usingCDict ( ZSTDMT_CCtx * mtctx ,
const ZSTD_CDict * cdict ,
ZSTD_frameParameters fParams ,
unsigned long long pledgedSrcSize )
{
2017-08-25 20:14:51 +00:00
ZSTD_CCtx_params cctxParams = mtctx - > params ;
2017-08-26 00:58:28 +00:00
cctxParams . cParams = ZSTD_getCParamsFromCDict ( cdict ) ;
2017-08-25 20:14:51 +00:00
cctxParams . fParams = fParams ;
2017-06-21 19:26:40 +00:00
if ( cdict = = NULL ) return ERROR ( dictionary_wrong ) ; /* method incompatible with NULL cdict */
2017-08-30 21:36:54 +00:00
return ZSTDMT_initCStream_internal ( mtctx , NULL , 0 /*dictSize*/ , ZSTD_dm_auto , cdict ,
2017-08-25 20:14:51 +00:00
cctxParams , pledgedSrcSize ) ;
2017-01-23 00:40:06 +00:00
}
2017-06-03 08:15:02 +00:00
2017-01-20 00:59:56 +00:00
/* ZSTDMT_resetCStream() :
* pledgedSrcSize is optional and can be zero = = unknown */
size_t ZSTDMT_resetCStream ( ZSTDMT_CCtx * zcs , unsigned long long pledgedSrcSize )
{
2017-08-25 20:14:51 +00:00
if ( zcs - > params . nbThreads = = 1 )
2017-05-30 23:37:19 +00:00
return ZSTD_resetCStream ( zcs - > cctxPool - > cctx [ 0 ] , pledgedSrcSize ) ;
2017-08-30 21:36:54 +00:00
return ZSTDMT_initCStream_internal ( zcs , NULL , 0 , ZSTD_dm_auto , 0 , zcs - > params ,
2017-08-22 21:24:47 +00:00
pledgedSrcSize ) ;
2017-01-20 00:59:56 +00:00
}
2017-01-19 23:32:07 +00:00
size_t ZSTDMT_initCStream ( ZSTDMT_CCtx * zcs , int compressionLevel ) {
ZSTD_parameters const params = ZSTD_getParams ( compressionLevel , 0 , 0 ) ;
2017-08-22 21:24:47 +00:00
ZSTD_CCtx_params cctxParams = zcs - > params ;
cctxParams . cParams = params . cParams ;
cctxParams . fParams = params . fParams ;
2017-08-30 21:36:54 +00:00
return ZSTDMT_initCStream_internal ( zcs , NULL , 0 , ZSTD_dm_auto , NULL , cctxParams , 0 ) ;
2017-01-19 23:32:07 +00:00
}
2017-01-12 00:25:46 +00:00
2017-01-25 01:41:49 +00:00
static size_t ZSTDMT_createCompressionJob ( ZSTDMT_CCtx * zcs , size_t srcSize , unsigned endFrame )
{
unsigned const jobID = zcs - > nextJobID & zcs - > jobIDMask ;
2017-05-30 23:37:19 +00:00
DEBUGLOG ( 4 , " preparing job %u to compress %u bytes with %u preload " ,
zcs - > nextJobID , ( U32 ) srcSize , ( U32 ) zcs - > dictSize ) ;
2017-01-25 01:41:49 +00:00
zcs - > jobs [ jobID ] . src = zcs - > inBuff . buffer ;
zcs - > jobs [ jobID ] . srcStart = zcs - > inBuff . buffer . start ;
zcs - > jobs [ jobID ] . srcSize = srcSize ;
2017-05-30 23:37:19 +00:00
zcs - > jobs [ jobID ] . dictSize = zcs - > dictSize ;
assert ( zcs - > inBuff . filled > = srcSize + zcs - > dictSize ) ;
2017-01-25 01:41:49 +00:00
zcs - > jobs [ jobID ] . params = zcs - > params ;
2017-05-30 23:37:19 +00:00
/* do not calculate checksum within sections, but write it in header for first section */
if ( zcs - > nextJobID ) zcs - > jobs [ jobID ] . params . fParams . checksumFlag = 0 ;
2017-01-25 01:41:49 +00:00
zcs - > jobs [ jobID ] . cdict = zcs - > nextJobID = = 0 ? zcs - > cdict : NULL ;
zcs - > jobs [ jobID ] . fullFrameSize = zcs - > frameContentSize ;
2017-07-11 21:59:10 +00:00
zcs - > jobs [ jobID ] . dstBuff = g_nullBuffer ;
2017-07-10 23:30:55 +00:00
zcs - > jobs [ jobID ] . cctxPool = zcs - > cctxPool ;
2017-07-11 21:59:10 +00:00
zcs - > jobs [ jobID ] . bufPool = zcs - > bufPool ;
2017-01-25 01:41:49 +00:00
zcs - > jobs [ jobID ] . firstChunk = ( zcs - > nextJobID = = 0 ) ;
zcs - > jobs [ jobID ] . lastChunk = endFrame ;
zcs - > jobs [ jobID ] . jobCompleted = 0 ;
zcs - > jobs [ jobID ] . dstFlushed = 0 ;
zcs - > jobs [ jobID ] . jobCompleted_mutex = & zcs - > jobCompleted_mutex ;
zcs - > jobs [ jobID ] . jobCompleted_cond = & zcs - > jobCompleted_cond ;
2017-07-11 22:56:40 +00:00
if ( zcs - > params . fParams . checksumFlag )
XXH64_update ( & zcs - > xxhState , ( const char * ) zcs - > inBuff . buffer . start + zcs - > dictSize , srcSize ) ;
2017-01-25 01:41:49 +00:00
/* get a new buffer for next input */
if ( ! endFrame ) {
2017-01-25 06:32:12 +00:00
size_t const newDictSize = MIN ( srcSize + zcs - > dictSize , zcs - > targetDictSize ) ;
2017-07-11 22:17:25 +00:00
zcs - > inBuff . buffer = ZSTDMT_getBuffer ( zcs - > bufPool ) ;
2017-01-25 01:41:49 +00:00
if ( zcs - > inBuff . buffer . start = = NULL ) { /* not enough memory to allocate next input buffer */
zcs - > jobs [ jobID ] . jobCompleted = 1 ;
zcs - > nextJobID + + ;
ZSTDMT_waitForAllJobsCompleted ( zcs ) ;
ZSTDMT_releaseAllJobResources ( zcs ) ;
return ERROR ( memory_allocation ) ;
}
2017-01-25 06:32:12 +00:00
zcs - > inBuff . filled - = srcSize + zcs - > dictSize - newDictSize ;
2017-05-30 23:37:19 +00:00
memmove ( zcs - > inBuff . buffer . start ,
( const char * ) zcs - > jobs [ jobID ] . srcStart + zcs - > dictSize + srcSize - newDictSize ,
zcs - > inBuff . filled ) ;
2017-01-25 06:32:12 +00:00
zcs - > dictSize = newDictSize ;
2017-06-16 18:58:21 +00:00
} else { /* if (endFrame==1) */
2017-01-25 01:41:49 +00:00
zcs - > inBuff . buffer = g_nullBuffer ;
zcs - > inBuff . filled = 0 ;
2017-01-25 20:31:07 +00:00
zcs - > dictSize = 0 ;
2017-01-25 01:41:49 +00:00
zcs - > frameEnded = 1 ;
2017-07-10 19:29:57 +00:00
if ( zcs - > nextJobID = = 0 ) {
2017-05-30 23:37:19 +00:00
/* single chunk exception : checksum is calculated directly within worker thread */
zcs - > params . fParams . checksumFlag = 0 ;
2017-07-10 19:29:57 +00:00
} }
2017-01-25 01:41:49 +00:00
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 4 , " posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u) " ,
2017-05-30 23:37:19 +00:00
zcs - > nextJobID ,
( U32 ) zcs - > jobs [ jobID ] . srcSize ,
zcs - > jobs [ jobID ] . lastChunk ,
zcs - > doneJobID ,
zcs - > doneJobID & zcs - > jobIDMask ) ;
2017-01-25 01:41:49 +00:00
POOL_add ( zcs - > factory , ZSTDMT_compressChunk , & zcs - > jobs [ jobID ] ) ; /* this call is blocking when thread worker pool is exhausted */
zcs - > nextJobID + + ;
return 0 ;
}
2017-01-23 19:43:51 +00:00
/* ZSTDMT_flushNextJob() :
* output : will be updated with amount of data flushed .
2017-01-24 19:48:40 +00:00
* blockToFlush : if > 0 , the function will block and wait if there is no data available to flush .
2017-01-25 01:41:49 +00:00
* @ return : amount of data remaining within internal buffer , 1 if unknown but > 0 , 0 if no more , or an error code */
2017-01-23 19:43:51 +00:00
static size_t ZSTDMT_flushNextJob ( ZSTDMT_CCtx * zcs , ZSTD_outBuffer * output , unsigned blockToFlush )
{
unsigned const wJobID = zcs - > doneJobID & zcs - > jobIDMask ;
if ( zcs - > doneJobID = = zcs - > nextJobID ) return 0 ; /* all flushed ! */
PTHREAD_MUTEX_LOCK ( & zcs - > jobCompleted_mutex ) ;
while ( zcs - > jobs [ wJobID ] . jobCompleted = = 0 ) {
2017-01-24 19:48:40 +00:00
DEBUGLOG ( 5 , " waiting for jobCompleted signal from job %u " , zcs - > doneJobID ) ;
if ( ! blockToFlush ) { pthread_mutex_unlock ( & zcs - > jobCompleted_mutex ) ; return 0 ; } /* nothing ready to be flushed => skip */
pthread_cond_wait ( & zcs - > jobCompleted_cond , & zcs - > jobCompleted_mutex ) ; /* block when nothing available to flush */
2017-01-23 19:43:51 +00:00
}
pthread_mutex_unlock ( & zcs - > jobCompleted_mutex ) ;
/* compression job completed : output can be flushed */
{ ZSTDMT_jobDescription job = zcs - > jobs [ wJobID ] ;
2017-01-24 19:48:40 +00:00
if ( ! job . jobScanned ) {
if ( ZSTD_isError ( job . cSize ) ) {
DEBUGLOG ( 5 , " compression error detected " ) ;
ZSTDMT_waitForAllJobsCompleted ( zcs ) ;
ZSTDMT_releaseAllJobResources ( zcs ) ;
return job . cSize ;
}
DEBUGLOG ( 5 , " zcs->params.fParams.checksumFlag : %u " , zcs - > params . fParams . checksumFlag ) ;
if ( zcs - > params . fParams . checksumFlag ) {
if ( zcs - > frameEnded & & ( zcs - > doneJobID + 1 = = zcs - > nextJobID ) ) { /* write checksum at end of last section */
U32 const checksum = ( U32 ) XXH64_digest ( & zcs - > xxhState ) ;
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 5 , " writing checksum : %08X \n " , checksum ) ;
2017-01-24 19:48:40 +00:00
MEM_writeLE32 ( ( char * ) job . dstBuff . start + job . cSize , checksum ) ;
job . cSize + = 4 ;
zcs - > jobs [ wJobID ] . cSize + = 4 ;
} }
zcs - > jobs [ wJobID ] . jobScanned = 1 ;
}
{ size_t const toWrite = MIN ( job . cSize - job . dstFlushed , output - > size - output - > pos ) ;
2017-07-12 00:18:26 +00:00
DEBUGLOG ( 5 , " Flushing %u bytes from job %u " , ( U32 ) toWrite , zcs - > doneJobID ) ;
2017-01-24 19:48:40 +00:00
memcpy ( ( char * ) output - > dst + output - > pos , ( const char * ) job . dstBuff . start + job . dstFlushed , toWrite ) ;
output - > pos + = toWrite ;
job . dstFlushed + = toWrite ;
2017-01-23 19:43:51 +00:00
}
if ( job . dstFlushed = = job . cSize ) { /* output buffer fully flushed => move to next one */
2017-07-11 21:59:10 +00:00
ZSTDMT_releaseBuffer ( zcs - > bufPool , job . dstBuff ) ;
2017-01-23 19:43:51 +00:00
zcs - > jobs [ wJobID ] . dstBuff = g_nullBuffer ;
zcs - > jobs [ wJobID ] . jobCompleted = 0 ;
zcs - > doneJobID + + ;
} else {
zcs - > jobs [ wJobID ] . dstFlushed = job . dstFlushed ;
}
/* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
if ( job . cSize > job . dstFlushed ) return ( job . cSize - job . dstFlushed ) ;
if ( zcs - > doneJobID < zcs - > nextJobID ) return 1 ; /* still some buffer to flush */
zcs - > allJobsCompleted = zcs - > frameEnded ; /* frame completed and entirely flushed */
return 0 ; /* everything flushed */
} }
2017-06-30 21:51:01 +00:00
/** ZSTDMT_compressStream_generic() :
2017-09-28 18:46:19 +00:00
* internal use only - exposed to be invoked from zstd_compress . c
2017-06-30 21:51:01 +00:00
* assumption : output and input are valid ( pos < = size )
* @ return : minimum amount of data remaining to flush , 0 if none */
size_t ZSTDMT_compressStream_generic ( ZSTDMT_CCtx * mtctx ,
ZSTD_outBuffer * output ,
ZSTD_inBuffer * input ,
ZSTD_EndDirective endOp )
{
2017-07-11 15:54:29 +00:00
size_t const newJobThreshold = mtctx - > dictSize + mtctx - > targetSectionSize ;
2017-09-28 18:46:19 +00:00
unsigned forwardInputProgress = 0 ;
2017-06-30 21:51:01 +00:00
assert ( output - > pos < = output - > size ) ;
assert ( input - > pos < = input - > size ) ;
if ( ( mtctx - > frameEnded ) & & ( endOp = = ZSTD_e_continue ) ) {
2017-09-28 09:14:48 +00:00
/* current frame being ended. Only flush/end are allowed */
2017-06-30 21:51:01 +00:00
return ERROR ( stage_wrong ) ;
}
2017-08-25 20:14:51 +00:00
if ( mtctx - > params . nbThreads = = 1 ) { /* delegate to single-thread (synchronous) */
2017-06-30 21:51:01 +00:00
return ZSTD_compressStream_generic ( mtctx - > cctxPool - > cctx [ 0 ] , output , input , endOp ) ;
}
2017-06-30 22:44:57 +00:00
2017-09-28 18:46:19 +00:00
/* single-pass shortcut (note : synchronous-mode) */
if ( ( mtctx - > nextJobID = = 0 ) /* just started */
& & ( mtctx - > inBuff . filled = = 0 ) /* nothing buffered */
& & ( endOp = = ZSTD_e_end ) /* end order */
2017-06-30 22:44:57 +00:00
& & ( output - > size - output - > pos > = ZSTD_compressBound ( input - > size - input - > pos ) ) ) { /* enough room */
2017-08-22 21:24:47 +00:00
size_t const cSize = ZSTDMT_compress_advanced_internal ( mtctx ,
2017-06-30 21:51:01 +00:00
( char * ) output - > dst + output - > pos , output - > size - output - > pos ,
( const char * ) input - > src + input - > pos , input - > size - input - > pos ,
2017-08-25 20:14:51 +00:00
mtctx - > cdict , mtctx - > params ) ;
2017-06-30 21:51:01 +00:00
if ( ZSTD_isError ( cSize ) ) return cSize ;
input - > pos = input - > size ;
output - > pos + = cSize ;
2017-07-11 21:59:10 +00:00
ZSTDMT_releaseBuffer ( mtctx - > bufPool , mtctx - > inBuff . buffer ) ; /* was allocated in initStream */
2017-06-30 21:51:01 +00:00
mtctx - > allJobsCompleted = 1 ;
mtctx - > frameEnded = 1 ;
return 0 ;
2017-07-04 17:36:41 +00:00
}
2017-06-30 21:51:01 +00:00
/* fill input buffer */
2017-07-11 15:54:29 +00:00
if ( input - > size > input - > pos ) { /* support NULL input */
2017-07-11 00:16:41 +00:00
if ( mtctx - > inBuff . buffer . start = = NULL ) {
2017-09-28 18:46:19 +00:00
mtctx - > inBuff . buffer = ZSTDMT_getBuffer ( mtctx - > bufPool ) ; /* note : may fail, in which case, no forward input progress */
2017-07-11 00:16:41 +00:00
mtctx - > inBuff . filled = 0 ;
}
2017-09-28 18:46:19 +00:00
if ( mtctx - > inBuff . buffer . start ) {
size_t const toLoad = MIN ( input - > size - input - > pos , mtctx - > inBuffSize - mtctx - > inBuff . filled ) ;
2017-07-11 00:16:41 +00:00
DEBUGLOG ( 5 , " inBuff:%08X; inBuffSize=%u; ToCopy=%u " , ( U32 ) ( size_t ) mtctx - > inBuff . buffer . start , ( U32 ) mtctx - > inBuffSize , ( U32 ) toLoad ) ;
memcpy ( ( char * ) mtctx - > inBuff . buffer . start + mtctx - > inBuff . filled , ( const char * ) input - > src + input - > pos , toLoad ) ;
input - > pos + = toLoad ;
mtctx - > inBuff . filled + = toLoad ;
2017-09-28 18:46:19 +00:00
forwardInputProgress = toLoad > 0 ;
2017-07-11 00:16:41 +00:00
} }
2017-06-30 21:51:01 +00:00
if ( ( mtctx - > inBuff . filled > = newJobThreshold ) /* filled enough : let's compress */
& & ( mtctx - > nextJobID < = mtctx - > doneJobID + mtctx - > jobIDMask ) ) { /* avoid overwriting job round buffer */
CHECK_F ( ZSTDMT_createCompressionJob ( mtctx , mtctx - > targetSectionSize , 0 /* endFrame */ ) ) ;
}
/* check for potential compressed data ready to be flushed */
2017-09-28 18:46:19 +00:00
CHECK_F ( ZSTDMT_flushNextJob ( mtctx , output , ! forwardInputProgress /* blockToFlush */ ) ) ; /* block if there was no forward input progress */
2017-06-30 21:51:01 +00:00
if ( input - > pos < input - > size ) /* input not consumed : do not flush yet */
endOp = ZSTD_e_continue ;
switch ( endOp )
{
case ZSTD_e_flush :
return ZSTDMT_flushStream ( mtctx , output ) ;
case ZSTD_e_end :
return ZSTDMT_endStream ( mtctx , output ) ;
case ZSTD_e_continue :
return 1 ;
default :
return ERROR ( GENERIC ) ; /* invalid endDirective */
}
}
2017-01-12 00:25:46 +00:00
size_t ZSTDMT_compressStream ( ZSTDMT_CCtx * zcs , ZSTD_outBuffer * output , ZSTD_inBuffer * input )
{
2017-07-01 13:59:24 +00:00
CHECK_F ( ZSTDMT_compressStream_generic ( zcs , output , input , ZSTD_e_continue ) ) ;
2017-01-24 19:48:40 +00:00
2017-01-12 00:25:46 +00:00
/* recommended next input size : fill current input buffer */
2017-01-24 19:48:40 +00:00
return zcs - > inBuffSize - zcs - > inBuff . filled ; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
2017-01-17 23:31:16 +00:00
}
2017-01-18 00:15:18 +00:00
2017-01-17 23:31:16 +00:00
static size_t ZSTDMT_flushStream_internal ( ZSTDMT_CCtx * zcs , ZSTD_outBuffer * output , unsigned endFrame )
{
2017-01-25 20:31:07 +00:00
size_t const srcSize = zcs - > inBuff . filled - zcs - > dictSize ;
2017-01-17 23:31:16 +00:00
2017-01-19 20:12:50 +00:00
if ( ( ( srcSize > 0 ) | | ( endFrame & & ! zcs - > frameEnded ) )
& & ( zcs - > nextJobID < = zcs - > doneJobID + zcs - > jobIDMask ) ) {
2017-01-25 20:31:07 +00:00
CHECK_F ( ZSTDMT_createCompressionJob ( zcs , srcSize , endFrame ) ) ;
2017-01-17 23:31:16 +00:00
}
/* check if there is any data available to flush */
2017-06-30 21:51:01 +00:00
return ZSTDMT_flushNextJob ( zcs , output , 1 /* blockToFlush */ ) ;
2017-01-17 23:31:16 +00:00
}
size_t ZSTDMT_flushStream ( ZSTDMT_CCtx * zcs , ZSTD_outBuffer * output )
{
2017-06-20 01:25:35 +00:00
DEBUGLOG ( 5 , " ZSTDMT_flushStream " ) ;
2017-08-25 20:14:51 +00:00
if ( zcs - > params . nbThreads = = 1 )
2017-05-30 23:37:19 +00:00
return ZSTD_flushStream ( zcs - > cctxPool - > cctx [ 0 ] , output ) ;
2017-06-16 18:58:21 +00:00
return ZSTDMT_flushStream_internal ( zcs , output , 0 /* endFrame */ ) ;
2017-01-17 23:31:16 +00:00
}
size_t ZSTDMT_endStream ( ZSTDMT_CCtx * zcs , ZSTD_outBuffer * output )
{
2017-06-30 21:51:01 +00:00
DEBUGLOG ( 4 , " ZSTDMT_endStream " ) ;
2017-08-25 20:14:51 +00:00
if ( zcs - > params . nbThreads = = 1 )
2017-05-30 23:37:19 +00:00
return ZSTD_endStream ( zcs - > cctxPool - > cctx [ 0 ] , output ) ;
2017-06-16 18:58:21 +00:00
return ZSTDMT_flushStream_internal ( zcs , output , 1 /* endFrame */ ) ;
2017-01-12 00:25:46 +00:00
}