Python: Convert bro.py tests to unittest style (#478)

* Create unittest-style tests for `bro.py` decompression and compression
* Delete old tests for `bro.py`
* Update test method generation to properly create a Cartesian product
  of iterables using `itertools.product`
This commit is contained in:
Alex Nicksay 2016-12-09 07:44:05 -05:00 committed by Eugene Kliuchnikov
parent 50bc3a7145
commit 4a60128c13
6 changed files with 159 additions and 213 deletions

View File

@ -1,22 +1,13 @@
from __future__ import print_function
import filecmp
import glob
import itertools
import os
import sys
import sysconfig
import unittest
def diff_q(first_file, second_file):
"""Simulate call to POSIX diff with -q argument"""
if not filecmp.cmp(first_file, second_file, shallow=False):
print(
'Files %s and %s differ' % (first_file, second_file),
file=sys.stderr)
return 1
return 0
project_dir = os.path.abspath(os.path.join(__file__, '..', '..', '..'))
PYTHON = sys.executable or 'python'
@ -62,23 +53,39 @@ def get_temp_uncompressed_name(filename):
return filename + '.unbro'
def bind_method_args(method, *args):
return lambda self: method(self, *args)
def bind_method_args(method, *args, **kwargs):
return lambda self: method(self, *args, **kwargs)
def generate_test_methods(test_case_class, for_decompression=False):
def generate_test_methods(test_case_class,
for_decompression=False,
variants=None):
# Add test methods for each test data file. This makes identifying problems
# with specific compression scenarios easier.
if for_decompression:
paths = TESTDATA_PATHS_FOR_DECOMPRESSION
else:
paths = TESTDATA_PATHS
opts = []
if variants:
opts_list = []
for k, v in variants.items():
opts_list.append([r for r in itertools.product([k], v)])
for o in itertools.product(*opts_list):
opts_name = '_'.join([str(i) for i in itertools.chain(*o)])
opts_dict = dict(o)
opts.append([opts_name, opts_dict])
else:
opts.append(['', {}])
for method in [m for m in dir(test_case_class) if m.startswith('_test')]:
for testdata in paths:
f = os.path.splitext(os.path.basename(testdata))[0]
name = 'test_{method}_{file}'.format(method=method, file=f)
func = bind_method_args(getattr(test_case_class, method), testdata)
setattr(test_case_class, name, func)
for (opts_name, opts_dict) in opts:
f = os.path.splitext(os.path.basename(testdata))[0]
name = 'test_{method}_{options}_{file}'.format(
method=method, options=opts_name, file=f)
func = bind_method_args(
getattr(test_case_class, method), testdata, **opts_dict)
setattr(test_case_class, name, func)
class TestCase(unittest.TestCase):

120
python/tests/bro_test.py Normal file
View File

@ -0,0 +1,120 @@
# Copyright 2016 The Brotli Authors. All rights reserved.
#
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import subprocess
import unittest
import _test_utils
import brotli
PYTHON = _test_utils.PYTHON
BRO = _test_utils.BRO
TEST_ENV = _test_utils.TEST_ENV
def _get_original_name(test_data):
return test_data.split('.compressed')[0]
class TestBroDecompress(_test_utils.TestCase):
def _check_decompression(self, test_data):
# Verify decompression matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
original = _get_original_name(test_data)
self.assertFilesMatch(temp_uncompressed, original)
def _decompress_file(self, test_data):
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
args = [PYTHON, BRO, '-f', '-d', '-i', test_data, '-o',
temp_uncompressed]
subprocess.check_call(args, env=TEST_ENV)
def _decompress_pipe(self, test_data):
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
args = [PYTHON, BRO, '-d']
with open(temp_uncompressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
subprocess.check_call(
args, stdin=in_file, stdout=out_file, env=TEST_ENV)
def _test_decompress_file(self, test_data):
self._decompress_file(test_data)
self._check_decompression(test_data)
def _test_decompress_pipe(self, test_data):
self._decompress_pipe(test_data)
self._check_decompression(test_data)
_test_utils.generate_test_methods(TestBroDecompress, for_decompression=True)
class TestBroCompress(_test_utils.TestCase):
VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)}
def _check_decompression(self, test_data, **kwargs):
# Write decompression to temp file and verify it matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
original = test_data
args = [PYTHON, BRO, '-f', '-d']
if 'dictionary' in kwargs:
args.extend(['--custom-dictionary', str(kwargs['dictionary'])])
args.extend(['-i', temp_compressed, '-o', temp_uncompressed])
subprocess.check_call(args, env=TEST_ENV)
self.assertFilesMatch(temp_uncompressed, original)
def _compress_file(self, test_data, **kwargs):
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
args = [PYTHON, BRO, '-f']
if 'quality' in kwargs:
args.extend(['-q', str(kwargs['quality'])])
if 'lgwin' in kwargs:
args.extend(['--lgwin', str(kwargs['lgwin'])])
if 'dictionary' in kwargs:
args.extend(['--custom-dictionary', str(kwargs['dictionary'])])
args.extend(['-i', test_data, '-o', temp_compressed])
subprocess.check_call(args, env=TEST_ENV)
def _compress_pipe(self, test_data, **kwargs):
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
args = [PYTHON, BRO]
if 'quality' in kwargs:
args.extend(['-q', str(kwargs['quality'])])
if 'lgwin' in kwargs:
args.extend(['--lgwin', str(kwargs['lgwin'])])
if 'dictionary' in kwargs:
args.extend(['--custom-dictionary', str(kwargs['dictionary'])])
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
subprocess.check_call(
args, stdin=in_file, stdout=out_file, env=TEST_ENV)
def _test_compress_file(self, test_data, **kwargs):
self._compress_file(test_data, **kwargs)
self._check_decompression(test_data)
def _test_compress_pipe(self, test_data, **kwargs):
self._compress_pipe(test_data, **kwargs)
self._check_decompression(test_data)
def _test_compress_file_custom_dictionary(self, test_data, **kwargs):
kwargs['dictionary'] = test_data
self._compress_file(test_data, **kwargs)
self._check_decompression(test_data, **kwargs)
def _test_compress_pipe_custom_dictionary(self, test_data, **kwargs):
kwargs['dictionary'] = test_data
self._compress_pipe(test_data, **kwargs)
self._check_decompression(test_data, **kwargs)
_test_utils.generate_test_methods(
TestBroCompress, variants=TestBroCompress.VARIANTS)
if __name__ == '__main__':
unittest.main()

View File

@ -1,30 +0,0 @@
#!/usr/bin/env python
from __future__ import print_function
import glob
import sys
import os
from subprocess import check_call
from _test_utils import PYTHON, BRO, TEST_ENV, diff_q
os.chdir(os.path.abspath("../../tests"))
for filename in glob.glob("testdata/*.compressed*"):
filename = os.path.abspath(filename)
print('Testing decompression of file "%s"' % os.path.basename(filename))
expected = filename.split(".compressed")[0]
uncompressed = expected + ".uncompressed"
check_call([PYTHON, BRO, "-f", "-d", "-i", filename, "-o", uncompressed],
env=TEST_ENV)
if diff_q(uncompressed, expected) != 0:
sys.exit(1)
# Test the streaming version
with open(filename, "rb") as infile, open(uncompressed, "wb") as outfile:
check_call([PYTHON, BRO, '-d'], stdin=infile, stdout=outfile,
env=TEST_ENV)
if diff_q(uncompressed, expected) != 0:
sys.exit(1)
try:
os.unlink(uncompressed)
except OSError:
pass

View File

@ -11,7 +11,14 @@ import brotli
class TestCompress(_test_utils.TestCase):
VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)}
def _check_decompression(self, test_data, **kwargs):
# Only dictionary is supported as a kwarg to brotli.decompress.
if 'dictionary' in kwargs:
kwargs = {'dictionary': kwargs['dictionary']}
else:
kwargs = {}
# Write decompression to temp file and verify it matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
@ -27,88 +34,19 @@ class TestCompress(_test_utils.TestCase):
with open(test_data, 'rb') as in_file:
out_file.write(brotli.compress(in_file.read(), **kwargs))
def _test_compress_quality_1(self, test_data):
self._compress(test_data, quality=1)
self._check_decompression(test_data)
def _test_compress(self, test_data, **kwargs):
self._compress(test_data, **kwargs)
self._check_decompression(test_data, **kwargs)
def _test_compress_quality_6(self, test_data):
self._compress(test_data, quality=6)
self._check_decompression(test_data)
def _test_compress_quality_9(self, test_data):
self._compress(test_data, quality=9)
self._check_decompression(test_data)
def _test_compress_quality_11(self, test_data):
self._compress(test_data, quality=11)
self._check_decompression(test_data)
def _test_compress_quality_1_lgwin_10(self, test_data):
self._compress(test_data, quality=1, lgwin=10)
self._check_decompression(test_data)
def _test_compress_quality_6_lgwin_15(self, test_data):
self._compress(test_data, quality=6, lgwin=15)
self._check_decompression(test_data)
def _test_compress_quality_9_lgwin_20(self, test_data):
self._compress(test_data, quality=9, lgwin=20)
self._check_decompression(test_data)
def _test_compress_quality_11_lgwin_24(self, test_data):
self._compress(test_data, quality=11, lgwin=24)
self._check_decompression(test_data)
def _test_compress_quality_1_custom_dictionary(self, test_data):
def _test_compress_custom_dictionary(self, test_data, **kwargs):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=1, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_6_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=6, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_9_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=9, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_11_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=11, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_1_lgwin_10_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=1, lgwin=10, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_6_lgwin_15_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=6, lgwin=15, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_9_lgwin_20_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=9, lgwin=20, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
def _test_compress_quality_11_lgwin_24_custom_dictionary(self, test_data):
with open(test_data, 'rb') as in_file:
dictionary = in_file.read()
self._compress(test_data, quality=11, lgwin=24, dictionary=dictionary)
self._check_decompression(test_data, dictionary=dictionary)
kwargs['dictionary'] = dictionary
self._compress(test_data, **kwargs)
self._check_decompression(test_data, **kwargs)
_test_utils.generate_test_methods(TestCompress)
_test_utils.generate_test_methods(TestCompress, variants=TestCompress.VARIANTS)
if __name__ == '__main__':
unittest.main()

View File

@ -1,41 +0,0 @@
#!/usr/bin/env python
from __future__ import print_function
import sys
import os
from subprocess import check_call, Popen, PIPE
from _test_utils import PYTHON, BRO, TEST_ENV, diff_q
INPUTS = """\
testdata/alice29.txt
testdata/asyoulik.txt
testdata/lcet10.txt
testdata/plrabn12.txt
../enc/encode.c
../common/dictionary.h
../dec/decode.c
%s
""" % BRO
os.chdir(os.path.abspath("../../tests"))
for filename in INPUTS.splitlines():
for quality in (1, 6, 9, 11):
for lgwin in (10, 15, 20, 24):
filename = os.path.abspath(filename)
print('Roundtrip testing file "%s" at quality %d with lg(win)=%d and auto-custom-dictionary' %
(os.path.basename(filename), quality, lgwin))
compressed = os.path.splitext(filename)[0] + ".custom_bro"
uncompressed = os.path.splitext(filename)[0] + ".custom_unbro"
check_call([PYTHON, BRO, "-f", "-q", str(quality), "-i", filename,
"-o", compressed, "--lgwin", str(lgwin),
"--custom-dictionary", filename], env=TEST_ENV)
check_call([PYTHON, BRO, "-f", "-d", "-i", compressed, "-o",
uncompressed, "--custom-dictionary", filename], env=TEST_ENV)
if diff_q(filename, uncompressed) != 0:
sys.exit(1)
try:
os.unlink(compressed)
os.unlink(uncompressed)
except OSError:
pass

View File

@ -1,48 +0,0 @@
#!/usr/bin/env python
from __future__ import print_function
import sys
import os
from subprocess import check_call, Popen, PIPE
from _test_utils import PYTHON, BRO, TEST_ENV, diff_q
INPUTS = """\
testdata/alice29.txt
testdata/asyoulik.txt
testdata/lcet10.txt
testdata/plrabn12.txt
../enc/encode.c
../common/dictionary.h
../dec/decode.c
%s
""" % BRO
os.chdir(os.path.abspath("../../tests"))
for filename in INPUTS.splitlines():
for quality in (1, 6, 9, 11):
filename = os.path.abspath(filename)
print('Roundtrip testing file "%s" at quality %d' %
(os.path.basename(filename), quality))
compressed = os.path.splitext(filename)[0] + ".bro"
uncompressed = os.path.splitext(filename)[0] + ".unbro"
check_call([PYTHON, BRO, "-f", "-q", str(quality), "-i", filename,
"-o", compressed], env=TEST_ENV)
check_call([PYTHON, BRO, "-f", "-d", "-i", compressed, "-o",
uncompressed], env=TEST_ENV)
if diff_q(filename, uncompressed) != 0:
sys.exit(1)
# Test the streaming version
with open(filename, "rb") as infile, \
open(uncompressed, "wb") as outfile:
p = Popen([PYTHON, BRO, "-q", str(quality)], stdin=infile,
stdout=PIPE, env=TEST_ENV)
check_call([PYTHON, BRO, "-d"], stdin=p.stdout, stdout=outfile,
env=TEST_ENV)
if diff_q(filename, uncompressed) != 0:
sys.exit(1)
try:
os.unlink(compressed)
os.unlink(uncompressed)
except OSError:
pass