Python: Support streamed compression with the Compressor object (#448)

This adds `flush` and `finish` methods to the `Compressor`
object in the extension module, renames the `compress` method to
`process`, and updates that method to only process data.  Now,
one or more `process` calls followed by a `finish` call will be
equivalent to a module-level `compress` call.

Note: To maximize the compression efficiency (and match
underlying Brotli behavior, the `Compressor` object `process`
method does not guarantee all input is immediately written to
output. To ensure immediate output, call `flush` to manually
flush the compression buffer.  Extraneous flushing can increase
the size, but may be required when processing streaming data.

Progress on #191
This commit is contained in:
Alex Nicksay 2016-10-24 07:28:56 -04:00 committed by Eugene Kliuchnikov
parent 678f8627d3
commit 5632315d35
2 changed files with 136 additions and 34 deletions

View File

@ -2,7 +2,6 @@
#include <Python.h> #include <Python.h>
#include <bytesobject.h> #include <bytesobject.h>
#include <structmember.h> #include <structmember.h>
#include <cstdio>
#include <vector> #include <vector>
#include "../common/version.h" #include "../common/version.h"
#include <brotli/decode.h> #include <brotli/decode.h>
@ -88,6 +87,38 @@ static int lgblock_convertor(PyObject *o, int *lgblock) {
return 1; return 1;
} }
static BROTLI_BOOL compress_stream(BrotliEncoderState* enc, BrotliEncoderOperation op,
std::vector<uint8_t>* output, uint8_t* input, size_t input_length) {
BROTLI_BOOL ok = BROTLI_TRUE;
size_t available_in = input_length;
const uint8_t* next_in = input;
size_t available_out = 0;
uint8_t* next_out = NULL;
while (ok) {
ok = BrotliEncoderCompressStream(enc, op,
&available_in, &next_in,
&available_out, &next_out, NULL);
if (!ok)
break;
size_t buffer_length = 0; // Request all available output.
const uint8_t* buffer = BrotliEncoderTakeOutput(enc, &buffer_length);
if (buffer_length) {
(*output).insert((*output).end(), buffer, buffer + buffer_length);
}
if (available_in || BrotliEncoderHasMoreOutput(enc)) {
continue;
}
break;
}
return ok;
}
PyDoc_STRVAR(brotli_Compressor_doc, PyDoc_STRVAR(brotli_Compressor_doc,
"An object to compress a byte string.\n" "An object to compress a byte string.\n"
"\n" "\n"
@ -177,63 +208,132 @@ static int brotli_Compressor_init(brotli_Compressor *self, PyObject *args, PyObj
return 0; return 0;
} }
PyDoc_STRVAR(brotli_Compressor_compress_doc, PyDoc_STRVAR(brotli_Compressor_process_doc,
"Compress a byte string.\n" "Process \"string\" for compression, returning a string that contains \n"
"compressed output data. This data should be concatenated to the output \n"
"produced by any preceding calls to the \"process()\" or flush()\" methods. \n"
"Some or all of the input may be kept in internal buffers for later \n"
"processing, and the compressed output data may be empty until enough input \n"
"has been accumulated.\n"
"\n" "\n"
"Signature:\n" "Signature:\n"
" compress(string)\n" " compress(string)\n"
"\n" "\n"
"Args:\n" "Args:\n"
" string (bytes): The input data.\n" " string (bytes): The input data\n"
"\n" "\n"
"Returns:\n" "Returns:\n"
" The compressed byte string.\n" " The compressed output data (bytes)\n"
"\n" "\n"
"Raises:\n" "Raises:\n"
" brotli.error: If compression fails.\n"); " brotli.error: If compression fails\n");
static PyObject* brotli_Compressor_compress(brotli_Compressor *self, PyObject *args) { static PyObject* brotli_Compressor_process(brotli_Compressor *self, PyObject *args) {
PyObject* ret = NULL; PyObject* ret = NULL;
std::vector<uint8_t> output;
uint8_t* input; uint8_t* input;
uint8_t* output = NULL;
uint8_t* next_out;
const uint8_t *next_in;
size_t input_length; size_t input_length;
size_t output_length; BROTLI_BOOL ok = BROTLI_TRUE;
size_t available_in;
size_t available_out;
int ok;
ok = PyArg_ParseTuple(args, "s#:compress", &input, &input_length); ok = (BROTLI_BOOL)PyArg_ParseTuple(args, "s#:process", &input, &input_length);
if (!ok) if (!ok)
return NULL; return NULL;
output_length = input_length + (input_length >> 2) + 10240;
if (!self->enc) { if (!self->enc) {
ok = false; ok = BROTLI_FALSE;
goto end; goto end;
} }
output = new uint8_t[output_length]; ok = compress_stream(self->enc, BROTLI_OPERATION_PROCESS,
available_out = output_length; &output, input, input_length);
next_out = output;
available_in = input_length;
next_in = input;
BrotliEncoderCompressStream(self->enc, BROTLI_OPERATION_FINISH,
&available_in, &next_in,
&available_out, &next_out, 0);
ok = BrotliEncoderIsFinished(self->enc);
end: end:
if (ok) { if (ok) {
ret = PyBytes_FromStringAndSize((char*)output, output_length - available_out); ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size());
} else { } else {
PyErr_SetString(BrotliError, "BrotliCompressBuffer failed"); PyErr_SetString(BrotliError, "BrotliEncoderCompressStream failed while processing the stream");
} }
delete[] output; return ret;
}
PyDoc_STRVAR(brotli_Compressor_flush_doc,
"Process all pending input, returning a string containing the remaining\n"
"compressed data. This data should be concatenated to the output produced by\n"
"any preceding calls to the \"process()\" or \"flush()\" methods.\n"
"\n"
"Signature:\n"
" flush()\n"
"\n"
"Returns:\n"
" The compressed output data (bytes)\n"
"\n"
"Raises:\n"
" brotli.error: If compression fails\n");
static PyObject* brotli_Compressor_flush(brotli_Compressor *self) {
PyObject *ret = NULL;
std::vector<uint8_t> output;
BROTLI_BOOL ok = BROTLI_TRUE;
if (!self->enc) {
ok = BROTLI_FALSE;
goto end;
}
ok = compress_stream(self->enc, BROTLI_OPERATION_FLUSH,
&output, NULL, 0);
end:
if (ok) {
ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size());
} else {
PyErr_SetString(BrotliError, "BrotliEncoderCompressStream failed while flushing the stream");
}
return ret;
}
PyDoc_STRVAR(brotli_Compressor_finish_doc,
"Process all pending input and complete all compression, returning a string\n"
"containing the remaining compressed data. This data should be concatenated\n"
"to the output produced by any preceding calls to the \"process()\" or\n"
"\"flush()\" methods.\n"
"After calling \"finish()\", the \"process()\" and \"flush()\" methods\n"
"cannot be called again, and a new \"Compressor\" object should be created.\n"
"\n"
"Signature:\n"
" finish(string)\n"
"\n"
"Returns:\n"
" The compressed output data (bytes)\n"
"\n"
"Raises:\n"
" brotli.error: If compression fails\n");
static PyObject* brotli_Compressor_finish(brotli_Compressor *self) {
PyObject *ret = NULL;
std::vector<uint8_t> output;
BROTLI_BOOL ok = BROTLI_TRUE;
if (!self->enc) {
ok = BROTLI_FALSE;
goto end;
}
ok = compress_stream(self->enc, BROTLI_OPERATION_FINISH,
&output, NULL, 0);
if (ok) {
ok = BrotliEncoderIsFinished(self->enc);
}
end:
if (ok) {
ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size());
} else {
PyErr_SetString(BrotliError, "BrotliEncoderCompressStream failed while finishing the stream");
}
return ret; return ret;
} }
@ -243,7 +343,9 @@ static PyMemberDef brotli_Compressor_members[] = {
}; };
static PyMethodDef brotli_Compressor_methods[] = { static PyMethodDef brotli_Compressor_methods[] = {
{"compress", (PyCFunction)brotli_Compressor_compress, METH_VARARGS, brotli_Compressor_compress_doc}, {"process", (PyCFunction)brotli_Compressor_process, METH_VARARGS, brotli_Compressor_process_doc},
{"flush", (PyCFunction)brotli_Compressor_flush, METH_NOARGS, brotli_Compressor_flush_doc},
{"finish", (PyCFunction)brotli_Compressor_finish, METH_NOARGS, brotli_Compressor_finish_doc},
{NULL} /* Sentinel */ {NULL} /* Sentinel */
}; };

View File

@ -34,7 +34,7 @@ def compress(string, mode=MODE_GENERIC, quality=11, lgwin=22, lgblock=0,
Range is 16 to 24. If set to 0, the value will be set based on the Range is 16 to 24. If set to 0, the value will be set based on the
quality. Defaults to 0. quality. Defaults to 0.
dictionary (bytes, optional): Custom dictionary. Only last sliding window dictionary (bytes, optional): Custom dictionary. Only last sliding window
size bytes will be used. size bytes will be used.
Returns: Returns:
The compressed byte string. The compressed byte string.
@ -44,7 +44,7 @@ def compress(string, mode=MODE_GENERIC, quality=11, lgwin=22, lgblock=0,
""" """
compressor = _brotli.Compressor(mode=mode, quality=quality, lgwin=lgwin, compressor = _brotli.Compressor(mode=mode, quality=quality, lgwin=lgwin,
lgblock=lgblock, dictionary=dictionary) lgblock=lgblock, dictionary=dictionary)
return compressor.compress(string) return compressor.process(string) + compressor.finish()
# Decompress a compressed byte string. # Decompress a compressed byte string.
decompress = _brotli.decompress decompress = _brotli.decompress