#!/usr/bin/python
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
import common
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils
from autotest_lib.scheduler import rdb
from autotest_lib.scheduler import rdb_cache_manager
from autotest_lib.scheduler import rdb_lib
from autotest_lib.scheduler import rdb_testing_utils as test_utils
from autotest_lib.scheduler import rdb_utils
def get_line_with_labels(required_labels, cache_lines):
"""Get the cache line with the hosts that match given labels.
Confirm that all hosts have matching labels within a line,
then return the lines with the requested labels. There can
be more than one, since we use acls in the cache key.
@param labels: A list of label names.
@cache_lines: A list of cache lines to look through.
@return: A list of the cache lines with the requested labels.
"""
label_lines = []
for line in cache_lines:
if not line:
continue
labels = list(line)[0].labels.get_label_names()
if any(host.labels.get_label_names() != labels for host in line):
raise AssertionError('Mismatch in deps within a cache line')
if required_labels == labels:
label_lines.append(line)
return label_lines
def get_hosts_for_request(
response_map, deps=test_utils.DEFAULT_DEPS,
acls=test_utils.DEFAULT_ACLS, priority=0, parent_job_id=0, **kwargs):
"""Get the hosts for a request matching kwargs from the response map.
@param response_map: A response map from an rdb request_handler.
"""
return response_map[
test_utils.AbstractBaseRDBTester.get_request(
deps, acls, priority, parent_job_id)]
class RDBCacheTest(test_utils.AbstractBaseRDBTester, unittest.TestCase):
"""Unittests for RDBHost objects."""
def testCachingBasic(self):
"""Test that different requests will hit the database."""
# r1 should cache h2 and use h1; r2 should cach [] and use h2
# at the end the cache should contain one stale line, with
# h2 in it, and one empty line since r2 acquired h2.
default_params = test_utils.get_default_job_params()
self.create_job(**default_params)
default_params['deps'] = default_params['deps'][0]
self.create_job(**default_params)
for i in range(0, 2):
self.db_helper.create_host(
'h%s'%i, **test_utils.get_default_host_params())
queue_entries = self._dispatcher._refresh_pending_queue_entries()
def local_get_response(self):
""" Local rdb.get_response handler."""
requests = self.response_map.keys()
if not (self.cache.hits == 0 and self.cache.misses == 2):
raise AssertionError('Neither request should have hit the '
'cache, but both should have inserted into it.')
lines = get_line_with_labels(
test_utils.DEFAULT_DEPS,
self.cache._cache_backend._cache.values())
if len(lines) > 1:
raise AssertionError('Caching was too agressive, '
'the second request should not have cached anything '
'because it used the one free host.')
cached_host = lines[0].pop()
default_params = test_utils.get_default_job_params()
job1_host = get_hosts_for_request(
self.response_map, **default_params)[0]
default_params['deps'] = default_params['deps'][0]
job2_host = get_hosts_for_request(
self.response_map, **default_params)[0]
if (job2_host.hostname == job1_host.hostname or
cached_host.hostname not in
[job2_host.hostname, job1_host.hostname]):
raise AssertionError('Wrong host cached %s. The first job '
'should have cached the host used by the second.' %
cached_host.hostname)
# Shouldn't be able to lease this host since r2 used it.
try:
cached_host.lease()
except rdb_utils.RDBException:
pass
else:
raise AssertionError('Was able to lease a stale host. The '
'second request should have leased it.')
return test_utils.wire_format_response_map(self.response_map)
self.god.stub_with(rdb.AvailableHostRequestHandler,
'get_response', local_get_response)
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
def testCachingPriority(self):
"""Test requests with the same deps but different priorities."""
# All 3 jobs should find hosts, and there should be one host left
# behind in the cache. The first job will take one host and cache 3,
# the second will take one and cache 2, while the last will take one.
# The remaining host in the cache should not be stale.
default_job_params = test_utils.get_default_job_params()
for i in range(0, 3):
default_job_params['priority'] = i
job = self.create_job(**default_job_params)
default_host_params = test_utils.get_default_host_params()
for i in range(0, 4):
self.db_helper.create_host('h%s'%i, **default_host_params)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
def local_get_response(self):
""" Local rdb.get_response handler."""
if not (self.cache.hits == 2 and self.cache.misses ==1):
raise AssertionError('The first request should have populated '
'the cache for the others.')
default_job_params = test_utils.get_default_job_params()
lines = get_line_with_labels(
default_job_params['deps'],
self.cache._cache_backend._cache.values())
if len(lines) > 1:
raise AssertionError('Should only be one cache line left.')
# Make sure that all the jobs got different hosts, and that
# the host cached isn't being used by a job.
cached_host = lines[0].pop()
cached_host.lease()
job_hosts = []
default_job_params = test_utils.get_default_job_params()
for i in range(0, 3):
default_job_params['priority'] = i
hosts = get_hosts_for_request(self.response_map,
**default_job_params)
assert(len(hosts) == 1)
host = hosts[0]
assert(host.id not in job_hosts and cached_host.id != host.id)
return test_utils.wire_format_response_map(self.response_map)
self.god.stub_with(rdb.AvailableHostRequestHandler,
'get_response', local_get_response)
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
def testCachingEmptyList(self):
"""Test that the 'no available hosts' condition isn't a cache miss."""
default_params = test_utils.get_default_job_params()
for i in range(0 ,3):
default_params['parent_job_id'] = i
self.create_job(**default_params)
default_host_params = test_utils.get_default_host_params()
self.db_helper.create_host('h1', **default_host_params)
def local_get_response(self):
""" Local rdb.get_response handler."""
if not (self.cache.misses == 1 and self.cache.hits == 2):
raise AssertionError('The first request should have taken h1 '
'while the other 2 should have hit the cache.')
request = test_utils.AbstractBaseRDBTester.get_request(
test_utils.DEFAULT_DEPS, test_utils.DEFAULT_ACLS)
key = self.cache.get_key(deps=request.deps, acls=request.acls)
if self.cache._cache_backend.get(key) != []:
raise AssertionError('A request with no hosts does not get '
'cached corrrectly.')
return test_utils.wire_format_response_map(self.response_map)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
self.god.stub_with(rdb.AvailableHostRequestHandler,
'get_response', local_get_response)
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
def testStaleCacheLine(self):
"""Test that a stale cache line doesn't satisfy a request."""
# Create 3 jobs, all of which can use the same hosts. The first
# will cache the only remaining host after taking one, the second
# will also take a host, but not cache anything, while the third
# will try to use the host cached by the first job but fail because
# it is already leased.
default_params = test_utils.get_default_job_params()
default_params['priority'] = 2
self.create_job(**default_params)
default_params['priority'] = 1
default_params['deps'] = default_params['deps'][0]
self.create_job(**default_params)
default_params['priority'] = 0
default_params['deps'] = test_utils.DEFAULT_DEPS
self.create_job(**default_params)
host_1 = self.db_helper.create_host(
'h1', **test_utils.get_default_host_params())
host_2 = self.db_helper.create_host(
'h2', **test_utils.get_default_host_params())
queue_entries = self._dispatcher._refresh_pending_queue_entries()
def local_get_response(self):
""" Local rdb.get_response handler."""
default_job_params = test_utils.get_default_job_params()
# Confirm that even though the third job hit the cache, it wasn't
# able to use the cached host because it was already leased, and
# that it doesn't add it back to the cache.
assert(self.cache.misses == 2 and self.cache.hits == 1)
lines = get_line_with_labels(
default_job_params['deps'],
self.cache._cache_backend._cache.values())
assert(len(lines) == 0)
assert(int(self.cache.mean_staleness()) == 100)
return test_utils.wire_format_response_map(self.response_map)
self.god.stub_with(rdb.AvailableHostRequestHandler,
'get_response', local_get_response)
acquired_hosts = list(rdb_lib.acquire_hosts(queue_entries))
self.assertTrue(acquired_hosts[0].id == host_1.id and
acquired_hosts[1].id == host_2.id and
acquired_hosts[2] is None)
def testCacheAPI(self):
"""Test the cache managers api."""
cache = rdb_cache_manager.RDBHostCacheManager()
key = cache.get_key(
deps=test_utils.DEFAULT_DEPS, acls=test_utils.DEFAULT_ACLS)
# Cannot set None, it's reserved for cache expiration.
self.assertRaises(rdb_utils.RDBException, cache.set_line, *(key, None))
# Setting an empty list indicates a query with no results.
cache.set_line(key, [])
self.assertTrue(cache.get_line(key) == [])
# Getting a value will delete the key, leading to a miss on subsequent
# gets before a set.
self.assertRaises(rdb_utils.CacheMiss, cache.get_line, *(key,))
# Caching a leased host is just a waste of cache space.
host = test_utils.FakeHost(
'h1', 1, labels=test_utils.DEFAULT_DEPS,
acls=test_utils.DEFAULT_ACLS, leased=1)
cache.set_line(key, [host])
self.assertRaises(
rdb_utils.CacheMiss, cache.get_line, *(key,))
# Retrieving an unleased cached host shouldn't mutate it, even if the
# key is reconstructed.
host.leased=0
cache.set_line(cache.get_key(host.labels, host.acls), [host])
self.assertTrue(
cache.get_line(cache.get_key(host.labels, host.acls)) == [host])
# Caching different hosts under the same key isn't allowed.
different_host = test_utils.FakeHost(
'h2', 2, labels=[test_utils.DEFAULT_DEPS[0]],
acls=test_utils.DEFAULT_ACLS, leased=0)
cache.set_line(key, [host, different_host])
self.assertRaises(
rdb_utils.CacheMiss, cache.get_line, *(key,))
# Caching hosts with the same deps but different acls under the
# same key is allowed, as long as the acls match the key.
different_host = test_utils.FakeHost(
'h2', 2, labels=test_utils.DEFAULT_DEPS,
acls=[test_utils.DEFAULT_ACLS[1]], leased=0)
cache.set_line(key, [host, different_host])
self.assertTrue(set(cache.get_line(key)) == set([host, different_host]))
# Make sure we don't divide by zero while calculating hit ratio
cache.misses = 0
cache.hits = 0
self.assertTrue(cache.hit_ratio() == 0)
cache.hits = 1
hit_ratio = cache.hit_ratio()
self.assertTrue(type(hit_ratio) == float and hit_ratio == 100)
def testDummyCache(self):
"""Test that the dummy cache doesn't save hosts."""
# Create 2 jobs and 3 hosts. Both the jobs should not hit the cache,
# nor should they cache anything, but both jobs should acquire hosts.
default_params = test_utils.get_default_job_params()
default_host_params = test_utils.get_default_host_params()
for i in range(0, 2):
default_params['parent_job_id'] = i
self.create_job(**default_params)
self.db_helper.create_host('h%s'%i, **default_host_params)
self.db_helper.create_host('h2', **default_host_params)
queue_entries = self._dispatcher._refresh_pending_queue_entries()
self.god.stub_with(
rdb_cache_manager.RDBHostCacheManager, 'use_cache', False)
def local_get_response(self):
""" Local rdb.get_response handler."""
requests = self.response_map.keys()
if not (self.cache.hits == 0 and self.cache.misses == 2):
raise AssertionError('Neither request should have hit the '
'cache, but both should have inserted into it.')
# Make sure both requests actually found a host
default_params = test_utils.get_default_job_params()
job1_host = get_hosts_for_request(
self.response_map, **default_params)[0]
default_params['parent_job_id'] = 1
job2_host = get_hosts_for_request(
self.response_map, **default_params)[0]
if (not job1_host or not job2_host or
job2_host.hostname == job1_host.hostname):
raise AssertionError('Excected acquisitions did not occur.')
assert(hasattr(self.cache._cache_backend, '_cache') == False)
return test_utils.wire_format_response_map(self.response_map)
self.god.stub_with(rdb.AvailableHostRequestHandler,
'get_response', local_get_response)
self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
if __name__ == '__main__':
unittest.main()