# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import subprocess
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils
from autotest_lib.server import test
from autotest_lib.server import frontend
class moblab_Setup(test.test):
""" Moblab server test that checks for a specified number of
connected DUTs and that those DUTs have specified labels. Used to
verify the setup before kicking off a long running test suite.
"""
version = 1
def run_once(self, required_duts=1, required_labels=[]):
""" Tests the moblab's connected DUTs to see if the current
configuration is valid for a specific test.
@param required_duts [int] number of _live_ DUTs required to run
the test in question. A DUT is not live if it is in a failed
repair state
@param required_labels [list<string>] list of labels that are
required to be on at least one _live_ DUT for this test.
"""
logging.info('required_duts=%d required_labels=%s' %
(required_duts, str(required_labels)))
# creating a client to connect to autotest rpc interface
# all available rpc calls are defined in
# src/third_party/autotest/files/server/frontend.py
afe = frontend.AFE(server='localhost', user='moblab')
# get autotest statuses that indicate a live host
live_statuses = afe.host_statuses(live=True)
hosts = []
# get the hosts connected to autotest, find the live ones
for host in afe.get_hosts():
if host.status in live_statuses:
logging.info('Host %s is live, status %s' %
(host.hostname, host.status))
hosts.append(host)
else:
logging.info('Host %s is not live, status %s' %
(host.hostname, host.status))
# check that we have the required number of live duts
if len(hosts) < required_duts:
raise error.TestFail(('Suite requires %d DUTs, only %d connected' %
(required_duts, len(hosts))))
required_labels_found = {}
for label in required_labels:
required_labels_found[label] = False
# check that at least one DUT has each required label
for host in hosts:
for label in host.get_labels():
if label.name in required_labels_found:
required_labels_found[label.name] = True
# note: pools are stored as specially formatted labels
# to find if a DUT is in a pool,
# check if it has the label pool:mypoolname
for key in required_labels_found:
if not required_labels_found[key]:
raise error.TestFail('No DUT with required label %s' % key)
return
# to have autotest reverify that hosts are live, use the reverify_hosts
# rpc call
# note: this schedules a background asynchronous job, and
# logic to check back in on hosts would need to be built
# reverify_hostnames = [host.hostname for host in hosts]
# afe.reverify_hosts(hostnames=reverify_hostnames)
# example of running a command on the dut and getting the output back
# def run_ssh_command_on_dut(hostname, cmd):
# """ Run a command on a DUT via ssh
#
# @return output of the command
# @raises subprocess.CalledProcessError if the ssh command fails,
# such as a connection couldn't be established
# """
# ssh_cmd = ('ssh -o ConnectTimeout=2 -o StrictHostKeyChecking=no '
# "root@%s '%s'") % (hostname, cmd)
# return subprocess.check_output(ssh_cmd, shell=True)
# for host in hosts:
# logging.info(run_ssh_command_on_dut(
# host.hostname, 'cat /etc/lsb-release'))