# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. #pylint: disable-msg=C0111 import os import logging import time from autotest_lib.client.common_lib import global_config from autotest_lib.client.common_lib.cros.graphite import autotest_stats from autotest_lib.frontend.afe import models from autotest_lib.scheduler import email_manager from autotest_lib.scheduler import scheduler_config, scheduler_models # Override default parser with our site parser. def parser_path(install_dir): """Return site implementation of parser. @param install_dir: installation directory. """ return os.path.join(install_dir, 'tko', 'site_parse') class SiteAgentTask(object): """ SiteAgentTask subclasses BaseAgentTask in monitor_db. """ def _archive_results(self, queue_entries): """ Set the status of queue_entries to ARCHIVING. This method sets the status of the queue_entries to ARCHIVING if the enable_archiving flag is true in global_config.ini. Otherwise, it bypasses the archiving step and sets the queue entries to the final status of current step. """ enable_archiving = global_config.global_config.get_config_value( scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool) # Set the status of the queue entries to archiving or self final status if enable_archiving: status = models.HostQueueEntry.Status.ARCHIVING else: status = self._final_status() for queue_entry in self.queue_entries: queue_entry.set_status(status) def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, allowed_host_statuses=None): """ Forked from monitor_db.py """ class_name = self.__class__.__name__ for entry in queue_entries: if entry.status not in allowed_hqe_statuses: # In the orignal code, here we raise an exception. In an # effort to prevent downtime we will instead abort the job and # send out an email notifying us this has occured. error_message = ('%s attempting to start entry with invalid ' 'status %s: %s. Aborting Job: %s.' % (class_name, entry.status, entry, entry.job)) logging.error(error_message) email_manager.manager.enqueue_notify_email( 'Job Aborted - Invalid Host Queue Entry Status', error_message) entry.job.request_abort() invalid_host_status = ( allowed_host_statuses is not None and entry.host.status not in allowed_host_statuses) if invalid_host_status: # In the orignal code, here we raise an exception. In an # effort to prevent downtime we will instead abort the job and # send out an email notifying us this has occured. error_message = ('%s attempting to start on queue entry with ' 'invalid host status %s: %s. Aborting Job: %s' % (class_name, entry.host.status, entry, entry.job)) logging.error(error_message) email_manager.manager.enqueue_notify_email( 'Job Aborted - Invalid Host Status', error_message) entry.job.request_abort() class SiteDispatcher(object): """ SiteDispatcher subclasses BaseDispatcher in monitor_db. """ DEFAULT_REQUESTED_BY_USER_ID = 1 _timer = autotest_stats.Timer('scheduler') _gauge = autotest_stats.Gauge('scheduler_rel') _tick_start = None @_timer.decorate def tick(self): self._tick_start = time.time() super(SiteDispatcher, self).tick() self._gauge.send('tick', time.time() - self._tick_start) @_timer.decorate def _garbage_collection(self): super(SiteDispatcher, self)._garbage_collection() if self._tick_start: self._gauge.send('_garbage_collection', time.time() - self._tick_start) @_timer.decorate def _run_cleanup(self): super(SiteDispatcher, self)._run_cleanup() if self._tick_start: self._gauge.send('_run_cleanup', time.time() - self._tick_start) @_timer.decorate def _find_aborting(self): super(SiteDispatcher, self)._find_aborting() if self._tick_start: self._gauge.send('_find_aborting', time.time() - self._tick_start) @_timer.decorate def _process_recurring_runs(self): super(SiteDispatcher, self)._process_recurring_runs() if self._tick_start: self._gauge.send('_process_recurring_runs', time.time() - self._tick_start) @_timer.decorate def _schedule_delay_tasks(self): super(SiteDispatcher, self)._schedule_delay_tasks() if self._tick_start: self._gauge.send('_schedule_delay_tasks', time.time() - self._tick_start) @_timer.decorate def _schedule_running_host_queue_entries(self): super(SiteDispatcher, self)._schedule_running_host_queue_entries() if self._tick_start: self._gauge.send('_schedule_running_host_queue_entries', time.time() - self._tick_start) @_timer.decorate def _schedule_special_tasks(self): super(SiteDispatcher, self)._schedule_special_tasks() if self._tick_start: self._gauge.send('_schedule_special_tasks', time.time() - self._tick_start) @_timer.decorate def _schedule_new_jobs(self): super(SiteDispatcher, self)._schedule_new_jobs() if self._tick_start: self._gauge.send('_schedule_new_jobs', time.time() - self._tick_start) @_timer.decorate def _handle_agents(self): super(SiteDispatcher, self)._handle_agents() if self._tick_start: self._gauge.send('_handle_agents', time.time() - self._tick_start) def _reverify_hosts_where(self, where, print_message='Reverifying host %s'): """ This is an altered version of _reverify_hosts_where the class to models.SpecialTask.objects.create passes in an argument for requested_by, in order to allow the Reset task to be created properly. """ full_where='locked = 0 AND invalid = 0 AND ' + where for host in scheduler_models.Host.fetch(where=full_where): if self.host_has_agent(host): # host has already been recovered in some way continue if self._host_has_scheduled_special_task(host): # host will have a special task scheduled on the next cycle continue if print_message: logging.error(print_message, host.hostname) try: user = models.User.objects.get(login='autotest_system') except models.User.DoesNotExist: user = models.User.objects.get( id=SiteDispatcher.DEFAULT_REQUESTED_BY_USER_ID) models.SpecialTask.objects.create( task=models.SpecialTask.Task.RESET, host=models.Host.objects.get(id=host.id), requested_by=user) def _check_for_unrecovered_verifying_entries(self): # Verify is replaced by Reset. queue_entries = scheduler_models.HostQueueEntry.fetch( where='status = "%s"' % models.HostQueueEntry.Status.RESETTING) for queue_entry in queue_entries: special_tasks = models.SpecialTask.objects.filter( task__in=(models.SpecialTask.Task.CLEANUP, models.SpecialTask.Task.VERIFY, models.SpecialTask.Task.RESET), queue_entry__id=queue_entry.id, is_complete=False) if special_tasks.count() == 0: logging.error('Unrecovered Resetting host queue entry: %s. ' 'Setting status to Queued.', str(queue_entry)) # Essentially this host queue entry was set to be Verifying # however no special task exists for entry. This occurs if the # scheduler dies between changing the status and creating the # special task. By setting it to queued, the job can restart # from the beginning and proceed correctly. This is much more # preferable than having monitor_db not launching. queue_entry.set_status('Queued')