Merge pull request #434 from terrelln/dev

Pzstd Improvements
This commit is contained in:
Yann Collet 2016-10-28 20:27:59 -07:00 committed by GitHub
commit 03982b5ddb
16 changed files with 620 additions and 293 deletions

1
.gitignore vendored
View File

@ -32,3 +32,4 @@ _zstdbench/
*.swp
.DS_Store
googletest/
*.d

View File

@ -22,7 +22,7 @@ matrix:
packages:
- gcc-4.8
- g++-4.8
env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd pzstd && make -C contrib/pzstd googletest && make -C contrib/pzstd test && make -C contrib/pzstd clean"
env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest && make -C contrib/pzstd all && make -C contrib/pzstd check && make -C contrib/pzstd clean"
- os: linux
sudo: false
env: PLATFORM="Ubuntu 12.04 container" CMD="make usan"
@ -55,6 +55,20 @@ matrix:
packages:
- libc6-dev-i386
- gcc-multilib
- os: linux
sudo: required
install:
- export CXX="g++-6" CC="gcc-6"
- export LDFLAGS="-fuse-ld=gold"
- export TESTFLAGS='--gtest_filter=-*ExtremelyLarge*'
env: PLATFORM="Ubuntu 12.04" CMD='cd contrib/pzstd && make googletest && make tsan && make check && make clean && make asan && make check && make clean && cd ../..'
addons:
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- gcc-6
- g++-6
# Ubuntu 14.04 LTS Server Edition 64 bit
- os: linux
dist: trusty
@ -69,7 +83,7 @@ matrix:
sudo: required
install:
- export CXX="g++-4.8" CC="gcc-4.8"
env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd pzstd32 && make -C contrib/pzstd googletest32 && make -C contrib/pzstd test32 && make -C contrib/pzstd clean"
env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd googletest32 && make -C contrib/pzstd all32 && make -C contrib/pzstd check && make -C contrib/pzstd clean"
addons:
apt:
packages:

View File

@ -50,10 +50,9 @@ build_script:
ECHO *** &&
ECHO *** Building pzstd for %PLATFORM% &&
ECHO *** &&
ECHO make -C contrib\pzstd pzstd &&
make -C contrib\pzstd pzstd &&
make -C contrib\pzstd googletest-mingw64 &&
make -C contrib\pzstd test &&
make -C contrib\pzstd all &&
make -C contrib\pzstd check &&
make -C contrib\pzstd clean
)
- if [%COMPILER%]==[gcc] (

72
contrib/pzstd/Logging.h Normal file
View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#pragma once
#include <cstdio>
#include <mutex>
namespace pzstd {
constexpr int ERROR = 1;
constexpr int INFO = 2;
constexpr int DEBUG = 3;
constexpr int VERBOSE = 4;
class Logger {
std::mutex mutex_;
FILE* out_;
const int level_;
using Clock = std::chrono::system_clock;
Clock::time_point lastUpdate_;
std::chrono::milliseconds refreshRate_;
public:
explicit Logger(int level, FILE* out = stderr)
: out_(out), level_(level), lastUpdate_(Clock::now()),
refreshRate_(150) {}
bool logsAt(int level) {
return level <= level_;
}
template <typename... Args>
void operator()(int level, const char *fmt, Args... args) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
std::fprintf(out_, fmt, args...);
}
template <typename... Args>
void update(int level, const char *fmt, Args... args) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
auto now = Clock::now();
if (now - lastUpdate_ > refreshRate_) {
lastUpdate_ = now;
std::fprintf(out_, "\r");
std::fprintf(out_, fmt, args...);
}
}
void clear(int level) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
std::fprintf(out_, "\r%79s\r", "");
}
};
}

View File

@ -7,20 +7,71 @@
# of patent rights can be found in the PATENTS file in the same directory.
# ##########################################################################
# Standard variables for installation
DESTDIR ?=
PREFIX ?= /usr/local
BINDIR := $(DESTDIR)$(PREFIX)/bin
ZSTDDIR = ../../lib
PROGDIR = ../../programs
CPPFLAGS = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11
CXXFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
# External program to use to run tests, e.g. qemu or valgrind
TESTPROG ?=
# Flags to pass to the tests
TESTFLAGS ?=
# We use gcc/clang to generate the header dependencies of files
DEPFLAGS = -MMD -MP -MF $*.Td
POSTCOMPILE = mv -f $*.Td $*.d
# CFLAGS, CXXFLAGS, CPPFLAGS, and LDFLAGS are for the users to override
CFLAGS ?= -O3 -Wall -Wextra
CXXFLAGS ?= -O3 -Wall -Wextra -pedantic -std=c++11
CPPFLAGS ?=
LDFLAGS ?=
# Include flags
PZSTD_INC = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
GTEST_INC = -isystem googletest/googletest/include
PZSTD_CPPFLAGS = $(PZSTD_INC) $(GTEST_INC)
PZSTD_CCXXFLAGS =
PZSTD_CFLAGS = $(PZSTD_CCXXFLAGS)
PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS)
PZSTD_LDFLAGS =
EXTRA_FLAGS =
ALL_CFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CFLAGS) $(PZSTD_CFLAGS)
ALL_CXXFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CXXFLAGS) $(PZSTD_CXXFLAGS)
ALL_LDFLAGS = $(EXTRA_FLAGS) $(LDFLAGS) $(PZSTD_LDFLAGS)
ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c
ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c
ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c
ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES)
# gtest libraries need to go before "-lpthread" because they depend on it.
GTEST_LIB = -L googletest/build/googlemock/gtest
LIBS = $(GTEST_LIB) -lpthread
# Compilation commands
LD_COMMAND = $(CXX) $^ $(ALL_LDFLAGS) $(LIBS) -o $@
CC_COMMAND = $(CC) $(DEPFLAGS) $(ALL_CFLAGS) -c $< -o $@
CXX_COMMAND = $(CXX) $(DEPFLAGS) $(ALL_CXXFLAGS) -c $< -o $@
# Get a list of all zstd files so we rebuild the static library when we need to
ZSTDCOMMON_FILES := $(wildcard $(ZSTDDIR)/common/*.c) \
$(wildcard $(ZSTDDIR)/common/*.h)
ZSTDCOMP_FILES := $(wildcard $(ZSTDDIR)/compress/*.c) \
$(wildcard $(ZSTDDIR)/compress/*.h)
ZSTDDECOMP_FILES := $(wildcard $(ZSTDDIR)/decompress/*.c) \
$(wildcard $(ZSTDDIR)/decompress/*.h)
ZSTDPROG_FILES := $(wildcard $(PROGDIR)/*.c) \
$(wildcard $(PROGDIR)/*.h)
ZSTD_FILES := $(wildcard $(ZSTDDIR)/*.h) \
$(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) \
$(ZSTDPROG_FILES)
# List all the pzstd source files so we can determine their dependencies
PZSTD_SRCS := $(wildcard *.cpp)
PZSTD_TESTS := $(wildcard test/*.cpp)
UTILS_TESTS := $(wildcard utils/test/*.cpp)
ALL_SRCS := $(PZSTD_SRCS) $(PZSTD_TESTS) $(UTILS_TESTS)
# Define *.exe as extension for Windows systems
@ -30,89 +81,169 @@ else
EXT =
endif
.PHONY: default all test clean test32 googletest googletest32
# Standard targets
.PHONY: default
default: all
default: pzstd
.PHONY: check
check:
$(TESTPROG) ./utils/test/BufferTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/RangeTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/ResourcePoolTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/ScopeGuardTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/ThreadPoolTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/WorkQueueTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./test/OptionsTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./test/PzstdTest$(EXT) $(TESTFLAGS)
all: pzstd
.PHONY: install
install: PZSTD_CPPFLAGS += -DNDEBUG
install: pzstd$(EXT)
install -d -m 755 $(BINDIR)/
install -m 755 pzstd$(EXT) $(BINDIR)/pzstd$(EXT)
.PHONY: uninstall
uninstall:
$(RM) $(BINDIR)/pzstd$(EXT)
# Targets for many different builds
.PHONY: all
all: PZSTD_CPPFLAGS += -DNDEBUG
all: pzstd$(EXT) tests roundtrip
.PHONY: debug
debug: EXTRA_FLAGS += -g
debug: pzstd$(EXT) tests roundtrip
.PHONY: tsan
tsan: PZSTD_CCXXFLAGS += -fsanitize=thread -fPIC
tsan: PZSTD_LDFLAGS += -fsanitize=thread -pie
tsan: debug
.PHONY: asan
asan: EXTRA_FLAGS += -fsanitize=address
asan: debug
.PHONY: ubsan
ubsan: EXTRA_FLAGS += -fsanitize=undefined
ubsan: debug
.PHONY: all32
all32: EXTRA_FLAGS += -m32
all32: all
.PHONY: debug32
debug32: EXTRA_FLAGS += -m32
debug32: debug
.PHONY: asan32
asan32: EXTRA_FLAGS += -m32
asan32: asan
.PHONY: tsan32
tsan32: EXTRA_FLAGS += -m32
tsan32: tsan
.PHONY: ubsan32
ubsan32: EXTRA_FLAGS += -m32
ubsan32: ubsan
# Run long round trip tests
.PHONY: roundtripcheck
roundtripcheck: roundtrip check
$(TESTPROG) ./test/RoundTripTest$(EXT) $(TESTFLAGS)
# Build the main binary
pzstd$(EXT): main.o Options.o Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
$(LD_COMMAND)
# Target that depends on all the tests
.PHONY: tests
tests: EXTRA_FLAGS += -Wno-deprecated-declarations
tests: $(patsubst %,%$(EXT),$(basename $(PZSTD_TESTS) $(UTILS_TESTS)))
# Build the round trip tests
.PHONY: roundtrip
roundtrip: EXTRA_FLAGS += -Wno-deprecated-declarations
roundtrip: test/RoundTripTest$(EXT)
# Use the static library that zstd builds for simplicity and
# so we get the compiler options correct
$(ZSTDDIR)/libzstd.a: $(ZSTD_FILES)
$(MAKE) -C $(ZSTDDIR) libzstd CFLAGS="$(ALL_CFLAGS)" LDFLAGS="$(ALL_LDFLAGS)"
libzstd.a: $(ZSTD_FILES)
$(MAKE) -C $(ZSTDDIR) libzstd
@cp $(ZSTDDIR)/libzstd.a .
# Rules to build the tests
test/RoundTripTest$(EXT): test/RoundTripTest.o $(PROGDIR)/datagen.o Options.o \
Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
$(LD_COMMAND)
Pzstd.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h
$(CXX) $(FLAGS) -c Pzstd.cpp -o $@
test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main
test/%Test$(EXT): test/%Test.o $(PROGDIR)/datagen.o Options.o Pzstd.o \
SkippableFrame.o $(ZSTDDIR)/libzstd.a
$(LD_COMMAND)
SkippableFrame.o: SkippableFrame.h SkippableFrame.cpp utils/*.h
$(CXX) $(FLAGS) -c SkippableFrame.cpp -o $@
utils/test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main
utils/test/%Test$(EXT): utils/test/%Test.o
$(LD_COMMAND)
Options.o: Options.h Options.cpp
$(CXX) $(FLAGS) -c Options.cpp -o $@
main.o: main.cpp *.h utils/*.h
$(CXX) $(FLAGS) -c main.cpp -o $@
pzstd: Pzstd.o SkippableFrame.o Options.o main.o libzstd.a
$(CXX) $(FLAGS) $^ -o $@$(EXT) -lpthread
libzstd32.a: $(ZSTD_FILES)
$(MAKE) -C $(ZSTDDIR) libzstd MOREFLAGS="-m32"
@cp $(ZSTDDIR)/libzstd.a libzstd32.a
Pzstd32.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h
$(CXX) -m32 $(FLAGS) -c Pzstd.cpp -o $@
SkippableFrame32.o: SkippableFrame.h SkippableFrame.cpp utils/*.h
$(CXX) -m32 $(FLAGS) -c SkippableFrame.cpp -o $@
Options32.o: Options.h Options.cpp
$(CXX) -m32 $(FLAGS) -c Options.cpp -o $@
main32.o: main.cpp *.h utils/*.h
$(CXX) -m32 $(FLAGS) -c main.cpp -o $@
pzstd32: Pzstd32.o SkippableFrame32.o Options32.o main32.o libzstd32.a
$(CXX) -m32 $(FLAGS) $^ -o $@$(EXT) -lpthread
GTEST_CMAKEFLAGS =
# Install googletest
.PHONY: googletest
googletest: PZSTD_CCXXFLAGS += -fPIC
googletest:
@$(RM) -rf googletest
@git clone https://github.com/google/googletest
@mkdir -p googletest/build
@cd googletest/build && cmake .. && make
@cd googletest/build && cmake $(GTEST_CMAKEFLAGS) -DCMAKE_CXX_FLAGS="$(ALL_CXXFLAGS)" .. && $(MAKE)
googletest32:
@$(RM) -rf googletest
@git clone https://github.com/google/googletest
@mkdir -p googletest/build
@cd googletest/build && cmake .. -DCMAKE_CXX_FLAGS=-m32 && make
googletest-mingw64:
$(RM) -rf googletest
git clone https://github.com/google/googletest
mkdir -p googletest/build
cd googletest/build && cmake -G "MSYS Makefiles" .. && $(MAKE)
test:
$(MAKE) libzstd.a
$(MAKE) pzstd MOREFLAGS="-Wall -Wextra -pedantic -Werror"
$(MAKE) -C utils/test clean
$(MAKE) -C utils/test test MOREFLAGS="-Wall -Wextra -pedantic -Werror"
$(MAKE) -C test clean
$(MAKE) -C test test MOREFLAGS="-Wall -Wextra -pedantic -Werror"
test32:
$(MAKE) libzstd.a MOREFLAGS="-m32"
$(MAKE) pzstd MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
$(MAKE) -C utils/test clean
$(MAKE) -C utils/test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
$(MAKE) -C test clean
$(MAKE) -C test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
.PHONY: googletest32
googletest32: PZSTD_CCXXFLAGS += -m32
googletest32: googletest
.PHONY: googletest-mingw64
googletest-mingw64: GTEST_CMAKEFLAGS += -G "MSYS Makefiles"
googletest-mingw64: googletest
.PHONY: clean
clean:
$(RM) -f *.o pzstd$(EXT) *.Td *.d
$(RM) -f test/*.o test/*Test$(EXT) test/*.Td test/*.d
$(RM) -f utils/test/*.o utils/test/*Test$(EXT) utils/test/*.Td utils/test/*.d
$(RM) -f $(PROGDIR)/*.o $(PROGDIR)/*.Td $(PROGDIR)/*.d
$(MAKE) -C $(ZSTDDIR) clean
$(MAKE) -C utils/test clean
$(MAKE) -C test clean
@$(RM) -rf libzstd.a *.o pzstd$(EXT) pzstd32$(EXT)
@echo Cleaning completed
# Cancel implicit rules
%.o: %.c
%.o: %.cpp
# Object file rules
%.o: %.c
$(CC_COMMAND)
$(POSTCOMPILE)
$(PROGDIR)/%.o: $(PROGDIR)/%.c
$(CC_COMMAND)
$(POSTCOMPILE)
%.o: %.cpp
$(CXX_COMMAND)
$(POSTCOMPILE)
test/%.o: test/%.cpp
$(CXX_COMMAND)
$(POSTCOMPILE)
utils/test/%.o: utils/test/%.cpp
$(CXX_COMMAND)
$(POSTCOMPILE)
# Dependency file stuff
.PRECIOUS: %.d test/%.d utils/test/%.d
# Include rules that specify header file dependencies
-include $(patsubst %,%.d,$(basename $(ALL_SRCS)))

View File

@ -303,6 +303,12 @@ Options::Status Options::parse(int argc, const char **argv) {
} // while (*options != 0);
} // for (int i = 1; i < argc; ++i);
// Set options for test mode
if (test) {
outputFile = nullOutput;
keepSource = true;
}
// Input file defaults to standard input if not provided.
if (localInputFiles.empty()) {
localInputFiles.emplace_back(kStdIn);
@ -399,11 +405,6 @@ Options::Status Options::parse(int argc, const char **argv) {
verbosity = 1;
}
// Set options for test mode
if (test) {
outputFile = nullOutput;
keepSource = true;
}
return Status::Success;
}

View File

@ -15,6 +15,7 @@
#include "utils/WorkQueue.h"
#include <chrono>
#include <cinttypes>
#include <cstddef>
#include <cstdio>
#include <memory>
@ -58,26 +59,24 @@ static std::uint64_t handleOneInput(const Options &options,
FILE* inputFd,
const std::string &outputFile,
FILE* outputFd,
ErrorHolder &errorHolder) {
SharedState& state) {
auto inputSize = fileSizeOrZero(inputFile);
// WorkQueue outlives ThreadPool so in the case of error we are certain
// we don't accidently try to call push() on it after it is destroyed.
// we don't accidently try to call push() on it after it is destroyed
WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
std::uint64_t bytesRead;
std::uint64_t bytesWritten;
{
// Initialize the thread pool with numThreads + 1
// We add one because the read thread spends most of its time waiting.
// This also sets the minimum number of threads to 2, so the algorithm
// doesn't deadlock.
ThreadPool executor(options.numThreads + 1);
// Initialize the (de)compression thread pool with numThreads
ThreadPool executor(options.numThreads);
// Run the reader thread on an extra thread
ThreadPool readExecutor(1);
if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs
executor.add(
[&errorHolder, &outs, &executor, inputFd, inputSize, &options,
&bytesRead] {
readExecutor.add(
[&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
bytesRead = asyncCompressChunks(
errorHolder,
state,
outs,
executor,
inputFd,
@ -86,29 +85,28 @@ static std::uint64_t handleOneInput(const Options &options,
options.determineParameters());
});
// Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
options.verbosity);
bytesWritten = writeFile(state, outs, outputFd, options.decompress);
} else {
// Add a job that reads the input and starts all the decompression jobs
executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
});
// Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
options.verbosity);
bytesWritten = writeFile(state, outs, outputFd, options.decompress);
}
}
if (options.verbosity > 1 && !errorHolder.hasError()) {
if (!state.errorHolder.hasError()) {
std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
if (!options.decompress) {
double ratio = static_cast<double>(bytesWritten) /
static_cast<double>(bytesRead + !bytesRead);
std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n",
state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
" bytes, %s)\n",
inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
outputFileName.c_str());
} else {
std::fprintf(stderr, "%-20s: %llu bytes \n",
state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
inputFileName.c_str(),bytesWritten);
}
}
@ -138,7 +136,7 @@ static FILE *openInputFile(const std::string &inputFile,
static FILE *openOutputFile(const Options &options,
const std::string &outputFile,
ErrorHolder &errorHolder) {
SharedState& state) {
if (outputFile == "-") {
SET_BINARY_MODE(stdout);
return stdout;
@ -148,82 +146,78 @@ static FILE *openOutputFile(const Options &options,
auto outputFd = std::fopen(outputFile.c_str(), "rb");
if (outputFd != nullptr) {
std::fclose(outputFd);
if (options.verbosity <= 1) {
errorHolder.setError("Output file exists");
if (!state.log.logsAt(INFO)) {
state.errorHolder.setError("Output file exists");
return nullptr;
}
std::fprintf(
stderr,
state.log(
INFO,
"pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
outputFile.c_str());
int c = getchar();
if (c != 'y' && c != 'Y') {
errorHolder.setError("Not overwritten");
state.errorHolder.setError("Not overwritten");
return nullptr;
}
}
}
auto outputFd = std::fopen(outputFile.c_str(), "wb");
if (!errorHolder.check(
if (!state.errorHolder.check(
outputFd != nullptr, "Failed to open output file")) {
return 0;
return nullptr;
}
return outputFd;
}
int pzstdMain(const Options &options) {
int returnCode = 0;
SharedState state(options);
for (const auto& input : options.inputFiles) {
// Setup the error holder
ErrorHolder errorHolder;
// Setup the shared state
auto printErrorGuard = makeScopeGuard([&] {
if (errorHolder.hasError()) {
if (state.errorHolder.hasError()) {
returnCode = 1;
if (options.verbosity > 0) {
std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(),
errorHolder.getError().c_str());
}
} else {
state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
state.errorHolder.getError().c_str());
}
});
// Open the input file
auto inputFd = openInputFile(input, errorHolder);
auto inputFd = openInputFile(input, state.errorHolder);
if (inputFd == nullptr) {
continue;
}
auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
// Open the output file
auto outputFile = options.getOutputFile(input);
if (!errorHolder.check(outputFile != "",
if (!state.errorHolder.check(outputFile != "",
"Input file does not have extension .zst")) {
continue;
}
auto outputFd = openOutputFile(options, outputFile, errorHolder);
auto outputFd = openOutputFile(options, outputFile, state);
if (outputFd == nullptr) {
continue;
}
auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
// (de)compress the file
handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder);
if (errorHolder.hasError()) {
handleOneInput(options, input, inputFd, outputFile, outputFd, state);
if (state.errorHolder.hasError()) {
continue;
}
// Delete the input file if necessary
if (!options.keepSource) {
// Be sure that we are done and have written everything before we delete
if (!errorHolder.check(std::fclose(inputFd) == 0,
if (!state.errorHolder.check(std::fclose(inputFd) == 0,
"Failed to close input file")) {
continue;
}
closeInputGuard.dismiss();
if (!errorHolder.check(std::fclose(outputFd) == 0,
if (!state.errorHolder.check(std::fclose(outputFd) == 0,
"Failed to close output file")) {
continue;
}
closeOutputGuard.dismiss();
if (std::remove(input.c_str()) != 0) {
errorHolder.setError("Failed to remove input file");
state.errorHolder.setError("Failed to remove input file");
continue;
}
}
@ -269,27 +263,25 @@ Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
/**
* Stream chunks of input from `in`, compress it, and stream it out to `out`.
*
* @param errorHolder Used to report errors and check if an error occured
* @param state The shared state
* @param in Queue that we `pop()` input buffers from
* @param out Queue that we `push()` compressed output buffers to
* @param maxInputSize An upper bound on the size of the input
* @param parameters The zstd parameters to use for compression
*/
static void compress(
ErrorHolder& errorHolder,
SharedState& state,
std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out,
size_t maxInputSize,
ZSTD_parameters parameters) {
size_t maxInputSize) {
auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); });
// Initialize the CCtx
std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> ctx(
ZSTD_createCStream(), ZSTD_freeCStream);
auto ctx = state.cStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
return;
}
{
auto err = ZSTD_initCStream_advanced(ctx.get(), nullptr, 0, parameters, 0);
auto err = ZSTD_resetCStream(ctx.get(), 0);
if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
return;
}
@ -396,7 +388,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
}
std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor,
FILE* fd,
@ -410,23 +402,23 @@ std::uint64_t asyncCompressChunks(
// independently.
size_t step = calculateStep(size, numThreads, params);
auto status = FileStatus::Continue;
while (status == FileStatus::Continue && !errorHolder.hasError()) {
while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the chunk's input data into.
auto in = std::make_shared<BufferWorkQueue>();
auto inGuard = makeScopeGuard([&] { in->finish(); });
// Make a new output queue that compress will put the compressed data into.
auto out = std::make_shared<BufferWorkQueue>();
// Start compression in the thread pool
executor.add([&errorHolder, in, out, step, params] {
executor.add([&state, in, out, step] {
return compress(
errorHolder, std::move(in), std::move(out), step, params);
state, std::move(in), std::move(out), step);
});
// Pass the output queue to the writer thread.
chunks.push(std::move(out));
// Fill the input queue for the compression job we just started
status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
}
errorHolder.check(status != FileStatus::Error, "Error reading input");
state.errorHolder.check(status != FileStatus::Error, "Error reading input");
return bytesRead;
}
@ -434,24 +426,24 @@ std::uint64_t asyncCompressChunks(
* Decompress a frame, whose data is streamed into `in`, and stream the output
* to `out`.
*
* @param errorHolder Used to report errors and check if an error occured
* @param state The shared state
* @param in Queue that we `pop()` input buffers from. It contains
* exactly one compressed frame.
* @param out Queue that we `push()` decompressed output buffers to
*/
static void decompress(
ErrorHolder& errorHolder,
SharedState& state,
std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out) {
auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); });
// Initialize the DCtx
std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> ctx(
ZSTD_createDStream(), ZSTD_freeDStream);
auto ctx = state.dStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
return;
}
{
auto err = ZSTD_initDStream(ctx.get());
auto err = ZSTD_resetDStream(ctx.get());
if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
return;
}
@ -509,7 +501,7 @@ static void decompress(
}
std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor,
FILE* fd) {
@ -522,7 +514,7 @@ std::uint64_t asyncDecompressFrames(
// Otherwise, we will decompress using only one decompression task.
const size_t chunkSize = ZSTD_DStreamInSize();
auto status = FileStatus::Continue;
while (status == FileStatus::Continue && !errorHolder.hasError()) {
while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the frames's bytes into.
auto in = std::make_shared<BufferWorkQueue>();
auto inGuard = makeScopeGuard([&] { in->finish(); });
@ -551,15 +543,15 @@ std::uint64_t asyncDecompressFrames(
out->setMaxSize(64);
}
// Start decompression in the thread pool
executor.add([&errorHolder, in, out] {
return decompress(errorHolder, std::move(in), std::move(out));
executor.add([&state, in, out] {
return decompress(state, std::move(in), std::move(out));
});
// Pass the output queue to the writer thread
frames.push(std::move(out));
if (frameSize == 0) {
// We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
// Pass the rest of the source to this decompression task
while (status == FileStatus::Continue && !errorHolder.hasError()) {
while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
}
break;
@ -567,7 +559,7 @@ std::uint64_t asyncDecompressFrames(
// Fill the input queue for the decompression job we just started
status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
}
errorHolder.check(status != FileStatus::Error, "Error reading input");
state.errorHolder.check(status != FileStatus::Error, "Error reading input");
return totalBytesRead;
}
@ -582,32 +574,14 @@ static bool writeData(ByteRange data, FILE* fd) {
return true;
}
void updateWritten(int verbosity, std::uint64_t bytesWritten) {
if (verbosity <= 1) {
return;
}
using Clock = std::chrono::system_clock;
static Clock::time_point then;
constexpr std::chrono::milliseconds refreshRate{150};
auto now = Clock::now();
if (now - then > refreshRate) {
then = now;
std::fprintf(stderr, "\rWritten: %u MB ",
static_cast<std::uint32_t>(bytesWritten >> 20));
}
}
std::uint64_t writeFile(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
bool decompress,
int verbosity) {
auto lineClearGuard = makeScopeGuard([verbosity] {
if (verbosity > 1) {
std::fprintf(stderr, "\r%79s\r", "");
}
bool decompress) {
auto& errorHolder = state.errorHolder;
auto lineClearGuard = makeScopeGuard([&state] {
state.log.clear(INFO);
});
std::uint64_t bytesWritten = 0;
std::shared_ptr<BufferWorkQueue> out;
@ -633,7 +607,8 @@ std::uint64_t writeFile(
return bytesWritten;
}
bytesWritten += buffer.size();
updateWritten(verbosity, bytesWritten);
state.log.update(INFO, "Written: %u MB ",
static_cast<std::uint32_t>(bytesWritten >> 20));
}
}
return bytesWritten;

View File

@ -9,9 +9,11 @@
#pragma once
#include "ErrorHolder.h"
#include "Logging.h"
#include "Options.h"
#include "utils/Buffer.h"
#include "utils/Range.h"
#include "utils/ResourcePool.h"
#include "utils/ThreadPool.h"
#include "utils/WorkQueue.h"
#define ZSTD_STATIC_LINKING_ONLY
@ -32,12 +34,58 @@ namespace pzstd {
*/
int pzstdMain(const Options& options);
class SharedState {
public:
SharedState(const Options& options) : log(options.verbosity) {
if (!options.decompress) {
auto parameters = options.determineParameters();
cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
[parameters]() -> ZSTD_CStream* {
auto zcs = ZSTD_createCStream();
if (zcs) {
auto err = ZSTD_initCStream_advanced(
zcs, nullptr, 0, parameters, 0);
if (ZSTD_isError(err)) {
ZSTD_freeCStream(zcs);
return nullptr;
}
}
return zcs;
},
[](ZSTD_CStream *zcs) {
ZSTD_freeCStream(zcs);
}});
} else {
dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
[]() -> ZSTD_DStream* {
auto zds = ZSTD_createDStream();
if (zds) {
auto err = ZSTD_initDStream(zds);
if (ZSTD_isError(err)) {
ZSTD_freeDStream(zds);
return nullptr;
}
}
return zds;
},
[](ZSTD_DStream *zds) {
ZSTD_freeDStream(zds);
}});
}
}
Logger log;
ErrorHolder errorHolder;
std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
};
/**
* Streams input from `fd`, breaks input up into chunks, and compresses each
* chunk independently. Output of each chunk gets streamed to a queue, and
* the output queues get put into `chunks` in order.
*
* @param errorHolder Used to report errors and coordinate early shutdown
* @param state The shared state
* @param chunks Each compression jobs output queue gets `pushed()` here
* as soon as it is available
* @param executor The thread pool to run compression jobs in
@ -48,7 +96,7 @@ int pzstdMain(const Options& options);
* @returns The number of bytes read from the file
*/
std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor,
FILE* fd,
@ -62,7 +110,7 @@ std::uint64_t asyncCompressChunks(
* decompression job. Output of each frame gets streamed to a queue, and
* the output queues get put into `frames` in order.
*
* @param errorHolder Used to report errors and coordinate early shutdown
* @param state The shared state
* @param frames Each decompression jobs output queue gets `pushed()` here
* as soon as it is available
* @param executor The thread pool to run compression jobs in
@ -70,7 +118,7 @@ std::uint64_t asyncCompressChunks(
* @returns The number of bytes read from the file
*/
std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor,
FILE* fd);
@ -79,18 +127,16 @@ std::uint64_t asyncDecompressFrames(
* Streams input in from each queue in `outs` in order, and writes the data to
* `outputFd`.
*
* @param errorHolder Used to report errors and coordinate early exit
* @param state The shared state
* @param outs A queue of output queues, one for each
* (de)compression job.
* @param outputFd The file descriptor to write to
* @param decompress Are we decompressing?
* @param verbosity The verbosity level to log at
* @returns The number of bytes written
*/
std::uint64_t writeFile(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
bool decompress,
int verbosity);
bool decompress);
}

View File

@ -10,7 +10,7 @@ When decompressing files compressed with Zstandard, PZstandard does IO in one th
## Usage
PZstandard supports the same command line interface as Zstandard, but also provies the `-p` option to specify the number of threads.
PZstandard supports the same command line interface as Zstandard, but also provides the `-p` option to specify the number of threads.
Dictionary mode is not currently supported.
Basic usage

View File

@ -1,48 +0,0 @@
# ##########################################################################
# Copyright (c) 2016-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
# ##########################################################################
# Define *.exe as extension for Windows systems
ifneq (,$(filter Windows%,$(OS)))
EXT =.exe
else
EXT =
endif
PZSTDDIR = ..
PROGDIR = ../../../programs
ZSTDDIR = ../../../lib
# Set GTEST_INC and GTEST_LIB to work with your install of gtest
GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include
GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
GTEST_FLAGS = $(GTEST_INC) $(GTEST_LIB)
CPPFLAGS = -I$(PZSTDDIR) -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11 -Wno-deprecated-declarations
CXXFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
datagen.o: $(PROGDIR)/datagen.*
$(CC) $(CPPFLAGS) -O3 $(MOREFLAGS) $(LDFLAGS) -Wno-long-long -Wno-variadic-macros $(PROGDIR)/datagen.c -c -o $@
%: %.cpp *.h datagen.o
$(CXX) $(FLAGS) $@.cpp datagen.o $(PZSTDDIR)/Pzstd.o $(PZSTDDIR)/SkippableFrame.o $(PZSTDDIR)/Options.o $(PZSTDDIR)/libzstd.a -o $@$(EXT) $(GTEST_FLAGS) -lgtest -lgtest_main -lpthread
.PHONY: test clean
test: OptionsTest PzstdTest
@./OptionsTest$(EXT)
@./PzstdTest$(EXT)
roundtrip: RoundTripTest
@./RoundTripTest$(EXT)
clean:
@rm -f datagen.o OptionsTest PzstdTest RoundTripTest

View File

@ -0,0 +1,96 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#pragma once
#include <cassert>
#include <functional>
#include <memory>
#include <mutex>
#include <vector>
namespace pzstd {
/**
* An unbounded pool of resources.
* A `ResourcePool<T>` requires a factory function that takes allocates `T*` and
* a free function that frees a `T*`.
* Calling `ResourcePool::get()` will give you a new `ResourcePool::UniquePtr`
* to a `T`, and when it goes out of scope the resource will be returned to the
* pool.
* The `ResourcePool<T>` *must* survive longer than any resources it hands out.
* Remember that `ResourcePool<T>` hands out mutable `T`s, so make sure to clean
* up the resource before or after every use.
*/
template <typename T>
class ResourcePool {
public:
class Deleter;
using Factory = std::function<T*()>;
using Free = std::function<void(T*)>;
using UniquePtr = std::unique_ptr<T, Deleter>;
private:
std::mutex mutex_;
Factory factory_;
Free free_;
std::vector<T*> resources_;
unsigned inUse_;
public:
/**
* Creates a `ResourcePool`.
*
* @param factory The function to use to create new resources.
* @param free The function to use to free resources created by `factory`.
*/
ResourcePool(Factory factory, Free free)
: factory_(std::move(factory)), free_(std::move(free)), inUse_(0) {}
/**
* @returns A unique pointer to a resource. The resource is null iff
* there are no avaiable resources and `factory()` returns null.
*/
UniquePtr get() {
std::lock_guard<std::mutex> lock(mutex_);
if (!resources_.empty()) {
UniquePtr resource{resources_.back(), Deleter{*this}};
resources_.pop_back();
++inUse_;
return resource;
}
UniquePtr resource{factory_(), Deleter{*this}};
++inUse_;
return resource;
}
~ResourcePool() noexcept {
assert(inUse_ == 0);
for (const auto resource : resources_) {
free_(resource);
}
}
class Deleter {
ResourcePool *pool_;
public:
explicit Deleter(ResourcePool &pool) : pool_(&pool) {}
void operator() (T *resource) {
std::lock_guard<std::mutex> lock(pool_->mutex_);
// Make sure we don't put null resources into the pool
if (resource) {
pool_->resources_.push_back(resource);
}
assert(pool_->inUse_ > 0);
--pool_->inUse_;
}
};
};
}

View File

@ -27,7 +27,7 @@ class ThreadPool {
explicit ThreadPool(std::size_t numThreads) {
threads_.reserve(numThreads);
for (std::size_t i = 0; i < numThreads; ++i) {
threads_.emplace_back([&] {
threads_.emplace_back([this] {
std::function<void()> task;
while (tasks_.pop(task)) {
task();

View File

@ -28,6 +28,7 @@ class WorkQueue {
std::mutex mutex_;
std::condition_variable readerCv_;
std::condition_variable writerCv_;
std::condition_variable finishCv_;
std::queue<T> queue_;
bool done_;
@ -53,12 +54,13 @@ class WorkQueue {
/**
* Push an item onto the work queue. Notify a single thread that work is
* available. If `finish()` has been called, do nothing and return false.
* If `push()` returns false, then `item` has not been moved from.
*
* @param item Item to push onto the queue.
* @returns True upon success, false if `finish()` has been called. An
* item was pushed iff `push()` returns true.
*/
bool push(T item) {
bool push(T&& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
@ -124,19 +126,14 @@ class WorkQueue {
}
readerCv_.notify_all();
writerCv_.notify_all();
finishCv_.notify_all();
}
/// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
readerCv_.wait(lock);
// If we were woken by a push, we need to wake a thread waiting on pop().
if (!done_) {
lock.unlock();
readerCv_.notify_one();
lock.lock();
}
finishCv_.wait(lock);
}
}
};

View File

@ -1,42 +0,0 @@
# ##########################################################################
# Copyright (c) 2016-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
# ##########################################################################
# Define *.exe as extension for Windows systems
ifneq (,$(filter Windows%,$(OS)))
EXT =.exe
else
EXT =
endif
PZSTDDIR = ../..
# Set GTEST_INC and GTEST_LIB to work with your install of gtest
GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include
GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB)
CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11
CXXFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
%: %.cpp
$(CXX) $(FLAGS) $^ -o $@$(EXT) -lgtest -lgtest_main -lpthread
.PHONY: test clean
test: BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest
@./BufferTest$(EXT)
@./RangeTest$(EXT)
@./ScopeGuardTest$(EXT)
@./ThreadPoolTest$(EXT)
@./WorkQueueTest$(EXT)
clean:
@rm -f BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest

View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#include "utils/ResourcePool.h"
#include <gtest/gtest.h>
#include <atomic>
#include <thread>
using namespace pzstd;
TEST(ResourcePool, FullTest) {
unsigned numCreated = 0;
unsigned numDeleted = 0;
{
ResourcePool<int> pool(
[&numCreated] { ++numCreated; return new int{5}; },
[&numDeleted](int *x) { ++numDeleted; delete x; });
{
auto i = pool.get();
EXPECT_EQ(5, *i);
*i = 6;
}
{
auto i = pool.get();
EXPECT_EQ(6, *i);
auto j = pool.get();
EXPECT_EQ(5, *j);
*j = 7;
}
{
auto i = pool.get();
EXPECT_EQ(6, *i);
auto j = pool.get();
EXPECT_EQ(7, *j);
}
}
EXPECT_EQ(2, numCreated);
EXPECT_EQ(numCreated, numDeleted);
}
TEST(ResourcePool, ThreadSafe) {
std::atomic<unsigned> numCreated{0};
std::atomic<unsigned> numDeleted{0};
{
ResourcePool<int> pool(
[&numCreated] { ++numCreated; return new int{0}; },
[&numDeleted](int *x) { ++numDeleted; delete x; });
auto push = [&pool] {
for (int i = 0; i < 100; ++i) {
auto x = pool.get();
++*x;
}
};
std::thread t1{push};
std::thread t2{push};
t1.join();
t2.join();
auto x = pool.get();
auto y = pool.get();
EXPECT_EQ(200, *x + *y);
}
EXPECT_GE(2, numCreated);
EXPECT_EQ(numCreated, numDeleted);
}

View File

@ -10,6 +10,7 @@
#include "utils/WorkQueue.h"
#include <gtest/gtest.h>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
@ -64,7 +65,7 @@ TEST(WorkQueue, SPSC) {
const int max = 100;
for (int i = 0; i < 10; ++i) {
queue.push(i);
queue.push(int{i});
}
std::thread thread([ &queue, max ] {
@ -80,7 +81,7 @@ TEST(WorkQueue, SPSC) {
std::this_thread::yield();
for (int i = 10; i < max; ++i) {
queue.push(i);
queue.push(int{i});
}
queue.finish();
@ -97,7 +98,7 @@ TEST(WorkQueue, SPMC) {
}
for (int i = 0; i < 50; ++i) {
queue.push(i);
queue.push(int{i});
}
queue.finish();
@ -126,7 +127,7 @@ TEST(WorkQueue, MPMC) {
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
queue.push(i);
queue.push(int{i});
}
});
}
@ -212,7 +213,7 @@ TEST(WorkQueue, BoundedSizeMPMC) {
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
queue.push(i);
queue.push(int{i});
}
});
}
@ -231,6 +232,18 @@ TEST(WorkQueue, BoundedSizeMPMC) {
}
}
TEST(WorkQueue, FailedPush) {
WorkQueue<std::unique_ptr<int>> queue;
std::unique_ptr<int> x(new int{5});
EXPECT_TRUE(queue.push(std::move(x)));
EXPECT_EQ(nullptr, x);
queue.finish();
x.reset(new int{6});
EXPECT_FALSE(queue.push(std::move(x)));
EXPECT_NE(nullptr, x);
EXPECT_EQ(6, *x);
}
TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
{
BufferWorkQueue queue;