普通文本  |  221行  |  8.87 KB

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

#pylint: disable-msg=C0111

import os
import logging
import time

from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib.cros.graphite import autotest_stats
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import email_manager
from autotest_lib.scheduler import scheduler_config, scheduler_models


# Override default parser with our site parser.
def parser_path(install_dir):
    """Return site implementation of parser.

    @param install_dir: installation directory.
    """
    return os.path.join(install_dir, 'tko', 'site_parse')


class SiteAgentTask(object):
    """
    SiteAgentTask subclasses BaseAgentTask in monitor_db.
    """


    def _archive_results(self, queue_entries):
        """
        Set the status of queue_entries to ARCHIVING.

        This method sets the status of the queue_entries to ARCHIVING
        if the enable_archiving flag is true in global_config.ini.
        Otherwise, it bypasses the archiving step and sets the queue entries
        to the final status of current step.
        """
        enable_archiving = global_config.global_config.get_config_value(
            scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)
        # Set the status of the queue entries to archiving or self final status
        if enable_archiving:
            status = models.HostQueueEntry.Status.ARCHIVING
        else:
            status = self._final_status()

        for queue_entry in self.queue_entries:
            queue_entry.set_status(status)


    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
                                    allowed_host_statuses=None):
        """
        Forked from monitor_db.py
        """
        class_name = self.__class__.__name__
        for entry in queue_entries:
            if entry.status not in allowed_hqe_statuses:
                # In the orignal code, here we raise an exception. In an
                # effort to prevent downtime we will instead abort the job and
                # send out an email notifying us this has occured.
                error_message = ('%s attempting to start entry with invalid '
                                 'status %s: %s. Aborting Job: %s.'
                                 % (class_name, entry.status, entry,
                                    entry.job))
                logging.error(error_message)
                email_manager.manager.enqueue_notify_email(
                    'Job Aborted - Invalid Host Queue Entry Status',
                    error_message)
                entry.job.request_abort()
            invalid_host_status = (
                    allowed_host_statuses is not None
                    and entry.host.status not in allowed_host_statuses)
            if invalid_host_status:
                # In the orignal code, here we raise an exception. In an
                # effort to prevent downtime we will instead abort the job and
                # send out an email notifying us this has occured.
                error_message = ('%s attempting to start on queue entry with '
                                 'invalid host status %s: %s. Aborting Job: %s'
                                 % (class_name, entry.host.status, entry,
                                    entry.job))
                logging.error(error_message)
                email_manager.manager.enqueue_notify_email(
                    'Job Aborted - Invalid Host Status', error_message)
                entry.job.request_abort()


class SiteDispatcher(object):
    """
    SiteDispatcher subclasses BaseDispatcher in monitor_db.
    """
    DEFAULT_REQUESTED_BY_USER_ID = 1


    _timer = autotest_stats.Timer('scheduler')
    _gauge = autotest_stats.Gauge('scheduler_rel')
    _tick_start = None


    @_timer.decorate
    def tick(self):
        self._tick_start = time.time()
        super(SiteDispatcher, self).tick()
        self._gauge.send('tick', time.time() - self._tick_start)

    @_timer.decorate
    def _garbage_collection(self):
        super(SiteDispatcher, self)._garbage_collection()
        if self._tick_start:
            self._gauge.send('_garbage_collection',
                             time.time() - self._tick_start)

    @_timer.decorate
    def _run_cleanup(self):
        super(SiteDispatcher, self)._run_cleanup()
        if self._tick_start:
            self._gauge.send('_run_cleanup', time.time() - self._tick_start)

    @_timer.decorate
    def _find_aborting(self):
        super(SiteDispatcher, self)._find_aborting()
        if self._tick_start:
            self._gauge.send('_find_aborting', time.time() - self._tick_start)

    @_timer.decorate
    def _process_recurring_runs(self):
        super(SiteDispatcher, self)._process_recurring_runs()
        if self._tick_start:
            self._gauge.send('_process_recurring_runs',
                             time.time() - self._tick_start)

    @_timer.decorate
    def _schedule_delay_tasks(self):
        super(SiteDispatcher, self)._schedule_delay_tasks()
        if self._tick_start:
            self._gauge.send('_schedule_delay_tasks',
                             time.time() - self._tick_start)

    @_timer.decorate
    def _schedule_running_host_queue_entries(self):
        super(SiteDispatcher, self)._schedule_running_host_queue_entries()
        if self._tick_start:
            self._gauge.send('_schedule_running_host_queue_entries',
                             time.time() - self._tick_start)

    @_timer.decorate
    def _schedule_special_tasks(self):
        super(SiteDispatcher, self)._schedule_special_tasks()
        if self._tick_start:
            self._gauge.send('_schedule_special_tasks',
                             time.time() - self._tick_start)

    @_timer.decorate
    def _schedule_new_jobs(self):
        super(SiteDispatcher, self)._schedule_new_jobs()
        if self._tick_start:
            self._gauge.send('_schedule_new_jobs',
                             time.time() - self._tick_start)


    @_timer.decorate
    def _handle_agents(self):
        super(SiteDispatcher, self)._handle_agents()
        if self._tick_start:
            self._gauge.send('_handle_agents', time.time() - self._tick_start)


    def _reverify_hosts_where(self, where,
                              print_message='Reverifying host %s'):
        """
        This is an altered version of _reverify_hosts_where the class to
        models.SpecialTask.objects.create passes in an argument for
        requested_by, in order to allow the Reset task to be created
        properly.
        """
        full_where='locked = 0 AND invalid = 0 AND ' + where
        for host in scheduler_models.Host.fetch(where=full_where):
            if self.host_has_agent(host):
                # host has already been recovered in some way
                continue
            if self._host_has_scheduled_special_task(host):
                # host will have a special task scheduled on the next cycle
                continue
            if print_message:
                logging.error(print_message, host.hostname)
            try:
                user = models.User.objects.get(login='autotest_system')
            except models.User.DoesNotExist:
                user = models.User.objects.get(
                        id=SiteDispatcher.DEFAULT_REQUESTED_BY_USER_ID)
            models.SpecialTask.objects.create(
                    task=models.SpecialTask.Task.RESET,
                    host=models.Host.objects.get(id=host.id),
                    requested_by=user)


    def _check_for_unrecovered_verifying_entries(self):
        # Verify is replaced by Reset.
        queue_entries = scheduler_models.HostQueueEntry.fetch(
                where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)
        for queue_entry in queue_entries:
            special_tasks = models.SpecialTask.objects.filter(
                    task__in=(models.SpecialTask.Task.CLEANUP,
                              models.SpecialTask.Task.VERIFY,
                              models.SpecialTask.Task.RESET),
                    queue_entry__id=queue_entry.id,
                    is_complete=False)
            if special_tasks.count() == 0:
                logging.error('Unrecovered Resetting host queue entry: %s. '
                              'Setting status to Queued.', str(queue_entry))
                # Essentially this host queue entry was set to be Verifying
                # however no special task exists for entry. This occurs if the
                # scheduler dies between changing the status and creating the
                # special task. By setting it to queued, the job can restart
                # from the beginning and proceed correctly. This is much more
                # preferable than having monitor_db not launching.
                queue_entry.set_status('Queued')