#!/usr/bin/env python2
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Load generator for devserver.
Example usage:
# Find DUTs in suites pool to test with:
atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready
# Lock DUTs:
atest host mod -l -r 'quick provision testing' DUT1 DUT2
# Create config file with DUTs to test and builds to use.
cat >config.json <<EOD
{
"BOARD": {
"duts": [
"chromeosX-rowY-rackZ-hostA",
"chromeosX-rowY-rackZ-hostB",
],
"versions": [
"auron_paine-paladin/R65-10208.0.0-rc2",
"auron_paine-paladin/R65-10208.0.0-rc3",
"auron_paine-paladin/R65-10209.0.0-rc1"
]
},
}
EOD
# Do 100 total provisions, aiming to have 10 active simultaneously.
loadtest.py $DS config.json --simultaneous 10 --total 100
# Unlock DUTs:
atest host mod -u DUT1 DUT2
"""
import collections
import datetime
import json
import random
import re
import signal
import subprocess
import sys
import time
import common
from autotest_lib.client.common_lib import time_utils
from autotest_lib.client.common_lib.cros import dev_server
from chromite.lib import commandline
from chromite.lib import cros_logging as logging
from chromite.lib import locking
from chromite.lib import parallel
# Paylods to stage.
PAYLOADS = ['quick_provision', 'stateful']
# Number of seconds between full status checks.
STATUS_POLL_SECONDS = 2
# Number of successes/failures to blacklist a DUT.
BLACKLIST_CONSECUTIVE_FAILURE = 2
BLACKLIST_TOTAL_SUCCESS = 0
BLACKLIST_TOTAL_FAILURE = 5
def get_parser():
"""Creates the argparse parser."""
parser = commandline.ArgumentParser(description=__doc__)
parser.add_argument('server', type=str, action='store',
help='Devserver to load test.')
parser.add_argument('config', type=str, action='store',
help='Path to JSON config file.'
'Config file is indexed by board with keys of '
'"duts" and "versions", each a list.')
parser.add_argument('--blacklist-consecutive', '-C', type=int,
action='store',
help=('Consecutive number of failures before '
'blacklisting DUT (default %d).') %
BLACKLIST_CONSECUTIVE_FAILURE,
default=BLACKLIST_CONSECUTIVE_FAILURE)
parser.add_argument('--blacklist-success', '-S', type=int, action='store',
help=('Total number of successes before blacklisting '
'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS,
default=BLACKLIST_TOTAL_SUCCESS)
parser.add_argument('--blacklist-total', '-T', type=int, action='store',
help=('Total number of failures before blacklisting '
'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE,
default=BLACKLIST_TOTAL_FAILURE)
parser.add_argument('--boards', '-b', type=str, action='store',
help='Comma-separated list of boards to provision.')
parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun',
help='Do not attempt to provision.')
parser.add_argument('--duts', '-d', type=str, action='store',
help='Comma-separated list of duts to provision.')
parser.add_argument('--outputlog', '-l', type=str, action='store',
help='Path to append JSON entries to.')
parser.add_argument('--output', '-o', type=str, action='store',
help='Path to write JSON file to.')
parser.add_argument('--ping', '-p', action='store_true',
help='Ping DUTs and blacklist unresponsive ones.')
parser.add_argument('--simultaneous', '-s', type=int, action='store',
help='Number of simultaneous provisions to run.',
default=1)
parser.add_argument('--no-stage', action='store_false',
dest='stage', default=True,
help='Do not attempt to stage builds.')
parser.add_argument('--total', '-t', type=int, action='store',
help='Number of total provisions to run.',
default=0)
return parser
def make_entry(entry_id, name, status, start_time,
finish_time=None, parent=None, **kwargs):
"""Generate an event log entry to be stored in Cloud Datastore.
@param entry_id: A (Kind, id) tuple representing the key.
@param name: A string identifying the event
@param status: A string identifying the status of the event.
@param start_time: A datetime of the start of the event.
@param finish_time: A datetime of the finish of the event.
@param parent: A (Kind, id) tuple representing the parent key.
@return A dictionary representing the entry suitable for dumping via JSON.
"""
entry = {
'id': entry_id,
'name': name,
'status': status,
'start_time': time_utils.to_epoch_time(start_time),
}
if finish_time is not None:
entry['finish_time'] = time_utils.to_epoch_time(finish_time)
if parent is not None:
entry['parent'] = parent
return entry
class Job(object):
"""Tracks a single provision job."""
def __init__(self, ds, host_name, build_name,
entry_id=0, parent=None, board=None,
start_active=0,
force_update=False, full_update=False,
clobber_stateful=True, quick_provision=True,
ping=False, dryrun=False):
self.ds = ds
self.host_name = host_name
self.build_name = build_name
self.entry_id = ('Job', entry_id)
self.parent = parent
self.board = board
self.start_active = start_active
self.end_active = None
self.check_active_sum = 0
self.check_active_count = 0
self.start_time = datetime.datetime.now()
self.finish_time = None
self.trigger_response = None
self.ping = ping
self.pre_ping = None
self.post_ping = None
self.kwargs = {
'host_name': host_name,
'build_name': build_name,
'force_update': force_update,
'full_update': full_update,
'clobber_stateful': clobber_stateful,
'quick_provision': quick_provision,
}
if dryrun:
self.finish_time = datetime.datetime.now()
self.raised_error = None
self.success = True
self.pid = 0
else:
if self.ping:
self.pre_ping = ping_dut(self.host_name)
self.trigger_response = ds._trigger_auto_update(**self.kwargs)
def as_entry(self):
"""Generate an entry for exporting to datastore."""
entry = make_entry(self.entry_id, self.host_name,
'pass' if self.success else 'fail',
self.start_time, self.finish_time, self.parent)
entry.update({
'build_name': self.build_name,
'board': self.board,
'devserver': self.ds.hostname,
'start_active': self.start_active,
'end_active': self.end_active,
'force_update': self.kwargs['force_update'],
'full_update': self.kwargs['full_update'],
'clobber_stateful': self.kwargs['clobber_stateful'],
'quick_provision': self.kwargs['quick_provision'],
'elapsed': int(self.elapsed().total_seconds()),
'trigger_response': self.trigger_response,
'pre_ping': self.pre_ping,
'post_ping': self.post_ping,
})
if self.check_active_count:
entry['avg_active'] = (self.check_active_sum /
self.check_active_count)
return entry
def check(self, active_count):
"""Checks if a job has completed.
@param active_count: Number of active provisions at time of the check.
@return: True if the job has completed, False otherwise.
"""
if self.finish_time is not None:
return True
self.check_active_sum += active_count
self.check_active_count += 1
finished, raised_error, pid = self.ds.check_for_auto_update_finished(
self.trigger_response, wait=False, **self.kwargs)
if finished:
self.finish_time = datetime.datetime.now()
self.raised_error = raised_error
self.success = self.raised_error is None
self.pid = pid
self.end_active = active_count
if self.ping:
self.post_ping = ping_dut(self.host_name)
return finished
def elapsed(self):
"""Determine the elapsed time of the task."""
finish_time = self.finish_time or datetime.datetime.now()
return finish_time - self.start_time
class Runner(object):
"""Parallel provision load test runner."""
def __init__(self, ds, duts, config, simultaneous=1, total=0,
outputlog=None, ping=False, blacklist_consecutive=None,
blacklist_success=None, blacklist_total=None, dryrun=False):
self.ds = ds
self.duts = duts
self.config = config
self.start_time = datetime.datetime.now()
self.finish_time = None
self.simultaneous = simultaneous
self.total = total
self.outputlog = outputlog
self.ping = ping
self.blacklist_consecutive = blacklist_consecutive
self.blacklist_success = blacklist_success
self.blacklist_total = blacklist_total
self.dryrun = dryrun
self.active = []
self.started = 0
self.completed = []
# Track DUTs which have failed multiple times.
self.dut_blacklist = set()
# Track versions of each DUT to provision in order.
self.last_versions = {}
# id for the parent entry.
# TODO: This isn't the most unique.
self.entry_id = ('Runner',
int(time_utils.to_epoch_time(datetime.datetime.now())))
# ids for the job entries.
self.next_id = 0
if self.outputlog:
dump_entries_as_json([self.as_entry()], self.outputlog)
def signal_handler(self, signum, frame):
"""Signal handle to dump current status."""
logging.info('Received signal %s', signum)
if signum == signal.SIGUSR1:
now = datetime.datetime.now()
logging.info('%d active provisions, %d completed provisions, '
'%s elapsed:',
len(self.active), len(self.completed),
now - self.start_time)
for job in self.active:
logging.info(' %s -> %s, %s elapsed',
job.host_name, job.build_name,
now - job.start_time)
def as_entry(self):
"""Generate an entry for exporting to datastore."""
entry = make_entry(self.entry_id, 'Runner', 'pass',
self.start_time, self.finish_time)
entry.update({
'devserver': self.ds.hostname,
})
return entry
def get_completed_entries(self):
"""Retrieves all completed jobs as entries for datastore."""
entries = [self.as_entry()]
entries.extend([job.as_entry() for job in self.completed])
return entries
def get_next_id(self):
"""Get the next Job id."""
entry_id = self.next_id
self.next_id += 1
return entry_id
def spawn(self, host_name, build_name):
"""Spawn a single provision job."""
job = Job(self.ds, host_name, build_name,
entry_id=self.get_next_id(), parent=self.entry_id,
board=self.get_dut_board_type(host_name),
start_active=len(self.active), ping=self.ping,
dryrun=self.dryrun)
self.active.append(job)
logging.info('Provision (%d) of %s to %s started',
job.entry_id[1], job.host_name, job.build_name)
self.last_versions[host_name] = build_name
self.started += 1
def replenish(self):
"""Replenish the number of active provisions to match goals."""
while ((self.simultaneous == 0 or
len(self.active) < self.simultaneous) and
(self.total == 0 or self.started < self.total)):
host_name = self.find_idle_dut()
if host_name:
build_name = self.find_build_for_dut(host_name)
self.spawn(host_name, build_name)
elif self.simultaneous:
logging.warn('Insufficient DUTs to satisfy goal')
return False
else:
return len(self.active) > 0
return True
def check_all(self):
"""Check the status of outstanding provisions."""
still_active = []
for job in self.active:
if job.check(len(self.active)):
logging.info('Provision (%d) of %s to %s %s in %s: %s',
job.entry_id[1], job.host_name, job.build_name,
'completed' if job.success else 'failed',
job.elapsed(), job.raised_error)
entry = job.as_entry()
logging.debug(json.dumps(entry))
if self.outputlog:
dump_entries_as_json([entry], self.outputlog)
self.completed.append(job)
if self.should_blacklist(job.host_name):
logging.error('Blacklisting DUT %s', job.host_name)
self.dut_blacklist.add(job.host_name)
else:
still_active.append(job)
self.active = still_active
def should_blacklist(self, host_name):
"""Determines if a given DUT should be blacklisted."""
jobs = [job for job in self.completed if job.host_name == host_name]
total = 0
consecutive = 0
successes = 0
for job in jobs:
if not job.success:
total += 1
consecutive += 1
if ((self.blacklist_total is not None and
total >= self.blacklist_total) or
(self.blacklist_consecutive is not None and
consecutive >= self.blacklist_consecutive)):
return True
else:
successes += 1
if (self.blacklist_success is not None and
successes >= self.blacklist_success):
return True
consecutive = 0
return False
def find_idle_dut(self):
"""Find an idle DUT to provision.."""
active_duts = {job.host_name for job in self.active}
idle_duts = [d for d in self.duts
if d not in active_duts | self.dut_blacklist]
return random.choice(idle_duts) if len(idle_duts) else None
def get_dut_board_type(self, host_name):
"""Determine the board type of a DUT."""
return self.duts[host_name]
def get_board_versions(self, board):
"""Determine the versions to provision for a board."""
return self.config[board]['versions']
def find_build_for_dut(self, host_name):
"""Determine a build to provision on a DUT."""
board = self.get_dut_board_type(host_name)
versions = self.get_board_versions(board)
last_version = self.last_versions.get(host_name)
try:
last_index = versions.index(last_version)
except ValueError:
return versions[0]
return versions[(last_index + 1) % len(versions)]
def stage(self, build):
"""Stage artifacts for a given build."""
logging.debug('Staging %s', build)
self.ds.stage_artifacts(build, PAYLOADS)
def stage_all(self):
"""Stage all necessary artifacts."""
boards = set(self.duts.values())
logging.info('Staging for %d boards', len(boards))
funcs = []
for board in boards:
for build in self.get_board_versions(board):
funcs.append(lambda build_=build: self.stage(build_))
parallel.RunParallelSteps(funcs)
def loop(self):
"""Run the main provision loop."""
# Install a signal handler for status updates.
old_handler = signal.signal(signal.SIGUSR1, self.signal_handler)
signal.siginterrupt(signal.SIGUSR1, False)
try:
while True:
self.check_all()
if self.total != 0 and len(self.completed) >= self.total:
break
if not self.replenish() and len(self.active) == 0:
logging.error('Unable to replenish with no active '
'provisions')
return False
logging.debug('%d provisions active', len(self.active))
time.sleep(STATUS_POLL_SECONDS)
return True
except KeyboardInterrupt:
return False
finally:
self.finish_time = datetime.datetime.now()
# Clean up signal handler.
signal.signal(signal.SIGUSR1, old_handler)
def elapsed(self):
"""Determine the elapsed time of the task."""
finish_time = self.finish_time or datetime.datetime.now()
return finish_time - self.start_time
def dump_entries_as_json(entries, output_file):
"""Dump event log entries as json to a file.
@param entries: A list of event log entries to dump.
@param output_file: The file to write to.
"""
# Write the entries out as JSON.
logging.debug('Dumping %d entries' % len(entries))
for e in entries:
json.dump(e, output_file, sort_keys=True)
output_file.write('\n')
output_file.flush()
def ping_dut(hostname):
"""Checks if a host is responsive to pings."""
if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname):
hostname += '.cros'
response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname],
stdout=subprocess.PIPE)
return response == 0
def main(argv):
"""Load generator for a devserver."""
parser = get_parser()
options = parser.parse_args(argv)
# Parse devserver.
if options.server:
if re.match(r'^https?://', options.server):
server = options.server
else:
server = 'http://%s/' % options.server
ds = dev_server.ImageServer(server)
else:
parser.print_usage()
logging.error('Must specify devserver')
sys.exit(1)
# Parse config file and determine master list of duts and their board type,
# filtering by board type if specified.
duts = {}
if options.config:
with open(options.config, 'r') as f:
config = json.load(f)
boards = (options.boards.split(',')
if options.boards else config.keys())
duts_specified = (set(options.duts.split(','))
if options.duts else None)
for board in boards:
duts.update({dut: board for dut in config[board]['duts']
if duts_specified is None or
dut in duts_specified})
logging.info('Config file %s: %d boards, %d duts',
options.config, len(boards), len(duts))
else:
parser.print_usage()
logging.error('Must specify config file')
sys.exit(1)
if options.ping:
logging.info('Performing ping tests')
duts_alive = {}
for dut, board in duts.items():
if ping_dut(dut):
duts_alive[dut] = board
else:
logging.error('Ignoring DUT %s (%s) for failing initial '
'ping check', dut, board)
duts = duts_alive
logging.info('After ping tests: %d boards, %d duts', len(boards),
len(duts))
# Set up the test runner and stage all the builds.
outputlog = open(options.outputlog, 'a') if options.outputlog else None
runner = Runner(ds, duts, config,
simultaneous=options.simultaneous, total=options.total,
outputlog=outputlog, ping=options.ping,
blacklist_consecutive=options.blacklist_consecutive,
blacklist_success=options.blacklist_success,
blacklist_total=options.blacklist_total,
dryrun=options.dryrun)
if options.stage:
runner.stage_all()
# Run all the provisions.
with locking.FileLock(options.config, blocking=True).lock():
completed = runner.loop()
logging.info('%s in %s', 'Completed' if completed else 'Interrupted',
runner.elapsed())
# Write all entries as JSON.
entries = runner.get_completed_entries()
if options.output:
with open(options.output, 'w') as f:
dump_entries_as_json(entries, f)
else:
dump_entries_as_json(entries, sys.stdout)
logging.info('Summary: %s',
dict(collections.Counter([e['status'] for e in entries
if e['name'] != 'Runner'])))
# List blacklisted DUTs.
if runner.dut_blacklist:
logging.warn('Blacklisted DUTs:')
for host_name in runner.dut_blacklist:
logging.warn(' %s', host_name)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))