new streaming API (compression)

This commit is contained in:
Yann Collet 2016-08-12 01:20:36 +02:00
parent b3950eef20
commit 5a0c8e2439
2 changed files with 316 additions and 3 deletions

View File

@ -32,7 +32,7 @@
*/ */
/* ******************************************************* /*-*******************************************************
* Compiler specifics * Compiler specifics
*********************************************************/ *********************************************************/
#ifdef _MSC_VER /* Visual Studio */ #ifdef _MSC_VER /* Visual Studio */
@ -55,7 +55,7 @@
#include "mem.h" #include "mem.h"
#define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */ #define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */
#include "xxhash.h" /* XXH_reset, update, digest */ #include "xxhash.h" /* XXH_reset, update, digest */
#define FSE_STATIC_LINKING_ONLY #define FSE_STATIC_LINKING_ONLY /* FSE_encodeSymbol */
#include "fse.h" #include "fse.h"
#define HUF_STATIC_LINKING_ONLY #define HUF_STATIC_LINKING_ONLY
#include "huf.h" #include "huf.h"
@ -2758,6 +2758,274 @@ ZSTDLIB_API size_t ZSTD_compress_usingCDict(ZSTD_CCtx* cctx,
/*-====== Streaming ======-*/
typedef enum { zcss_init, zcss_load, zcss_flush, zcss_final } ZSTD_cStreamStage;
struct ZSTD_CStream_s {
ZSTD_CCtx* zc;
char* inBuff;
size_t inBuffSize;
size_t inToCompress;
size_t inBuffPos;
size_t inBuffTarget;
size_t blockSize;
char* outBuff;
size_t outBuffSize;
size_t outBuffContentSize;
size_t outBuffFlushedSize;
ZSTD_cStreamStage stage;
U32 checksum;
U32 frameEnded;
ZSTD_customMem customMem;
}; /* typedef'd to ZSTD_CStream within "zstd.h" */
ZSTD_CStream* ZSTD_createCStream(void)
{
return ZSTD_createCStream_advanced(defaultCustomMem);
}
ZSTD_CStream* ZSTD_createCStream_advanced(ZSTD_customMem customMem)
{
ZSTD_CStream* zcs;
if (!customMem.customAlloc && !customMem.customFree)
customMem = defaultCustomMem;
if (!customMem.customAlloc || !customMem.customFree)
return NULL;
zcs = (ZSTD_CStream*)customMem.customAlloc(customMem.opaque, sizeof(ZSTD_CStream));
if (zcs==NULL) return NULL;
memset(zcs, 0, sizeof(ZSTD_CStream));
memcpy(&zcs->customMem, &customMem, sizeof(ZSTD_customMem));
zcs->zc = ZSTD_createCCtx_advanced(customMem);
if (zcs->zc == NULL) { ZSTD_freeCStream(zcs); return NULL; }
return zcs;
}
size_t ZSTD_freeCStream(ZSTD_CStream* zcs)
{
if (zcs==NULL) return 0; /* support free on NULL */
ZSTD_freeCCtx(zcs->zc);
if (zcs->inBuff) zcs->customMem.customFree(zcs->customMem.opaque, zcs->inBuff);
if (zcs->outBuff) zcs->customMem.customFree(zcs->customMem.opaque, zcs->outBuff);
zcs->customMem.customFree(zcs->customMem.opaque, zcs);
return 0;
}
/* Initialization */
size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs,
const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
/* allocate buffers */
{ size_t const neededInBuffSize = (size_t)1 << params.cParams.windowLog;
if (zcs->inBuffSize < neededInBuffSize) {
zcs->inBuffSize = neededInBuffSize;
zcs->customMem.customFree(zcs->customMem.opaque, zcs->inBuff); /* should not be necessary */
zcs->inBuff = (char*)zcs->customMem.customAlloc(zcs->customMem.opaque, neededInBuffSize);
if (zcs->inBuff == NULL) return ERROR(memory_allocation);
}
zcs->blockSize = MIN(ZSTD_BLOCKSIZE_ABSOLUTEMAX, neededInBuffSize);
}
if (zcs->outBuffSize < ZSTD_compressBound(zcs->blockSize)+1) {
zcs->outBuffSize = ZSTD_compressBound(zcs->blockSize)+1;
zcs->customMem.customFree(zcs->customMem.opaque, zcs->outBuff); /* should not be necessary */
zcs->outBuff = (char*)zcs->customMem.customAlloc(zcs->customMem.opaque, zcs->outBuffSize);
if (zcs->outBuff == NULL) return ERROR(memory_allocation);
}
{ size_t const errorCode = ZSTD_compressBegin_advanced(zcs->zc, dict, dictSize, params, pledgedSrcSize);
if (ZSTD_isError(errorCode)) return errorCode; }
zcs->inToCompress = 0;
zcs->inBuffPos = 0;
zcs->inBuffTarget = zcs->blockSize;
zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0;
zcs->stage = zcss_load;
zcs->checksum = params.fParams.checksumFlag > 0;
zcs->frameEnded = 0;
return 0; /* ready to go */
}
size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel)
{
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, dictSize);
return ZSTD_initCStream_advanced(zcs, dict, dictSize, params, 0);
}
size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel)
{
return ZSTD_initCStream_usingDict(zcs, NULL, 0, compressionLevel);
}
/* Compression */
typedef enum { zsf_gather, zsf_flush, zsf_end } ZSTD_flush_e;
MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, const void* src, size_t srcSize)
{
size_t const length = MIN(dstCapacity, srcSize);
memcpy(dst, src, length);
return length;
}
static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
void* dst, size_t* dstCapacityPtr,
const void* src, size_t* srcSizePtr,
ZSTD_flush_e const flush)
{
U32 someMoreWork = 1;
const char* const istart = (const char*)src;
const char* const iend = istart + *srcSizePtr;
const char* ip = istart;
char* const ostart = (char*)dst;
char* const oend = ostart + *dstCapacityPtr;
char* op = ostart;
while (someMoreWork) {
switch(zcs->stage)
{
case zcss_init: return ERROR(init_missing); /* call ZBUFF_compressInit() first ! */
case zcss_load:
/* complete inBuffer */
{ size_t const toLoad = zcs->inBuffTarget - zcs->inBuffPos;
size_t const loaded = ZSTD_limitCopy(zcs->inBuff + zcs->inBuffPos, toLoad, ip, iend-ip);
zcs->inBuffPos += loaded;
ip += loaded;
if ( (zcs->inBuffPos==zcs->inToCompress) || (!flush && (toLoad != loaded)) ) {
someMoreWork = 0; break; /* not enough input to get a full block : stop there, wait for more */
} }
/* compress current block (note : this stage cannot be stopped in the middle) */
{ void* cDst;
size_t cSize;
size_t const iSize = zcs->inBuffPos - zcs->inToCompress;
size_t oSize = oend-op;
if (oSize >= ZSTD_compressBound(iSize))
cDst = op; /* compress directly into output buffer (avoid flush stage) */
else
cDst = zcs->outBuff, oSize = zcs->outBuffSize;
cSize = (flush == zsf_end) ?
ZSTD_compressEnd(zcs->zc, cDst, oSize, zcs->inBuff + zcs->inToCompress, iSize) :
ZSTD_compressContinue(zcs->zc, cDst, oSize, zcs->inBuff + zcs->inToCompress, iSize);
if (ZSTD_isError(cSize)) return cSize;
if (flush == zsf_end) zcs->frameEnded = 1;
/* prepare next block */
zcs->inBuffTarget = zcs->inBuffPos + zcs->blockSize;
if (zcs->inBuffTarget > zcs->inBuffSize)
zcs->inBuffPos = 0, zcs->inBuffTarget = zcs->blockSize; /* note : inBuffSize >= blockSize */
zcs->inToCompress = zcs->inBuffPos;
if (cDst == op) { op += cSize; break; } /* no need to flush */
zcs->outBuffContentSize = cSize;
zcs->outBuffFlushedSize = 0;
zcs->stage = zcss_flush; /* pass-through to flush stage */
}
case zcss_flush:
{ size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize;
size_t const flushed = ZSTD_limitCopy(op, oend-op, zcs->outBuff + zcs->outBuffFlushedSize, toFlush);
op += flushed;
zcs->outBuffFlushedSize += flushed;
if (toFlush!=flushed) { someMoreWork = 0; break; } /* dst too small to store flushed data : stop there */
zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0;
zcs->stage = zcss_load;
break;
}
case zcss_final:
someMoreWork = 0; /* do nothing */
break;
default:
return ERROR(GENERIC); /* impossible */
}
}
*srcSizePtr = ip - istart;
*dstCapacityPtr = op - ostart;
if (zcs->frameEnded) return 0;
{ size_t hintInSize = zcs->inBuffTarget - zcs->inBuffPos;
if (hintInSize==0) hintInSize = zcs->blockSize;
return hintInSize;
}
}
size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_wCursor* output, ZSTD_rCursor* input)
{
size_t sizeRead = input->size;
size_t sizeWritten = output->size;
size_t const result = ZSTD_compressStream_generic(zcs, output->ptr, &sizeWritten, input->ptr, &sizeRead, zsf_gather);
input->ptr = (const char*)(input->ptr) + sizeRead;
input->size -= sizeRead;
output->ptr = (char*)(output->ptr) + sizeWritten;
output->size -= sizeWritten;
output->nbBytesWritten += sizeWritten;
return result;
}
/* Finalize */
/*! ZSTD_flushStream() :
* @return : amount of data remaining to flush */
size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_wCursor* output)
{
size_t srcSize = 0;
size_t sizeWritten = output->size;
size_t const result = ZSTD_compressStream_generic(zcs, output->ptr, &sizeWritten, &srcSize, &srcSize, zsf_flush); /* use a valid src address instead of NULL */
output->ptr = (char*)(output->ptr) + sizeWritten;
output->size -= sizeWritten;
output->nbBytesWritten += sizeWritten;
if (ZSTD_isError(result)) return result;
return zcs->outBuffContentSize - zcs->outBuffFlushedSize; /* remaining to flush */
}
size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_wCursor* output)
{
BYTE* const ostart = (BYTE*)(output->ptr);
BYTE* const oend = ostart + output->size;
BYTE* op = ostart;
if (zcs->stage != zcss_final) {
/* flush whatever remains */
size_t srcSize = 0;
size_t sizeWritten = output->size;
size_t const notEnded = ZSTD_compressStream_generic(zcs, ostart, &sizeWritten, &srcSize, &srcSize, zsf_end); /* use a valid src address instead of NULL */
size_t const remainingToFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize;
op += sizeWritten;
if (remainingToFlush) {
output->ptr = op;
output->size -= sizeWritten;
output->nbBytesWritten += sizeWritten;
return remainingToFlush + ZSTD_BLOCKHEADERSIZE /* final empty block */ + (zcs->checksum * 4);
}
/* create epilogue */
zcs->stage = zcss_final;
zcs->outBuffContentSize = !notEnded ? 0 :
ZSTD_compressEnd(zcs->zc, zcs->outBuff, zcs->outBuffSize, NULL, 0); /* write epilogue, including final empty block, into outBuff */
}
/* flush epilogue */
{ size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize;
size_t const flushed = ZSTD_limitCopy(op, oend-op, zcs->outBuff + zcs->outBuffFlushedSize, toFlush);
op += flushed;
zcs->outBuffFlushedSize += flushed;
output->ptr = op;
output->size = oend-op;
output->nbBytesWritten += op-ostart;
if (toFlush==flushed) zcs->stage = zcss_init; /* end reached */
return toFlush - flushed;
}
}
/*-===== Pre-defined compression levels =====-*/ /*-===== Pre-defined compression levels =====-*/
#define ZSTD_DEFAULT_CLEVEL 1 #define ZSTD_DEFAULT_CLEVEL 1

View File

@ -323,7 +323,52 @@ ZSTDLIB_API size_t ZSTD_sizeofDCtx(const ZSTD_DCtx* dctx);
/* ****************************************************************** /* ******************************************************************
* Buffer-less streaming functions (synchronous mode) * Streaming
********************************************************************/
typedef struct ZSTD_readCursor_s {
const void* ptr; /* position of cursor - update to new position */
size_t size; /* remaining buffer size to read - update preserves end of buffer */
} ZSTD_rCursor;
typedef struct ZSTD_writeCursor_s {
void* ptr; /* position of cursor - update to new position */
size_t size; /* remaining buffer size to write - update preserves end of buffer */
size_t nbBytesWritten; /* already written bytes - update adds bytes newly written (accumulator) */
} ZSTD_wCursor;
/*====== compression ======*/
typedef struct ZSTD_CStream_s ZSTD_CStream;
ZSTD_CStream* ZSTD_createCStream(void);
size_t ZSTD_freeCStream(ZSTD_CStream* zcs);
size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel);
size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_wCursor* output, ZSTD_rCursor* input);
size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_wCursor* output);
size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_wCursor* output);
/* advanced */
ZSTD_CStream* ZSTD_createCStream_advanced(ZSTD_customMem customMem);
size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel);
size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize);
/*====== decompression ======*/
typedef struct ZSTD_DStream_s ZSTD_DStream;
ZSTD_DStream* ZSTD_createDStream(void);
size_t ZSTD_freeDStream(ZSTD_DStream* zds);
size_t ZSTD_initDStream(ZSTD_DStream* zds);
size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_wCursor* output, ZSTD_rCursor* input);
/* ******************************************************************
* Buffer-less and synchronous inner streaming functions
********************************************************************/ ********************************************************************/
/* This is an advanced API, giving full control over buffer management, for users which need direct control over memory. /* This is an advanced API, giving full control over buffer management, for users which need direct control over memory.
* But it's also a complex one, with a lot of restrictions (documented below). * But it's also a complex one, with a lot of restrictions (documented below).