Merge branch 'dev' of github.com:facebook/zstd into dev

This commit is contained in:
Yann Collet 2016-09-02 22:12:01 -07:00
commit 78b102d21d
4 changed files with 65 additions and 7 deletions

View File

@ -63,7 +63,9 @@ googletest:
@cd googletest/build && cmake .. && make @cd googletest/build && cmake .. && make
test: libzstd.a Pzstd.o Options.o SkippableFrame.o test: libzstd.a Pzstd.o Options.o SkippableFrame.o
$(MAKE) -C utils/test clean
$(MAKE) -C utils/test test $(MAKE) -C utils/test test
$(MAKE) -C test clean
$(MAKE) -C test test $(MAKE) -C test test
clean: clean:

View File

@ -67,11 +67,14 @@ size_t pzstdMain(const Options& options, ErrorHolder& errorHolder) {
// WorkQueue outlives ThreadPool so in the case of error we are certain // WorkQueue outlives ThreadPool so in the case of error we are certain
// we don't accidently try to call push() on it after it is destroyed. // we don't accidently try to call push() on it after it is destroyed.
WorkQueue<std::shared_ptr<BufferWorkQueue>> outs; WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{2 * options.numThreads};
size_t bytesWritten; size_t bytesWritten;
{ {
// Initialize the thread pool with numThreads // Initialize the thread pool with numThreads + 1
ThreadPool executor(options.numThreads); // We add one because the read thread spends most of its time waiting.
// This also sets the minimum number of threads to 2, so the algorithm
// doesn't deadlock.
ThreadPool executor(options.numThreads + 1);
if (!options.decompress) { if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs // Add a job that reads the input and starts all the compression jobs
executor.add( executor.add(
@ -229,6 +232,15 @@ calculateStep(size_t size, size_t numThreads, const ZSTD_parameters& params) {
namespace { namespace {
enum class FileStatus { Continue, Done, Error }; enum class FileStatus { Continue, Done, Error };
/// Determines the status of the file descriptor `fd`.
FileStatus fileStatus(FILE* fd) {
if (std::feof(fd)) {
return FileStatus::Done;
} else if (std::ferror(fd)) {
return FileStatus::Error;
}
return FileStatus::Continue;
}
} // anonymous namespace } // anonymous namespace
/** /**
@ -243,10 +255,9 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) {
auto bytesRead = auto bytesRead =
std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
queue.push(buffer.splitAt(bytesRead)); queue.push(buffer.splitAt(bytesRead));
if (std::feof(fd)) { auto status = fileStatus(fd);
return FileStatus::Done; if (status != FileStatus::Continue) {
} else if (std::ferror(fd) || bytesRead == 0) { return status;
return FileStatus::Error;
} }
} }
return FileStatus::Continue; return FileStatus::Continue;
@ -388,6 +399,7 @@ void asyncDecompressFrames(
// frameSize is 0 if the frame info can't be decoded. // frameSize is 0 if the frame info can't be decoded.
Buffer buffer(SkippableFrame::kSize); Buffer buffer(SkippableFrame::kSize);
auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
status = fileStatus(fd);
if (bytesRead == 0 && status != FileStatus::Continue) { if (bytesRead == 0 && status != FileStatus::Continue) {
break; break;
} }
@ -395,6 +407,12 @@ void asyncDecompressFrames(
frameSize = SkippableFrame::tryRead(buffer.range()); frameSize = SkippableFrame::tryRead(buffer.range());
in->push(std::move(buffer)); in->push(std::move(buffer));
} }
if (frameSize == 0) {
// We hit a non SkippableFrame, so this will be the last job.
// Make sure that we don't use too much memory
in->setMaxSize(64);
out->setMaxSize(64);
}
// Start decompression in the thread pool // Start decompression in the thread pool
executor.add([&errorHolder, in, out] { executor.add([&errorHolder, in, out] {
return decompress(errorHolder, std::move(in), std::move(out)); return decompress(errorHolder, std::move(in), std::move(out));

View File

@ -99,6 +99,19 @@ class WorkQueue {
return true; return true;
} }
/**
* Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
*
* @param maxSize The new maximum queue size.
*/
void setMaxSize(std::size_t maxSize) {
{
std::lock_guard<std::mutex> lock(mutex_);
maxSize_ = maxSize;
}
writerCv_.notify_all();
}
/** /**
* Promise that `push()` won't be called again, so once the queue is empty * Promise that `push()` won't be called again, so once the queue is empty
* there will never any more work. * there will never any more work.
@ -149,6 +162,10 @@ class BufferWorkQueue {
return result; return result;
} }
void setMaxSize(std::size_t maxSize) {
queue_.setMaxSize(maxSize);
}
void finish() { void finish() {
queue_.finish(); queue_.finish();
} }

View File

@ -175,6 +175,27 @@ TEST(WorkQueue, BoundedSizePushAfterFinish) {
pusher.join(); pusher.join();
} }
TEST(WorkQueue, SetMaxSize) {
WorkQueue<int> queue(2);
int result;
queue.push(5);
queue.push(6);
queue.setMaxSize(1);
std::thread pusher([&queue] {
queue.push(7);
});
// Dirtily try and make sure that pusher has run.
std::this_thread::sleep_for(std::chrono::seconds(1));
queue.finish();
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(5, result);
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(6, result);
EXPECT_FALSE(queue.pop(result));
pusher.join();
}
TEST(WorkQueue, BoundedSizeMPMC) { TEST(WorkQueue, BoundedSizeMPMC) {
WorkQueue<int> queue(100); WorkQueue<int> queue(100);
std::vector<int> results(10000, -1); std::vector<int> results(10000, -1);