普通文本  |  266行  |  9.46 KB

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module of benchmark runs."""
from __future__ import print_function

import datetime
import threading
import time
import traceback

from cros_utils import command_executer
from cros_utils import timeline

from suite_runner import SuiteRunner
from results_cache import MockResult
from results_cache import MockResultsCache
from results_cache import Result
from results_cache import ResultsCache

STATUS_FAILED = 'FAILED'
STATUS_SUCCEEDED = 'SUCCEEDED'
STATUS_IMAGING = 'IMAGING'
STATUS_RUNNING = 'RUNNING'
STATUS_WAITING = 'WAITING'
STATUS_PENDING = 'PENDING'


class BenchmarkRun(threading.Thread):
  """The benchmarkrun class."""

  def __init__(self, name, benchmark, label, iteration, cache_conditions,
               machine_manager, logger_to_use, log_level, share_cache):
    threading.Thread.__init__(self)
    self.name = name
    self._logger = logger_to_use
    self.log_level = log_level
    self.benchmark = benchmark
    self.iteration = iteration
    self.label = label
    self.result = None
    self.terminated = False
    self.retval = None
    self.run_completed = False
    self.machine_manager = machine_manager
    self.suite_runner = SuiteRunner(self._logger, self.log_level)
    self.machine = None
    self.cache_conditions = cache_conditions
    self.runs_complete = 0
    self.cache_hit = False
    self.failure_reason = ''
    self.test_args = benchmark.test_args
    self.cache = None
    self.profiler_args = self.GetExtraAutotestArgs()
    self._ce = command_executer.GetCommandExecuter(
        self._logger, log_level=self.log_level)
    self.timeline = timeline.Timeline()
    self.timeline.Record(STATUS_PENDING)
    self.share_cache = share_cache
    self.cache_has_been_read = False

    # This is used by schedv2.
    self.owner_thread = None

  def ReadCache(self):
    # Just use the first machine for running the cached version,
    # without locking it.
    self.cache = ResultsCache()
    self.cache.Init(self.label.chromeos_image, self.label.chromeos_root,
                    self.benchmark.test_name, self.iteration, self.test_args,
                    self.profiler_args, self.machine_manager, self.machine,
                    self.label.board, self.cache_conditions, self._logger,
                    self.log_level, self.label, self.share_cache,
                    self.benchmark.suite, self.benchmark.show_all_results,
                    self.benchmark.run_local)

    self.result = self.cache.ReadResult()
    self.cache_hit = (self.result is not None)
    self.cache_has_been_read = True

  def run(self):
    try:
      if not self.cache_has_been_read:
        self.ReadCache()

      if self.result:
        self._logger.LogOutput('%s: Cache hit.' % self.name)
        self._logger.LogOutput(self.result.out, print_to_console=False)
        self._logger.LogError(self.result.err, print_to_console=False)

      elif self.label.cache_only:
        self._logger.LogOutput('%s: No cache hit.' % self.name)
        output = '%s: No Cache hit.' % self.name
        retval = 1
        err = 'No cache hit.'
        self.result = Result.CreateFromRun(
            self._logger, self.log_level, self.label, self.machine, output, err,
            retval, self.benchmark.test_name, self.benchmark.suite)

      else:
        self._logger.LogOutput('%s: No cache hit.' % self.name)
        self.timeline.Record(STATUS_WAITING)
        # Try to acquire a machine now.
        self.machine = self.AcquireMachine()
        self.cache.machine = self.machine
        self.result = self.RunTest(self.machine)

        self.cache.remote = self.machine.name
        self.label.chrome_version = self.machine_manager.GetChromeVersion(
            self.machine)
        self.cache.StoreResult(self.result)

      if not self.label.chrome_version:
        if self.machine:
          self.label.chrome_version = self.machine_manager.GetChromeVersion(
              self.machine)
        elif self.result.chrome_version:
          self.label.chrome_version = self.result.chrome_version

      if self.terminated:
        return

      if not self.result.retval:
        self.timeline.Record(STATUS_SUCCEEDED)
      else:
        if self.timeline.GetLastEvent() != STATUS_FAILED:
          self.failure_reason = 'Return value of test suite was non-zero.'
          self.timeline.Record(STATUS_FAILED)

    except Exception, e:
      self._logger.LogError("Benchmark run: '%s' failed: %s" % (self.name, e))
      traceback.print_exc()
      if self.timeline.GetLastEvent() != STATUS_FAILED:
        self.timeline.Record(STATUS_FAILED)
        self.failure_reason = str(e)
    finally:
      if self.owner_thread is not None:
        # In schedv2 mode, we do not lock machine locally. So noop here.
        pass
      elif self.machine:
        if not self.machine.IsReachable():
          self._logger.LogOutput(
              'Machine %s is not reachable, removing it.' % self.machine.name)
          self.machine_manager.RemoveMachine(self.machine.name)
        self._logger.LogOutput('Releasing machine: %s' % self.machine.name)
        self.machine_manager.ReleaseMachine(self.machine)
        self._logger.LogOutput('Released machine: %s' % self.machine.name)

  def Terminate(self):
    self.terminated = True
    self.suite_runner.Terminate()
    if self.timeline.GetLastEvent() != STATUS_FAILED:
      self.timeline.Record(STATUS_FAILED)
      self.failure_reason = 'Thread terminated.'

  def AcquireMachine(self):
    if self.owner_thread is not None:
      # No need to lock machine locally, DutWorker, which is a thread, is
      # responsible for running br.
      return self.owner_thread.dut()
    while True:
      machine = None
      if self.terminated:
        raise RuntimeError('Thread terminated while trying to acquire machine.')

      machine = self.machine_manager.AcquireMachine(self.label)

      if machine:
        self._logger.LogOutput('%s: Machine %s acquired at %s' %
                               (self.name, machine.name,
                                datetime.datetime.now()))
        break
      time.sleep(10)
    return machine

  def GetExtraAutotestArgs(self):
    if self.benchmark.perf_args and self.benchmark.suite == 'telemetry':
      self._logger.LogError('Telemetry does not support profiler.')
      self.benchmark.perf_args = ''

    if self.benchmark.perf_args and self.benchmark.suite == 'test_that':
      self._logger.LogError('test_that does not support profiler.')
      self.benchmark.perf_args = ''

    if self.benchmark.perf_args:
      perf_args_list = self.benchmark.perf_args.split(' ')
      perf_args_list = [perf_args_list[0]] + ['-a'] + perf_args_list[1:]
      perf_args = ' '.join(perf_args_list)
      if not perf_args_list[0] in ['record', 'stat']:
        raise SyntaxError('perf_args must start with either record or stat')
      extra_test_args = [
          '--profiler=custom_perf',
          ("--profiler_args='perf_options=\"%s\"'" % perf_args)
      ]
      return ' '.join(extra_test_args)
    else:
      return ''

  def RunTest(self, machine):
    self.timeline.Record(STATUS_IMAGING)
    if self.owner_thread is not None:
      # In schedv2 mode, do not even call ImageMachine. Machine image is
      # guarenteed.
      pass
    else:
      self.machine_manager.ImageMachine(machine, self.label)
    self.timeline.Record(STATUS_RUNNING)
    retval, out, err = self.suite_runner.Run(machine.name, self.label,
                                             self.benchmark, self.test_args,
                                             self.profiler_args)
    self.run_completed = True
    return Result.CreateFromRun(self._logger, self.log_level, self.label,
                                self.machine, out, err, retval,
                                self.benchmark.test_name, self.benchmark.suite)

  def SetCacheConditions(self, cache_conditions):
    self.cache_conditions = cache_conditions

  def logger(self):
    """Return the logger, only used by unittest.

    Returns:
      self._logger
    """

    return self._logger

  def __str__(self):
    """For better debugging."""

    return 'BenchmarkRun[name="{}"]'.format(self.name)


class MockBenchmarkRun(BenchmarkRun):
  """Inherited from BenchmarkRun."""

  def ReadCache(self):
    # Just use the first machine for running the cached version,
    # without locking it.
    self.cache = MockResultsCache()
    self.cache.Init(self.label.chromeos_image, self.label.chromeos_root,
                    self.benchmark.test_name, self.iteration, self.test_args,
                    self.profiler_args, self.machine_manager, self.machine,
                    self.label.board, self.cache_conditions, self._logger,
                    self.log_level, self.label, self.share_cache,
                    self.benchmark.suite, self.benchmark.show_all_results,
                    self.benchmark.run_local)

    self.result = self.cache.ReadResult()
    self.cache_hit = (self.result is not None)

  def RunTest(self, machine):
    """Remove Result.CreateFromRun for testing."""
    self.timeline.Record(STATUS_IMAGING)
    self.machine_manager.ImageMachine(machine, self.label)
    self.timeline.Record(STATUS_RUNNING)
    [retval, out,
     err] = self.suite_runner.Run(machine.name, self.label, self.benchmark,
                                  self.test_args, self.profiler_args)
    self.run_completed = True
    rr = MockResult('logger', self.label, self.log_level, machine)
    rr.out = out
    rr.err = err
    rr.retval = retval
    return rr