brotli/python/tests/decompressor_test.py
Justin Ridgewell 5805f99a53 Ensure decompression consumes all input (#730)
* Ensure decompression consumes all input

If not, it's a corrupt stream.

* Use byte strings
2018-11-12 10:36:00 +01:00

60 lines
1.8 KiB
Python

# Copyright 2016 The Brotli Authors. All rights reserved.
#
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import functools
import unittest
from . import _test_utils
import brotli
def _get_original_name(test_data):
return test_data.split('.compressed')[0]
class TestDecompressor(_test_utils.TestCase):
CHUNK_SIZE = 1
def setUp(self):
self.decompressor = brotli.Decompressor()
def tearDown(self):
self.decompressor = None
def _check_decompression(self, test_data):
# Verify decompression matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
original = _get_original_name(test_data)
self.assertFilesMatch(temp_uncompressed, original)
def _decompress(self, test_data):
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
with open(temp_uncompressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
for data in iter(read_chunk, b''):
out_file.write(self.decompressor.process(data))
self.assertTrue(self.decompressor.is_finished())
def _test_decompress(self, test_data):
self._decompress(test_data)
self._check_decompression(test_data)
def test_garbage_appended(self):
with self.assertRaises(brotli.error):
self.decompressor.process(brotli.compress(b'a') + b'a')
def test_already_finished(self):
self.decompressor.process(brotli.compress(b'a'))
with self.assertRaises(brotli.error):
self.decompressor.process(b'a')
_test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
if __name__ == '__main__':
unittest.main()