#!/usr/bin/python
#pylint: disable-msg=C0111
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import common
from autotest_lib.client.common_lib import host_queue_entry_states
from autotest_lib.client.common_lib.test_utils import unittest
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils
from autotest_lib.frontend.afe import models
from autotest_lib.frontend.afe import rdb_model_extensions
from autotest_lib.scheduler import rdb
from autotest_lib.scheduler import rdb_hosts
from autotest_lib.scheduler import rdb_lib
from autotest_lib.scheduler import rdb_requests
from autotest_lib.scheduler import rdb_testing_utils
from autotest_lib.server.cros import provision
class AssignmentValidator(object):
"""Utility class to check that priority inversion doesn't happen. """
@staticmethod
def check_acls_deps(host, request):
"""Check if a host and request match by comparing acls and deps.
@param host: A dictionary representing attributes of the host.
@param request: A request, as defined in rdb_requests.
@return True if the deps/acls of the request match the host.
"""
# Unfortunately the hosts labels are labelnames, not ids.
request_deps = set([l.name for l in
models.Label.objects.filter(id__in=request.deps)])
return (set(host['labels']).intersection(request_deps) == request_deps
and set(host['acls']).intersection(request.acls))
@staticmethod
def find_matching_host_for_request(hosts, request):
"""Find a host from the given list of hosts, matching the request.
@param hosts: A list of dictionaries representing host attributes.
@param requetst: The unsatisfied request.
@return: A host, if a matching host is found from the input list.
"""
if not hosts or not request:
return None
for host in hosts:
if AssignmentValidator.check_acls_deps(host, request):
return host
@staticmethod
def sort_requests(requests):
"""Sort the requests by priority.
@param requests: Unordered requests.
@return: A list of requests ordered by priority.
"""
return sorted(collections.Counter(requests).items(),
key=lambda request: request[0].priority, reverse=True)
@staticmethod
def verify_priority(request_queue, result):
requests = AssignmentValidator.sort_requests(request_queue)
for request, count in requests:
hosts = result.get(request)
# The request was completely satisfied.
if hosts and len(hosts) == count:
continue
# Go through all hosts given to lower priority requests and
# make sure we couldn't have allocated one of them for this
# unsatisfied higher priority request.
lower_requests = requests[requests.index((request,count))+1:]
for lower_request, count in lower_requests:
if (lower_request.priority < request.priority and
AssignmentValidator.find_matching_host_for_request(
result.get(lower_request), request)):
raise ValueError('Priority inversion occured between '
'priorities %s and %s' %
(request.priority, lower_request.priority))
@staticmethod
def priority_checking_response_handler(request_manager):
"""Fake response handler wrapper for any request_manager.
Check that higher priority requests get a response over lower priority
requests, by re-validating all the hosts assigned to a lower priority
request against the unsatisfied higher priority ones.
@param request_manager: A request_manager as defined in rdb_lib.
@raises ValueError: If priority inversion is detected.
"""
# Fist call the rdb to make its decisions, then sort the requests
# by priority and make sure unsatisfied requests higher up in the list
# could not have been satisfied by hosts assigned to requests lower
# down in the list.
result = request_manager.api_call(request_manager.request_queue)
if not result:
raise ValueError('Expected results but got none.')
AssignmentValidator.verify_priority(
request_manager.request_queue, result)
for hosts in result.values():
for host in hosts:
yield host
class BaseRDBTest(rdb_testing_utils.AbstractBaseRDBTester, unittest.TestCase):
_config_section = 'AUTOTEST_WEB'
def testAcquireLeasedHostBasic(self):
"""Test that acquisition of a leased host doesn't happen.
@raises AssertionError: If the one host that satisfies the request
is acquired.
"""
job = self.create_job(deps=set(['a']))
host = self.db_helper.create_host('h1', deps=set(['a']))
host.leased = 1
host.save()
queue_entries = self._dispatcher._refresh_pending_queue_entries()
hosts = list(rdb_lib.acquire_hosts(queue_entries))
self.assertTrue(len(hosts) == 1 and hosts[0] is None)
def testAcquireLeasedHostRace(self):
"""Test behaviour when hosts are leased just before acquisition.
If a fraction of the hosts somehow get leased between finding and
acquisition, the rdb should just return the remaining hosts for the
request to use.
@raises AssertionError: If both the requests get a host successfully,
since one host gets leased before the final attempt to lease both.
"""
j1 = self.create_job(deps=set(['a']))
j2 = self.create_job(deps=set(['a']))
hosts = [self.db_helper.create_host('h1', deps=set(['a'])),
self.db_helper.create_host('h2', deps=set(['a']))]
@rdb_hosts.return_rdb_host
def local_find_hosts(host_query_manger, deps, acls):
"""Return a predetermined list of hosts, one of which is leased."""
h1 = models.Host.objects.get(hostname='h1')
h1.leased = 1
h1.save()
h2 = models.Host.objects.get(hostname='h2')
return [h1, h2]
self.god.stub_with(rdb.AvailableHostQueryManager, 'find_hosts',
local_find_hosts)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
hosts = list(rdb_lib.acquire_hosts(queue_entries))
self.assertTrue(len(hosts) == 2 and None in hosts)
self.check_hosts(iter(hosts))
def testHostReleaseStates(self):
"""Test that we will only release an unused host if it is in Ready.
@raises AssertionError: If the host gets released in any other state.
"""
host = self.db_helper.create_host('h1', deps=set(['x']))
for state in rdb_model_extensions.AbstractHostModel.Status.names:
host.status = state
host.leased = 1
host.save()
self._release_unused_hosts()
host = models.Host.objects.get(hostname='h1')
self.assertTrue(host.leased == (state != 'Ready'))
def testHostReleseHQE(self):
"""Test that we will not release a ready host if it's being used.
@raises AssertionError: If the host is released even though it has
been assigned to an active hqe.
"""
# Create a host and lease it out in Ready.
host = self.db_helper.create_host('h1', deps=set(['x']))
host.status = 'Ready'
host.leased = 1
host.save()
# Create a job and give its hqe the leased host.
job = self.create_job(deps=set(['x']))
self.db_helper.add_host_to_job(host, job.id)
hqe = models.HostQueueEntry.objects.get(job_id=job.id)
# Activate the hqe by setting its state.
hqe.status = host_queue_entry_states.ACTIVE_STATUSES[0]
hqe.save()
# Make sure the hqes host isn't released, even if its in ready.
self._release_unused_hosts()
host = models.Host.objects.get(hostname='h1')
self.assertTrue(host.leased == 1)
def testBasicDepsAcls(self):
"""Test a basic deps/acls request.
Make sure that a basic request with deps and acls, finds a host from
the ready pool that has matching labels and is in a matching aclgroups.
@raises AssertionError: If the request doesn't find a host, since the
we insert a matching host in the ready pool.
"""
deps = set(['a', 'b'])
acls = set(['a', 'b'])
self.db_helper.create_host('h1', deps=deps, acls=acls)
job = self.create_job(user='autotest_system', deps=deps, acls=acls)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
matching_host = rdb_lib.acquire_hosts(queue_entries).next()
self.check_host_assignment(job.id, matching_host.id)
self.assertTrue(matching_host.leased == 1)
def testPreferredDeps(self):
"""Test that perferred deps is respected.
If multiple hosts satisfied a job's deps, the one with preferred
label will be assigned to the job.
@raises AssertionError: If a host without a preferred label is
assigned to the job instead of one with
a preferred label.
"""
lumpy_deps = set(['board:lumpy'])
stumpy_deps = set(['board:stumpy'])
stumpy_deps_with_crosversion = set(
['board:stumpy', 'cros-version:lumpy-release/R41-6323.0.0'])
acls = set(['a', 'b'])
# Hosts lumpy1 and lumpy2 are created as a control group,
# which ensures that if no preferred label is used, the host
# with a smaller id will be chosen first. We need to make sure
# stumpy2 was chosen because it has a cros-version label, but not
# because of other randomness.
self.db_helper.create_host('lumpy1', deps=lumpy_deps, acls=acls)
self.db_helper.create_host('lumpy2', deps=lumpy_deps, acls=acls)
self.db_helper.create_host('stumpy1', deps=stumpy_deps, acls=acls)
self.db_helper.create_host(
'stumpy2', deps=stumpy_deps_with_crosversion , acls=acls)
job_1 = self.create_job(user='autotest_system',
deps=lumpy_deps, acls=acls)
job_2 = self.create_job(user='autotest_system',
deps=stumpy_deps_with_crosversion, acls=acls)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
matching_hosts = list(rdb_lib.acquire_hosts(queue_entries))
assignment = {}
import logging
for job, host in zip(queue_entries, matching_hosts):
self.check_host_assignment(job.id, host.id)
assignment[job.id] = host.hostname
self.assertEqual(assignment[job_1.id], 'lumpy1')
self.assertEqual(assignment[job_2.id], 'stumpy2')
def testBadDeps(self):
"""Test that we find no hosts when only acls match.
@raises AssertionError: If the request finds a host, since the only
host in the ready pool will not have matching deps.
"""
host_labels = set(['a'])
job_deps = set(['b'])
acls = set(['a', 'b'])
self.db_helper.create_host('h1', deps=host_labels, acls=acls)
job = self.create_job(user='autotest_system', deps=job_deps, acls=acls)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
matching_host = rdb_lib.acquire_hosts(queue_entries).next()
self.assert_(not matching_host)
def testBadAcls(self):
"""Test that we find no hosts when only deps match.
@raises AssertionError: If the request finds a host, since the only
host in the ready pool will not have matching acls.
"""
deps = set(['a'])
host_acls = set(['a'])
job_acls = set(['b'])
self.db_helper.create_host('h1', deps=deps, acls=host_acls)
# Create the job as a new user who is only in the 'b' and 'Everyone'
# aclgroups. Though there are several hosts in the Everyone group, the
# 1 host that has the 'a' dep isn't.
job = self.create_job(user='new_user', deps=deps, acls=job_acls)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
matching_host = rdb_lib.acquire_hosts(queue_entries).next()
self.assert_(not matching_host)
def testBasicPriority(self):
"""Test that priority inversion doesn't happen.
Schedule 2 jobs with the same deps, acls and user, but different
priorities, and confirm that the higher priority request gets the host.
This confirmation happens through the AssignmentValidator.
@raises AssertionError: If the un important request gets host h1 instead
of the important request.
"""
deps = set(['a', 'b'])
acls = set(['a', 'b'])
self.db_helper.create_host('h1', deps=deps, acls=acls)
important_job = self.create_job(user='autotest_system',
deps=deps, acls=acls, priority=2)
un_important_job = self.create_job(user='autotest_system',
deps=deps, acls=acls, priority=0)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
AssignmentValidator.priority_checking_response_handler)
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
def testPriorityLevels(self):
"""Test that priority inversion doesn't happen.
Increases a job's priority and makes several requests for hosts,
checking that priority inversion doesn't happen.
@raises AssertionError: If the unimportant job gets h1 while it is
still unimportant, or doesn't get h1 while after it becomes the
most important job.
"""
deps = set(['a', 'b'])
acls = set(['a', 'b'])
self.db_helper.create_host('h1', deps=deps, acls=acls)
# Create jobs that will bucket differently and confirm that jobs in an
# earlier bucket get a host.
first_job = self.create_job(user='autotest_system', deps=deps, acls=acls)
important_job = self.create_job(user='autotest_system', deps=deps,
acls=acls, priority=2)
deps.pop()
unimportant_job = self.create_job(user='someother_system', deps=deps,
acls=acls, priority=1)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
AssignmentValidator.priority_checking_response_handler)
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
# Elevate the priority of the unimportant job, so we now have
# 2 jobs at the same priority.
self.db_helper.increment_priority(job_id=unimportant_job.id)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
self._release_unused_hosts()
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
# Prioritize the first job, and confirm that it gets the host over the
# jobs that got it the last time.
self.db_helper.increment_priority(job_id=unimportant_job.id)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
self._release_unused_hosts()
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
def testFrontendJobScheduling(self):
"""Test that basic frontend job scheduling.
@raises AssertionError: If the received and requested host don't match,
or the mis-matching host is returned instead.
"""
deps = set(['x', 'y'])
acls = set(['a', 'b'])
# Create 2 frontend jobs and only one matching host.
matching_job = self.create_job(acls=acls, deps=deps)
matching_host = self.db_helper.create_host('h1', acls=acls, deps=deps)
mis_matching_job = self.create_job(acls=acls, deps=deps)
mis_matching_host = self.db_helper.create_host(
'h2', acls=acls, deps=deps.pop())
self.db_helper.add_host_to_job(matching_host, matching_job.id)
self.db_helper.add_host_to_job(mis_matching_host, mis_matching_job.id)
# Check that only the matching host is returned, and that we get 'None'
# for the second request.
queue_entries = self._dispatcher._refresh_pending_queue_entries()
hosts = list(rdb_lib.acquire_hosts(queue_entries))
self.assertTrue(len(hosts) == 2 and None in hosts)
returned_host = [host for host in hosts if host].pop()
self.assertTrue(matching_host.id == returned_host.id)
def testFrontendJobPriority(self):
"""Test that frontend job scheduling doesn't ignore priorities.
@raises ValueError: If the priorities of frontend jobs are ignored.
"""
board = 'x'
high_priority = self.create_job(priority=2, deps=set([board]))
low_priority = self.create_job(priority=1, deps=set([board]))
host = self.db_helper.create_host('h1', deps=set([board]))
self.db_helper.add_host_to_job(host, low_priority.id)
self.db_helper.add_host_to_job(host, high_priority.id)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
def local_response_handler(request_manager):
"""Confirms that a higher priority frontend job gets a host.
@raises ValueError: If priority inversion happens and the job
with priority 1 gets the host instead.
"""
result = request_manager.api_call(request_manager.request_queue)
if not result:
raise ValueError('Excepted the high priority request to '
'get a host, but the result is empty.')
for request, hosts in result.iteritems():
if request.priority == 1:
raise ValueError('Priority of frontend job ignored.')
if len(hosts) > 1:
raise ValueError('Multiple hosts returned against one '
'frontend job scheduling request.')
yield hosts[0]
self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
local_response_handler)
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
def testSuiteOrderedHostAcquisition(self):
"""Test that older suite jobs acquire hosts first.
Make sure older suite jobs get hosts first, but not at the expense of
higher priority jobs.
@raises ValueError: If unexpected acquisitions occur, eg:
suite_job_2 acquires the last 2 hosts instead of suite_job_1.
isolated_important_job doesn't get any hosts.
Any job acquires more hosts than necessary.
"""
board = 'x'
# Create 2 suites such that the later suite has an ordering of deps
# that places it ahead of the earlier suite, if parent_job_id is
# ignored.
suite_without_dep = self.create_suite(num=2, priority=0, board=board)
suite_with_dep = self.create_suite(num=1, priority=0, board=board)
self.db_helper.add_deps_to_job(suite_with_dep[0], dep_names=list('y'))
# Create an important job that should be ahead of the first suite,
# because priority trumps parent_job_id and time of creation.
isolated_important_job = self.create_job(priority=3, deps=set([board]))
# Create 3 hosts, all with the deps to satisfy the last suite.
for i in range(0, 3):
self.db_helper.create_host('h%s' % i, deps=set([board, 'y']))
queue_entries = self._dispatcher._refresh_pending_queue_entries()
def local_response_handler(request_manager):
"""Reorder requests and check host acquisition.
@raises ValueError: If unexpected/no acquisitions occur.
"""
if any([request for request in request_manager.request_queue
if request.parent_job_id is None]):
raise ValueError('Parent_job_id can never be None.')
# This will result in the ordering:
# [suite_2_1, suite_1_*, suite_1_*, isolated_important_job]
# The priority scheduling order should be:
# [isolated_important_job, suite_1_*, suite_1_*, suite_2_1]
# Since:
# a. the isolated_important_job is the most important.
# b. suite_1 was created before suite_2, regardless of deps
disorderly_queue = sorted(request_manager.request_queue,
key=lambda r: -r.parent_job_id)
request_manager.request_queue = disorderly_queue
result = request_manager.api_call(request_manager.request_queue)
if not result:
raise ValueError('Expected results but got none.')
# Verify that the isolated_important_job got a host, and that the
# first suite got both remaining free hosts.
for request, hosts in result.iteritems():
if request.parent_job_id == 0:
if len(hosts) > 1:
raise ValueError('First job acquired more hosts than '
'necessary. Response map: %s' % result)
continue
if request.parent_job_id == 1:
if len(hosts) < 2:
raise ValueError('First suite job requests were not '
'satisfied. Response_map: %s' % result)
continue
# The second suite job got hosts instead of one of
# the others. Eitherway this is a failure.
raise ValueError('Unexpected host acquisition '
'Response map: %s' % result)
yield None
self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',
local_response_handler)
list(rdb_lib.acquire_hosts(queue_entries))
def testConfigurations(self):
"""Test that configurations don't matter.
@raises AssertionError: If the request doesn't find a host,
this will happen if configurations are not stripped out.
"""
self.god.stub_with(provision.Cleanup,
'_actions',
{'action': 'fakeTest'})
job_labels = set(['action', 'a'])
host_deps = set(['a'])
db_host = self.db_helper.create_host('h1', deps=host_deps)
self.create_job(user='autotest_system', deps=job_labels)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
matching_host = rdb_lib.acquire_hosts(queue_entries).next()
self.assert_(matching_host.id == db_host.id)
class RDBMinDutTest(
rdb_testing_utils.AbstractBaseRDBTester, unittest.TestCase):
"""Test AvailableHostRequestHandler"""
_config_section = 'AUTOTEST_WEB'
def min_dut_test_helper(self, num_hosts, suite_settings):
"""A helper function to test min_dut logic.
@param num_hosts: Total number of hosts to create.
@param suite_settings: A dictionary specify how suites would be created
and verified.
E.g. {'priority': 10, 'num_jobs': 3,
'min_duts':2, 'expected_aquired': 1}
With this setting, will create a suite that has 3
child jobs, with priority 10 and min_duts 2.
The suite is expected to get 1 dut.
"""
acls = set(['fake_acl'])
hosts = []
for i in range (0, num_hosts):
hosts.append(self.db_helper.create_host(
'h%d' % i, deps=set(['board:lumpy']), acls=acls))
suites = {}
suite_min_duts = {}
for setting in suite_settings:
s = self.create_suite(num=setting['num_jobs'],
priority=setting['priority'],
board='board:lumpy', acls=acls)
# Empty list will be used to store acquired hosts.
suites[s['parent_job'].id] = (setting, [])
suite_min_duts[s['parent_job'].id] = setting['min_duts']
queue_entries = self._dispatcher._refresh_pending_queue_entries()
matching_hosts = rdb_lib.acquire_hosts(queue_entries, suite_min_duts)
for host, queue_entry in zip(matching_hosts, queue_entries):
if host:
suites[queue_entry.job.parent_job_id][1].append(host)
for setting, hosts in suites.itervalues():
self.assertEqual(len(hosts),setting['expected_aquired'])
def testHighPriorityTakeAll(self):
"""Min duts not satisfied."""
num_hosts = 1
suite1 = {'priority':20, 'num_jobs': 3, 'min_duts': 2,
'expected_aquired': 1}
suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,
'expected_aquired': 0}
self.min_dut_test_helper(num_hosts, [suite1, suite2])
def testHighPriorityMinSatisfied(self):
"""High priority min duts satisfied."""
num_hosts = 4
suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
'expected_aquired': 2}
suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,
'expected_aquired': 2}
self.min_dut_test_helper(num_hosts, [suite1, suite2])
def testAllPrioritiesMinSatisfied(self):
"""Min duts satisfied."""
num_hosts = 7
suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
'expected_aquired': 2}
suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,
'expected_aquired': 5}
self.min_dut_test_helper(num_hosts, [suite1, suite2])
def testHighPrioritySatisfied(self):
"""Min duts satisfied, high priority suite satisfied."""
num_hosts = 10
suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
'expected_aquired': 4}
suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,
'expected_aquired': 6}
self.min_dut_test_helper(num_hosts, [suite1, suite2])
def testEqualPriorityFirstSuiteMinSatisfied(self):
"""Equal priority, earlier suite got min duts."""
num_hosts = 4
suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
'expected_aquired': 2}
suite2 = {'priority':20, 'num_jobs': 7, 'min_duts': 5,
'expected_aquired': 2}
self.min_dut_test_helper(num_hosts, [suite1, suite2])
def testEqualPriorityAllSuitesMinSatisfied(self):
"""Equal priority, all suites got min duts."""
num_hosts = 7
suite1 = {'priority':20, 'num_jobs': 4, 'min_duts': 2,
'expected_aquired': 2}
suite2 = {'priority':20, 'num_jobs': 7, 'min_duts': 5,
'expected_aquired': 5}
self.min_dut_test_helper(num_hosts, [suite1, suite2])
if __name__ == '__main__':
unittest.main()