#!/usr/bin/env python3
#
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
"""A tool for running puffin tests in a corpus of deflate compressed files."""
import argparse
import filecmp
import logging
import os
import subprocess
import sys
import tempfile
_PUFFHUFF = 'puffhuff'
_PUFFDIFF = 'puffdiff'
TESTS = (_PUFFHUFF, _PUFFDIFF)
class Error(Exception):
"""Puffin general processing error."""
def ParseArguments(argv):
"""Parses and Validates command line arguments.
Args:
argv: command line arguments to parse.
Returns:
The arguments list.
"""
parser = argparse.ArgumentParser()
parser.add_argument('corpus', metavar='CORPUS',
help='A corpus directory containing compressed files')
parser.add_argument('-d', '--disabled_tests', default=(), metavar='',
nargs='*',
help=('Space separated list of tests to disable. '
'Allowed options include: ' + ', '.join(TESTS)),
choices=TESTS)
parser.add_argument('--cache_size', type=int, metavar='SIZE',
help='The size (in bytes) of the cache for puffpatch '
'operations.')
parser.add_argument('--debug', action='store_true',
help='Turns on verbosity.')
# Parse command-line arguments.
args = parser.parse_args(argv)
if not os.path.isdir(args.corpus):
raise Error('Corpus directory {} is non-existent or inaccesible'
.format(args.corpus))
return args
def main(argv):
"""The main function."""
args = ParseArguments(argv[1:])
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Construct list of appropriate files.
files = list(filter(os.path.isfile, [os.path.join(args.corpus, f)
for f in os.listdir(args.corpus)]))
# For each file in corpus run puffhuff.
if _PUFFHUFF not in args.disabled_tests:
for src in files:
with tempfile.NamedTemporaryFile() as tgt_file:
operation = 'puffhuff'
logging.debug('Running %s on %s', operation, src)
cmd = ['puffin',
'--operation={}'.format(operation),
'--src_file={}'.format(src),
'--dst_file={}'.format(tgt_file.name)]
if subprocess.call(cmd) != 0:
raise Error('Puffin failed to do {} command: {}'
.format(operation, cmd))
if not filecmp.cmp(src, tgt_file.name):
raise Error('The generated file {} is not equivalent to the original '
'file {} after {} operation.'
.format(tgt_file.name, src, operation))
if _PUFFDIFF not in args.disabled_tests:
# Run puffdiff and puffpatch for each pairs of files in the corpus.
for src in files:
for tgt in files:
with tempfile.NamedTemporaryFile() as patch, \
tempfile.NamedTemporaryFile() as new_tgt:
operation = 'puffdiff'
logging.debug('Running %s on %s (%d) and %s (%d)',
operation,
os.path.basename(src), os.stat(src).st_size,
os.path.basename(tgt), os.stat(tgt).st_size)
cmd = ['puffin',
'--operation={}'.format(operation),
'--src_file={}'.format(src),
'--dst_file={}'.format(tgt),
'--patch_file={}'.format(patch.name)]
# Running the puffdiff operation
if subprocess.call(cmd) != 0:
raise Error('Puffin failed to do {} command: {}'
.format(operation, cmd))
logging.debug('Patch size is: %d', os.stat(patch.name).st_size)
operation = 'puffpatch'
logging.debug('Running %s on src file %s and patch %s',
operation, os.path.basename(src), patch.name)
cmd = ['puffin',
'--operation={}'.format(operation),
'--src_file={}'.format(src),
'--dst_file={}'.format(new_tgt.name),
'--patch_file={}'.format(patch.name)]
if args.cache_size:
cmd += ['--cache_size={}'.format(args.cache_size)]
# Running the puffpatch operation
if subprocess.call(cmd) != 0:
raise Error('Puffin failed to do {} command: {}'
.format(operation, cmd))
if not filecmp.cmp(tgt, new_tgt.name):
raise Error('The generated file {} is not equivalent to the '
'original file {} after puffpatch operation.'
.format(new_tgt.name, tgt))
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))