[wasm] Reset StreamingProcessor on error

After the first decoder error, the streaming processor should not be
called again. To enforce this, reset the {processor_} field. This also
makes the {ok_} field redundant.
Note that this refactoring is also necessary for a future CL which
makes the {StreamingProcessor} keep the {AsyncCompileJob} alive. By
resetting the processor, we also remove that link.

R=ahaas@chromium.org

Bug: v8:7921
Change-Id: I42f5ed26a8f26c3dc8db5676557a0d82021e132e
Reviewed-on: https://chromium-review.googlesource.com/c/1329179
Commit-Queue: Clemens Hammacher <clemensh@chromium.org>
Reviewed-by: Andreas Haas <ahaas@chromium.org>
Cr-Commit-Position: refs/heads/master@{#57435}
This commit is contained in:
Clemens Hammacher 2018-11-12 15:02:31 +01:00 committed by Commit Bot
parent f7be7ae096
commit 3b64603da5
4 changed files with 67 additions and 67 deletions

View File

@ -136,7 +136,9 @@ class Vector {
}
// Implicit conversion from Vector<T> to Vector<const T>.
inline operator Vector<const T>() { return Vector<const T>::cast(*this); }
inline operator Vector<const T>() const {
return Vector<const T>::cast(*this);
}
// Factory method for creating empty vectors.
static Vector<T> empty() { return Vector<T>(nullptr, 0); }
@ -147,7 +149,7 @@ class Vector {
input.length() * sizeof(S) / sizeof(T));
}
bool operator==(const Vector<T>& other) const {
bool operator==(const Vector<const T> other) const {
if (length_ != other.length_) return false;
if (start_ == other.start_) return true;
for (size_t i = 0; i < length_; ++i) {

View File

@ -62,6 +62,9 @@ size_t StreamingDecoder::DecodingState::ReadBytes(StreamingDecoder* streaming,
}
void StreamingDecoder::Finish() {
TRACE_STREAMING("Finish\n");
if (!ok()) return;
if (deserializing()) {
Vector<const uint8_t> wire_bytes(wire_bytes_for_deserializing_.data(),
wire_bytes_for_deserializing_.size());
@ -75,11 +78,6 @@ void StreamingDecoder::Finish() {
// The decoder has received all wire bytes; fall through and finish.
}
TRACE_STREAMING("Finish\n");
if (!ok()) {
return;
}
if (!state_->is_finishing_allowed()) {
// The byte stream ended too early, we report an error.
Error("unexpected end of stream");
@ -105,10 +103,9 @@ void StreamingDecoder::Finish() {
void StreamingDecoder::Abort() {
TRACE_STREAMING("Abort\n");
if (ok()) {
ok_ = false;
processor_->OnAbort();
}
if (!ok()) return; // Failed already.
processor_->OnAbort();
Fail();
}
void StreamingDecoder::SetModuleCompiledCallback(

View File

@ -80,11 +80,7 @@ class V8_EXPORT_PRIVATE StreamingDecoder {
// Notify the StreamingDecoder that compilation ended and the
// StreamingProcessor should not be called anymore.
void NotifyCompilationEnded() {
// We set {ok_} to false to turn all future calls to the StreamingDecoder
// into no-ops.
ok_ = false;
}
void NotifyCompilationEnded() { Fail(); }
// Caching support.
// Sets the callback that is called after the module is fully compiled.
@ -212,8 +208,8 @@ class V8_EXPORT_PRIVATE StreamingDecoder {
Vector<const uint8_t> length_bytes);
std::unique_ptr<DecodingState> Error(VoidResult result) {
if (ok_) processor_->OnError(std::move(result));
ok_ = false;
if (ok()) processor_->OnError(std::move(result));
Fail();
return std::unique_ptr<DecodingState>(nullptr);
}
@ -222,48 +218,52 @@ class V8_EXPORT_PRIVATE StreamingDecoder {
}
void ProcessModuleHeader() {
if (!ok_) return;
if (!processor_->ProcessModuleHeader(state_->buffer(), 0)) {
ok_ = false;
}
if (!ok()) return;
if (!processor_->ProcessModuleHeader(state_->buffer(), 0)) Fail();
}
void ProcessSection(SectionBuffer* buffer) {
if (!ok_) return;
if (!ok()) return;
if (!processor_->ProcessSection(
buffer->section_code(), buffer->payload(),
buffer->module_offset() +
static_cast<uint32_t>(buffer->payload_offset()))) {
ok_ = false;
Fail();
}
}
void StartCodeSection(size_t num_functions,
std::shared_ptr<WireBytesStorage> wire_bytes_storage) {
if (!ok_) return;
if (!ok()) return;
// The offset passed to {ProcessCodeSectionHeader} is an error offset and
// not the start offset of a buffer. Therefore we need the -1 here.
if (!processor_->ProcessCodeSectionHeader(num_functions,
module_offset() - 1,
std::move(wire_bytes_storage))) {
ok_ = false;
Fail();
}
}
void ProcessFunctionBody(Vector<const uint8_t> bytes,
uint32_t module_offset) {
if (!ok_) return;
if (!processor_->ProcessFunctionBody(bytes, module_offset)) ok_ = false;
if (!ok()) return;
if (!processor_->ProcessFunctionBody(bytes, module_offset)) Fail();
}
bool ok() const { return ok_; }
void Fail() {
// We reset the {processor_} field to represent failure. This also ensures
// that we do not accidentally call further methods on the processor after
// failure.
processor_.reset();
}
bool ok() const { return processor_ != nullptr; }
uint32_t module_offset() const { return module_offset_; }
bool deserializing() const { return !compiled_module_bytes_.is_empty(); }
std::unique_ptr<StreamingProcessor> processor_;
bool ok_ = true;
std::unique_ptr<DecodingState> state_;
std::vector<std::shared_ptr<SectionBuffer>> section_buffers_;
uint32_t module_offset_ = 0;

View File

@ -17,20 +17,32 @@ namespace v8 {
namespace internal {
namespace wasm {
struct MockStreamingResult {
size_t num_sections = 0;
size_t num_functions = 0;
bool ok = true;
OwnedVector<uint8_t> received_bytes;
MockStreamingResult() = default;
};
class MockStreamingProcessor : public StreamingProcessor {
public:
explicit MockStreamingProcessor(MockStreamingResult* result)
: result_(result) {}
bool ProcessModuleHeader(Vector<const uint8_t> bytes,
uint32_t offset) override {
// TODO(ahaas): Share code with the module-decoder.
Decoder decoder(bytes.begin(), bytes.end());
uint32_t magic_word = decoder.consume_u32("wasm magic");
if (decoder.failed() || magic_word != kWasmMagic) {
ok_ = false;
result_->ok = false;
return false;
}
uint32_t magic_version = decoder.consume_u32("wasm version");
if (decoder.failed() || magic_version != kWasmVersion) {
ok_ = false;
result_->ok = false;
return false;
}
return true;
@ -38,7 +50,7 @@ class MockStreamingProcessor : public StreamingProcessor {
// Process all sections but the code section.
bool ProcessSection(SectionCode section_code, Vector<const uint8_t> bytes,
uint32_t offset) override {
++num_sections_;
++result_->num_sections;
return true;
}
@ -50,7 +62,7 @@ class MockStreamingProcessor : public StreamingProcessor {
// Process a function body.
bool ProcessFunctionBody(Vector<const uint8_t> bytes,
uint32_t offset) override {
++num_functions_;
++result_->num_functions;
return true;
}
@ -58,11 +70,11 @@ class MockStreamingProcessor : public StreamingProcessor {
// Finish the processing of the stream.
void OnFinishedStream(OwnedVector<uint8_t> bytes) override {
received_bytes_ = std::move(bytes);
result_->received_bytes = std::move(bytes);
}
// Report an error detected in the StreamingDecoder.
void OnError(DecodeResult result) override { ok_ = false; }
void OnError(DecodeResult result) override { result_->ok = false; }
void OnAbort() override {}
@ -71,18 +83,8 @@ class MockStreamingProcessor : public StreamingProcessor {
return false;
};
size_t num_sections() const { return num_sections_; }
size_t num_functions() const { return num_functions_; }
bool ok() const { return ok_; }
Vector<const uint8_t> received_bytes() const {
return received_bytes_.as_vector();
}
private:
size_t num_sections_ = 0;
size_t num_functions_ = 0;
bool ok_ = true;
OwnedVector<uint8_t> received_bytes_;
MockStreamingResult* const result_;
};
class WasmStreamingDecoderTest : public ::testing::Test {
@ -90,50 +92,49 @@ class WasmStreamingDecoderTest : public ::testing::Test {
void ExpectVerifies(Vector<const uint8_t> data, size_t expected_sections,
size_t expected_functions) {
for (int split = 0; split <= data.length(); ++split) {
// Use a unique_ptr so that the StreamingDecoder can own the processor.
std::unique_ptr<MockStreamingProcessor> p(new MockStreamingProcessor());
MockStreamingProcessor* processor = p.get();
StreamingDecoder stream(std::move(p));
MockStreamingResult result;
StreamingDecoder stream(
base::make_unique<MockStreamingProcessor>(&result));
stream.OnBytesReceived(data.SubVector(0, split));
stream.OnBytesReceived(data.SubVector(split, data.length()));
stream.Finish();
EXPECT_TRUE(processor->ok());
EXPECT_EQ(expected_sections, processor->num_sections());
EXPECT_EQ(expected_functions, processor->num_functions());
EXPECT_EQ(data, processor->received_bytes());
EXPECT_TRUE(result.ok);
EXPECT_EQ(expected_sections, result.num_sections);
EXPECT_EQ(expected_functions, result.num_functions);
EXPECT_EQ(data, result.received_bytes.as_vector());
}
}
void ExpectFailure(Vector<const uint8_t> data) {
for (int split = 0; split <= data.length(); ++split) {
std::unique_ptr<MockStreamingProcessor> p(new MockStreamingProcessor());
MockStreamingProcessor* processor = p.get();
StreamingDecoder stream(std::move(p));
MockStreamingResult result;
StreamingDecoder stream(
base::make_unique<MockStreamingProcessor>(&result));
stream.OnBytesReceived(data.SubVector(0, split));
stream.OnBytesReceived(data.SubVector(split, data.length()));
stream.Finish();
EXPECT_FALSE(processor->ok());
EXPECT_FALSE(result.ok);
}
}
MockStreamingResult result;
};
TEST_F(WasmStreamingDecoderTest, EmptyStream) {
std::unique_ptr<MockStreamingProcessor> p(new MockStreamingProcessor());
MockStreamingProcessor* processor = p.get();
StreamingDecoder stream(std::move(p));
MockStreamingResult result;
StreamingDecoder stream(base::make_unique<MockStreamingProcessor>(&result));
stream.Finish();
EXPECT_FALSE(processor->ok());
EXPECT_FALSE(result.ok);
}
TEST_F(WasmStreamingDecoderTest, IncompleteModuleHeader) {
const uint8_t data[] = {U32_LE(kWasmMagic), U32_LE(kWasmVersion)};
{
std::unique_ptr<MockStreamingProcessor> p(new MockStreamingProcessor());
MockStreamingProcessor* processor = p.get();
StreamingDecoder stream(std::move(p));
MockStreamingResult result;
StreamingDecoder stream(base::make_unique<MockStreamingProcessor>(&result));
stream.OnBytesReceived(Vector<const uint8_t>(data, 1));
stream.Finish();
EXPECT_FALSE(processor->ok());
EXPECT_FALSE(result.ok);
}
for (int length = 1; length < static_cast<int>(arraysize(data)); ++length) {
ExpectFailure(Vector<const uint8_t>(data, length));