From 9622fe499d0ddc867508d3724e10a0bdc6cf2dfa Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 2 Sep 2016 20:11:22 -0700 Subject: [PATCH] Fix memory usage issues. --- contrib/pzstd/Makefile | 2 ++ contrib/pzstd/Pzstd.cpp | 32 +++++++++++++++++----- contrib/pzstd/utils/WorkQueue.h | 17 ++++++++++++ contrib/pzstd/utils/test/WorkQueueTest.cpp | 21 ++++++++++++++ 4 files changed, 65 insertions(+), 7 deletions(-) diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile index c59a6d10..d71cf5b3 100644 --- a/contrib/pzstd/Makefile +++ b/contrib/pzstd/Makefile @@ -63,7 +63,9 @@ googletest: @cd googletest/build && cmake .. && make test: libzstd.a Pzstd.o Options.o SkippableFrame.o + $(MAKE) -C utils/test clean $(MAKE) -C utils/test test + $(MAKE) -C test clean $(MAKE) -C test test clean: diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 84f6a2e4..ddfa5955 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -67,11 +67,14 @@ size_t pzstdMain(const Options& options, ErrorHolder& errorHolder) { // WorkQueue outlives ThreadPool so in the case of error we are certain // we don't accidently try to call push() on it after it is destroyed. - WorkQueue> outs; + WorkQueue> outs{2 * options.numThreads}; size_t bytesWritten; { - // Initialize the thread pool with numThreads - ThreadPool executor(options.numThreads); + // Initialize the thread pool with numThreads + 1 + // We add one because the read thread spends most of its time waiting. + // This also sets the minimum number of threads to 2, so the algorithm + // doesn't deadlock. + ThreadPool executor(options.numThreads + 1); if (!options.decompress) { // Add a job that reads the input and starts all the compression jobs executor.add( @@ -229,6 +232,15 @@ calculateStep(size_t size, size_t numThreads, const ZSTD_parameters& params) { namespace { enum class FileStatus { Continue, Done, Error }; +/// Determines the status of the file descriptor `fd`. +FileStatus fileStatus(FILE* fd) { + if (std::feof(fd)) { + return FileStatus::Done; + } else if (std::ferror(fd)) { + return FileStatus::Error; + } + return FileStatus::Continue; +} } // anonymous namespace /** @@ -243,10 +255,9 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { auto bytesRead = std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); queue.push(buffer.splitAt(bytesRead)); - if (std::feof(fd)) { - return FileStatus::Done; - } else if (std::ferror(fd) || bytesRead == 0) { - return FileStatus::Error; + auto status = fileStatus(fd); + if (status != FileStatus::Continue) { + return status; } } return FileStatus::Continue; @@ -388,6 +399,7 @@ void asyncDecompressFrames( // frameSize is 0 if the frame info can't be decoded. Buffer buffer(SkippableFrame::kSize); auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); + status = fileStatus(fd); if (bytesRead == 0 && status != FileStatus::Continue) { break; } @@ -395,6 +407,12 @@ void asyncDecompressFrames( frameSize = SkippableFrame::tryRead(buffer.range()); in->push(std::move(buffer)); } + if (frameSize == 0) { + // We hit a non SkippableFrame, so this will be the last job. + // Make sure that we don't use too much memory + in->setMaxSize(64); + out->setMaxSize(64); + } // Start decompression in the thread pool executor.add([&errorHolder, in, out] { return decompress(errorHolder, std::move(in), std::move(out)); diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h index 2fa417f4..53821350 100644 --- a/contrib/pzstd/utils/WorkQueue.h +++ b/contrib/pzstd/utils/WorkQueue.h @@ -99,6 +99,19 @@ class WorkQueue { return true; } + /** + * Sets the maximum queue size. If `maxSize == 0` then it is unbounded. + * + * @param maxSize The new maximum queue size. + */ + void setMaxSize(std::size_t maxSize) { + { + std::lock_guard lock(mutex_); + maxSize_ = maxSize; + } + writerCv_.notify_all(); + } + /** * Promise that `push()` won't be called again, so once the queue is empty * there will never any more work. @@ -149,6 +162,10 @@ class BufferWorkQueue { return result; } + void setMaxSize(std::size_t maxSize) { + queue_.setMaxSize(maxSize); + } + void finish() { queue_.finish(); } diff --git a/contrib/pzstd/utils/test/WorkQueueTest.cpp b/contrib/pzstd/utils/test/WorkQueueTest.cpp index 074891fd..84d8573c 100644 --- a/contrib/pzstd/utils/test/WorkQueueTest.cpp +++ b/contrib/pzstd/utils/test/WorkQueueTest.cpp @@ -175,6 +175,27 @@ TEST(WorkQueue, BoundedSizePushAfterFinish) { pusher.join(); } +TEST(WorkQueue, SetMaxSize) { + WorkQueue queue(2); + int result; + queue.push(5); + queue.push(6); + queue.setMaxSize(1); + std::thread pusher([&queue] { + queue.push(7); + }); + // Dirtily try and make sure that pusher has run. + std::this_thread::sleep_for(std::chrono::seconds(1)); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(6, result); + EXPECT_FALSE(queue.pop(result)); + + pusher.join(); +} + TEST(WorkQueue, BoundedSizeMPMC) { WorkQueue queue(100); std::vector results(10000, -1);