Implementation of paralell buffer compression based on std::thread

update from pr
This commit is contained in:
Erik Zenker 2015-11-09 14:50:41 +01:00
parent c90ec29f54
commit 229ac369a6
2 changed files with 98 additions and 29 deletions

View File

@ -39,6 +39,7 @@ struct BrotliParams {
quality(11),
lgwin(22),
lgblock(0),
n_threads(1),
enable_dictionary(true),
enable_transforms(false),
greedy_block_split(false),
@ -63,6 +64,8 @@ struct BrotliParams {
// Base 2 logarithm of the maximum input block size. Range is 16 to 24.
// If set to 0, the value will be set based on the quality.
int lgblock;
// Number of threads used for parallel compression.
int n_threads;
// These settings are deprecated and will be ignored.
// All speed vs. size compromises are controlled by the quality param.

View File

@ -18,6 +18,9 @@
#include <algorithm>
#include <limits>
#include <iostream>
#include <thread>
#include <utility>
#include "./backward_references.h"
#include "./bit_cost.h"
@ -216,6 +219,28 @@ bool WriteMetaBlockParallel(const BrotliParams& params,
} // namespace
std::pair<size_t, size_t> consecutiveMapping(const size_t tid, const size_t nThreads, const size_t nElements){
const size_t elementsPerThread = static_cast<size_t>(ceil(static_cast<double>(nElements) / nThreads));
const size_t min_i = tid * elementsPerThread;
const size_t max_i = std::min(min_i + elementsPerThread, nElements);
if(tid > nElements - 1){
return std::make_pair(0,0);
}
if(min_i > nElements){
return std::make_pair(0,0);
}
return std::make_pair(min_i, max_i);
}
int BrotliCompressBufferParallel(BrotliParams params,
size_t input_size,
const uint8_t* input_buffer,
@ -236,6 +261,7 @@ int BrotliCompressBufferParallel(BrotliParams params,
} else if (params.lgwin > kMaxWindowBits) {
params.lgwin = kMaxWindowBits;
}
if (params.lgblock == 0) {
params.lgblock = 16;
if (params.quality >= 9 && params.lgwin > params.lgblock) {
@ -246,43 +272,83 @@ int BrotliCompressBufferParallel(BrotliParams params,
} else if (params.lgblock > kMaxInputBlockBits) {
params.lgblock = kMaxInputBlockBits;
}
// Calc number of blocks which will be distributed to threads
size_t max_input_block_size = 1 << params.lgblock;
size_t max_num_blocks = std::max(size_t(1), (input_size / max_input_block_size));
std::vector<std::thread> threads;
// Output of compression
std::vector<size_t> out_sizes(max_num_blocks);
std::vector<std::vector<uint8_t>> outs(max_num_blocks);
std::vector<std::vector<uint8_t> > compressed_pieces;
// Debug output
// std::cout << "max_num_blocks: " << max_num_blocks << std::endl;
// std::cout << "input_size: " << input_size << std::endl;
// std::cout << "max_input_block_size: " << max_input_block_size << std::endl;
// Compress block-by-block independently.
for (size_t pos = 0; pos < input_size; ) {
size_t input_block_size = std::min(max_input_block_size, input_size - pos);
size_t out_size = input_block_size + (input_block_size >> 3) + 1024;
std::vector<uint8_t> out(out_size);
if (!WriteMetaBlockParallel(params,
input_block_size,
&input_buffer[pos],
pos,
input_buffer,
pos == 0,
pos + input_block_size == input_size,
&out_size,
&out[0])) {
return false;
}
out.resize(out_size);
compressed_pieces.push_back(out);
pos += input_block_size;
for(size_t thread_i = 0; thread_i < params.n_threads; thread_i++){
std::thread t([&, thread_i](){
size_t min_block_i = 0;
size_t max_block_i = 0;
std::tie(min_block_i, max_block_i) = consecutiveMapping(thread_i, params.n_threads, max_num_blocks);
for(size_t block_i = min_block_i; block_i < max_block_i; ++block_i){
//std::cout << "[" << thread_i << "/"<< params.n_threads << "/" << block_i <<"] Compress block " << std::endl;
size_t input_block_size = std::min(max_input_block_size, input_size - ((block_i+1) * max_input_block_size));
size_t pos = block_i * max_input_block_size;
out_sizes[block_i] = input_block_size + (input_block_size >> 3) + 1024;
outs[block_i] = std::vector<uint8_t>(out_sizes[block_i]);
WriteMetaBlockParallel(params,
input_block_size,
&input_buffer[pos],
pos,
input_buffer,
pos == 0, // is_first
pos + input_block_size == input_size, // is_last
&out_sizes[block_i],
outs[block_i].data());
outs[block_i].resize(out_sizes[block_i]);
}
});
threads.push_back(std::move(t));
}
// Wait until all threads have finished
for(auto &t : threads){
t.join();
}
// Piece together the output.
size_t out_pos = 0;
for (size_t i = 0; i < compressed_pieces.size(); ++i) {
const std::vector<uint8_t>& out = compressed_pieces[i];
if (out_pos + out.size() > *encoded_size) {
return false;
}
memcpy(&encoded_buffer[out_pos], &out[0], out.size());
out_pos += out.size();
}
*encoded_size = out_pos;
for (int i = 0; i < outs.size(); ++i) {
if (out_pos + outs[i].size() > *encoded_size) {
// Encoded buffer is bigger than provided output buffer
std::cout << "Encoded buffer is bigger than provided output buffer" << std::endl;
return false;
}
memcpy(&encoded_buffer[out_pos], outs[i].data(), outs[i].size());
out_pos += outs[i].size();
}
*encoded_size = out_pos;
return true;
}