# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import subprocess from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import utils from autotest_lib.server import test from autotest_lib.server import frontend class moblab_Setup(test.test): """ Moblab server test that checks for a specified number of connected DUTs and that those DUTs have specified labels. Used to verify the setup before kicking off a long running test suite. """ version = 1 def run_once(self, required_duts=1, required_labels=[]): """ Tests the moblab's connected DUTs to see if the current configuration is valid for a specific test. @param required_duts [int] number of _live_ DUTs required to run the test in question. A DUT is not live if it is in a failed repair state @param required_labels [list<string>] list of labels that are required to be on at least one _live_ DUT for this test. """ logging.info('required_duts=%d required_labels=%s' % (required_duts, str(required_labels))) # creating a client to connect to autotest rpc interface # all available rpc calls are defined in # src/third_party/autotest/files/server/frontend.py afe = frontend.AFE(server='localhost', user='moblab') # get autotest statuses that indicate a live host live_statuses = afe.host_statuses(live=True) hosts = [] # get the hosts connected to autotest, find the live ones for host in afe.get_hosts(): if host.status in live_statuses: logging.info('Host %s is live, status %s' % (host.hostname, host.status)) hosts.append(host) else: logging.info('Host %s is not live, status %s' % (host.hostname, host.status)) # check that we have the required number of live duts if len(hosts) < required_duts: raise error.TestFail(('Suite requires %d DUTs, only %d connected' % (required_duts, len(hosts)))) required_labels_found = {} for label in required_labels: required_labels_found[label] = False # check that at least one DUT has each required label for host in hosts: for label in host.get_labels(): if label.name in required_labels_found: required_labels_found[label.name] = True # note: pools are stored as specially formatted labels # to find if a DUT is in a pool, # check if it has the label pool:mypoolname for key in required_labels_found: if not required_labels_found[key]: raise error.TestFail('No DUT with required label %s' % key) return # to have autotest reverify that hosts are live, use the reverify_hosts # rpc call # note: this schedules a background asynchronous job, and # logic to check back in on hosts would need to be built # reverify_hostnames = [host.hostname for host in hosts] # afe.reverify_hosts(hostnames=reverify_hostnames) # example of running a command on the dut and getting the output back # def run_ssh_command_on_dut(hostname, cmd): # """ Run a command on a DUT via ssh # # @return output of the command # @raises subprocess.CalledProcessError if the ssh command fails, # such as a connection couldn't be established # """ # ssh_cmd = ('ssh -o ConnectTimeout=2 -o StrictHostKeyChecking=no ' # "root@%s '%s'") % (hostname, cmd) # return subprocess.check_output(ssh_cmd, shell=True) # for host in hosts: # logging.info(run_ssh_command_on_dut( # host.hostname, 'cat /etc/lsb-release'))