普通文本  |  259行  |  8.54 KB

#!/usr/bin/python
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# pylint: disable-msg=W0311

from collections import namedtuple
import argparse
import glob
import json
import os
import pprint
import re
import subprocess

_EXPECTATIONS_DIR = 'expectations'
_AUTOTEST_RESULT_TEMPLATE = 'gs://chromeos-autotest-results/%s-chromeos-test/chromeos*/graphics_dEQP/debug/graphics_dEQP.DEBUG'
# Use this template for tryjob results:
#_AUTOTEST_RESULT_TEMPLATE = 'gs://chromeos-autotest-results/%s-ihf/*/graphics_dEQP/debug/graphics_dEQP.DEBUG'
_BOARD_REGEX = re.compile(r'ChromeOS BOARD = (.+)')
_CPU_FAMILY_REGEX = re.compile(r'ChromeOS CPU family = (.+)')
_GPU_FAMILY_REGEX = re.compile(r'ChromeOS GPU family = (.+)')
_TEST_FILTER_REGEX = re.compile(r'dEQP test filter = (.+)')
_HASTY_MODE_REGEX = re.compile(r'\'hasty\': \'True\'|Running in hasty mode.')

#04/23 07:30:21.624 INFO |graphics_d:0240| TestCase: dEQP-GLES3.functional.shaders.operator.unary_operator.bitwise_not.highp_ivec3_vertex
#04/23 07:30:21.840 INFO |graphics_d:0261| Result: Pass
_TEST_RESULT_REGEX = re.compile(r'TestCase: (.+?)$\n.+? Result: (.+?)$',
                                re.MULTILINE)
_HASTY_TEST_RESULT_REGEX = re.compile(
    r'\[stdout\] Test case \'(.+?)\'..$\n'
    r'.+?\[stdout\]   (Pass|Fail|QualityWarning) \((.+)\)', re.MULTILINE)
Logfile = namedtuple('Logfile', 'job_id name gs_path')


def execute(cmd_list):
  sproc = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
  return sproc.communicate()[0]


def get_metadata(s):
  cpu = re.search(_CPU_FAMILY_REGEX, s).group(1)
  gpu = re.search(_GPU_FAMILY_REGEX, s).group(1)
  board = re.search(_BOARD_REGEX, s).group(1)
  filter = re.search(_TEST_FILTER_REGEX, s).group(1)
  hasty = False
  if re.search(_HASTY_MODE_REGEX, s):
    hasty = True
  print('Found results from %s for GPU = %s, filter = %s and hasty = %r.' %
        (board, gpu, filter, hasty))
  return board, gpu, filter, hasty


def get_logs_from_gs(autotest_result_path):
  logs = []
  gs_paths = execute(['gsutil', 'ls', autotest_result_path]).splitlines()
  for gs_path in gs_paths:
    job_id = gs_path.split('/')[3].split('-')[0]
    # DEBUG logs have more information than INFO logs, especially for hasty.
    name = os.path.join('logs', job_id + '_graphics_dEQP.DEBUG')
    logs.append(Logfile(job_id, name, gs_path))
  for log in logs:
    execute(['gsutil', 'cp', log.gs_path, log.name])
  return logs


def get_local_logs():
  logs = []
  for name in glob.glob(os.path.join('logs', '*_graphics_dEQP.INFO')):
    job_id = name.split('_')[0]
    logs.append(Logfile(job_id, name, name))
  for name in glob.glob(os.path.join('logs', '*_graphics_dEQP.DEBUG')):
    job_id = name.split('_')[0]
    logs.append(Logfile(job_id, name, name))
  return logs


def get_all_tests(text):
  tests = []
  for test, result in re.findall(_TEST_RESULT_REGEX, text):
    tests.append((test, result))
  for test, result, details in re.findall(_HASTY_TEST_RESULT_REGEX, text):
    tests.append((test, result))
  return tests


def get_not_passing_tests(text):
  not_passing = []
  for test, result in re.findall(_TEST_RESULT_REGEX, text):
    if not (result == 'Pass' or result == 'NotSupported'):
      not_passing.append((test, result))
  for test, result, details in re.findall(_HASTY_TEST_RESULT_REGEX, text):
    if result != 'Pass':
      not_passing.append((test, result))
  return not_passing


def load_expectation_dict(json_file):
  data = {}
  if os.path.isfile(json_file):
    print('Loading file ' + json_file)
    with open(json_file, 'r') as f:
      text = f.read()
      data = json.loads(text)
  return data


def load_expectations(json_file):
  data = load_expectation_dict(json_file)
  expectations = {}
  # Convert from dictionary of lists to dictionary of sets.
  for key in data:
    expectations[key] = set(data[key])
  return expectations


def expectation_list_to_dict(tests):
  data = {}
  tests = list(set(tests))
  for test, result in tests:
    if data.has_key(result):
      new_list = list(set(data[result].append(test)))
      data.pop(result)
      data[result] = new_list
    else:
      data[result] = [test]
  return data


def save_expectation_dict(expectation_path, expectation_dict):
  # Clean up obsolete expectations.
  for file_name in glob.glob(expectation_path + '.*'):
    if not '.hasty.' in file_name or '.hasty' in expectation_path:
      os.remove(file_name)
  # Dump json for next iteration.
  with open(expectation_path + '.json', 'w') as f:
    json.dump(expectation_dict,
              f,
              sort_keys=True,
              indent=4,
              separators=(',', ': '))
  # Dump plain text for autotest.
  for key in expectation_dict:
    if expectation_dict[key]:
      with open(expectation_path + '.' + key, 'w') as f:
        for test in expectation_dict[key]:
          f.write(test)
          f.write('\n')


# Figure out duplicates and move them to Flaky result set/list.
def process_flaky(status_dict):
  """Figure out duplicates and move them to Flaky result set/list."""
  clean_dict = {}
  flaky = set([])
  if status_dict.has_key('Flaky'):
    flaky = status_dict['Flaky']

  # FLaky tests are tests with 2 distinct results.
  for key1 in status_dict.keys():
    for key2 in status_dict.keys():
      if key1 != key2:
        flaky |= status_dict[key1] & status_dict[key2]

  # Remove Flaky tests from other status and convert to dict of list.
  for key in status_dict.keys():
    if key != 'Flaky':
      not_flaky = list(status_dict[key] - flaky)
      not_flaky.sort()
      print('Number of "%s" is %d.' % (key, len(not_flaky)))
      clean_dict[key] = not_flaky

  # And finally process flaky list/set.
  flaky_list = list(flaky)
  flaky_list.sort()
  clean_dict['Flaky'] = flaky_list

  return clean_dict


def merge_expectation_list(expectation_path, tests):
  status_dict = {}
  expectation_json = expectation_path + '.json'
  if os.access(expectation_json, os.R_OK):
    status_dict = load_expectations(expectation_json)
  else:
    print 'Could not load', expectation_json
  for test, result in tests:
    if status_dict.has_key(result):
      new_set = status_dict[result]
      new_set.add(test)
      status_dict.pop(result)
      status_dict[result] = new_set
    else:
      status_dict[result] = set([test])
  clean_dict = process_flaky(status_dict)
  save_expectation_dict(expectation_path, clean_dict)


def load_log(name):
  """Load test log and clean it from stderr spew."""
  with open(name) as f:
    lines = f.read().splitlines()
  text = ''
  for line in lines:
    if ('dEQP test filter =' in line or 'ChromeOS BOARD = ' in line or
        'ChromeOS CPU family =' in line or 'ChromeOS GPU family =' in line or
        'TestCase: ' in line or 'Result: ' in line or
        'Test Options: ' in line or 'Running in hasty mode.' in line or
        # For hasty logs we have:
        ' Pass (' in line or ' Fail (' in line or 'QualityWarning (' in line or
        ' Test case \'' in line):
      text += line + '\n'
  # TODO(ihf): Warn about or reject log files missing the end marker.
  return text


def process_logs(logs):
  for log in logs:
    text = load_log(log.name)
    if text:
      print('================================================================')
      print('Loading %s...' % log.name)
      _, gpu, filter, hasty = get_metadata(text)
      tests = get_all_tests(text)
      print('Found %d test results.' % len(tests))
      if tests:
        # GPU family goes first in path to simplify adding/deleting families.
        output_path = os.path.join(_EXPECTATIONS_DIR, gpu)
        if not os.access(output_path, os.R_OK):
          os.makedirs(output_path)
        expectation_path = os.path.join(output_path, filter)
        if hasty:
          expectation_path = os.path.join(output_path, filter + '.hasty')
        merge_expectation_list(expectation_path, tests)


argparser = argparse.ArgumentParser(
    description='Download from GS and process dEQP logs into expectations.')
argparser.add_argument(
    'result_ids',
    metavar='result_id',
    nargs='*',  # Zero or more result_ids specified.
    help='List of result log IDs (wildcards for gsutil like 5678* are ok).')
args = argparser.parse_args()

print pprint.pformat(args)
# This is somewhat optional. Remove existing expectations to start clean, but
# feel free to process them incrementally.
execute(['rm', '-rf', _EXPECTATIONS_DIR])
for id in args.result_ids:
  gs_path = _AUTOTEST_RESULT_TEMPLATE % id
  logs = get_logs_from_gs(gs_path)

# This will include the just downloaded logs from GS as well.
logs = get_local_logs()
process_logs(logs)