/** * Copyright (c) 2016-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. */ #pragma once #include "ErrorHolder.h" #include "Logging.h" #include "Options.h" #include "utils/Buffer.h" #include "utils/Range.h" #include "utils/ResourcePool.h" #include "utils/ThreadPool.h" #include "utils/WorkQueue.h" #define ZSTD_STATIC_LINKING_ONLY #include "zstd.h" #undef ZSTD_STATIC_LINKING_ONLY #include #include #include namespace pzstd { /** * Runs pzstd with `options` and returns the number of bytes written. * An error occurred if `errorHandler.hasError()`. * * @param options The pzstd options to use for (de)compression * @returns 0 upon success and non-zero on failure. */ int pzstdMain(const Options& options); class SharedState { public: SharedState(const Options& options) : log(options.verbosity) { if (!options.decompress) { auto parameters = options.determineParameters(); cStreamPool.reset(new ResourcePool{ [this, parameters]() -> ZSTD_CStream* { this->log(VERBOSE, "Creating new ZSTD_CStream\n"); auto zcs = ZSTD_createCStream(); if (zcs) { auto err = ZSTD_initCStream_advanced( zcs, nullptr, 0, parameters, 0); if (ZSTD_isError(err)) { ZSTD_freeCStream(zcs); return nullptr; } } return zcs; }, [](ZSTD_CStream *zcs) { ZSTD_freeCStream(zcs); }}); } else { dStreamPool.reset(new ResourcePool{ [this]() -> ZSTD_DStream* { this->log(VERBOSE, "Creating new ZSTD_DStream\n"); auto zds = ZSTD_createDStream(); if (zds) { auto err = ZSTD_initDStream(zds); if (ZSTD_isError(err)) { ZSTD_freeDStream(zds); return nullptr; } } return zds; }, [](ZSTD_DStream *zds) { ZSTD_freeDStream(zds); }}); } } ~SharedState() { // The resource pools have references to this, so destroy them first. cStreamPool.reset(); dStreamPool.reset(); } Logger log; ErrorHolder errorHolder; std::unique_ptr> cStreamPool; std::unique_ptr> dStreamPool; }; /** * Streams input from `fd`, breaks input up into chunks, and compresses each * chunk independently. Output of each chunk gets streamed to a queue, and * the output queues get put into `chunks` in order. * * @param state The shared state * @param chunks Each compression jobs output queue gets `pushed()` here * as soon as it is available * @param executor The thread pool to run compression jobs in * @param fd The input file descriptor * @param size The size of the input file if known, 0 otherwise * @param numThreads The number of threads in the thread pool * @param parameters The zstd parameters to use for compression * @returns The number of bytes read from the file */ std::uint64_t asyncCompressChunks( SharedState& state, WorkQueue>& chunks, ThreadPool& executor, FILE* fd, std::uintmax_t size, std::size_t numThreads, ZSTD_parameters parameters); /** * Streams input from `fd`. If pzstd headers are available it breaks the input * up into independent frames. It sends each frame to an independent * decompression job. Output of each frame gets streamed to a queue, and * the output queues get put into `frames` in order. * * @param state The shared state * @param frames Each decompression jobs output queue gets `pushed()` here * as soon as it is available * @param executor The thread pool to run compression jobs in * @param fd The input file descriptor * @returns The number of bytes read from the file */ std::uint64_t asyncDecompressFrames( SharedState& state, WorkQueue>& frames, ThreadPool& executor, FILE* fd); /** * Streams input in from each queue in `outs` in order, and writes the data to * `outputFd`. * * @param state The shared state * @param outs A queue of output queues, one for each * (de)compression job. * @param outputFd The file descriptor to write to * @param decompress Are we decompressing? * @returns The number of bytes written */ std::uint64_t writeFile( SharedState& state, WorkQueue>& outs, FILE* outputFd, bool decompress); }