普通文本  |  139行  |  4.93 KB

# Copyright 2010 Google Inc. All Rights Reserved.
#

import logging
import os.path
import threading

from automation.common import command as cmd
from automation.common import job
from automation.common import logger
from automation.common.command_executer import LoggingCommandExecuter
from automation.common.command_executer import CommandTerminator


class JobExecuter(threading.Thread):

  def __init__(self, job_to_execute, machines, listeners):
    threading.Thread.__init__(self)

    assert machines

    self.job = job_to_execute
    self.listeners = listeners
    self.machines = machines

    # Set Thread name.
    self.name = '%s-%s' % (self.__class__.__name__, self.job.id)

    self._logger = logging.getLogger(self.__class__.__name__)
    self._executer = LoggingCommandExecuter(self.job.dry_run)
    self._terminator = CommandTerminator()

  def _RunRemotely(self, command, fail_msg, command_timeout=1 * 60 * 60):
    exit_code = self._executer.RunCommand(command,
                                          self.job.primary_machine.hostname,
                                          self.job.primary_machine.username,
                                          command_terminator=self._terminator,
                                          command_timeout=command_timeout)
    if exit_code:
      raise job.JobFailure(fail_msg, exit_code)

  def _RunLocally(self, command, fail_msg, command_timeout=1 * 60 * 60):
    exit_code = self._executer.RunCommand(command,
                                          command_terminator=self._terminator,
                                          command_timeout=command_timeout)
    if exit_code:
      raise job.JobFailure(fail_msg, exit_code)

  def Kill(self):
    self._terminator.Terminate()

  def CleanUpWorkDir(self):
    self._logger.debug('Cleaning up %r work directory.', self.job)
    self._RunRemotely(cmd.RmTree(self.job.work_dir), 'Cleanup workdir failed.')

  def CleanUpHomeDir(self):
    self._logger.debug('Cleaning up %r home directory.', self.job)
    self._RunLocally(cmd.RmTree(self.job.home_dir), 'Cleanup homedir failed.')

  def _PrepareRuntimeEnvironment(self):
    self._RunRemotely(
        cmd.MakeDir(self.job.work_dir, self.job.logs_dir, self.job.results_dir),
        'Creating new job directory failed.')

    # The log directory is ready, so we can prepare to log command's output.
    self._executer.OpenLog(os.path.join(self.job.logs_dir,
                                        self.job.log_filename_prefix))

  def _SatisfyFolderDependencies(self):
    for dependency in self.job.folder_dependencies:
      to_folder = os.path.join(self.job.work_dir, dependency.dest)
      from_folder = os.path.join(dependency.job.work_dir, dependency.src)
      from_machine = dependency.job.primary_machine

      if from_machine == self.job.primary_machine and dependency.read_only:
        # No need to make a copy, just symlink it
        self._RunRemotely(
            cmd.MakeSymlink(from_folder, to_folder),
            'Failed to create symlink to required directory.')
      else:
        self._RunRemotely(
            cmd.RemoteCopyFrom(from_machine.hostname,
                               from_folder,
                               to_folder,
                               username=from_machine.username),
            'Failed to copy required files.')

  def _LaunchJobCommand(self):
    command = self.job.GetCommand()

    self._RunRemotely('%s; %s' % ('PS1=. TERM=linux source ~/.bashrc',
                                  cmd.Wrapper(command,
                                              cwd=self.job.work_dir)),
                      "Command failed to execute: '%s'." % command,
                      self.job.timeout)

  def _CopyJobResults(self):
    """Copy test results back to directory."""
    self._RunLocally(
        cmd.RemoteCopyFrom(self.job.primary_machine.hostname,
                           self.job.results_dir,
                           self.job.home_dir,
                           username=self.job.primary_machine.username),
        'Failed to copy results.')

  def run(self):
    self.job.status = job.STATUS_SETUP
    self.job.machines = self.machines
    self._logger.debug('Executing %r on %r in directory %s.', self.job,
                       self.job.primary_machine.hostname, self.job.work_dir)

    try:
      self.CleanUpWorkDir()

      self._PrepareRuntimeEnvironment()

      self.job.status = job.STATUS_COPYING

      self._SatisfyFolderDependencies()

      self.job.status = job.STATUS_RUNNING

      self._LaunchJobCommand()
      self._CopyJobResults()

      # If we get here, the job succeeded.
      self.job.status = job.STATUS_SUCCEEDED
    except job.JobFailure as ex:
      self._logger.error('Job failed. Exit code %s. %s', ex.exit_code, ex)
      if self._terminator.IsTerminated():
        self._logger.info('%r was killed', self.job)

      self.job.status = job.STATUS_FAILED

    self._executer.CloseLog()

    for listener in self.listeners:
      listener.NotifyJobComplete(self.job)