From 48294b57c359bc0cfc6560df89f34507658bdc3d Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Wed, 12 Oct 2016 15:18:16 -0700 Subject: [PATCH] [pzstd] Put ErrorHolder into SharedState --- contrib/pzstd/Pzstd.cpp | 74 +++++++++++++++++++++-------------------- contrib/pzstd/Pzstd.h | 18 ++++++---- 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index db9b8c85..70c0515b 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -59,7 +59,7 @@ static std::uint64_t handleOneInput(const Options &options, FILE* inputFd, const std::string &outputFile, FILE* outputFd, - ErrorHolder &errorHolder) { + SharedState& state) { auto inputSize = fileSizeOrZero(inputFile); // WorkQueue outlives ThreadPool so in the case of error we are certain // we don't accidently try to call push() on it after it is destroyed @@ -74,10 +74,9 @@ static std::uint64_t handleOneInput(const Options &options, if (!options.decompress) { // Add a job that reads the input and starts all the compression jobs readExecutor.add( - [&errorHolder, &outs, &executor, inputFd, inputSize, &options, - &bytesRead] { + [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] { bytesRead = asyncCompressChunks( - errorHolder, + state, outs, executor, inputFd, @@ -86,19 +85,19 @@ static std::uint64_t handleOneInput(const Options &options, options.determineParameters()); }); // Start writing - bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, + bytesWritten = writeFile(state, outs, outputFd, options.decompress, options.verbosity); } else { // Add a job that reads the input and starts all the decompression jobs - readExecutor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { - bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); + readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] { + bytesRead = asyncDecompressFrames(state, outs, executor, inputFd); }); // Start writing - bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, + bytesWritten = writeFile(state, outs, outputFd, options.decompress, options.verbosity); } } - if (options.verbosity > 1 && !errorHolder.hasError()) { + if (options.verbosity > 1 && !state.errorHolder.hasError()) { std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; if (!options.decompress) { @@ -176,53 +175,53 @@ int pzstdMain(const Options &options) { int returnCode = 0; for (const auto& input : options.inputFiles) { // Setup the error holder - ErrorHolder errorHolder; + SharedState state; auto printErrorGuard = makeScopeGuard([&] { - if (errorHolder.hasError()) { + if (state.errorHolder.hasError()) { returnCode = 1; if (options.verbosity > 0) { std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(), - errorHolder.getError().c_str()); + state.errorHolder.getError().c_str()); } } }); // Open the input file - auto inputFd = openInputFile(input, errorHolder); + auto inputFd = openInputFile(input, state.errorHolder); if (inputFd == nullptr) { continue; } auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); }); // Open the output file auto outputFile = options.getOutputFile(input); - if (!errorHolder.check(outputFile != "", + if (!state.errorHolder.check(outputFile != "", "Input file does not have extension .zst")) { continue; } - auto outputFd = openOutputFile(options, outputFile, errorHolder); + auto outputFd = openOutputFile(options, outputFile, state.errorHolder); if (outputFd == nullptr) { continue; } auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); // (de)compress the file - handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder); - if (errorHolder.hasError()) { + handleOneInput(options, input, inputFd, outputFile, outputFd, state); + if (state.errorHolder.hasError()) { continue; } // Delete the input file if necessary if (!options.keepSource) { // Be sure that we are done and have written everything before we delete - if (!errorHolder.check(std::fclose(inputFd) == 0, + if (!state.errorHolder.check(std::fclose(inputFd) == 0, "Failed to close input file")) { continue; } closeInputGuard.dismiss(); - if (!errorHolder.check(std::fclose(outputFd) == 0, + if (!state.errorHolder.check(std::fclose(outputFd) == 0, "Failed to close output file")) { continue; } closeOutputGuard.dismiss(); if (std::remove(input.c_str()) != 0) { - errorHolder.setError("Failed to remove input file"); + state.errorHolder.setError("Failed to remove input file"); continue; } } @@ -268,18 +267,19 @@ Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) { /** * Stream chunks of input from `in`, compress it, and stream it out to `out`. * - * @param errorHolder Used to report errors and check if an error occured + * @param state The shared state * @param in Queue that we `pop()` input buffers from * @param out Queue that we `push()` compressed output buffers to * @param maxInputSize An upper bound on the size of the input * @param parameters The zstd parameters to use for compression */ static void compress( - ErrorHolder& errorHolder, + SharedState& state, std::shared_ptr in, std::shared_ptr out, size_t maxInputSize, ZSTD_parameters parameters) { + auto& errorHolder = state.errorHolder; auto guard = makeScopeGuard([&] { out->finish(); }); // Initialize the CCtx std::unique_ptr ctx( @@ -395,7 +395,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd, } std::uint64_t asyncCompressChunks( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& chunks, ThreadPool& executor, FILE* fd, @@ -409,23 +409,23 @@ std::uint64_t asyncCompressChunks( // independently. size_t step = calculateStep(size, numThreads, params); auto status = FileStatus::Continue; - while (status == FileStatus::Continue && !errorHolder.hasError()) { + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { // Make a new input queue that we will put the chunk's input data into. auto in = std::make_shared(); auto inGuard = makeScopeGuard([&] { in->finish(); }); // Make a new output queue that compress will put the compressed data into. auto out = std::make_shared(); // Start compression in the thread pool - executor.add([&errorHolder, in, out, step, params] { + executor.add([&state, in, out, step, params] { return compress( - errorHolder, std::move(in), std::move(out), step, params); + state, std::move(in), std::move(out), step, params); }); // Pass the output queue to the writer thread. chunks.push(std::move(out)); // Fill the input queue for the compression job we just started status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); } - errorHolder.check(status != FileStatus::Error, "Error reading input"); + state.errorHolder.check(status != FileStatus::Error, "Error reading input"); return bytesRead; } @@ -433,15 +433,16 @@ std::uint64_t asyncCompressChunks( * Decompress a frame, whose data is streamed into `in`, and stream the output * to `out`. * - * @param errorHolder Used to report errors and check if an error occured + * @param state The shared state * @param in Queue that we `pop()` input buffers from. It contains * exactly one compressed frame. * @param out Queue that we `push()` decompressed output buffers to */ static void decompress( - ErrorHolder& errorHolder, + SharedState& state, std::shared_ptr in, std::shared_ptr out) { + auto& errorHolder = state.errorHolder; auto guard = makeScopeGuard([&] { out->finish(); }); // Initialize the DCtx std::unique_ptr ctx( @@ -508,7 +509,7 @@ static void decompress( } std::uint64_t asyncDecompressFrames( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& frames, ThreadPool& executor, FILE* fd) { @@ -521,7 +522,7 @@ std::uint64_t asyncDecompressFrames( // Otherwise, we will decompress using only one decompression task. const size_t chunkSize = ZSTD_DStreamInSize(); auto status = FileStatus::Continue; - while (status == FileStatus::Continue && !errorHolder.hasError()) { + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { // Make a new input queue that we will put the frames's bytes into. auto in = std::make_shared(); auto inGuard = makeScopeGuard([&] { in->finish(); }); @@ -550,15 +551,15 @@ std::uint64_t asyncDecompressFrames( out->setMaxSize(64); } // Start decompression in the thread pool - executor.add([&errorHolder, in, out] { - return decompress(errorHolder, std::move(in), std::move(out)); + executor.add([&state, in, out] { + return decompress(state, std::move(in), std::move(out)); }); // Pass the output queue to the writer thread frames.push(std::move(out)); if (frameSize == 0) { // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // Pass the rest of the source to this decompression task - while (status == FileStatus::Continue && !errorHolder.hasError()) { + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); } break; @@ -566,7 +567,7 @@ std::uint64_t asyncDecompressFrames( // Fill the input queue for the decompression job we just started status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); } - errorHolder.check(status != FileStatus::Error, "Error reading input"); + state.errorHolder.check(status != FileStatus::Error, "Error reading input"); return totalBytesRead; } @@ -598,11 +599,12 @@ void updateWritten(int verbosity, std::uint64_t bytesWritten) { } std::uint64_t writeFile( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& outs, FILE* outputFd, bool decompress, int verbosity) { + auto& errorHolder = state.errorHolder; auto lineClearGuard = makeScopeGuard([verbosity] { if (verbosity > 1) { std::fprintf(stderr, "\r%79s\r", ""); diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index fe44ccfd..469c20cd 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -12,6 +12,7 @@ #include "Options.h" #include "utils/Buffer.h" #include "utils/Range.h" +#include "utils/ResourcePool.h" #include "utils/ThreadPool.h" #include "utils/WorkQueue.h" #define ZSTD_STATIC_LINKING_ONLY @@ -32,12 +33,17 @@ namespace pzstd { */ int pzstdMain(const Options& options); +class SharedState { + public: + ErrorHolder errorHolder; +}; + /** * Streams input from `fd`, breaks input up into chunks, and compresses each * chunk independently. Output of each chunk gets streamed to a queue, and * the output queues get put into `chunks` in order. * - * @param errorHolder Used to report errors and coordinate early shutdown + * @param state The shared state * @param chunks Each compression jobs output queue gets `pushed()` here * as soon as it is available * @param executor The thread pool to run compression jobs in @@ -48,7 +54,7 @@ int pzstdMain(const Options& options); * @returns The number of bytes read from the file */ std::uint64_t asyncCompressChunks( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& chunks, ThreadPool& executor, FILE* fd, @@ -62,7 +68,7 @@ std::uint64_t asyncCompressChunks( * decompression job. Output of each frame gets streamed to a queue, and * the output queues get put into `frames` in order. * - * @param errorHolder Used to report errors and coordinate early shutdown + * @param state The shared state * @param frames Each decompression jobs output queue gets `pushed()` here * as soon as it is available * @param executor The thread pool to run compression jobs in @@ -70,7 +76,7 @@ std::uint64_t asyncCompressChunks( * @returns The number of bytes read from the file */ std::uint64_t asyncDecompressFrames( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& frames, ThreadPool& executor, FILE* fd); @@ -79,7 +85,7 @@ std::uint64_t asyncDecompressFrames( * Streams input in from each queue in `outs` in order, and writes the data to * `outputFd`. * - * @param errorHolder Used to report errors and coordinate early exit + * @param state The shared state * @param outs A queue of output queues, one for each * (de)compression job. * @param outputFd The file descriptor to write to @@ -88,7 +94,7 @@ std::uint64_t asyncDecompressFrames( * @returns The number of bytes written */ std::uint64_t writeFile( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& outs, FILE* outputFd, bool decompress,