# Copyright (c) 2014-2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Download images from Cloud Storage."""
from __future__ import print_function
import ast
import os
import test_flag
from cros_utils import command_executer
GS_UTIL = 'chromium/tools/depot_tools/gsutil.py'
class MissingImage(Exception):
"""Raised when the requested image does not exist in gs://"""
class MissingFile(Exception):
"""Raised when the requested file does not exist in gs://"""
class RunCommandExceptionHandler(object):
"""Handle Exceptions from calls to RunCommand"""
def __init__(self, logger_to_use, log_level, cmd_exec, command):
self.logger = logger_to_use
self.log_level = log_level
self.ce = cmd_exec
self.cleanup_command = command
def HandleException(self, _, e):
# Exception handler, Run specified command
if self.log_level != 'verbose' and self.cleanup_command is not None:
self.logger.LogOutput('CMD: %s' % self.cleanup_command)
if self.cleanup_command is not None:
_ = self.ce.RunCommand(self.cleanup_command)
# Raise exception again
raise e
class ImageDownloader(object):
"""Download images from Cloud Storage."""
def __init__(self, logger_to_use=None, log_level='verbose', cmd_exec=None):
self._logger = logger_to_use
self.log_level = log_level
self._ce = cmd_exec or command_executer.GetCommandExecuter(
self._logger, log_level=self.log_level)
def GetBuildID(self, chromeos_root, xbuddy_label):
# Get the translation of the xbuddy_label into the real Google Storage
# image name.
command = ('cd ~/trunk/src/third_party/toolchain-utils/crosperf; '
"python translate_xbuddy.py '%s'" % xbuddy_label)
_, build_id_tuple_str, _ = self._ce.ChrootRunCommandWOutput(
chromeos_root, command)
if not build_id_tuple_str:
raise MissingImage("Unable to find image for '%s'" % xbuddy_label)
build_id_tuple = ast.literal_eval(build_id_tuple_str)
build_id = build_id_tuple[0]
return build_id
def DownloadImage(self, chromeos_root, build_id, image_name):
if self.log_level == 'average':
self._logger.LogOutput('Preparing to download %s image to local '
'directory.' % build_id)
# Make sure the directory for downloading the image exists.
download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
image_path = os.path.join(download_path, 'chromiumos_test_image.bin')
if not os.path.exists(download_path):
os.makedirs(download_path)
# Check to see if the image has already been downloaded. If not,
# download the image.
if not os.path.exists(image_path):
gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
command = '%s cp %s %s' % (gsutil_cmd, image_name, download_path)
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % command)
status = self._ce.RunCommand(command)
downloaded_image_name = os.path.join(download_path,
'chromiumos_test_image.tar.xz')
if status != 0 or not os.path.exists(downloaded_image_name):
raise MissingImage('Cannot download image: %s.' % downloaded_image_name)
return image_path
def UncompressImage(self, chromeos_root, build_id):
# Check to see if the file has already been uncompresssed, etc.
if os.path.exists(
os.path.join(chromeos_root, 'chroot/tmp', build_id,
'chromiumos_test_image.bin')):
return
# Uncompress and untar the downloaded image.
download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
command = ('cd %s ; tar -Jxf chromiumos_test_image.tar.xz ' % download_path)
# Cleanup command for exception handler
clean_cmd = ('cd %s ; rm -f chromiumos_test_image.bin ' % download_path)
exception_handler = RunCommandExceptionHandler(self._logger, self.log_level,
self._ce, clean_cmd)
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % command)
print('(Uncompressing and un-tarring may take a couple of minutes...'
'please be patient.)')
retval = self._ce.RunCommand(
command, except_handler=exception_handler.HandleException)
if retval != 0:
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % clean_cmd)
print('(Removing file chromiumos_test_image.bin.)')
# Remove partially uncompressed file
_ = self._ce.RunCommand(clean_cmd)
# Raise exception for failure to uncompress
raise MissingImage('Cannot uncompress image: %s.' % build_id)
# Remove compressed image
command = ('cd %s ; rm -f chromiumos_test_image.tar.xz; ' % download_path)
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % command)
print('(Removing file chromiumos_test_image.tar.xz.)')
# try removing file, its ok to have an error, print if encountered
retval = self._ce.RunCommand(command)
if retval != 0:
print('(Warning: Could not remove file chromiumos_test_image.tar.xz .)')
def DownloadSingleAutotestFile(self, chromeos_root, build_id,
package_file_name):
# Verify if package files exist
status = 0
gs_package_name = ('gs://chromeos-image-archive/%s/%s' %
(build_id, package_file_name))
gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
if not test_flag.GetTestMode():
cmd = '%s ls %s' % (gsutil_cmd, gs_package_name)
status = self._ce.RunCommand(cmd)
if status != 0:
raise MissingFile(
'Cannot find autotest package file: %s.' % package_file_name)
if self.log_level == 'average':
self._logger.LogOutput('Preparing to download %s package to local '
'directory.' % package_file_name)
# Make sure the directory for downloading the package exists.
download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
package_path = os.path.join(download_path, package_file_name)
if not os.path.exists(download_path):
os.makedirs(download_path)
# Check to see if the package file has already been downloaded. If not,
# download it.
if not os.path.exists(package_path):
command = '%s cp %s %s' % (gsutil_cmd, gs_package_name, download_path)
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % command)
status = self._ce.RunCommand(command)
if status != 0 or not os.path.exists(package_path):
raise MissingFile('Cannot download package: %s .' % package_path)
def UncompressSingleAutotestFile(self, chromeos_root, build_id,
package_file_name, uncompress_cmd):
# Uncompress file
download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
command = ('cd %s ; %s %s' % (download_path, uncompress_cmd,
package_file_name))
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % command)
print('(Uncompressing autotest file %s .)' % package_file_name)
retval = self._ce.RunCommand(command)
if retval != 0:
raise MissingFile('Cannot uncompress file: %s.' % package_file_name)
# Remove uncompressed downloaded file
command = ('cd %s ; rm -f %s' % (download_path, package_file_name))
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % command)
print('(Removing processed autotest file %s .)' % package_file_name)
# try removing file, its ok to have an error, print if encountered
retval = self._ce.RunCommand(command)
if retval != 0:
print('(Warning: Could not remove file %s .)' % package_file_name)
def VerifyAutotestFilesExist(self, chromeos_root, build_id, package_file):
# Quickly verify if the files are there
status = 0
gs_package_name = ('gs://chromeos-image-archive/%s/%s' % (build_id,
package_file))
gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
if not test_flag.GetTestMode():
cmd = '%s ls %s' % (gsutil_cmd, gs_package_name)
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % cmd)
status = self._ce.RunCommand(cmd)
if status != 0:
print('(Warning: Could not find file %s )' % gs_package_name)
return 1
# Package exists on server
return 0
def DownloadAutotestFiles(self, chromeos_root, build_id):
# Download autest package files (3 files)
autotest_packages_name = ('autotest_packages.tar')
autotest_server_package_name = ('autotest_server_package.tar.bz2')
autotest_control_files_name = ('control_files.tar')
download_path = os.path.join(chromeos_root, 'chroot/tmp', build_id)
# Autotest directory relative path wrt chroot
autotest_rel_path = os.path.join('/tmp', build_id, 'autotest_files')
# Absolute Path to download files
autotest_path = os.path.join(chromeos_root, 'chroot/tmp', build_id,
'autotest_files')
if not os.path.exists(autotest_path):
# Quickly verify if the files are present on server
# If not, just exit with warning
status = self.VerifyAutotestFilesExist(chromeos_root, build_id,
autotest_packages_name)
if status != 0:
default_autotest_dir = '~/trunk/src/third_party/autotest/files'
print(
'(Warning: Could not find autotest packages .)\n'
'(Warning: Defaulting autotest path to %s .' % default_autotest_dir)
return default_autotest_dir
# Files exist on server, download and uncompress them
self.DownloadSingleAutotestFile(chromeos_root, build_id,
autotest_packages_name)
self.DownloadSingleAutotestFile(chromeos_root, build_id,
autotest_server_package_name)
self.DownloadSingleAutotestFile(chromeos_root, build_id,
autotest_control_files_name)
self.UncompressSingleAutotestFile(chromeos_root, build_id,
autotest_packages_name, 'tar -xvf ')
self.UncompressSingleAutotestFile(
chromeos_root, build_id, autotest_server_package_name, 'tar -jxvf ')
self.UncompressSingleAutotestFile(
chromeos_root, build_id, autotest_control_files_name, 'tar -xvf ')
# Rename created autotest directory to autotest_files
command = ('cd %s ; mv autotest autotest_files' % download_path)
if self.log_level != 'verbose':
self._logger.LogOutput('CMD: %s' % command)
print('(Moving downloaded autotest files to autotest_files)')
retval = self._ce.RunCommand(command)
if retval != 0:
raise MissingFile('Could not create directory autotest_files')
return autotest_rel_path
def Run(self, chromeos_root, xbuddy_label, autotest_path):
build_id = self.GetBuildID(chromeos_root, xbuddy_label)
image_name = ('gs://chromeos-image-archive/%s/chromiumos_test_image.tar.xz'
% build_id)
# Verify that image exists for build_id, before attempting to
# download it.
status = 0
if not test_flag.GetTestMode():
gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
cmd = '%s ls %s' % (gsutil_cmd, image_name)
status = self._ce.RunCommand(cmd)
if status != 0:
raise MissingImage('Cannot find official image: %s.' % image_name)
image_path = self.DownloadImage(chromeos_root, build_id, image_name)
self.UncompressImage(chromeos_root, build_id)
if self.log_level != 'quiet':
self._logger.LogOutput('Using image from %s.' % image_path)
if autotest_path == '':
autotest_path = self.DownloadAutotestFiles(chromeos_root, build_id)
return image_path, autotest_path