Python: Add unit tests for brotli.compress and brotli.decompress (#467)

Also
  - rename `test_utils` to `_test_utils`
  - refactor shared code into `_test_utils`
This commit is contained in:
Alex Nicksay 2016-11-09 06:21:13 -05:00 committed by Eugene Kliuchnikov
parent 12750768c2
commit 1e5ea6aedd
8 changed files with 282 additions and 119 deletions

100
python/tests/_test_utils.py Normal file
View File

@ -0,0 +1,100 @@
from __future__ import print_function
import filecmp
import glob
import os
import sys
import sysconfig
import unittest
def diff_q(first_file, second_file):
"""Simulate call to POSIX diff with -q argument"""
if not filecmp.cmp(first_file, second_file, shallow=False):
print(
'Files %s and %s differ' % (first_file, second_file),
file=sys.stderr)
return 1
return 0
project_dir = os.path.abspath(os.path.join(__file__, '..', '..', '..'))
PYTHON = sys.executable or 'python'
BRO = os.path.join(project_dir, 'python', 'bro.py')
# Get the platform/version-specific build folder.
# By default, the distutils build base is in the same location as setup.py.
platform_lib_name = 'lib.{platform}-{version[0]}.{version[1]}'.format(
platform=sysconfig.get_platform(), version=sys.version_info)
build_dir = os.path.join(project_dir, 'bin', platform_lib_name)
# Prepend the build folder to sys.path and the PYTHONPATH environment variable.
if build_dir not in sys.path:
sys.path.insert(0, build_dir)
TEST_ENV = os.environ.copy()
if 'PYTHONPATH' not in TEST_ENV:
TEST_ENV['PYTHONPATH'] = build_dir
else:
TEST_ENV['PYTHONPATH'] = build_dir + os.pathsep + TEST_ENV['PYTHONPATH']
TESTDATA_DIR = os.path.join(project_dir, 'tests', 'testdata')
TESTDATA_FILES = [
'empty', # Empty file
'10x10y', # Small text
'alice29.txt', # Large text
'random_org_10k.bin', # Small data
'mapsdatazrh', # Large data
]
TESTDATA_PATHS = [os.path.join(TESTDATA_DIR, f) for f in TESTDATA_FILES]
TESTDATA_PATHS_FOR_DECOMPRESSION = glob.glob(
os.path.join(TESTDATA_DIR, '*.compressed'))
def get_temp_compressed_name(filename):
return filename + '.bro'
def get_temp_uncompressed_name(filename):
return filename + '.unbro'
def bind_method_args(method, *args):
return lambda self: method(self, *args)
def generate_test_methods(test_case_class, for_decompression=False):
# Add test methods for each test data file. This makes identifying problems
# with specific compression scenarios easier.
if for_decompression:
paths = TESTDATA_PATHS_FOR_DECOMPRESSION
else:
paths = TESTDATA_PATHS
for method in [m for m in dir(test_case_class) if m.startswith('_test')]:
for testdata in paths:
f = os.path.splitext(os.path.basename(testdata))[0]
name = 'test_{method}_{file}'.format(method=method, file=f)
func = bind_method_args(getattr(test_case_class, method), testdata)
setattr(test_case_class, name, func)
class TestCase(unittest.TestCase):
def tearDown(self):
for f in TESTDATA_PATHS:
try:
os.unlink(get_temp_compressed_name(f))
except OSError:
pass
try:
os.unlink(get_temp_uncompressed_name(f))
except OSError:
pass
def assertFilesMatch(self, first, second):
self.assertTrue(
filecmp.cmp(first, second, shallow=False),
'File {} differs from {}'.format(first, second))

2
python/tests/compatibility_test.py Executable file → Normal file
View File

@ -5,7 +5,7 @@ import sys
import os
from subprocess import check_call
from test_utils import PYTHON, BRO, TEST_ENV, diff_q
from _test_utils import PYTHON, BRO, TEST_ENV, diff_q
os.chdir(os.path.abspath("../../tests"))

View File

@ -0,0 +1,114 @@
# Copyright 2016 The Brotli Authors. All rights reserved.
#
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import unittest
import _test_utils
import brotli
class TestCompress(_test_utils.TestCase):
def _check_decompression(self, test_data, **kwargs):
# Write decompression to temp file and verify it matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
original = test_data
with open(temp_uncompressed, 'wb') as out_file:
with open(temp_compressed, 'rb') as in_file:
out_file.write(brotli.decompress(in_file.read(), **kwargs))
self.assertFilesMatch(temp_uncompressed, original)
def _compress(self, test_data, **kwargs):
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
out_file.write(brotli.compress(in_file.read(), **kwargs))
def _test_compress_quality_1(self, test_data):
self._compress(test_data, quality=1)
self._check_decompression(test_data)
def _test_compress_quality_6(self, test_data):
self._compress(test_data, quality=6)
self._check_decompression(test_data)
def _test_compress_quality_9(self, test_data):
self._compress(test_data, quality=9)
self._check_decompression(test_data)
def _test_compress_quality_11(self, test_data):
self._compress(test_data, quality=11)
self._check_decompression(test_data)
def _test_compress_quality_1_lgwin_10(self, test_data):
self._compress(test_data, quality=1, lgwin=10)
self._check_decompression(test_data)
def _test_compress_quality_6_lgwin_15(self, test_data):
self._compress(test_data, quality=6, lgwin=15)
self._check_decompression(test_data)
def _test_compress_quality_9_lgwin_20(self, test_data):
self._compress(test_data, quality=9, lgwin=20)
self._check_decompression(test_data)
def _test_compress_quality_11_lgwin_24(self, test_data):
self._compress(test_data, quality=11, lgwin=24)
self._check_decompression(test_data)
def _test_compress_quality_1_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=1, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_6_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=6, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_9_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=9, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_11_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=11, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_1_lgwin_10_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=1, lgwin=10, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_6_lgwin_15_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=6, lgwin=15, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_9_lgwin_20_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=9, lgwin=20, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_11_lgwin_24_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=11, lgwin=24, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
_test_utils.generate_test_methods(TestCompress)
if __name__ == '__main__':
unittest.main()

View File

@ -3,135 +3,85 @@
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import filecmp
import functools
import os
import sys
import types
import unittest
import test_utils
import _test_utils
import brotli
TEST_DATA_FILES = [
'empty', # Empty file
'10x10y', # Small text
'alice29.txt', # Large text
'random_org_10k.bin', # Small data
'mapsdatazrh', # Large data
]
TEST_DATA_PATHS = [os.path.join(test_utils.TESTDATA_DIR, f)
for f in TEST_DATA_FILES]
def get_compressed_name(filename):
return filename + '.compressed'
def get_temp_compressed_name(filename):
return filename + '.bro'
def get_temp_uncompressed_name(filename):
return filename + '.unbro'
def bind_method_args(method, *args):
return lambda self: method(self, *args)
# Do not inherit from unittest.TestCase here to ensure that test methods
# Do not inherit from TestCase here to ensure that test methods
# are not run automatically and instead are run as part of a specific
# configuration below.
class _TestCompressor(object):
def _check_decompression_matches(self, test_data):
CHUNK_SIZE = 2048
def _check_decompression(self, test_data):
# Write decompression to temp file and verify it matches the original.
with open(get_temp_uncompressed_name(test_data), 'wb') as out_file:
with open(get_temp_compressed_name(test_data), 'rb') as in_file:
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
original = test_data
with open(temp_uncompressed, 'wb') as out_file:
with open(temp_compressed, 'rb') as in_file:
out_file.write(brotli.decompress(in_file.read()))
self.assertTrue(
filecmp.cmp(get_temp_uncompressed_name(test_data),
test_data,
shallow=False))
self.assertFilesMatch(temp_uncompressed, original)
def _test_single_process(self, test_data):
# Write single-shot compression to temp file.
with open(get_temp_compressed_name(test_data), 'wb') as out_file:
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
out_file.write(self.compressor.process(in_file.read()))
out_file.write(self.compressor.finish())
self._check_decompression_matches(test_data)
self._check_decompression(test_data)
def _test_multiple_process(self, test_data):
# Write chunked compression to temp file.
chunk_size = 2048
with open(get_temp_compressed_name(test_data), 'wb') as out_file:
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
read_chunk = functools.partial(in_file.read, chunk_size)
read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
for data in iter(read_chunk, b''):
out_file.write(self.compressor.process(data))
out_file.write(self.compressor.finish())
self._check_decompression_matches(test_data)
self._check_decompression(test_data)
def _test_multiple_process_and_flush(self, test_data):
# Write chunked and flushed compression to temp file.
chunk_size = 2048
with open(get_temp_compressed_name(test_data), 'wb') as out_file:
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
read_chunk = functools.partial(in_file.read, chunk_size)
read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
for data in iter(read_chunk, b''):
out_file.write(self.compressor.process(data))
out_file.write(self.compressor.flush())
out_file.write(self.compressor.finish())
self._check_decompression_matches(test_data)
self._check_decompression(test_data)
# Add test methods for each test data file. This makes identifying problems
# with specific compression scenarios easier.
for methodname in [m for m in dir(_TestCompressor) if m.startswith('_test')]:
for test_data in TEST_DATA_PATHS:
filename = os.path.splitext(os.path.basename(test_data))[0]
name = 'test_{method}_{file}'.format(method=methodname, file=filename)
func = bind_method_args(getattr(_TestCompressor, methodname), test_data)
setattr(_TestCompressor, name, func)
_test_utils.generate_test_methods(_TestCompressor)
class _CompressionTestCase(unittest.TestCase):
def tearDown(self):
for f in TEST_DATA_PATHS:
try:
os.unlink(get_temp_compressed_name(f))
except OSError:
pass
try:
os.unlink(get_temp_uncompressed_name(f))
except OSError:
pass
class TestCompressorQuality1(_TestCompressor, _CompressionTestCase):
class TestCompressorQuality1(_TestCompressor, _test_utils.TestCase):
def setUp(self):
self.compressor = brotli.Compressor(quality=1)
class TestCompressorQuality6(_TestCompressor, _CompressionTestCase):
class TestCompressorQuality6(_TestCompressor, _test_utils.TestCase):
def setUp(self):
self.compressor = brotli.Compressor(quality=6)
class TestCompressorQuality9(_TestCompressor, _CompressionTestCase):
class TestCompressorQuality9(_TestCompressor, _test_utils.TestCase):
def setUp(self):
self.compressor = brotli.Compressor(quality=9)
class TestCompressorQuality11(_TestCompressor, _CompressionTestCase):
class TestCompressorQuality11(_TestCompressor, _test_utils.TestCase):
def setUp(self):
self.compressor = brotli.Compressor(quality=11)

View File

@ -4,7 +4,7 @@ import sys
import os
from subprocess import check_call, Popen, PIPE
from test_utils import PYTHON, BRO, TEST_ENV, diff_q
from _test_utils import PYTHON, BRO, TEST_ENV, diff_q
INPUTS = """\

View File

@ -0,0 +1,38 @@
# Copyright 2016 The Brotli Authors. All rights reserved.
#
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import unittest
import _test_utils
import brotli
def _get_original_name(test_data):
return test_data.split('.compressed')[0]
class TestDecompress(_test_utils.TestCase):
def _check_decompression(self, test_data):
# Verify decompression matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
original = _get_original_name(test_data)
self.assertFilesMatch(temp_uncompressed, original)
def _decompress(self, test_data):
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
with open(temp_uncompressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
out_file.write(brotli.decompress(in_file.read()))
def _test_decompress(self, test_data):
self._decompress(test_data)
self._check_decompression(test_data)
_test_utils.generate_test_methods(TestDecompress, for_decompression=True)
if __name__ == '__main__':
unittest.main()

2
python/tests/roundtrip_test.py Executable file → Normal file
View File

@ -4,7 +4,7 @@ import sys
import os
from subprocess import check_call, Popen, PIPE
from test_utils import PYTHON, BRO, TEST_ENV, diff_q
from _test_utils import PYTHON, BRO, TEST_ENV, diff_q
INPUTS = """\

View File

@ -1,39 +0,0 @@
from __future__ import print_function
import sys
import os
import sysconfig
import filecmp
def diff_q(first_file, second_file):
"""Simulate call to POSIX diff with -q argument"""
if not filecmp.cmp(first_file, second_file, shallow=False):
print(
'Files %s and %s differ' % (first_file, second_file),
file=sys.stderr)
return 1
return 0
project_dir = os.path.abspath(os.path.join(__file__, '..', '..', '..'))
PYTHON = sys.executable or 'python'
BRO = os.path.join(project_dir, 'python', 'bro.py')
TESTDATA_DIR = os.path.join(project_dir, 'tests', 'testdata')
# Get the platform/version-specific build folder.
# By default, the distutils build base is in the same location as setup.py.
platform_lib_name = 'lib.{platform}-{version[0]}.{version[1]}'.format(
platform=sysconfig.get_platform(), version=sys.version_info)
build_dir = os.path.join(project_dir, 'bin', platform_lib_name)
# Prepend the build folder to sys.path and the PYTHONPATH environment variable.
if build_dir not in sys.path:
sys.path.insert(0, build_dir)
TEST_ENV = os.environ.copy()
if 'PYTHONPATH' not in TEST_ENV:
TEST_ENV['PYTHONPATH'] = build_dir
else:
TEST_ENV['PYTHONPATH'] = build_dir + os.pathsep + TEST_ENV['PYTHONPATH']