#!/usr/bin/python # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Orchestrate virtual machines to setup a toy instance of the lab for testing. This module is meant to help create a closed loop development flow for members of the lab team which looks something like this: ______________ | | |gs vm resistry|<+ |______________| | | | v | New change -> puppylab -> New core_cluster box | Vagrantfile specifies cluster settings _________________|____________________ | | | puppet provisions core_cluster box | |______________________________________| | | ........... | v v v master shard1 shardn | | | | | | mysql afe tko heartbt tko heartbt | | | | | | host ports 8001 8002 8001 8002 8001 8002 [host ports liable to autocorrect as needed] This module can work with any vm hosting service/provider as long as they adhere to the vagrant interface. VirtualBox is the only implementation so far, though GCE will be an ideal candidate. Class spec: * VagrantProvisioner: Provision boxes per a VagrantFile. * VirtualBoxProvisioner: Generate a Virtualbox VagrantFile. * CoreVM: Manage individual core_cluster vms. * ClusterManager: Spin up cluster. Usage: clusterctl --admin-repo /usr/local/autotest/chromeos-internal """ import argparse import logging import os import sys import common from autotest_lib.puppylab import lab_manifest from autotest_lib.puppylab import vm_manager from autotest_lib.site_utils.lib import infra # TODO: Enable multiple shards via command line args. NUM_SHARDS = 1 SHADOW_PATH = '/usr/local/autotest/shadow_config.ini' class ConfigError(Exception): """Raised if one of the vms in the cluster is misconfigured.""" class CoreVM(object): """Interface to create and manage a core_cluster vm image. A core_cluster vm image has base packages shared by all server roles. """ _core_vm_name = 'chromeos_lab_core_cluster' _core_image_source = 'gs://vms/%s.box' % _core_vm_name _core_image_name = '%s.box' % _core_vm_name _core_image_destination = os.path.join( vm_manager.VAGRANT_DIR, _core_image_name) # TODO: Preperation is currently by hand. Use the provisioner to # create a box of name '_core_image_name', with the CoreClusterTemplate # in the VAGRANT_DIR if you wish to prepare a new vm. You can achieve # this by: # * Copying the CoreClusterTemplate to a Vagrantfile and replacing the # modulepath with the path to you chromeos-admin/puppet directory. # * Calling `vagrant up` in the directory with this vagrant file. # * When it's done, calling vagrant package. # This should produce a package.box in the same dir. def __init__(self, provisioner): self.provisioner = provisioner def setup_core_box(self): """Setup a core cluster vm. Download a core_cluster image if one isn't present on disk and register it with vagrant. """ if not os.path.exists(self._core_image_destination): infra.execute_command( 'localhost', 'gsutil cp %s %s' % (self._core_image_source, self._core_image_destination)) self.provisioner.register_box( self._core_image_destination, self._core_vm_name) def teardown_core_box(self): """Teardown a core cluster vm.""" # TODO: delete the box file. self.provisioner.unregister_box(self._core_vm_name) class ClusterManager(object): """Interface to spin up a cluster of CoreVMs. This class manages all the details between creating a core_cluster image and running tests on a full fledged cluster. """ def _register_shards(self, num_shards): """Register num_shards worth of shard info. This includes the name, port address and board of the new shard. This information is piped through to each vm, so the cluster manager is actually in control of all the shards in the cluster and can address them by name. Consider a shard, shard1, assigned to board stumpy: * You will be able to ssh into it with 'vagrant ssh stumpyshard'. * The afe for the shard will be running on a incrementally designated port starting from shards_base_port. * The afe port of the shard is piped through to the shadow_config. This is required for 2 reasons: # `cli/atest shard add` should use this name, because it is the name the shard-client will use to request jobs. # the master afe should show links to the shard using this name. @param num_shards: The number of shards we wish to add to the cluster. """ self.vagrantfile_shard_args = {} self.shard_board_map = {} self.vagrant_shard_names = [] for num in range(1, num_shards+1): # The name to use for vagrant ssh shard_name = 'shard%s' % num # The port for the shard's afe shard_port = lab_manifest.shards_base_port + num # The hostname to use in the shadow_config of the shard shard_hostname = '%s:%s' % (lab_manifest.vm_host_name, shard_port) self.vagrantfile_shard_args.update({ shard_name: shard_name, '%s_shadow_config_hostname' % shard_name: shard_hostname, '%s_port' % shard_name: shard_port, }) if lab_manifest.shards: board = lab_manifest.shards.pop() # Assign a board to a shard. Use the shard_hostname as this # settings is not meant to be human understandable. self.shard_board_map[shard_hostname] = board vagrant_shard_name = '%sshard' % board.rsplit(':')[-1] # Replace the shard<int>-type-name with board_shard self.vagrantfile_shard_args[shard_name] = vagrant_shard_name self.vagrant_shard_names.append(vagrant_shard_name) def __init__(self, vm_provisioner, vagrant_master_name='master', num_shards=1): """Initialize parameters for the cluster. @param vm_provisioner: A provisioner object, currently the only one supported is VirtualBox. @param master_name: The name to give the cluster master. @param num_shards: The number of shards in the cluster. Each shard gets a name allocated based on its number (eg: shard1). """ self.provisioner = vm_provisioner self.vm_manager = CoreVM(provisioner=self.provisioner) self._register_shards(num_shards) self.vagrant_master_name = vagrant_master_name def start_cluster(self): """Start a cluster.""" self.vm_manager.setup_core_box() # TODO: Add a --rebuild-cluster option. needs_destroy = self.provisioner.initialize_vagrant( master=self.vagrant_master_name, master_port=lab_manifest.master_afe_port, **self.vagrantfile_shard_args) self.provisioner.provision(needs_destroy) def shutdown_cluster(self): """Shutdown the current cluster.""" # TODO: Actually destroy. Halt is useful for debugging. self.provisioner.vagrant_cmd('halt') def execute_against_vm(self, vm_name, cmd): """Execute cmd against vm_name. @param cmd: The command to execute. @param vm_name: The name of the vm, eg: stumpyshard. """ return self.provisioner.vagrant_cmd( "ssh %s -- '%s'" % (vm_name, cmd)).rstrip('\n') def _get_shadow_config_value(self, vm_name, key): cmd = 'grep "^%s:" %s' % (key, SHADOW_PATH) shadow_value = self.execute_against_vm(vm_name, cmd) return shadow_value.rsplit(':')[-1].lstrip(' ') def _check_shadow_config(self, vm, key, expected_value): """Sanity check the shadow_configs of all vms in the cluster. @raises ConfigError: If a shadow_config is misconfigured. """ value = self._get_shadow_config_value(vm, key) if value != expected_value: raise ConfigError( '%s vm has misconfigued config %s = %s, expected %s' % (vm, key, value, expected_value)) logging.info('%s has %s = %s', vm, key, value) def _upstart_cmd(self, vm, job_name, cmd='status'): """Execute an upstart command. @param vm: The name of the vm to execute it against. @param job_name: The name of the upstart job. @param cmd: The upstart command. @return: The output of the upstart command. """ status_cmd = 'sudo %s %s' % (cmd, job_name) try: return self.execute_against_vm(vm, status_cmd) except vm_manager.VagrantCmdError as e: return '%s service not found on %s' % (job_name, vm) def check_services(self, action='start'): """Get the status of all core services on the vms. This method is designed to start srevices on the master/all shards if their shadow configs are as expected. If the shadow config option on a vm has an unexpected setting, services are not started on it. @param action: The action to perform on servcies. Start will start all of them, stop will stop them all. @raises ConfigError: If a shadow_config option is unexpected. """ core_services = set( ['scheduler', 'host-scheduler', 'gs_offloader', 'gs_offloader_s', 'shard-client']) gateway = self.execute_against_vm( self.vagrant_master_name, "netstat -rn | grep \"^0.0.0.0 \" | cut -d \" \" -f10 | head -1" ).rstrip('\n') for vm in self.vagrant_shard_names + [self.vagrant_master_name]: vm_manager.format_msg('Checking services on %s' % vm) self._check_shadow_config(vm, 'host', 'localhost') global_db = ('localhost' if vm == self.vagrant_master_name else gateway) self._check_shadow_config(vm, 'global_db_host', global_db) for service in core_services: logging.info('Checking %s on %s', service, vm) status = self._upstart_cmd(vm, service, action) logging.info(status) def bringup_cluster(admin_repo, num_shards=NUM_SHARDS, start_safe=False): """Start a cluster. @param admin_repo: Path to the chromeos-admin repo. @param num_shards: Number of shards. You cannot change the number of shards on a running cluster, you need to destroy the cluster, remove the vagrant file, modify the ClusterTemplate to include a new section for the additional shard, and rerun clusterctl. @param start_safe: Start the cluster in safe mode. This means all core services will be stopped. """ puppet_path = os.path.join(admin_repo, 'puppet') if not os.path.exists(puppet_path): raise ValueError('Admin repo %s does not contain puppet module' % admin_repo) cluster_manager = ClusterManager( vm_provisioner=vm_manager.VirtualBox(puppet_path=puppet_path), vagrant_master_name='master', num_shards=num_shards) cluster_manager.start_cluster() try: cluster_manager.check_services(action='stop' if start_safe else 'start') except ConfigError as e: logging.error( 'Shutting down cluster: %s', e) cluster_manager.shutdown_cluster() return 1 def sync(): """Sync autotest from the host to all vms in the cluster.""" vm_manager.format_msg('Syncing Cluster') vm_manager.VagrantProvisioner.vagrant_cmd('rsync', stream_output=True) vm_manager.VagrantProvisioner.vagrant_cmd( 'provision --provision-with shell', stream_output=True) vm_manager.format_msg('Please restart services as required') def _parse_args(args): """Parse command line arguments. @param args: A list of command line arguments, eg sys.argv[1:] @return: A tuple with the parsed args, as returned by parser.parse_args. """ if not args: print ('Too few arguments, try clusterctl --help') sys.exit(1) description = ('A script to orchestrate a toy test lab. Provided ' 'with a path to the internal repo it will download a ' 'vm image and spin up a cluster against which you can ' 'test core autotest changes without DUTs.') parser = argparse.ArgumentParser(description=description) subparsers = parser.add_subparsers() provision_subparser = subparsers.add_parser( 'provision', help='provision a cluster') provision_subparser.required = False provision_subparser.set_defaults(which='provision') provision_subparser.add_argument( '--admin-repo', dest='admin_repo', type=str, help=('Path to the admin repo that has puppet scripts used for ' 'provisioning the cluster. If you do not already have it you ' 'can git clone the chromeos/chromeos-admin repo.')) provision_subparser.add_argument( '--safe', dest='start_safe', action='store_true', help='If sepcified services will not be started automatically.') # TODO: Automate restart of services via a --restart option. update_subparser = subparsers.add_parser('update', help='Update a cluster') update_subparser.required = False update_subparser.set_defaults(which='update') update_subparser.add_argument( '--sync', dest='sync', action='store_true', help='Sync autotest from host to all vms in cluster.') return parser.parse_args(args) def main(args): """Main function. @param args: command line arguments for the script. """ args = _parse_args(args) if args.which == 'update' and args.sync: sync() else: bringup_cluster( admin_repo=args.admin_repo, start_safe=args.start_safe) if __name__ == '__main__': sys.exit(main(sys.argv[1:]))