Small Python modernization of Brotli code.

PiperOrigin-RevId: 549289787
This commit is contained in:
Thomas Fischbacher 2023-07-19 05:43:51 -07:00 committed by Copybara-Service
parent 4b827e4ce4
commit acc265655d

View File

@ -1,7 +1,11 @@
#! /usr/bin/env python #! /usr/bin/env python
"""Compression/decompression utility using the Brotli algorithm.""" """Compression/decompression utility using the Brotli algorithm."""
# Note: Python2 has been deprecated long ago, but some projects out in
# the wide world may still use it nevertheless. This should not
# deprive them from being able to run Brotli.
from __future__ import print_function from __future__ import print_function
import argparse import argparse
import os import os
import platform import platform
@ -9,8 +13,9 @@ import sys
import brotli import brotli
# default values of encoder parameters # default values of encoder parameters
DEFAULT_PARAMS = { _DEFAULT_PARAMS = {
'mode': brotli.MODE_GENERIC, 'mode': brotli.MODE_GENERIC,
'quality': 11, 'quality': 11,
'lgwin': 22, 'lgwin': 22,
@ -19,17 +24,32 @@ DEFAULT_PARAMS = {
def get_binary_stdio(stream): def get_binary_stdio(stream):
""" Return the specified standard input, output or errors stream as a """Return the specified stdin/stdout/stderr stream.
'raw' buffer object suitable for reading/writing binary data from/to it.
If the stdio stream requested (i.e. sys.(stdin|stdout|stderr))
has been replaced with a stream object that does not have a `.buffer`
attribute, this will return the original stdio stream's buffer, i.e.
`sys.__(stdin|stdout|stderr)__.buffer`.
Args:
stream: One of 'stdin', 'stdout', 'stderr'.
Returns:
The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass
instance such as io.Bufferedreader/io.BufferedWriter), suitable for
reading/writing binary data from/to it.
""" """
assert stream in ['stdin', 'stdout', 'stderr'], 'invalid stream name' if stream == 'stdin': stdio = sys.stdin
stdio = getattr(sys, stream) elif stream == 'stdout': stdio = sys.stdout
elif stream == 'stderr': stdio = sys.stderr
else:
raise ValueError('invalid stream name: %s' % (stream,))
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
if sys.platform == 'win32': if sys.platform == 'win32':
# set I/O stream binary flag on python2.x (Windows) # set I/O stream binary flag on python2.x (Windows)
runtime = platform.python_implementation() runtime = platform.python_implementation()
if runtime == 'PyPy': if runtime == 'PyPy':
# the msvcrt trick doesn't work in pypy, so I use fdopen # the msvcrt trick doesn't work in pypy, so use fdopen().
mode = 'rb' if stream == 'stdin' else 'wb' mode = 'rb' if stream == 'stdin' else 'wb'
stdio = os.fdopen(stdio.fileno(), mode, 0) stdio = os.fdopen(stdio.fileno(), mode, 0)
else: else:
@ -38,12 +58,20 @@ def get_binary_stdio(stream):
msvcrt.setmode(stdio.fileno(), os.O_BINARY) msvcrt.setmode(stdio.fileno(), os.O_BINARY)
return stdio return stdio
else: else:
# get 'buffer' attribute to read/write binary data on python3.x try:
if hasattr(stdio, 'buffer'):
return stdio.buffer return stdio.buffer
else: except AttributeError:
orig_stdio = getattr(sys, '__%s__' % stream) # The Python reference explains
return orig_stdio.buffer # (-> https://docs.python.org/3/library/sys.html#sys.stdin)
# that the `.buffer` attribute might not exist, since
# the standard streams might have been replaced by something else
# (such as an `io.StringIO()` - perhaps via
# `contextlib.redirect_stdout()`).
# We fall back to the original stdio in these cases.
if stream == 'stdin': return sys.__stdin__.buffer
if stream == 'stdout': return sys.__stdout__.buffer
if stream == 'stderr': return sys.__stderr__.buffer
assert False, 'Impossible Situation.'
def main(args=None): def main(args=None):
@ -114,46 +142,52 @@ def main(args=None):
help='Base 2 logarithm of the maximum input block size. ' help='Base 2 logarithm of the maximum input block size. '
'Range is 16 to 24. If set to 0, the value will be set based ' 'Range is 16 to 24. If set to 0, the value will be set based '
'on the quality. Defaults to 0.') 'on the quality. Defaults to 0.')
# set default values using global DEFAULT_PARAMS dictionary # set default values using global _DEFAULT_PARAMS dictionary
parser.set_defaults(**DEFAULT_PARAMS) parser.set_defaults(**_DEFAULT_PARAMS)
options = parser.parse_args(args=args) options = parser.parse_args(args=args)
if options.infile: if options.infile:
if not os.path.isfile(options.infile): try:
parser.error('file "%s" not found' % options.infile) with open(options.infile, 'rb') as infile:
with open(options.infile, 'rb') as infile: data = infile.read()
data = infile.read() except OSError:
parser.error('Could not read --infile: %s' % (infile,))
else: else:
if sys.stdin.isatty(): if sys.stdin.isatty():
# interactive console, just quit # interactive console, just quit
parser.error('no input') parser.error('No input (called from interactive terminal).')
infile = get_binary_stdio('stdin') infile = get_binary_stdio('stdin')
data = infile.read() data = infile.read()
if options.outfile: if options.outfile:
if os.path.isfile(options.outfile) and not options.force: # Caution! If `options.outfile` is a broken symlink, will try to
parser.error('output file exists') # redirect the write according to symlink.
if os.path.exists(options.outfile) and not options.force:
parser.error(('Target --outfile=%s already exists, '
'but --force was not requested.') % (outfile,))
outfile = open(options.outfile, 'wb') outfile = open(options.outfile, 'wb')
did_open_outfile = True
else: else:
outfile = get_binary_stdio('stdout') outfile = get_binary_stdio('stdout')
did_open_outfile = False
try: try:
if options.decompress: try:
data = brotli.decompress(data) if options.decompress:
else: data = brotli.decompress(data)
data = brotli.compress( else:
data, data = brotli.compress(
mode=options.mode, data,
quality=options.quality, mode=options.mode,
lgwin=options.lgwin, quality=options.quality,
lgblock=options.lgblock) lgwin=options.lgwin,
lgblock=options.lgblock)
outfile.write(data)
finally:
if did_open_outfile: outfile.close()
except brotli.error as e: except brotli.error as e:
parser.exit(1, parser.exit(1,
'bro: error: %s: %s' % (e, options.infile or 'sys.stdin')) 'bro: error: %s: %s' % (e, options.infile or '{stdin}'))
outfile.write(data)
outfile.close()
if __name__ == '__main__': if __name__ == '__main__':