普通文本  |  949行  |  35.58 KB

import collections
import heapq
import os
import Queue
import time
import threading
import traceback
import logging

import common
from autotest_lib.client.common_lib import error, global_config, utils
from autotest_lib.client.common_lib.cros.graphite import autotest_stats
from autotest_lib.scheduler import email_manager, drone_utility, drones
from autotest_lib.scheduler import drone_task_queue
from autotest_lib.scheduler import scheduler_config
from autotest_lib.scheduler import thread_lib


# results on drones will be placed under the drone_installation_directory in a
# directory with this name
_DRONE_RESULTS_DIR_SUFFIX = 'results'

WORKING_DIRECTORY = object() # see execute_command()


AUTOSERV_PID_FILE = '.autoserv_execute'
CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
PARSER_PID_FILE = '.parser_execute'
ARCHIVER_PID_FILE = '.archiver_execute'

ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
                     ARCHIVER_PID_FILE)

_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
        scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
        type=bool, default=True)


class DroneManagerError(Exception):
    pass


class CustomEquals(object):
    def _id(self):
        raise NotImplementedError


    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return NotImplemented
        return self._id() == other._id()


    def __ne__(self, other):
        return not self == other


    def __hash__(self):
        return hash(self._id())


class Process(CustomEquals):
    def __init__(self, hostname, pid, ppid=None):
        self.hostname = hostname
        self.pid = pid
        self.ppid = ppid

    def _id(self):
        return (self.hostname, self.pid)


    def __str__(self):
        return '%s/%s' % (self.hostname, self.pid)


    def __repr__(self):
        return super(Process, self).__repr__() + '<%s>' % self


class PidfileId(CustomEquals):
    def __init__(self, path):
        self.path = path


    def _id(self):
        return self.path


    def __str__(self):
        return str(self.path)


class _PidfileInfo(object):
    age = 0
    num_processes = None


class PidfileContents(object):
    process = None
    exit_status = None
    num_tests_failed = None

    def is_invalid(self):
        return False


    def is_running(self):
        return self.process and not self.exit_status


class InvalidPidfile(object):
    process = None
    exit_status = None
    num_tests_failed = None


    def __init__(self, error):
        self.error = error


    def is_invalid(self):
        return True


    def is_running(self):
        return False


    def __str__(self):
        return self.error


class _DroneHeapWrapper(object):
    """Wrapper to compare drones based on used_capacity().

    These objects can be used to keep a heap of drones by capacity.
    """
    def __init__(self, drone):
        self.drone = drone


    def __cmp__(self, other):
        assert isinstance(other, _DroneHeapWrapper)
        return cmp(self.drone.used_capacity(), other.drone.used_capacity())


class BaseDroneManager(object):
    """
    This class acts as an interface from the scheduler to drones, whether it be
    only a single "drone" for localhost or multiple remote drones.

    All paths going into and out of this class are relative to the full results
    directory, except for those returns by absolute_path().
    """


    # Minimum time to wait before next email
    # about a drone hitting process limit is sent.
    NOTIFY_INTERVAL = 60 * 60 * 24 # one day
    _STATS_KEY = 'drone_manager'
    _timer = autotest_stats.Timer(_STATS_KEY)


    def __init__(self):
        # absolute path of base results dir
        self._results_dir = None
        # holds Process objects
        self._process_set = set()
        # holds the list of all processes running on all drones
        self._all_processes = {}
        # maps PidfileId to PidfileContents
        self._pidfiles = {}
        # same as _pidfiles
        self._pidfiles_second_read = {}
        # maps PidfileId to _PidfileInfo
        self._registered_pidfile_info = {}
        # used to generate unique temporary paths
        self._temporary_path_counter = 0
        # maps hostname to Drone object
        self._drones = {}
        self._results_drone = None
        # maps results dir to dict mapping file path to contents
        self._attached_files = {}
        # heapq of _DroneHeapWrappers
        self._drone_queue = []
        # map drone hostname to time stamp of email that
        # has been sent about the drone hitting process limit.
        self._notify_record = {}
        # A threaded task queue used to refresh drones asynchronously.
        if _THREADED_DRONE_MANAGER:
            self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
                    name='%s.refresh_queue' % self._STATS_KEY)
        else:
            self._refresh_task_queue = drone_task_queue.DroneTaskQueue()


    def initialize(self, base_results_dir, drone_hostnames,
                   results_repository_hostname):
        self._results_dir = base_results_dir

        for hostname in drone_hostnames:
            self._add_drone(hostname)

        if not self._drones:
            # all drones failed to initialize
            raise DroneManagerError('No valid drones found')

        self.refresh_drone_configs()

        logging.info('Using results repository on %s',
                     results_repository_hostname)
        self._results_drone = drones.get_drone(results_repository_hostname)
        results_installation_dir = global_config.global_config.get_config_value(
                scheduler_config.CONFIG_SECTION,
                'results_host_installation_directory', default=None)
        if results_installation_dir:
            self._results_drone.set_autotest_install_dir(
                    results_installation_dir)
        # don't initialize() the results drone - we don't want to clear out any
        # directories and we don't need to kill any processes


    def reinitialize_drones(self):
        self._call_all_drones('initialize', self._results_dir)


    def shutdown(self):
        for drone in self.get_drones():
            drone.shutdown()


    def _get_max_pidfile_refreshes(self):
        """
        Normally refresh() is called on every monitor_db.Dispatcher.tick().

        @returns: The number of refresh() calls before we forget a pidfile.
        """
        pidfile_timeout = global_config.global_config.get_config_value(
                scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
                type=int, default=2000)
        return pidfile_timeout


    def _add_drone(self, hostname):
        logging.info('Adding drone %s', hostname)
        drone = drones.get_drone(hostname)
        if drone:
            self._drones[drone.hostname] = drone
            drone.call('initialize', self.absolute_path(''))


    def _remove_drone(self, hostname):
        self._drones.pop(hostname, None)


    def refresh_drone_configs(self):
        """
        Reread global config options for all drones.
        """
        # Import server_manager_utils is delayed rather than at the beginning of
        # this module. The reason is that test_that imports drone_manager when
        # importing autoserv_utils. The import is done before test_that setup
        # django (test_that only setup django in setup_local_afe, since it's
        # not needed when test_that runs the test in a lab duts through :lab:
        # option. Therefore, if server_manager_utils is imported at the
        # beginning of this module, test_that will fail since django is not
        # setup yet.
        from autotest_lib.site_utils import server_manager_utils
        config = global_config.global_config
        section = scheduler_config.CONFIG_SECTION
        config.parse_config_file()
        for hostname, drone in self._drones.iteritems():
            if server_manager_utils.use_server_db():
                server = server_manager_utils.get_servers(hostname=hostname)[0]
                attributes = dict([(a.attribute, a.value)
                                   for a in server.attributes.all()])
                drone.enabled = (
                        int(attributes.get('disabled', 0)) == 0)
                drone.max_processes = int(
                        attributes.get(
                            'max_processes',
                            scheduler_config.config.max_processes_per_drone))
                allowed_users = attributes.get('users', None)
            else:
                disabled = config.get_config_value(
                        section, '%s_disabled' % hostname, default='')
                drone.enabled = not bool(disabled)
                drone.max_processes = config.get_config_value(
                        section, '%s_max_processes' % hostname, type=int,
                        default=scheduler_config.config.max_processes_per_drone)

                allowed_users = config.get_config_value(
                        section, '%s_users' % hostname, default=None)
            if allowed_users:
                drone.allowed_users = set(allowed_users.split())
            else:
                drone.allowed_users = None
            logging.info('Drone %s.max_processes: %s', hostname,
                         drone.max_processes)
            logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
            logging.info('Drone %s.allowed_users: %s', hostname,
                         drone.allowed_users)
            logging.info('Drone %s.support_ssp: %s', hostname,
                         drone.support_ssp)

        self._reorder_drone_queue() # max_processes may have changed
        # Clear notification record about reaching max_processes limit.
        self._notify_record = {}


    def get_drones(self):
        return self._drones.itervalues()


    def cleanup_orphaned_containers(self):
        """Queue cleanup_orphaned_containers call at each drone.
        """
        for drone in self._drones.values():
            logging.info('Queue cleanup_orphaned_containers at %s',
                         drone.hostname)
            drone.queue_call('cleanup_orphaned_containers')


    def _get_drone_for_process(self, process):
        return self._drones[process.hostname]


    def _get_drone_for_pidfile_id(self, pidfile_id):
        pidfile_contents = self.get_pidfile_contents(pidfile_id)
        assert pidfile_contents.process is not None
        return self._get_drone_for_process(pidfile_contents.process)


    def _drop_old_pidfiles(self):
        # use items() since the dict is modified in unregister_pidfile()
        for pidfile_id, info in self._registered_pidfile_info.items():
            if info.age > self._get_max_pidfile_refreshes():
                logging.warning('dropping leaked pidfile %s', pidfile_id)
                self.unregister_pidfile(pidfile_id)
            else:
                info.age += 1


    def _reset(self):
        self._process_set = set()
        self._all_processes = {}
        self._pidfiles = {}
        self._pidfiles_second_read = {}
        self._drone_queue = []


    def _call_all_drones(self, method, *args, **kwargs):
        all_results = {}
        for drone in self.get_drones():
            with self._timer.get_client(
                    '%s.%s' % (drone.hostname.replace('.', '_'), method)):
                all_results[drone] = drone.call(method, *args, **kwargs)
        return all_results


    def _parse_pidfile(self, drone, raw_contents):
        """Parse raw pidfile contents.

        @param drone: The drone on which this pidfile was found.
        @param raw_contents: The raw contents of a pidfile, eg:
            "pid\nexit_staus\nnum_tests_failed\n".
        """
        contents = PidfileContents()
        if not raw_contents:
            return contents
        lines = raw_contents.splitlines()
        if len(lines) > 3:
            return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
                                  (len(lines), lines))
        try:
            pid = int(lines[0])
            contents.process = Process(drone.hostname, pid)
            # if len(lines) == 2, assume we caught Autoserv between writing
            # exit_status and num_failed_tests, so just ignore it and wait for
            # the next cycle
            if len(lines) == 3:
                contents.exit_status = int(lines[1])
                contents.num_tests_failed = int(lines[2])
        except ValueError, exc:
            return InvalidPidfile('Corrupt pid file: ' + str(exc.args))

        return contents


    def _process_pidfiles(self, drone, pidfiles, store_in_dict):
        for pidfile_path, contents in pidfiles.iteritems():
            pidfile_id = PidfileId(pidfile_path)
            contents = self._parse_pidfile(drone, contents)
            store_in_dict[pidfile_id] = contents


    def _add_process(self, drone, process_info):
        process = Process(drone.hostname, int(process_info['pid']),
                          int(process_info['ppid']))
        self._process_set.add(process)


    def _add_autoserv_process(self, drone, process_info):
        assert process_info['comm'] == 'autoserv'
        # only root autoserv processes have pgid == pid
        if process_info['pgid'] != process_info['pid']:
            return
        self._add_process(drone, process_info)


    def _enqueue_drone(self, drone):
        heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))


    def _reorder_drone_queue(self):
        heapq.heapify(self._drone_queue)


    def _compute_active_processes(self, drone):
        drone.active_processes = 0
        for pidfile_id, contents in self._pidfiles.iteritems():
            is_running = contents.exit_status is None
            on_this_drone = (contents.process
                             and contents.process.hostname == drone.hostname)
            if is_running and on_this_drone:
                info = self._registered_pidfile_info[pidfile_id]
                if info.num_processes is not None:
                    drone.active_processes += info.num_processes
        autotest_stats.Gauge(self._STATS_KEY).send(
                '%s.%s' % (drone.hostname.replace('.', '_'),
                           'active_processes'), drone.active_processes)


    def _check_drone_process_limit(self, drone):
        """
        Notify if the number of processes on |drone| is approaching limit.

        @param drone: A Drone object.
        """
        try:
            percent = float(drone.active_processes) / drone.max_processes
        except ZeroDivisionError:
            percent = 100
        max_percent = scheduler_config.config.max_processes_warning_threshold
        if percent >= max_percent:
            message = ('Drone %s is hitting %s of process limit.' %
                       (drone.hostname, format(percent, '.2%')))
            logging.warning(message)
            last_notified = self._notify_record.get(drone.hostname, 0)
            now = time.time()
            if last_notified + BaseDroneManager.NOTIFY_INTERVAL < now:
                body = ('Active processes/Process limit: %d/%d (%s)' %
                        (drone.active_processes, drone.max_processes,
                         format(percent, '.2%')))
                email_manager.manager.enqueue_notify_email(message, body)
                self._notify_record[drone.hostname] = now


    def trigger_refresh(self):
        """Triggers a drone manager refresh.

        @raises DroneManagerError: If a drone has un-executed calls.
            Since they will get clobbered when we queue refresh calls.
        """
        self._reset()
        self._drop_old_pidfiles()
        pidfile_paths = [pidfile_id.path
                         for pidfile_id in self._registered_pidfile_info]
        drones = list(self.get_drones())
        for drone in drones:
            calls = drone.get_calls()
            if calls:
                raise DroneManagerError('Drone %s has un-executed calls: %s '
                                        'which might get corrupted through '
                                        'this invocation' %
                                        (drone, [str(call) for call in calls]))
            drone.queue_call('refresh', pidfile_paths)
        logging.info("Invoking drone refresh.")
        with self._timer.get_client('trigger_refresh'):
            self._refresh_task_queue.execute(drones, wait=False)


    def sync_refresh(self):
        """Complete the drone refresh started by trigger_refresh.

        Waits for all drone threads then refreshes internal datastructures
        with drone process information.
        """

        # This gives us a dictionary like what follows:
        # {drone: [{'pidfiles': (raw contents of pidfile paths),
        #           'autoserv_processes': (autoserv process info from ps),
        #           'all_processes': (all process info from ps),
        #           'parse_processes': (parse process infor from ps),
        #           'pidfile_second_read': (pidfile contents, again),}]
        #   drone2: ...}
        # The values of each drone are only a list because this adheres to the
        # drone utility interface (each call is executed and its results are
        # places in a list, but since we never couple the refresh calls with
        # any other call, this list will always contain a single dict).
        with self._timer.get_client('sync_refresh'):
            all_results = self._refresh_task_queue.get_results()
        logging.info("Drones refreshed.")

        # The loop below goes through and parses pidfile contents. Pidfiles
        # are used to track autoserv execution, and will always contain < 3
        # lines of the following: pid, exit code, number of tests. Each pidfile
        # is identified by a PidfileId object, which contains a unique pidfile
        # path (unique because it contains the job id) making it hashable.
        # All pidfiles are stored in the drone managers _pidfiles dict as:
        #   {pidfile_id: pidfile_contents(Process(drone, pid),
        #                                 exit_code, num_tests_failed)}
        # In handle agents, each agent knows its pidfile_id, and uses this
        # to retrieve the refreshed contents of its pidfile via the
        # PidfileRunMonitor (through its tick) before making decisions. If
        # the agent notices that its process has exited, it unregisters the
        # pidfile from the drone_managers._registered_pidfile_info dict
        # through its epilog.
        for drone, results_list in all_results.iteritems():
            results = results_list[0]
            drone_hostname = drone.hostname.replace('.', '_')

            with self._timer.get_client('%s.results' % drone_hostname):
                for process_info in results['all_processes']:
                    if process_info['comm'] == 'autoserv':
                        self._add_autoserv_process(drone, process_info)
                    drone_pid = drone.hostname, int(process_info['pid'])
                    self._all_processes[drone_pid] = process_info

                for process_info in results['parse_processes']:
                    self._add_process(drone, process_info)

            with self._timer.get_client('%s.pidfiles' % drone_hostname):
                self._process_pidfiles(drone, results['pidfiles'],
                                       self._pidfiles)
            with self._timer.get_client('%s.pidfiles_second' % drone_hostname):
                self._process_pidfiles(drone, results['pidfiles_second_read'],
                                       self._pidfiles_second_read)

            self._compute_active_processes(drone)
            if drone.enabled:
                self._enqueue_drone(drone)
                self._check_drone_process_limit(drone)


    def refresh(self):
        """Refresh all drones."""
        with self._timer.get_client('refresh'):
            self.trigger_refresh()
            self.sync_refresh()


    def execute_actions(self):
        """
        Called at the end of a scheduler cycle to execute all queued actions
        on drones.
        """
        # Invoke calls queued on all drones since the last call to execute
        # and wait for them to return.
        if _THREADED_DRONE_MANAGER:
            thread_lib.ThreadedTaskQueue(
                    name='%s.execute_queue' % self._STATS_KEY).execute(
                            self._drones.values())
        else:
            drone_task_queue.DroneTaskQueue().execute(self._drones.values())

        try:
            self._results_drone.execute_queued_calls()
        except error.AutoservError:
            warning = ('Results repository failed to execute calls:\n' +
                       traceback.format_exc())
            email_manager.manager.enqueue_notify_email(
                'Results repository error', warning)
            self._results_drone.clear_call_queue()


    def get_orphaned_autoserv_processes(self):
        """
        Returns a set of Process objects for orphaned processes only.
        """
        return set(process for process in self._process_set
                   if process.ppid == 1)


    def kill_process(self, process):
        """
        Kill the given process.
        """
        logging.info('killing %s', process)
        drone = self._get_drone_for_process(process)
        drone.queue_call('kill_process', process)


    def _ensure_directory_exists(self, path):
        if not os.path.exists(path):
            os.makedirs(path)


    def total_running_processes(self):
        return sum(drone.active_processes for drone in self.get_drones())


    def max_runnable_processes(self, username, drone_hostnames_allowed):
        """
        Return the maximum number of processes that can be run (in a single
        execution) given the current load on drones.
        @param username: login of user to run a process.  may be None.
        @param drone_hostnames_allowed: list of drones that can be used. May be
                                        None
        """
        usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
                                 if wrapper.drone.usable_by(username) and
                                 (drone_hostnames_allowed is None or
                                          wrapper.drone.hostname in
                                                  drone_hostnames_allowed)]
        if not usable_drone_wrappers:
            # all drones disabled or inaccessible
            return 0
        runnable_processes = [
                wrapper.drone.max_processes - wrapper.drone.active_processes
                for wrapper in usable_drone_wrappers]
        return max([0] + runnable_processes)


    def _least_loaded_drone(self, drones):
        drone_to_use = drones[0]
        for drone in drones[1:]:
            if drone.used_capacity() < drone_to_use.used_capacity():
                drone_to_use = drone
        return drone_to_use


    def _choose_drone_for_execution(self, num_processes, username,
                                    drone_hostnames_allowed,
                                    require_ssp=False):
        """Choose a drone to execute command.

        @param num_processes: Number of processes needed for execution.
        @param username: Name of the user to execute the command.
        @param drone_hostnames_allowed: A list of names of drone allowed.
        @param require_ssp: Require server-side packaging to execute the,
                            command, default to False.

        @return: A drone object to be used for execution.
        """
        # cycle through drones is order of increasing used capacity until
        # we find one that can handle these processes
        checked_drones = []
        usable_drones = []
        # Drones do not support server-side packaging, used as backup if no
        # drone is found to run command requires server-side packaging.
        no_ssp_drones = []
        drone_to_use = None
        while self._drone_queue:
            drone = heapq.heappop(self._drone_queue).drone
            checked_drones.append(drone)
            logging.info('Checking drone %s', drone.hostname)
            if not drone.usable_by(username):
                continue

            drone_allowed = (drone_hostnames_allowed is None
                             or drone.hostname in drone_hostnames_allowed)
            if not drone_allowed:
                logging.debug('Drone %s not allowed: ', drone.hostname)
                continue
            if require_ssp and not drone.support_ssp:
                logging.debug('Drone %s does not support server-side '
                              'packaging.', drone.hostname)
                no_ssp_drones.append(drone)
                continue

            usable_drones.append(drone)

            if drone.active_processes + num_processes <= drone.max_processes:
                drone_to_use = drone
                break
            logging.info('Drone %s has %d active + %s requested > %s max',
                         drone.hostname, drone.active_processes, num_processes,
                         drone.max_processes)

        if not drone_to_use and usable_drones:
            # Drones are all over loaded, pick the one with least load.
            drone_summary = ','.join('%s %s/%s' % (drone.hostname,
                                                   drone.active_processes,
                                                   drone.max_processes)
                                     for drone in usable_drones)
            logging.error('No drone has capacity to handle %d processes (%s) '
                          'for user %s', num_processes, drone_summary, username)
            drone_to_use = self._least_loaded_drone(usable_drones)
        elif not drone_to_use and require_ssp and no_ssp_drones:
            # No drone supports server-side packaging, choose the least loaded.
            drone_to_use = self._least_loaded_drone(no_ssp_drones)

        # refill _drone_queue
        for drone in checked_drones:
            self._enqueue_drone(drone)

        return drone_to_use


    def _substitute_working_directory_into_command(self, command,
                                                   working_directory):
        for i, item in enumerate(command):
            if item is WORKING_DIRECTORY:
                command[i] = working_directory


    def execute_command(self, command, working_directory, pidfile_name,
                        num_processes, log_file=None, paired_with_pidfile=None,
                        username=None, drone_hostnames_allowed=None):
        """
        Execute the given command, taken as an argv list.

        @param command: command to execute as a list.  if any item is
                WORKING_DIRECTORY, the absolute path to the working directory
                will be substituted for it.
        @param working_directory: directory in which the pidfile will be written
        @param pidfile_name: name of the pidfile this process will write
        @param num_processes: number of processes to account for from this
                execution
        @param log_file (optional): path (in the results repository) to hold
                command output.
        @param paired_with_pidfile (optional): a PidfileId for an
                already-executed process; the new process will execute on the
                same drone as the previous process.
        @param username (optional): login of the user responsible for this
                process.
        @param drone_hostnames_allowed (optional): hostnames of the drones that
                                                   this command is allowed to
                                                   execute on
        """
        abs_working_directory = self.absolute_path(working_directory)
        if not log_file:
            log_file = self.get_temporary_path('execute')
        log_file = self.absolute_path(log_file)

        self._substitute_working_directory_into_command(command,
                                                        abs_working_directory)

        if paired_with_pidfile:
            drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
        else:
            require_ssp = '--require-ssp' in command
            drone = self._choose_drone_for_execution(
                    num_processes, username, drone_hostnames_allowed,
                    require_ssp=require_ssp)
            # Enable --warn-no-ssp option for autoserv to log a warning and run
            # the command without using server-side packaging.
            if require_ssp and not drone.support_ssp:
                command.append('--warn-no-ssp')

        if not drone:
            raise DroneManagerError('command failed; no drones available: %s'
                                    % command)

        logging.info("command = %s", command)
        logging.info('log file = %s:%s', drone.hostname, log_file)
        self._write_attached_files(working_directory, drone)
        drone.queue_call('execute_command', command, abs_working_directory,
                         log_file, pidfile_name)
        drone.active_processes += num_processes
        self._reorder_drone_queue()

        pidfile_path = os.path.join(abs_working_directory, pidfile_name)
        pidfile_id = PidfileId(pidfile_path)
        self.register_pidfile(pidfile_id)
        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
        return pidfile_id


    def get_pidfile_id_from(self, execution_tag, pidfile_name):
        path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
        return PidfileId(path)


    def register_pidfile(self, pidfile_id):
        """
        Indicate that the DroneManager should look for the given pidfile when
        refreshing.
        """
        if pidfile_id not in self._registered_pidfile_info:
            logging.info('monitoring pidfile %s', pidfile_id)
            self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
        self._reset_pidfile_age(pidfile_id)


    def _reset_pidfile_age(self, pidfile_id):
        if pidfile_id in self._registered_pidfile_info:
            self._registered_pidfile_info[pidfile_id].age = 0


    def unregister_pidfile(self, pidfile_id):
        if pidfile_id in self._registered_pidfile_info:
            logging.info('forgetting pidfile %s', pidfile_id)
            del self._registered_pidfile_info[pidfile_id]


    def declare_process_count(self, pidfile_id, num_processes):
        self._registered_pidfile_info[pidfile_id].num_processes = num_processes


    def get_pidfile_contents(self, pidfile_id, use_second_read=False):
        """
        Retrieve a PidfileContents object for the given pidfile_id.  If
        use_second_read is True, use results that were read after the processes
        were checked, instead of before.
        """
        self._reset_pidfile_age(pidfile_id)
        if use_second_read:
            pidfile_map = self._pidfiles_second_read
        else:
            pidfile_map = self._pidfiles
        return pidfile_map.get(pidfile_id, PidfileContents())


    def is_process_running(self, process):
        """
        Check if the given process is in the running process list.
        """
        if process in self._process_set:
            return True

        drone_pid = process.hostname, process.pid
        if drone_pid in self._all_processes:
            logging.error('Process %s found, but not an autoserv process. '
                    'Is %s', process, self._all_processes[drone_pid])
            return True

        return False


    def get_temporary_path(self, base_name):
        """
        Get a new temporary path guaranteed to be unique across all drones
        for this scheduler execution.
        """
        self._temporary_path_counter += 1
        return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
                            '%s.%s' % (base_name, self._temporary_path_counter))


    def absolute_path(self, path, on_results_repository=False):
        if on_results_repository:
            base_dir = self._results_dir
        else:
            base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
                                    _DRONE_RESULTS_DIR_SUFFIX)
        return os.path.join(base_dir, path)


    def _copy_results_helper(self, process, source_path, destination_path,
                             to_results_repository=False):
        logging.debug('_copy_results_helper. process: %s, source_path: %s, '
                      'destination_path: %s, to_results_repository: %s',
                      process, source_path, destination_path,
                      to_results_repository)
        full_source = self.absolute_path(source_path)
        full_destination = self.absolute_path(
                destination_path, on_results_repository=to_results_repository)
        source_drone = self._get_drone_for_process(process)
        if to_results_repository:
            source_drone.send_file_to(self._results_drone, full_source,
                                      full_destination, can_fail=True)
        else:
            source_drone.queue_call('copy_file_or_directory', full_source,
                                    full_destination)


    def copy_to_results_repository(self, process, source_path,
                                   destination_path=None):
        """
        Copy results from the given process at source_path to destination_path
        in the results repository.
        """
        if destination_path is None:
            destination_path = source_path
        self._copy_results_helper(process, source_path, destination_path,
                                  to_results_repository=True)


    def copy_results_on_drone(self, process, source_path, destination_path):
        """
        Copy a results directory from one place to another on the drone.
        """
        self._copy_results_helper(process, source_path, destination_path)


    def _write_attached_files(self, results_dir, drone):
        attached_files = self._attached_files.pop(results_dir, {})
        for file_path, contents in attached_files.iteritems():
            drone.queue_call('write_to_file', self.absolute_path(file_path),
                             contents)


    def attach_file_to_execution(self, results_dir, file_contents,
                                 file_path=None):
        """
        When the process for the results directory is executed, the given file
        contents will be placed in a file on the drone.  Returns the path at
        which the file will be placed.
        """
        if not file_path:
            file_path = self.get_temporary_path('attach')
        files_for_execution = self._attached_files.setdefault(results_dir, {})
        assert file_path not in files_for_execution
        files_for_execution[file_path] = file_contents
        return file_path


    def write_lines_to_file(self, file_path, lines, paired_with_process=None):
        """
        Write the given lines (as a list of strings) to a file.  If
        paired_with_process is given, the file will be written on the drone
        running the given Process.  Otherwise, the file will be written to the
        results repository.
        """
        file_contents = '\n'.join(lines) + '\n'
        if paired_with_process:
            drone = self._get_drone_for_process(paired_with_process)
            on_results_repository = False
        else:
            drone = self._results_drone
            on_results_repository = True
        full_path = self.absolute_path(
                file_path, on_results_repository=on_results_repository)
        drone.queue_call('write_to_file', full_path, file_contents)


SiteDroneManager = utils.import_site_class(
   __file__, 'autotest_lib.scheduler.site_drone_manager',
   'SiteDroneManager', BaseDroneManager)


class DroneManager(SiteDroneManager):
    pass


_the_instance = None

def instance():
    if _the_instance is None:
        _set_instance(DroneManager())
    return _the_instance


def _set_instance(instance): # usable for testing
    global _the_instance
    _the_instance = instance