# Copyright Martin J. Bligh, Google Inc 2008 # Released under the GPL v2 """ This class allows you to communicate with the frontend to submit jobs etc It is designed for writing more sophisiticated server-side control files that can recursively add and manage other jobs. We turn the JSON dictionaries into real objects that are more idiomatic For docs, see: http://www.chromium.org/chromium-os/testing/afe-rpc-infrastructure http://docs.djangoproject.com/en/dev/ref/models/querysets/#queryset-api """ #pylint: disable=missing-docstring import getpass import os import re import common from autotest_lib.frontend.afe import rpc_client_lib from autotest_lib.client.common_lib import control_data from autotest_lib.client.common_lib import global_config from autotest_lib.client.common_lib import host_states from autotest_lib.client.common_lib import priorities from autotest_lib.client.common_lib import utils from autotest_lib.tko import db try: from chromite.lib import metrics except ImportError: metrics = utils.metrics_mock try: from autotest_lib.server.site_common import site_utils as server_utils except: from autotest_lib.server import utils as server_utils form_ntuples_from_machines = server_utils.form_ntuples_from_machines GLOBAL_CONFIG = global_config.global_config DEFAULT_SERVER = 'autotest' def dump_object(header, obj): """ Standard way to print out the frontend objects (eg job, host, acl, label) in a human-readable fashion for debugging """ result = header + '\n' for key in obj.hash: if key == 'afe' or key == 'hash': continue result += '%20s: %s\n' % (key, obj.hash[key]) return result class RpcClient(object): """ Abstract RPC class for communicating with the autotest frontend Inherited for both TKO and AFE uses. All the constructors go in the afe / tko class. Manipulating methods go in the object classes themselves """ def __init__(self, path, user, server, print_log, debug, reply_debug): """ Create a cached instance of a connection to the frontend user: username to connect as server: frontend server to connect to print_log: pring a logging message to stdout on every operation debug: print out all RPC traffic """ if not user and utils.is_in_container(): user = GLOBAL_CONFIG.get_config_value('SSP', 'user', default=None) if not user: user = getpass.getuser() if not server: if 'AUTOTEST_WEB' in os.environ: server = os.environ['AUTOTEST_WEB'] else: server = GLOBAL_CONFIG.get_config_value('SERVER', 'hostname', default=DEFAULT_SERVER) self.server = server self.user = user self.print_log = print_log self.debug = debug self.reply_debug = reply_debug headers = {'AUTHORIZATION': self.user} rpc_server = rpc_client_lib.add_protocol(server) + path if debug: print 'SERVER: %s' % rpc_server print 'HEADERS: %s' % headers self.proxy = rpc_client_lib.get_proxy(rpc_server, headers=headers) def run(self, call, **dargs): """ Make a RPC call to the AFE server """ rpc_call = getattr(self.proxy, call) if self.debug: print 'DEBUG: %s %s' % (call, dargs) try: result = utils.strip_unicode(rpc_call(**dargs)) if self.reply_debug: print result return result except Exception: raise def log(self, message): if self.print_log: print message class TKO(RpcClient): def __init__(self, user=None, server=None, print_log=True, debug=False, reply_debug=False): super(TKO, self).__init__(path='/new_tko/server/noauth/rpc/', user=user, server=server, print_log=print_log, debug=debug, reply_debug=reply_debug) self._db = None @metrics.SecondsTimerDecorator( 'chromeos/autotest/tko/get_job_status_duration') def get_job_test_statuses_from_db(self, job_id): """Get job test statuses from the database. Retrieve a set of fields from a job that reflect the status of each test run within a job. fields retrieved: status, test_name, reason, test_started_time, test_finished_time, afe_job_id, job_owner, hostname. @param job_id: The afe job id to look up. @returns a TestStatus object of the resulting information. """ if self._db is None: self._db = db.db() fields = ['status', 'test_name', 'subdir', 'reason', 'test_started_time', 'test_finished_time', 'afe_job_id', 'job_owner', 'hostname', 'job_tag'] table = 'tko_test_view_2' where = 'job_tag like "%s-%%"' % job_id test_status = [] # Run commit before we query to ensure that we are pulling the latest # results. self._db.commit() for entry in self._db.select(','.join(fields), table, (where, None)): status_dict = {} for key,value in zip(fields, entry): # All callers expect values to be a str object. status_dict[key] = str(value) # id is used by TestStatus to uniquely identify each Test Status # obj. status_dict['id'] = [status_dict['reason'], status_dict['hostname'], status_dict['test_name']] test_status.append(status_dict) return [TestStatus(self, e) for e in test_status] def get_status_counts(self, job, **data): entries = self.run('get_status_counts', group_by=['hostname', 'test_name', 'reason'], job_tag__startswith='%s-' % job, **data) return [TestStatus(self, e) for e in entries['groups']] class _StableVersionMap(object): """ A mapping from board names to strings naming software versions. The mapping is meant to allow finding a nominally "stable" version of software associated with a given board. The mapping identifies specific versions of software that should be installed during operations such as repair. Conceptually, there are multiple version maps, each handling different types of image. For instance, a single board may have both a stable OS image (e.g. for CrOS), and a separate stable firmware image. Each different type of image requires a certain amount of special handling, implemented by a subclass of `StableVersionMap`. The subclasses take care of pre-processing of arguments, delegating actual RPC calls to this superclass. @property _afe AFE object through which to make the actual RPC calls. @property _android Value of the `android` parameter to be passed when calling the `get_stable_version` RPC. """ def __init__(self, afe): self._afe = afe def get_all_versions(self): """ Get all mappings in the stable versions table. Extracts the full content of the `stable_version` table in the AFE database, and returns it as a dictionary mapping board names to version strings. @return A dictionary mapping board names to version strings. """ return self._afe.run('get_all_stable_versions') def get_version(self, board): """ Get the mapping of one board in the stable versions table. Look up and return the version mapped to the given board in the `stable_versions` table in the AFE database. @param board The board to be looked up. @return The version mapped for the given board. """ return self._afe.run('get_stable_version', board=board) def set_version(self, board, version): """ Change the mapping of one board in the stable versions table. Set the mapping in the `stable_versions` table in the AFE database for the given board to the given version. @param board The board to be updated. @param version The new version to be assigned to the board. """ self._afe.run('set_stable_version', version=version, board=board) def delete_version(self, board): """ Remove the mapping of one board in the stable versions table. Remove the mapping in the `stable_versions` table in the AFE database for the given board. @param board The board to be updated. """ self._afe.run('delete_stable_version', board=board) class _OSVersionMap(_StableVersionMap): """ Abstract stable version mapping for full OS images of various types. """ def _version_is_valid(self, version): return True def get_all_versions(self): versions = super(_OSVersionMap, self).get_all_versions() for board in versions.keys(): if ('/' in board or not self._version_is_valid(versions[board])): del versions[board] return versions def get_version(self, board): version = super(_OSVersionMap, self).get_version(board) return version if self._version_is_valid(version) else None def format_cros_image_name(board, version): """ Return an image name for a given `board` and `version`. This formats `board` and `version` into a string identifying an image file. The string represents part of a URL for access to the image. The returned image name is typically of a form like "falco-release/R55-8872.44.0". """ build_pattern = GLOBAL_CONFIG.get_config_value( 'CROS', 'stable_build_pattern') return build_pattern % (board, version) class _CrosVersionMap(_OSVersionMap): """ Stable version mapping for Chrome OS release images. This class manages a mapping of Chrome OS board names to known-good release (or canary) images. The images selected can be installed on DUTs during repair tasks, as a way of getting a DUT into a known working state. """ def _version_is_valid(self, version): return version is not None and '/' not in version def get_image_name(self, board): """ Return the full image name of the stable version for `board`. This finds the stable version for `board`, and returns a string identifying the associated image as for `format_image_name()`, above. @return A string identifying the image file for the stable image for `board`. """ return format_cros_image_name(board, self.get_version(board)) class _SuffixHackVersionMap(_StableVersionMap): """ Abstract super class for mappings using a pseudo-board name. For non-OS image type mappings, we look them up in the `stable_versions` table by constructing a "pseudo-board" from the real board name plus a suffix string that identifies the image type. So, for instance the name "lulu/firmware" is used to look up the FAFT firmware version for lulu boards. """ # _SUFFIX - The suffix used in constructing the "pseudo-board" # lookup key. Each subclass must define this value for itself. # _SUFFIX = None def get_all_versions(self): # Get all the mappings from the AFE, extract just the mappings # with our suffix, and replace the pseudo-board name keys with # the real board names. # all_versions = super( _SuffixHackVersionMap, self).get_all_versions() return { board[0 : -len(self._SUFFIX)]: all_versions[board] for board in all_versions.keys() if board.endswith(self._SUFFIX) } def get_version(self, board): board += self._SUFFIX return super(_SuffixHackVersionMap, self).get_version(board) def set_version(self, board, version): board += self._SUFFIX super(_SuffixHackVersionMap, self).set_version(board, version) def delete_version(self, board): board += self._SUFFIX super(_SuffixHackVersionMap, self).delete_version(board) class _FAFTVersionMap(_SuffixHackVersionMap): """ Stable version mapping for firmware versions used in FAFT repair. When DUTs used for FAFT fail repair, stable firmware may need to be flashed directly from original tarballs. The FAFT firmware version mapping finds the appropriate tarball for a given board. """ _SUFFIX = '/firmware' def get_version(self, board): # If there's no mapping for `board`, the lookup will return the # default CrOS version mapping. To eliminate that case, we # require a '/' character in the version, since CrOS versions # won't match that. # # TODO(jrbarnette): This is, of course, a hack. Ultimately, # the right fix is to move handling to the RPC server side. # version = super(_FAFTVersionMap, self).get_version(board) return version if '/' in version else None class _FirmwareVersionMap(_SuffixHackVersionMap): """ Stable version mapping for firmware supplied in Chrome OS images. A Chrome OS image bundles a version of the firmware that the device should update to when the OS version is installed during AU. Test images suppress the firmware update during AU. Instead, during repair and verify we check installed firmware on a DUT, compare it against the stable version mapping for the board, and update when the DUT is out-of-date. """ _SUFFIX = '/rwfw' def get_version(self, board): # If there's no mapping for `board`, the lookup will return the # default CrOS version mapping. To eliminate that case, we # require the version start with "Google_", since CrOS versions # won't match that. # # TODO(jrbarnette): This is, of course, a hack. Ultimately, # the right fix is to move handling to the RPC server side. # version = super(_FirmwareVersionMap, self).get_version(board) return version if version.startswith('Google_') else None class AFE(RpcClient): # Known image types for stable version mapping objects. # CROS_IMAGE_TYPE - Mappings for Chrome OS images. # FAFT_IMAGE_TYPE - Mappings for Firmware images for FAFT repair. # FIRMWARE_IMAGE_TYPE - Mappings for released RW Firmware images. # CROS_IMAGE_TYPE = 'cros' FAFT_IMAGE_TYPE = 'faft' FIRMWARE_IMAGE_TYPE = 'firmware' _IMAGE_MAPPING_CLASSES = { CROS_IMAGE_TYPE: _CrosVersionMap, FAFT_IMAGE_TYPE: _FAFTVersionMap, FIRMWARE_IMAGE_TYPE: _FirmwareVersionMap, } def __init__(self, user=None, server=None, print_log=True, debug=False, reply_debug=False, job=None): self.job = job super(AFE, self).__init__(path='/afe/server/noauth/rpc/', user=user, server=server, print_log=print_log, debug=debug, reply_debug=reply_debug) def get_stable_version_map(self, image_type): """ Return a stable version mapping for the given image type. @return An object mapping board names to version strings for software of the given image type. """ return self._IMAGE_MAPPING_CLASSES[image_type](self) def host_statuses(self, live=None): dead_statuses = ['Repair Failed', 'Repairing'] statuses = self.run('get_static_data')['host_statuses'] if live == True: return list(set(statuses) - set(dead_statuses)) if live == False: return dead_statuses else: return statuses @staticmethod def _dict_for_host_query(hostnames=(), status=None, label=None): query_args = {} if hostnames: query_args['hostname__in'] = hostnames if status: query_args['status'] = status if label: query_args['labels__name'] = label return query_args def get_hosts(self, hostnames=(), status=None, label=None, **dargs): query_args = dict(dargs) query_args.update(self._dict_for_host_query(hostnames=hostnames, status=status, label=label)) hosts = self.run('get_hosts', **query_args) return [Host(self, h) for h in hosts] def get_hostnames(self, status=None, label=None, **dargs): """Like get_hosts() but returns hostnames instead of Host objects.""" # This implementation can be replaced with a more efficient one # that does not query for entire host objects in the future. return [host_obj.hostname for host_obj in self.get_hosts(status=status, label=label, **dargs)] def reverify_hosts(self, hostnames=(), status=None, label=None): query_args = dict(locked=False, aclgroup__users__login=self.user) query_args.update(self._dict_for_host_query(hostnames=hostnames, status=status, label=label)) return self.run('reverify_hosts', **query_args) def repair_hosts(self, hostnames=(), status=None, label=None): query_args = dict(locked=False, aclgroup__users__login=self.user) query_args.update(self._dict_for_host_query(hostnames=hostnames, status=status, label=label)) return self.run('repair_hosts', **query_args) def create_host(self, hostname, **dargs): id = self.run('add_host', hostname=hostname, **dargs) return self.get_hosts(id=id)[0] def get_host_attribute(self, attr, **dargs): host_attrs = self.run('get_host_attribute', attribute=attr, **dargs) return [HostAttribute(self, a) for a in host_attrs] def set_host_attribute(self, attr, val, **dargs): self.run('set_host_attribute', attribute=attr, value=val, **dargs) def get_labels(self, **dargs): labels = self.run('get_labels', **dargs) return [Label(self, l) for l in labels] def create_label(self, name, **dargs): id = self.run('add_label', name=name, **dargs) return self.get_labels(id=id)[0] def get_acls(self, **dargs): acls = self.run('get_acl_groups', **dargs) return [Acl(self, a) for a in acls] def create_acl(self, name, **dargs): id = self.run('add_acl_group', name=name, **dargs) return self.get_acls(id=id)[0] def get_users(self, **dargs): users = self.run('get_users', **dargs) return [User(self, u) for u in users] def generate_control_file(self, tests, **dargs): ret = self.run('generate_control_file', tests=tests, **dargs) return ControlFile(self, ret) def get_jobs(self, summary=False, **dargs): if summary: jobs_data = self.run('get_jobs_summary', **dargs) else: jobs_data = self.run('get_jobs', **dargs) jobs = [] for j in jobs_data: job = Job(self, j) # Set up some extra information defaults job.testname = re.sub('\s.*', '', job.name) # arbitrary default job.platform_results = {} job.platform_reasons = {} jobs.append(job) return jobs def get_host_queue_entries(self, **kwargs): """Find JobStatus objects matching some constraints. @param **kwargs: Arguments to pass to the RPC """ entries = self.run('get_host_queue_entries', **kwargs) return self._entries_to_statuses(entries) def get_host_queue_entries_by_insert_time(self, **kwargs): """Like get_host_queue_entries, but using the insert index table. @param **kwargs: Arguments to pass to the RPC """ entries = self.run('get_host_queue_entries_by_insert_time', **kwargs) return self._entries_to_statuses(entries) def _entries_to_statuses(self, entries): """Converts HQEs to JobStatuses Sadly, get_host_queue_entries doesn't return platforms, we have to get those back from an explicit get_hosts queury, then patch the new host objects back into the host list. :param entries: A list of HQEs from get_host_queue_entries or get_host_queue_entries_by_insert_time. """ job_statuses = [JobStatus(self, e) for e in entries] hostnames = [s.host.hostname for s in job_statuses if s.host] hosts = {} for host in self.get_hosts(hostname__in=hostnames): hosts[host.hostname] = host for status in job_statuses: if status.host: status.host = hosts.get(status.host.hostname) # filter job statuses that have either host or meta_host return [status for status in job_statuses if (status.host or status.meta_host)] def get_special_tasks(self, **data): tasks = self.run('get_special_tasks', **data) return [SpecialTask(self, t) for t in tasks] def get_host_special_tasks(self, host_id, **data): tasks = self.run('get_host_special_tasks', host_id=host_id, **data) return [SpecialTask(self, t) for t in tasks] def get_host_status_task(self, host_id, end_time): task = self.run('get_host_status_task', host_id=host_id, end_time=end_time) return SpecialTask(self, task) if task else None def get_host_diagnosis_interval(self, host_id, end_time, success): return self.run('get_host_diagnosis_interval', host_id=host_id, end_time=end_time, success=success) def create_job(self, control_file, name=' ', priority=priorities.Priority.DEFAULT, control_type=control_data.CONTROL_TYPE_NAMES.CLIENT, **dargs): id = self.run('create_job', name=name, priority=priority, control_file=control_file, control_type=control_type, **dargs) return self.get_jobs(id=id)[0] def abort_jobs(self, jobs): """Abort a list of jobs. Already completed jobs will not be affected. @param jobs: List of job ids to abort. """ for job in jobs: self.run('abort_host_queue_entries', job_id=job) def get_hosts_by_attribute(self, attribute, value): """ Get the list of hosts that share the same host attribute value. @param attribute: String of the host attribute to check. @param value: String of the value that is shared between hosts. @returns List of hostnames that all have the same host attribute and value. """ return self.run('get_hosts_by_attribute', attribute=attribute, value=value) def lock_host(self, host, lock_reason, fail_if_locked=False): """ Lock the given host with the given lock reason. Locking a host that's already locked using the 'modify_hosts' rpc will raise an exception. That's why fail_if_locked exists so the caller can determine if the lock succeeded or failed. This will save every caller from wrapping lock_host in a try-except. @param host: hostname of host to lock. @param lock_reason: Reason for locking host. @param fail_if_locked: Return False if host is already locked. @returns Boolean, True if lock was successful, False otherwise. """ try: self.run('modify_hosts', host_filter_data={'hostname': host}, update_data={'locked': True, 'lock_reason': lock_reason}) except Exception: return not fail_if_locked return True def unlock_hosts(self, locked_hosts): """ Unlock the hosts. Unlocking a host that's already unlocked will do nothing so we don't need any special try-except clause here. @param locked_hosts: List of hostnames of hosts to unlock. """ self.run('modify_hosts', host_filter_data={'hostname__in': locked_hosts}, update_data={'locked': False, 'lock_reason': ''}) class TestResults(object): """ Container class used to hold the results of the tests for a job """ def __init__(self): self.good = [] self.fail = [] self.pending = [] def add(self, result): if result.complete_count > result.pass_count: self.fail.append(result) elif result.incomplete_count > 0: self.pending.append(result) else: self.good.append(result) class RpcObject(object): """ Generic object used to construct python objects from rpc calls """ def __init__(self, afe, hash): self.afe = afe self.hash = hash self.__dict__.update(hash) def __str__(self): return dump_object(self.__repr__(), self) class ControlFile(RpcObject): """ AFE control file object Fields: synch_count, dependencies, control_file, is_server """ def __repr__(self): return 'CONTROL FILE: %s' % self.control_file class Label(RpcObject): """ AFE label object Fields: name, invalid, platform, kernel_config, id, only_if_needed """ def __repr__(self): return 'LABEL: %s' % self.name def add_hosts(self, hosts): # We must use the label's name instead of the id because label ids are # not consistent across master-shard. return self.afe.run('label_add_hosts', id=self.name, hosts=hosts) def remove_hosts(self, hosts): # We must use the label's name instead of the id because label ids are # not consistent across master-shard. return self.afe.run('label_remove_hosts', id=self.name, hosts=hosts) class Acl(RpcObject): """ AFE acl object Fields: users, hosts, description, name, id """ def __repr__(self): return 'ACL: %s' % self.name def add_hosts(self, hosts): self.afe.log('Adding hosts %s to ACL %s' % (hosts, self.name)) return self.afe.run('acl_group_add_hosts', self.id, hosts) def remove_hosts(self, hosts): self.afe.log('Removing hosts %s from ACL %s' % (hosts, self.name)) return self.afe.run('acl_group_remove_hosts', self.id, hosts) def add_users(self, users): self.afe.log('Adding users %s to ACL %s' % (users, self.name)) return self.afe.run('acl_group_add_users', id=self.name, users=users) class Job(RpcObject): """ AFE job object Fields: name, control_file, control_type, synch_count, reboot_before, run_verify, priority, email_list, created_on, dependencies, timeout, owner, reboot_after, id """ def __repr__(self): return 'JOB: %s' % self.id class JobStatus(RpcObject): """ AFE job_status object Fields: status, complete, deleted, meta_host, host, active, execution_subdir, id """ def __init__(self, afe, hash): super(JobStatus, self).__init__(afe, hash) self.job = Job(afe, self.job) if getattr(self, 'host'): self.host = Host(afe, self.host) def __repr__(self): if self.host and self.host.hostname: hostname = self.host.hostname else: hostname = 'None' return 'JOB STATUS: %s-%s' % (self.job.id, hostname) class SpecialTask(RpcObject): """ AFE special task object """ def __init__(self, afe, hash): super(SpecialTask, self).__init__(afe, hash) self.host = Host(afe, self.host) def __repr__(self): return 'SPECIAL TASK: %s' % self.id class Host(RpcObject): """ AFE host object Fields: status, lock_time, locked_by, locked, hostname, invalid, labels, platform, protection, dirty, id """ def __repr__(self): return 'HOST OBJECT: %s' % self.hostname def show(self): labels = list(set(self.labels) - set([self.platform])) print '%-6s %-7s %-7s %-16s %s' % (self.hostname, self.status, self.locked, self.platform, ', '.join(labels)) def delete(self): return self.afe.run('delete_host', id=self.id) def modify(self, **dargs): return self.afe.run('modify_host', id=self.id, **dargs) def get_acls(self): return self.afe.get_acls(hosts__hostname=self.hostname) def add_acl(self, acl_name): self.afe.log('Adding ACL %s to host %s' % (acl_name, self.hostname)) return self.afe.run('acl_group_add_hosts', id=acl_name, hosts=[self.hostname]) def remove_acl(self, acl_name): self.afe.log('Removing ACL %s from host %s' % (acl_name, self.hostname)) return self.afe.run('acl_group_remove_hosts', id=acl_name, hosts=[self.hostname]) def get_labels(self): return self.afe.get_labels(host__hostname__in=[self.hostname]) def add_labels(self, labels): self.afe.log('Adding labels %s to host %s' % (labels, self.hostname)) return self.afe.run('host_add_labels', id=self.id, labels=labels) def remove_labels(self, labels): self.afe.log('Removing labels %s from host %s' % (labels,self.hostname)) return self.afe.run('host_remove_labels', id=self.id, labels=labels) def is_available(self): """Check whether DUT host is available. @return: bool """ return not (self.locked or self.status in host_states.UNAVAILABLE_STATES) class User(RpcObject): def __repr__(self): return 'USER: %s' % self.login class TestStatus(RpcObject): """ TKO test status object Fields: test_idx, hostname, testname, id complete_count, incomplete_count, group_count, pass_count """ def __repr__(self): return 'TEST STATUS: %s' % self.id class HostAttribute(RpcObject): """ AFE host attribute object Fields: id, host, attribute, value """ def __repr__(self): return 'HOST ATTRIBUTE %d' % self.id