#!/usr/bin/python
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# pylint: disable-msg=W0311
from collections import namedtuple
import argparse
import glob
import json
import os
import pprint
import re
import subprocess
_EXPECTATIONS_DIR = 'expectations'
_AUTOTEST_RESULT_TEMPLATE = 'gs://chromeos-autotest-results/%s-chromeos-test/chromeos*/graphics_dEQP/debug/graphics_dEQP.DEBUG'
# Use this template for tryjob results:
#_AUTOTEST_RESULT_TEMPLATE = 'gs://chromeos-autotest-results/%s-ihf/*/graphics_dEQP/debug/graphics_dEQP.DEBUG'
_BOARD_REGEX = re.compile(r'ChromeOS BOARD = (.+)')
_CPU_FAMILY_REGEX = re.compile(r'ChromeOS CPU family = (.+)')
_GPU_FAMILY_REGEX = re.compile(r'ChromeOS GPU family = (.+)')
_TEST_FILTER_REGEX = re.compile(r'dEQP test filter = (.+)')
_HASTY_MODE_REGEX = re.compile(r'\'hasty\': \'True\'|Running in hasty mode.')
#04/23 07:30:21.624 INFO |graphics_d:0240| TestCase: dEQP-GLES3.functional.shaders.operator.unary_operator.bitwise_not.highp_ivec3_vertex
#04/23 07:30:21.840 INFO |graphics_d:0261| Result: Pass
_TEST_RESULT_REGEX = re.compile(r'TestCase: (.+?)$\n.+? Result: (.+?)$',
re.MULTILINE)
_HASTY_TEST_RESULT_REGEX = re.compile(
r'\[stdout\] Test case \'(.+?)\'..$\n'
r'.+?\[stdout\] (Pass|Fail|QualityWarning) \((.+)\)', re.MULTILINE)
Logfile = namedtuple('Logfile', 'job_id name gs_path')
def execute(cmd_list):
sproc = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
return sproc.communicate()[0]
def get_metadata(s):
cpu = re.search(_CPU_FAMILY_REGEX, s).group(1)
gpu = re.search(_GPU_FAMILY_REGEX, s).group(1)
board = re.search(_BOARD_REGEX, s).group(1)
filter = re.search(_TEST_FILTER_REGEX, s).group(1)
hasty = False
if re.search(_HASTY_MODE_REGEX, s):
hasty = True
print('Found results from %s for GPU = %s, filter = %s and hasty = %r.' %
(board, gpu, filter, hasty))
return board, gpu, filter, hasty
def get_logs_from_gs(autotest_result_path):
logs = []
gs_paths = execute(['gsutil', 'ls', autotest_result_path]).splitlines()
for gs_path in gs_paths:
job_id = gs_path.split('/')[3].split('-')[0]
# DEBUG logs have more information than INFO logs, especially for hasty.
name = os.path.join('logs', job_id + '_graphics_dEQP.DEBUG')
logs.append(Logfile(job_id, name, gs_path))
for log in logs:
execute(['gsutil', 'cp', log.gs_path, log.name])
return logs
def get_local_logs():
logs = []
for name in glob.glob(os.path.join('logs', '*_graphics_dEQP.INFO')):
job_id = name.split('_')[0]
logs.append(Logfile(job_id, name, name))
for name in glob.glob(os.path.join('logs', '*_graphics_dEQP.DEBUG')):
job_id = name.split('_')[0]
logs.append(Logfile(job_id, name, name))
return logs
def get_all_tests(text):
tests = []
for test, result in re.findall(_TEST_RESULT_REGEX, text):
tests.append((test, result))
for test, result, details in re.findall(_HASTY_TEST_RESULT_REGEX, text):
tests.append((test, result))
return tests
def get_not_passing_tests(text):
not_passing = []
for test, result in re.findall(_TEST_RESULT_REGEX, text):
if not (result == 'Pass' or result == 'NotSupported'):
not_passing.append((test, result))
for test, result, details in re.findall(_HASTY_TEST_RESULT_REGEX, text):
if result != 'Pass':
not_passing.append((test, result))
return not_passing
def load_expectation_dict(json_file):
data = {}
if os.path.isfile(json_file):
print('Loading file ' + json_file)
with open(json_file, 'r') as f:
text = f.read()
data = json.loads(text)
return data
def load_expectations(json_file):
data = load_expectation_dict(json_file)
expectations = {}
# Convert from dictionary of lists to dictionary of sets.
for key in data:
expectations[key] = set(data[key])
return expectations
def expectation_list_to_dict(tests):
data = {}
tests = list(set(tests))
for test, result in tests:
if data.has_key(result):
new_list = list(set(data[result].append(test)))
data.pop(result)
data[result] = new_list
else:
data[result] = [test]
return data
def save_expectation_dict(expectation_path, expectation_dict):
# Clean up obsolete expectations.
for file_name in glob.glob(expectation_path + '.*'):
if not '.hasty.' in file_name or '.hasty' in expectation_path:
os.remove(file_name)
# Dump json for next iteration.
with open(expectation_path + '.json', 'w') as f:
json.dump(expectation_dict,
f,
sort_keys=True,
indent=4,
separators=(',', ': '))
# Dump plain text for autotest.
for key in expectation_dict:
if expectation_dict[key]:
with open(expectation_path + '.' + key, 'w') as f:
for test in expectation_dict[key]:
f.write(test)
f.write('\n')
# Figure out duplicates and move them to Flaky result set/list.
def process_flaky(status_dict):
"""Figure out duplicates and move them to Flaky result set/list."""
clean_dict = {}
flaky = set([])
if status_dict.has_key('Flaky'):
flaky = status_dict['Flaky']
# FLaky tests are tests with 2 distinct results.
for key1 in status_dict.keys():
for key2 in status_dict.keys():
if key1 != key2:
flaky |= status_dict[key1] & status_dict[key2]
# Remove Flaky tests from other status and convert to dict of list.
for key in status_dict.keys():
if key != 'Flaky':
not_flaky = list(status_dict[key] - flaky)
not_flaky.sort()
print('Number of "%s" is %d.' % (key, len(not_flaky)))
clean_dict[key] = not_flaky
# And finally process flaky list/set.
flaky_list = list(flaky)
flaky_list.sort()
clean_dict['Flaky'] = flaky_list
return clean_dict
def merge_expectation_list(expectation_path, tests):
status_dict = {}
expectation_json = expectation_path + '.json'
if os.access(expectation_json, os.R_OK):
status_dict = load_expectations(expectation_json)
else:
print 'Could not load', expectation_json
for test, result in tests:
if status_dict.has_key(result):
new_set = status_dict[result]
new_set.add(test)
status_dict.pop(result)
status_dict[result] = new_set
else:
status_dict[result] = set([test])
clean_dict = process_flaky(status_dict)
save_expectation_dict(expectation_path, clean_dict)
def load_log(name):
"""Load test log and clean it from stderr spew."""
with open(name) as f:
lines = f.read().splitlines()
text = ''
for line in lines:
if ('dEQP test filter =' in line or 'ChromeOS BOARD = ' in line or
'ChromeOS CPU family =' in line or 'ChromeOS GPU family =' in line or
'TestCase: ' in line or 'Result: ' in line or
'Test Options: ' in line or 'Running in hasty mode.' in line or
# For hasty logs we have:
' Pass (' in line or ' Fail (' in line or 'QualityWarning (' in line or
' Test case \'' in line):
text += line + '\n'
# TODO(ihf): Warn about or reject log files missing the end marker.
return text
def process_logs(logs):
for log in logs:
text = load_log(log.name)
if text:
print('================================================================')
print('Loading %s...' % log.name)
_, gpu, filter, hasty = get_metadata(text)
tests = get_all_tests(text)
print('Found %d test results.' % len(tests))
if tests:
# GPU family goes first in path to simplify adding/deleting families.
output_path = os.path.join(_EXPECTATIONS_DIR, gpu)
if not os.access(output_path, os.R_OK):
os.makedirs(output_path)
expectation_path = os.path.join(output_path, filter)
if hasty:
expectation_path = os.path.join(output_path, filter + '.hasty')
merge_expectation_list(expectation_path, tests)
argparser = argparse.ArgumentParser(
description='Download from GS and process dEQP logs into expectations.')
argparser.add_argument(
'result_ids',
metavar='result_id',
nargs='*', # Zero or more result_ids specified.
help='List of result log IDs (wildcards for gsutil like 5678* are ok).')
args = argparser.parse_args()
print pprint.pformat(args)
# This is somewhat optional. Remove existing expectations to start clean, but
# feel free to process them incrementally.
execute(['rm', '-rf', _EXPECTATIONS_DIR])
for id in args.result_ids:
gs_path = _AUTOTEST_RESULT_TEMPLATE % id
logs = get_logs_from_gs(gs_path)
# This will include the just downloaded logs from GS as well.
logs = get_local_logs()
process_logs(logs)