#!/usr/bin/env python
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Standalone service to monitor AFE servers and report to ts_mon"""
import sys
import time
import logging
import multiprocessing
import urllib2
import common
from autotest_lib.client.common_lib import global_config
from autotest_lib.frontend.afe.json_rpc import proxy
from autotest_lib.server import frontend
# import needed to setup host_attributes
# pylint: disable=unused-import
from autotest_lib.server import site_host_attributes
from autotest_lib.site_utils import server_manager_utils
from chromite.lib import commandline
from chromite.lib import metrics
from chromite.lib import ts_mon_config
METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
METRIC_TICK = METRIC_ROOT + '/tick'
METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
FAILURE_REASONS = {
proxy.JSONRPCException: 'JSONRPCException',
}
def afe_rpc_call(hostname):
"""Perform one rpc call set on server
@param hostname: server's hostname to poll
"""
afe_monitor = AfeMonitor(hostname)
try:
afe_monitor.run()
except Exception as e:
metrics.Counter(METRIC_MONITOR_ERROR).increment(
fields={'target_hostname': hostname})
logging.exception('Exception when running against host %s', hostname)
def update_shards(shards, shards_lock, period=600, stop_event=None):
"""Updates dict of shards
@param shards: list of shards to be updated
@param shards_lock: shared lock for accessing shards
@param period: time between polls
@param stop_event: Event that can be set to stop polling
"""
while(not stop_event or not stop_event.is_set()):
start_time = time.time()
logging.debug('Updating Shards')
new_shards = set(server_manager_utils.get_shards())
with shards_lock:
current_shards = set(shards)
rm_shards = current_shards - new_shards
add_shards = new_shards - current_shards
if rm_shards:
for s in rm_shards:
shards.remove(s)
if add_shards:
shards.extend(add_shards)
if rm_shards:
logging.info('Servers left production: %s', str(rm_shards))
if add_shards:
logging.info('Servers entered production: %s',
str(add_shards))
wait_time = (start_time + period) - time.time()
if wait_time > 0:
time.sleep(wait_time)
def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
stop_event=None):
"""Blocking function that polls all servers and shards
@param servers: list of servers to poll
@param servers_lock: lock to be used when accessing servers or shards
@param shards: list of shards to poll
@param period: time between polls
@param stop_event: Event that can be set to stop polling
"""
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
while(not stop_event or not stop_event.is_set()):
start_time = time.time()
with servers_lock:
all_servers = set(servers).union(shards)
logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
pool.map(afe_rpc_call, all_servers)
logging.debug('Finished Server Polling')
metrics.Counter(METRIC_TICK).increment()
wait_time = (start_time + period) - time.time()
if wait_time > 0:
time.sleep(wait_time)
class RpcFlightRecorder(object):
"""Monitors a list of AFE"""
def __init__(self, servers, with_shards=True, poll_period=60):
"""
@param servers: list of afe services to monitor
@param with_shards: also record status on shards
@param poll_period: frequency to poll all services, in seconds
"""
self._manager = multiprocessing.Manager()
self._poll_period = poll_period
self._servers = self._manager.list(servers)
self._servers_lock = self._manager.RLock()
self._with_shards = with_shards
self._shards = self._manager.list()
self._update_shards_ps = None
self._poll_rpc_server_ps = None
self._stop_event = multiprocessing.Event()
def start(self):
"""Call to start recorder"""
if(self._with_shards):
shard_args = [self._shards, self._servers_lock]
shard_kwargs = {'stop_event': self._stop_event}
self._update_shards_ps = multiprocessing.Process(
name='update_shards',
target=update_shards,
args=shard_args,
kwargs=shard_kwargs)
self._update_shards_ps.start()
poll_args = [self._servers, self._servers_lock]
poll_kwargs= {'shards':self._shards,
'period':self._poll_period,
'stop_event':self._stop_event}
self._poll_rpc_server_ps = multiprocessing.Process(
name='poll_rpc_servers',
target=poll_rpc_servers,
args=poll_args,
kwargs=poll_kwargs)
self._poll_rpc_server_ps.start()
def close(self):
"""Send close event to all sub processes"""
self._stop_event.set()
def termitate(self):
"""Terminate processes"""
self.close()
if self._poll_rpc_server_ps:
self._poll_rpc_server_ps.terminate()
if self._update_shards_ps:
self._update_shards_ps.terminate()
if self._manager:
self._manager.shutdown()
def join(self, timeout=None):
"""Blocking call until closed and processes complete
@param timeout: passed to each process, so could be >timeout"""
if self._poll_rpc_server_ps:
self._poll_rpc_server_ps.join(timeout)
if self._update_shards_ps:
self._update_shards_ps.join(timeout)
def _failed(fields, msg_str, reason, err=None):
"""Mark current run failed
@param fields, ts_mon fields to mark as failed
@param msg_str, message string to be filled
@param reason: why it failed
@param err: optional error to log more debug info
"""
fields['success'] = False
fields['failure_reason'] = reason
logging.warning("%s failed - %s", msg_str, reason)
if err:
logging.debug("%s fail_err - %s", msg_str, str(err))
class AfeMonitor(object):
"""Object that runs rpc calls against the given afe frontend"""
def __init__(self, hostname):
"""
@param hostname: hostname of server to monitor, string
"""
self._hostname = hostname
self._afe = frontend.AFE(server=self._hostname)
self._metric_fields = {'target_hostname': self._hostname}
def run_cmd(self, cmd, expected=None):
"""Runs rpc command and log metrics
@param cmd: string of rpc command to send
@param expected: expected result of rpc
"""
metric_fields = self._metric_fields.copy()
metric_fields['command'] = cmd
metric_fields['success'] = True
metric_fields['failure_reason'] = ''
with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
fields=dict(metric_fields), scale=0.001) as f:
msg_str = "%s:%s" % (self._hostname, cmd)
try:
result = self._afe.run(cmd)
logging.debug("%s result = %s", msg_str, result)
if expected is not None and expected != result:
_failed(f, msg_str, 'IncorrectResponse')
except urllib2.HTTPError as e:
_failed(f, msg_str, 'HTTPError:%d' % e.code)
except Exception as e:
_failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
err=e)
if type(e) not in FAILURE_REASONS:
raise
if f['success']:
logging.info("%s success", msg_str)
def run(self):
"""Tests server and returns the result"""
self.run_cmd('get_server_time')
self.run_cmd('ping_db', [True])
def get_parser():
"""Returns argparse parser"""
parser = commandline.ArgumentParser(description=__doc__)
parser.add_argument('-a', '--afe', action='append', default=[],
help='Autotest FrontEnd server to monitor')
parser.add_argument('-p', '--poll-period', type=int, default=60,
help='Frequency to poll AFE servers')
parser.add_argument('--no-shards', action='store_false', dest='with_shards',
help='Disable shard updating')
return parser
def main(argv):
"""Main function
@param argv: commandline arguments passed
"""
parser = get_parser()
options = parser.parse_args(argv[1:])
if not options.afe:
options.afe = [global_config.global_config.get_config_value(
'SERVER', 'global_afe_hostname', default='cautotest')]
with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
indirect=True):
flight_recorder = RpcFlightRecorder(options.afe,
with_shards=options.with_shards,
poll_period=options.poll_period)
flight_recorder.start()
flight_recorder.join()
if __name__ == '__main__':
main(sys.argv)