#! /usr/bin/python
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Manage swarming bots.
* Launch bots, e.g. 200 bots:
$ swarming_bots.py launch --working_dir WORKING_DIR --id_range '1-200'
* Kill bot 1-200:
$ swarming_bots.py kill --working_dir WORKING_DIR --id_range '1-200'
* Check bot 1-200, start if not running:
$ swarming_bots.py check --working_dir WORKING_DIR --id_range '1-200'
* The hierachy of working dir is like
WORKING_DIR
|-- bot_0
| |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
| |-- bot_config.log
| |-- swarming_bot.log
| |-- swarming_bot.zip
| |-- swarming_bot.pid
|-- bot_1
|-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
|-- bot_config.log
|-- swarming_bot.log
|-- swarming_bot.zip
|-- pid
Note bot_config.py:get_dimensions() will rely on the the bot number
in the path to generate bot id.
* TODO (fdeng):
** Restart a bot given a bot id.
"""
import argparse
import logging
import logging.handlers
import os
import re
import shutil
import signal
import subprocess
import sys
import threading
import time
import urllib
import common
from autotest_lib.client.common_lib import global_config
LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_FILE_SIZE = 1024 * 5000 # 5000 KB
LOG_FILE_BACKUPCOUNT = 5
DEFAULT_SWARMING_PROXY = global_config.global_config.get_config_value(
'CROS', "swarming_proxy", default=None)
ID_RANGE_FMT = r'(\d+)-(\d+)'
KILL_PROC_TIMEOUT_SECS = 3600 * 3 # 3 hours
MAX_KILL_PROC_SLEEP_SECS = 60
class BotManagementError(Exception):
"""Raised for any bot management related error."""
class PidMisMatchError(BotManagementError):
"""Raised if pid file doesn't match what's found by pgrep."""
def __init__(self, known_pid, new_pid):
"""Initialize.
@param known_pid: pid in the pid file.
@param new_pid: new pid found by pgrep.
"""
self.known_pid = known_pid
self.new_pid = new_pid
msg = 'pid does not match, pid: %s, found %s' % (
self.known_pid, self.new_pid)
super(PidMisMatchError, self).__init__(msg)
class DuplicateBotError(BotManagementError):
"""Raised when multiple processes are detected for the same bot id."""
class SwarmingBot(object):
"""Class represent a swarming bot."""
PID_FILE = 'swarming_bot.pid'
BOT_DIR_FORMAT = 'bot_%s'
BOT_FILENAME = 'swarming_bot.zip'
# Used to search for bot process
# The process may bootstrap itself into swarming_bot.1.zip and swarming_bot.2.zip
BOT_CMD_PATTERN = 'swarming_bot.*zip start_bot'
def __init__(self, bot_id, parent_dir, swarming_proxy):
"""Initialize.
@param bot_id: An integer.
@param bot_dir: The working directory for the bot.
The directory is used to store bot code,
log file, and any file generated by the bot
at run time.
@param swarming_proxy: URL to the swarming instance.
"""
self.bot_id = bot_id
self.swarming_proxy = swarming_proxy
self.parent_dir = os.path.abspath(os.path.expanduser(parent_dir))
self.bot_dir = os.path.join(self.parent_dir,
self.BOT_DIR_FORMAT % self.bot_id)
self.pid_file = os.path.join(self.bot_dir, self.PID_FILE)
self.pid = None
self._refresh_pid()
if self.pid is None:
logging.debug('[Bot %s] Initialize: bot is not running',
self.bot_id)
else:
logging.debug('[Bot %s] Initialize: bot is running '
'as process %s', self.bot_id, self.pid)
def _write_pid(self):
"""Write pid to file"""
with open(self.pid_file, 'w') as f:
f.write(str(self.pid))
def _cleanup_pid(self):
"""Cleanup self.pid and pid file."""
self.pid = None
if os.path.exists(self.pid_file):
os.remove(self.pid_file)
def _is_process_running(self):
"""Check if the process is running."""
pattern = os.path.join(self.bot_dir, self.BOT_CMD_PATTERN)
pattern = '%s %s' % (sys.executable, pattern)
cmd = ['pgrep', '-f', pattern]
logging.debug('[Bot %s] check process running: %s',
self.bot_id, str(cmd))
try:
output = subprocess.check_output(cmd)
pids = output.splitlines()
if len(pids) > 1:
raise DuplicateBotError('Multiple processes (pid: %s) detected for Bot %s'
% (str(pids), self.bot_id))
pid = int(pids[0])
if pid != self.pid:
raise PidMisMatchError(self.pid, pid)
return True
except subprocess.CalledProcessError as e:
if e.returncode == 1:
return False
else:
raise
def _refresh_pid(self):
"""Check process status and update self.pid accordingly."""
# Reload pid from pid file.
if os.path.exists(self.pid_file):
with open(self.pid_file) as f:
try:
pid = f.readline().strip()
self.pid = int(pid)
except ValueError as e:
self.pid = None
try:
if not self._is_process_running():
self._cleanup_pid()
except PidMisMatchError as e:
logging.error('[Bot %s] %s, updating pid file',
self.bot_id, str(e))
self.pid = e.new_pid
self._write_pid()
def is_running(self):
"""Return if the bot is running."""
self._refresh_pid()
return bool(self.pid)
def ensure_running(self):
"""Start a swarming bot."""
if self.is_running():
logging.info(
'[Bot %s] Skip start, bot is already running (pid %s).',
self.bot_id, self.pid)
return
logging.debug('[Bot %s] Bootstrap bot in %s', self.bot_id, self.bot_dir)
if os.path.exists(self.bot_dir):
shutil.rmtree(self.bot_dir)
os.makedirs(self.bot_dir)
dest = os.path.join(self.bot_dir, self.BOT_FILENAME)
logging.debug('[Bot %s] Getting bot code from: %s/bot_code',
self.bot_id, self.swarming_proxy)
urllib.urlretrieve('%s/bot_code' % self.swarming_proxy, dest)
cmd = [sys.executable, self.BOT_FILENAME]
logging.debug('[Bot %s] Calling command: %s', self. bot_id, cmd)
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
cwd=self.bot_dir)
self.pid = process.pid
self._write_pid()
logging.info('[Bot %s] Created bot (pid: %d)', self.bot_id, self.pid)
def kill(self):
"""Kill the bot."""
if not self.is_running():
logging.info('[Bot %s] Skip killing bot, Bot is not running',
self.bot_id)
return
try:
logging.info('[Bot %s] killing bot (pid: %d)',
self.bot_id, self.pid)
os.kill(self.pid, signal.SIGTERM)
start = time.time()
sleep = 1
while(time.time() - start < KILL_PROC_TIMEOUT_SECS):
if not self.is_running():
return
sleep = min(sleep * 2, MAX_KILL_PROC_SLEEP_SECS)
logging.debug('[Bot %s] Waiting %d secs for bot to finish'
' any running task and exist.',
self.bot_id, sleep)
time.sleep(sleep)
else:
logging.error(
'[Bot %s] Failed to kill pid %s within %d secs, '
'the bot may be running a long running task, you '
'can retry the script. SIGKILL the process is not '
'recommended, it might lead to unexpected error.',
self.bot_id, self.pid, KILL_PROC_TIMEOUT_SECS)
except Exception as e:
raise BotManagementError('[Bot %s] %s' % (self.bot_id, str(e)))
class BotManager(object):
"""Class that manages swarming bots."""
CHECK_BOTS_PATTERN = '{executable} {working_dir}.*{bot_cmd_pattern}'
def __init__(self, bot_ids, working_dir, swarming_proxy):
"""Initialize.
@param bot_ids: a set of integers.
@param working_dir: Working directory of the bots.
Store temporary files.
@param swarming_proxy: The swarming instance to talk to.
"""
self.bot_ids = bot_ids
self.working_dir = os.path.abspath(os.path.expanduser(working_dir))
self.bots = [SwarmingBot(bid, self.working_dir, swarming_proxy)
for bid in bot_ids]
def launch(self):
"""Launch bots."""
for bot in self.bots:
try:
bot.ensure_running()
except BotManagementError as e:
logging.error('[BotManager] Failed to start Bot %s: %s',
bot.bot_id, str(e))
# If we let the process exit immediately, the last process won't
# be launched sometimes. So sleep for 3 secs.
# The right way is to query the server until all bots are seen
# by the server by visiting
# https://SWARMING_PROXY/swarming/api/v1/client/bots
# However, this would require oauth authentication (install
# oauth library and install credentials).
logging.info('Wait 3 seconds for process creation to complete.')
time.sleep(3)
def kill(self):
"""Kill running bots."""
# Start threads to kill bots.
threads = []
for bot in self.bots:
t = threading.Thread(target=bot.kill)
threads.append(t)
t.setDaemon(True)
t.start()
# Wait on threads.
try:
while threading.active_count() > 1:
time.sleep(0.1)
except KeyboardInterrupt:
msg = 'Ctrl-c recieved! Bots status not confirmed. Exit.'
logging.error(msg)
print msg
def check(self):
"""Check running bots, start it if not running."""
pattern = self.CHECK_BOTS_PATTERN.format(
executable=sys.executable, working_dir=self.working_dir,
bot_cmd_pattern=SwarmingBot.BOT_CMD_PATTERN)
cmd = ['pgrep', '-f', pattern]
logging.debug('[BotManager] Check bot counts: %s', str(cmd))
try:
output = subprocess.check_output(cmd)
bot_count = len(output.splitlines())
except subprocess.CalledProcessError as e:
if e.returncode == 1:
bot_count = 0
else:
raise
missing_count = len(self.bot_ids) - bot_count
logging.info(
'[BotManager] Check bot counts: %d bots running, missing: %d',
bot_count, missing_count)
if missing_count > 0:
logging.info('[BotManager] Checking all bots')
self.launch()
def _parse_range(id_range):
"""Convert an id range to a set of bot ids.
@param id_range: A range of integer, e.g "1-200".
@returns a set of bot ids set([1,2,...200])
"""
m = re.match(ID_RANGE_FMT, id_range)
if not m:
raise ValueError('Could not parse %s' % id_range)
min, max = int(m.group(1)), int(m.group(2))
return set(bid for bid in range(min, max+1))
def _parse_args(args):
"""Parse args.
@param args: Argument list passed from main.
@return: A tuple with the parsed args, as returned by parser.parse_args.
"""
parser = argparse.ArgumentParser(
description='Launch swarming bots on a autotest server')
action_help = ('launch: launch bots. '
'kill: kill bots. '
'check: check if bots are running, if not, starting bots.')
parser.add_argument(
'action', choices=('launch', 'kill', 'check'), help=action_help)
parser.add_argument(
'-r', '--id_range', type=str, dest='id_range', required=True,
help='A range of integer, each bot created will be labeled '
'with an id from this range. E.g. "1-200"')
parser.add_argument(
'-d', '--working_dir', type=str, dest='working_dir', required=True,
help='A working directory where bots will store files '
'generated at runtime')
parser.add_argument(
'-p', '--swarming_proxy', type=str, dest='swarming_proxy',
default=DEFAULT_SWARMING_PROXY,
help='The URL of the swarming instance to talk to, '
'Default to the one specified in global config')
parser.add_argument(
'-f', '--log_file', dest='log_file', required=False,
help='Path to the log file.')
parser.add_argument(
'-v', '--verbose', dest='verbose', action='store_true',
help='Verbose mode')
return parser.parse_args(args)
def _setup_logging(verbose, log_file):
"""Setup logging.
@param verbose: bool, if True, log at DEBUG level, otherwise INFO level.
@param log_file; path to log file.
"""
log_formatter = logging.Formatter(LOGGING_FORMAT)
if not log_file:
handler = logging.StreamHandler()
else:
handler = logging.handlers.RotatingFileHandler(
filename=log_file, maxBytes=LOG_FILE_SIZE,
backupCount=LOG_FILE_BACKUPCOUNT)
handler.setFormatter(log_formatter)
logger = logging.getLogger()
log_level = logging.DEBUG if verbose else logging.INFO
logger.setLevel(log_level)
logger.addHandler(handler)
def main(args):
"""Main.
@args: A list of system arguments.
"""
args = _parse_args(args)
_setup_logging(args.verbose, args.log_file)
if not args.swarming_proxy:
logging.error(
'No swarming proxy instance specified. '
'Specify swarming_proxy in [CROS] in shadow_config, '
'or use --swarming_proxy')
return 1
if not args.swarming_proxy.startswith('https://'):
swarming_proxy = 'https://' + args.swarming_proxy
else:
swarming_proxy = args.swarming_proxy
logging.info('Connecting to %s', swarming_proxy)
m = BotManager(_parse_range(args.id_range),
args.working_dir, args.swarming_proxy)
if args.action == 'launch':
m.launch()
elif args.action == 'kill':
m.kill()
elif args.action == 'check':
m.check()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))