普通文本  |  285行  |  11.79 KB

# Copyright (c) 2014-2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Download images from Cloud Storage."""

from __future__ import print_function

import ast
import os

import test_flag

from cros_utils import command_executer

GS_UTIL = 'chromium/tools/depot_tools/gsutil.py'


class MissingImage(Exception):
  """Raised when the requested image does not exist in gs://"""


class MissingFile(Exception):
  """Raised when the requested file does not exist in gs://"""


class RunCommandExceptionHandler(object):
  """Handle Exceptions from calls to RunCommand"""

  def __init__(self, logger_to_use, log_level, cmd_exec, command):
    self.logger = logger_to_use
    self.log_level = log_level
    self.ce = cmd_exec
    self.cleanup_command = command

  def HandleException(self, _, e):
    # Exception handler, Run specified command
    if self.log_level != 'verbose' and self.cleanup_command is not None:
      self.logger.LogOutput('CMD: %s' % self.cleanup_command)
    if self.cleanup_command is not None:
      _ = self.ce.RunCommand(self.cleanup_command)
    # Raise exception again
    raise e


class ImageDownloader(object):
  """Download images from Cloud Storage."""

  def __init__(self, logger_to_use=None, log_level='verbose', cmd_exec=None):
    self._logger = logger_to_use
    self.log_level = log_level
    self._ce = cmd_exec or command_executer.GetCommandExecuter(
        self._logger, log_level=self.log_level)

  def GetBuildID(self, chromeos_root, xbuddy_label):
    # Get the translation of the xbuddy_label into the real Google Storage
    # image name.
    command = ('cd ~/trunk/src/third_party/toolchain-utils/crosperf; '
               "python translate_xbuddy.py '%s'" % xbuddy_label)
    _, build_id_tuple_str, _ = self._ce.ChrootRunCommandWOutput(
        chromeos_root, command)
    if not build_id_tuple_str:
      raise MissingImage("Unable to find image for '%s'" % xbuddy_label)

    build_id_tuple = ast.literal_eval(build_id_tuple_str)
    build_id = build_id_tuple[0]

    return build_id

  def DownloadImage(self, chromeos_root, build_id, image_name):
    if self.log_level == 'average':
      self._logger.LogOutput('Preparing to download %s image to local '
                             'directory.' % build_id)

    # Make sure the directory for downloading the image exists.
    download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
    image_path = os.path.join(download_path, 'chromiumos_test_image.bin')
    if not os.path.exists(download_path):
      os.makedirs(download_path)

    # Check to see if the image has already been downloaded.  If not,
    # download the image.
    if not os.path.exists(image_path):
      gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
      command = '%s cp %s %s' % (gsutil_cmd, image_name, download_path)

      if self.log_level != 'verbose':
        self._logger.LogOutput('CMD: %s' % command)
      status = self._ce.RunCommand(command)
      downloaded_image_name = os.path.join(download_path,
                                           'chromiumos_test_image.tar.xz')
      if status != 0 or not os.path.exists(downloaded_image_name):
        raise MissingImage('Cannot download image: %s.' % downloaded_image_name)

    return image_path

  def UncompressImage(self, chromeos_root, build_id):
    # Check to see if the file has already been uncompresssed, etc.
    if os.path.exists(
        os.path.join(chromeos_root, 'chroot/tmp', build_id,
                     'chromiumos_test_image.bin')):
      return

    # Uncompress and untar the downloaded image.
    download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
    command = ('cd %s ; tar -Jxf chromiumos_test_image.tar.xz ' % download_path)
    # Cleanup command for exception handler
    clean_cmd = ('cd %s ; rm -f chromiumos_test_image.bin ' % download_path)
    exception_handler = RunCommandExceptionHandler(self._logger, self.log_level,
                                                   self._ce, clean_cmd)
    if self.log_level != 'verbose':
      self._logger.LogOutput('CMD: %s' % command)
      print('(Uncompressing and un-tarring may take a couple of minutes...'
            'please be patient.)')
    retval = self._ce.RunCommand(
        command, except_handler=exception_handler.HandleException)
    if retval != 0:
      if self.log_level != 'verbose':
        self._logger.LogOutput('CMD: %s' % clean_cmd)
        print('(Removing file chromiumos_test_image.bin.)')
      # Remove partially uncompressed file
      _ = self._ce.RunCommand(clean_cmd)
      # Raise exception for failure to uncompress
      raise MissingImage('Cannot uncompress image: %s.' % build_id)

    # Remove compressed image
    command = ('cd %s ; rm -f chromiumos_test_image.tar.xz; ' % download_path)
    if self.log_level != 'verbose':
      self._logger.LogOutput('CMD: %s' % command)
      print('(Removing file chromiumos_test_image.tar.xz.)')
    # try removing file, its ok to have an error, print if encountered
    retval = self._ce.RunCommand(command)
    if retval != 0:
      print('(Warning: Could not remove file chromiumos_test_image.tar.xz .)')

  def DownloadSingleAutotestFile(self, chromeos_root, build_id,
                                 package_file_name):
    # Verify if package files exist
    status = 0
    gs_package_name = ('gs://chromeos-image-archive/%s/%s' %
                       (build_id, package_file_name))
    gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
    if not test_flag.GetTestMode():
      cmd = '%s ls %s' % (gsutil_cmd, gs_package_name)
      status = self._ce.RunCommand(cmd)
    if status != 0:
      raise MissingFile(
          'Cannot find autotest package file: %s.' % package_file_name)

    if self.log_level == 'average':
      self._logger.LogOutput('Preparing to download %s package to local '
                             'directory.' % package_file_name)

    # Make sure the directory for downloading the package exists.
    download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
    package_path = os.path.join(download_path, package_file_name)
    if not os.path.exists(download_path):
      os.makedirs(download_path)

    # Check to see if the package file has already been downloaded.  If not,
    # download it.
    if not os.path.exists(package_path):
      command = '%s cp %s %s' % (gsutil_cmd, gs_package_name, download_path)

      if self.log_level != 'verbose':
        self._logger.LogOutput('CMD: %s' % command)
      status = self._ce.RunCommand(command)
      if status != 0 or not os.path.exists(package_path):
        raise MissingFile('Cannot download package: %s .' % package_path)

  def UncompressSingleAutotestFile(self, chromeos_root, build_id,
                                   package_file_name, uncompress_cmd):
    # Uncompress file
    download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
    command = ('cd %s ; %s %s' % (download_path, uncompress_cmd,
                                  package_file_name))

    if self.log_level != 'verbose':
      self._logger.LogOutput('CMD: %s' % command)
      print('(Uncompressing autotest file %s .)' % package_file_name)
    retval = self._ce.RunCommand(command)
    if retval != 0:
      raise MissingFile('Cannot uncompress file: %s.' % package_file_name)
    # Remove uncompressed downloaded file
    command = ('cd %s ; rm -f %s' % (download_path, package_file_name))
    if self.log_level != 'verbose':
      self._logger.LogOutput('CMD: %s' % command)
      print('(Removing processed autotest file %s .)' % package_file_name)
    # try removing file, its ok to have an error, print if encountered
    retval = self._ce.RunCommand(command)
    if retval != 0:
      print('(Warning: Could not remove file %s .)' % package_file_name)

  def VerifyAutotestFilesExist(self, chromeos_root, build_id, package_file):
    # Quickly verify if the files are there
    status = 0
    gs_package_name = ('gs://chromeos-image-archive/%s/%s' % (build_id,
                                                              package_file))
    gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
    if not test_flag.GetTestMode():
      cmd = '%s ls %s' % (gsutil_cmd, gs_package_name)
      if self.log_level != 'verbose':
        self._logger.LogOutput('CMD: %s' % cmd)
      status = self._ce.RunCommand(cmd)
      if status != 0:
        print('(Warning: Could not find file %s )' % gs_package_name)
        return 1
    # Package exists on server
    return 0

  def DownloadAutotestFiles(self, chromeos_root, build_id):
    # Download autest package files (3 files)
    autotest_packages_name = ('autotest_packages.tar')
    autotest_server_package_name = ('autotest_server_package.tar.bz2')
    autotest_control_files_name = ('control_files.tar')

    download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
    # Autotest directory relative path wrt chroot
    autotest_rel_path = os.path.join('/tmp', build_id, 'autotest_files')
    # Absolute Path to download files
    autotest_path = os.path.join(chromeos_root, 'chroot/tmp', build_id,
                                 'autotest_files')

    if not os.path.exists(autotest_path):
      # Quickly verify if the files are present on server
      # If not, just exit with warning
      status = self.VerifyAutotestFilesExist(chromeos_root, build_id,
                                             autotest_packages_name)
      if status != 0:
        default_autotest_dir = '~/trunk/src/third_party/autotest/files'
        print(
            '(Warning: Could not find autotest packages .)\n'
            '(Warning: Defaulting autotest path to %s .' % default_autotest_dir)
        return default_autotest_dir

      # Files exist on server, download and uncompress them
      self.DownloadSingleAutotestFile(chromeos_root, build_id,
                                      autotest_packages_name)
      self.DownloadSingleAutotestFile(chromeos_root, build_id,
                                      autotest_server_package_name)
      self.DownloadSingleAutotestFile(chromeos_root, build_id,
                                      autotest_control_files_name)

      self.UncompressSingleAutotestFile(chromeos_root, build_id,
                                        autotest_packages_name, 'tar -xvf ')
      self.UncompressSingleAutotestFile(
          chromeos_root, build_id, autotest_server_package_name, 'tar -jxvf ')
      self.UncompressSingleAutotestFile(
          chromeos_root, build_id, autotest_control_files_name, 'tar -xvf ')
      # Rename created autotest directory to autotest_files
      command = ('cd %s ; mv autotest autotest_files' % download_path)
      if self.log_level != 'verbose':
        self._logger.LogOutput('CMD: %s' % command)
        print('(Moving downloaded autotest files to autotest_files)')
      retval = self._ce.RunCommand(command)
      if retval != 0:
        raise MissingFile('Could not create directory autotest_files')

    return autotest_rel_path

  def Run(self, chromeos_root, xbuddy_label, autotest_path):
    build_id = self.GetBuildID(chromeos_root, xbuddy_label)
    image_name = ('gs://chromeos-image-archive/%s/chromiumos_test_image.tar.xz'
                  % build_id)

    # Verify that image exists for build_id, before attempting to
    # download it.
    status = 0
    if not test_flag.GetTestMode():
      gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
      cmd = '%s ls %s' % (gsutil_cmd, image_name)
      status = self._ce.RunCommand(cmd)
    if status != 0:
      raise MissingImage('Cannot find official image: %s.' % image_name)

    image_path = self.DownloadImage(chromeos_root, build_id, image_name)
    self.UncompressImage(chromeos_root, build_id)

    if self.log_level != 'quiet':
      self._logger.LogOutput('Using image from %s.' % image_path)

    if autotest_path == '':
      autotest_path = self.DownloadAutotestFiles(chromeos_root, build_id)

    return image_path, autotest_path