普通文本  |  179行  |  4.89 KB

# Copyright 2010 Google Inc. All Rights Reserved.
#
"""A module for a job in the infrastructure."""

__author__ = 'raymes@google.com (Raymes Khoury)'

import os.path

from automation.common import state_machine

STATUS_NOT_EXECUTED = 'NOT_EXECUTED'
STATUS_SETUP = 'SETUP'
STATUS_COPYING = 'COPYING'
STATUS_RUNNING = 'RUNNING'
STATUS_SUCCEEDED = 'SUCCEEDED'
STATUS_FAILED = 'FAILED'


class FolderDependency(object):

  def __init__(self, job, src, dest=None):
    if not dest:
      dest = src

    # TODO(kbaclawski): rename to producer
    self.job = job
    self.src = src
    self.dest = dest

  @property
  def read_only(self):
    return self.dest == self.src


class JobStateMachine(state_machine.BasicStateMachine):
  state_machine = {
      STATUS_NOT_EXECUTED: [STATUS_SETUP],
      STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED],
      STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED],
      STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED]
  }

  final_states = [STATUS_SUCCEEDED, STATUS_FAILED]


class JobFailure(Exception):

  def __init__(self, message, exit_code):
    Exception.__init__(self, message)
    self.exit_code = exit_code


class Job(object):
  """A class representing a job whose commands will be executed."""

  WORKDIR_PREFIX = '/usr/local/google/tmp/automation'

  def __init__(self, label, command, timeout=4 * 60 * 60):
    self._state = JobStateMachine(STATUS_NOT_EXECUTED)
    self.predecessors = set()
    self.successors = set()
    self.machine_dependencies = []
    self.folder_dependencies = []
    self.id = 0
    self.machines = []
    self.command = command
    self._has_primary_machine_spec = False
    self.group = None
    self.dry_run = None
    self.label = label
    self.timeout = timeout

  def _StateGet(self):
    return self._state

  def _StateSet(self, new_state):
    self._state.Change(new_state)

  status = property(_StateGet, _StateSet)

  @property
  def timeline(self):
    return self._state.timeline

  def __repr__(self):
    return '{%s: %s}' % (self.__class__.__name__, self.id)

  def __str__(self):
    res = []
    res.append('%d' % self.id)
    res.append('Predecessors:')
    res.extend(['%d' % pred.id for pred in self.predecessors])
    res.append('Successors:')
    res.extend(['%d' % succ.id for succ in self.successors])
    res.append('Machines:')
    res.extend(['%s' % machine for machine in self.machines])
    res.append(self.PrettyFormatCommand())
    res.append('%s' % self.status)
    res.append(self.timeline.GetTransitionEventReport())
    return '\n'.join(res)

  @staticmethod
  def _FormatCommand(cmd, substitutions):
    for pattern, replacement in substitutions:
      cmd = cmd.replace(pattern, replacement)

    return cmd

  def GetCommand(self):
    substitutions = [
        ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir),
        ('$JOB_HOME', self.home_dir),
        ('$PRIMARY_MACHINE', self.primary_machine.hostname)
    ]

    if len(self.machines) > 1:
      for num, machine in enumerate(self.machines[1:]):
        substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname
                             ))

    return self._FormatCommand(str(self.command), substitutions)

  def PrettyFormatCommand(self):
    # TODO(kbaclawski): This method doesn't belong here, but rather to
    # non existing Command class. If one is created then PrettyFormatCommand
    # shall become its method.
    return self._FormatCommand(self.GetCommand(), [
        ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n')
    ])

  def DependsOnFolder(self, dependency):
    self.folder_dependencies.append(dependency)
    self.DependsOn(dependency.job)

  @property
  def results_dir(self):
    return os.path.join(self.work_dir, 'results')

  @property
  def logs_dir(self):
    return os.path.join(self.home_dir, 'logs')

  @property
  def log_filename_prefix(self):
    return 'job-%d.log' % self.id

  @property
  def work_dir(self):
    return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id)

  @property
  def home_dir(self):
    return os.path.join(self.group.home_dir, 'job-%d' % self.id)

  @property
  def primary_machine(self):
    return self.machines[0]

  def DependsOn(self, job):
    """Specifies Jobs to be finished before this job can be launched."""
    self.predecessors.add(job)
    job.successors.add(self)

  @property
  def is_ready(self):
    """Check that all our dependencies have been executed."""
    return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors)

  def DependsOnMachine(self, machine_spec, primary=True):
    # Job will run on arbitrarily chosen machine specified by
    # MachineSpecification class instances passed to this method.
    if primary:
      if self._has_primary_machine_spec:
        raise RuntimeError('Only one primary machine specification allowed.')
      self._has_primary_machine_spec = True
      self.machine_dependencies.insert(0, machine_spec)
    else:
      self.machine_dependencies.append(machine_spec)