普通文本  |  303行  |  9.88 KB

#!/usr/bin/python
#
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Updates all unlocked hosts in Autotest lab in parallel at a given rate.

Used to update all hosts, or only those of a given platform, in the Autotest
lab to a given version. Allows a configurable number of updates to be started in
parallel. Updates can also be staggered to reduce load."""

import logging
import os
import subprocess
import sys
import threading
import time
import traceback

from collections import deque
from optparse import OptionParser


# Default number of hosts to update in parallel.
DEFAULT_CONCURRENCY = 10


# By default do not stagger any of the updates.
DEFAULT_STAGGER = 0


# Default location of ChromeOS checkout.
DEFAULT_GCLIENT_ROOT = '/usr/local/google/home/${USER}/chromeos/chromeos'


# Default path for individual host logs. Each host will have it's own file. E.g.
# <default_log_path>/<host>.log
DEFAULT_LOG_PATH = '/tmp/mass_update_logs/%s/' % time.strftime('%Y-%m-%d-%H-%M',
                                                               time.gmtime())


# Location of Autotest cli executable.
AUTOTEST_LOCATION = '/home/chromeos-test/autotest/cli'


# Default time in seconds to sleep while waiting for threads to complete.
DEFAULT_SLEEP = 10


# Amount of time in seconds to wait before declaring an update as failed.
DEFAULT_TIMEOUT = 2400


class MassUpdateStatus():
  """Used to track status for all updates."""
  ssh_failures = []
  update_failures = []
  successful_updates = 0


class UpdateThread(threading.Thread):
  """Responsible for ssh-test, locking, imaging, and unlocking a host.

  Uses the atest CLI for host control and the image_to_live script to actually
  update the host. Each thread will continue to process hosts until the queue
  is empty."""

  _SUCCESS = 0            # Update was successful.
  _SSH_FAILURE = 1        # Could not SSH to host or related SSH failure.
  _UPDATE_FAILURE = 2     # Update failed for any reason other than SSH.

  def __init__(self, options, hosts, status):
    self._options = options
    self._hosts = hosts
    self._status = status
    self._logger = logging.getLogger()
    threading.Thread.__init__(self)

  def run(self):
    while self._hosts:
      host = self._hosts.popleft()
      status = UpdateThread._UPDATE_FAILURE

      self._logger.info('Updating host %s' % host)
      try:
        try:
          if not CheckSSH(host=host, options=self._options):
            status = UpdateThread._SSH_FAILURE
          elif LockHost(host) and ImageHost(host=host, options=self._options):
            status = UpdateThread._SUCCESS
        finally:
          if status == UpdateThread._SUCCESS:
            self._logger.info(
                'Completed update for host %s successfully.' % host)
            self._status.successful_updates += 1
          elif status == UpdateThread._SSH_FAILURE:
            self._logger.info('Failed to SSH to host %s.' % host)
            self._status.ssh_failures.append(host)
          else:
            self._logger.info('Failed to update host %s.' % host)
            self._status.update_failures.append(host)

          UnlockHost(host)
      except:
        traceback.print_exc()
        self._logger.warning(
            'Exception encountered during update. Skipping host %s.' % host)


def CheckSSH(host, options):
  """Uses the ssh_test script to ensure SSH access to a host is available.

  Returns true if an SSH connection to the host was successful."""
  return subprocess.Popen(
      '%s/src/scripts/ssh_test.sh --remote=%s' % (options.gclient, host),
      shell=True,
      stdout=subprocess.PIPE,
      stderr=subprocess.PIPE).wait() == 0


def ImageHost(host, options):
  """Uses the image_to_live script to update a host.

  Returns true if the imaging process was successful."""
  log_file = open(os.path.join(options.log, host + '.log'), 'w')
  log_file_err = open(os.path.join(options.log, host + '.log.err'), 'w')

  exit_code = subprocess.Popen(
      ('/usr/local/scripts/alarm %d %s/src/scripts/image_to_live.sh '
       '--update_url %s --remote %s' % (DEFAULT_TIMEOUT, options.gclient,
                                        options.url, host)),
      shell=True,
      stdout=log_file,
      stderr=log_file_err).wait()

  log_file.close()
  log_file_err.close()

  return exit_code == 0


def LockHost(host):
  """Locks a host using the atest CLI.

  Locking a host tells Autotest that the host shouldn't be scheduled for
  any other tasks. Returns true if the locking process was successful."""
  success = subprocess.Popen(
      '%s/atest host mod -l %s' % (AUTOTEST_LOCATION, host),
      shell=True,
      stdout=subprocess.PIPE,
      stderr=subprocess.PIPE).wait() == 0

  if not success:
    logging.getLogger().info('Failed to lock host %s.' % host)

  return success


def UnlockHost(host):
  """Unlocks a host using the atest CLI.

  Unlocking a host tells Autotest that the host is okay to be scheduled
  for other tasks. Returns true if the unlocking process was successful."""
  success = subprocess.Popen(
      '%s/atest host mod -u %s' % (AUTOTEST_LOCATION, host),
      shell=True,
      stdout=subprocess.PIPE,
      stderr=subprocess.PIPE).wait() == 0

  if not success:
    logging.getLogger().info('Failed to unlock host %s.' % host)

  return success


def GetHostQueue(options):
  """Returns a queue containing unlocked hosts retrieved from the atest CLI.

  If options.label has been specified only unlocked hosts with the specified
  label will be returned."""
  cmd = ('%s/atest host list --unlocked -s Ready -a acl_cros_test'
         % AUTOTEST_LOCATION)

  if options.label:
    cmd += ' -b ' + options.label

  # atest host list will return a tabular data set. Use sed to remove the first
  # line which contains column labels we don't need. Then since the first column
  # contains the host name, use awk to extract it
  cmd += " | sed '1d' | awk '{print $1}'"

  stdout = subprocess.Popen(cmd,
                            shell=True,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE).communicate()[0]

  return deque(item.strip() for item in stdout.split('\n') if item.strip())


def ParseOptions():
  usage = 'usage: %prog --url=<update url> [options]'
  parser = OptionParser(usage)
  parser.add_option('-b', '--label', dest='label',
                    help='Only update hosts with the specified label.')
  parser.add_option('-c', '--concurrent', dest='concurrent',
                    default=DEFAULT_CONCURRENCY,
                    help=('Number of hosts to be updated concurrently. '
                          'Defaults to %d hosts.') % DEFAULT_CONCURRENCY)
  parser.add_option('-g', '--gclient', dest='gclient',
                    default=DEFAULT_GCLIENT_ROOT,
                    help=('Location of ChromeOS checkout. defaults to %s'
                    % DEFAULT_GCLIENT_ROOT))
  parser.add_option('-l', '--log', dest='log',
                    default=DEFAULT_LOG_PATH,
                    help=('Where to put individual host log files. '
                          'Defaults to %s' % DEFAULT_LOG_PATH))
  parser.add_option('-s', '--stagger', dest='stagger',
                    default=DEFAULT_STAGGER,
                    help=('Attempt to stagger updates. Waits the given amount '
                          'of time in minutes before starting each updater. '
                          'Updates will still overlap if the value is set as a '
                          'multiple of the update period.'))
  parser.add_option('-u', '--url', dest='url',
                    help='Update URL. Points to build for updating hosts.')

  options = parser.parse_args()[0]

  if options.url is None:
    parser.error('An update URL must be provided.')

  return options


def InitializeLogging():
  """Configure the global logger for time/date stamping console output.

  Returns a logger object for convenience."""
  logger = logging.getLogger()
  logger.setLevel(logging.INFO)

  stream_handler = logging.StreamHandler()
  stream_handler.setLevel(logging.INFO)
  stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
  logger.addHandler(stream_handler)
  return logger


def main():
  options = ParseOptions()
  hosts = GetHostQueue(options)
  logger = InitializeLogging()
  status = MassUpdateStatus()

  # Create log folder if it doesn't exist.
  if not os.path.exists(options.log):
    os.makedirs(options.log)

  logger.info('Starting update using URL %s' % options.url)
  logger.info('Individual host logs can be found under %s' % options.log)

  try:
    # Spawn processing threads which will handle lock, update, and unlock.
    for i in range(int(options.concurrent)):
      UpdateThread(hosts=hosts, options=options, status=status).start()

      # Stagger threads if the option has been enabled.
      if options.stagger > 0:
        time.sleep(int(options.stagger) * 60)

    # Wait for all hosts to be processed and threads to complete. NOTE: Not
    # using hosts.join() here because it does not behave properly with CTRL-C
    # and KeyboardInterrupt.
    while len(threading.enumerate()) > 1:
      time.sleep(DEFAULT_SLEEP)
  except:
    traceback.print_exc()
    logger.warning(
        'Update process aborted. Some machines may be left locked or updating.')
    sys.exit(1)
  finally:
    logger.info(
        ('Mass updating complete. %d hosts updated successfully, %d failed.' %
        (status.successful_updates, len(status.ssh_failures) +
            len(status.update_failures))))

    logger.info(('-' * 25) + '[ SUMMARY ]' + ('-' * 25))

    for host in status.ssh_failures:
      logger.info('Failed to SSH to host %s.' % host)

    for host in status.update_failures:
      logger.info('Failed to update host %s.' % host)

    if len(status.ssh_failures) == 0 and len(status.update_failures) == 0:
      logger.info('All hosts updated successfully.')

    logger.info('-' * 61)


if __name__ == '__main__':
  main()