# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime
import mox
import time
import unittest
import common
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils
from autotest_lib.frontend.afe import models
from autotest_lib.frontend.afe import model_logic
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import global_config
from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
from autotest_lib.scheduler.shard import shard_client
from django.core.exceptions import MultipleObjectsReturned
class ShardClientTest(mox.MoxTestBase,
frontend_test_utils.FrontendTestMixin):
"""Unit tests for functions in shard_client.py"""
GLOBAL_AFE_HOSTNAME = 'foo_autotest'
def setUp(self):
super(ShardClientTest, self).setUp()
global_config.global_config.override_config_value(
'SHARD', 'global_afe_hostname', self.GLOBAL_AFE_HOSTNAME)
self._frontend_common_setup(fill_data=False)
def tearDown(self):
self.mox.UnsetStubs()
def setup_mocks(self):
self.mox.StubOutClassWithMocks(frontend_wrappers, 'RetryingAFE')
self.afe = frontend_wrappers.RetryingAFE(server=mox.IgnoreArg(),
delay_sec=mox.IgnoreArg(),
timeout_min=mox.IgnoreArg())
def setup_global_config(self):
global_config.global_config.override_config_value(
'SHARD', 'is_slave_shard', 'True')
global_config.global_config.override_config_value(
'SHARD', 'shard_hostname', 'host1')
def expect_heartbeat(self, shard_hostname='host1',
known_job_ids=[], known_host_ids=[],
known_host_statuses=[], hqes=[], jobs=[],
side_effect=None, return_hosts=[], return_jobs=[],
return_suite_keyvals=[], return_incorrect_hosts=[]):
call = self.afe.run(
'shard_heartbeat', shard_hostname=shard_hostname,
hqes=hqes, jobs=jobs,
known_job_ids=known_job_ids, known_host_ids=known_host_ids,
known_host_statuses=known_host_statuses,
)
if side_effect:
call = call.WithSideEffects(side_effect)
call.AndReturn({
'hosts': return_hosts,
'jobs': return_jobs,
'suite_keyvals': return_suite_keyvals,
'incorrect_host_ids': return_incorrect_hosts,
})
def tearDown(self):
self._frontend_common_teardown()
# Without this global_config will keep state over test cases
global_config.global_config.reset_config_values()
def _get_sample_serialized_host(self):
return {'aclgroup_set': [],
'dirty': True,
'hostattribute_set': [],
'hostname': u'host1',
u'id': 2,
'invalid': False,
'labels': [],
'leased': True,
'lock_time': None,
'locked': False,
'protection': 0,
'shard': None,
'status': u'Ready'}
def _get_sample_serialized_job(self):
return {'control_file': u'foo',
'control_type': 2,
'created_on': datetime.datetime(2014, 9, 23, 15, 56, 10, 0),
'dependency_labels': [{u'id': 1,
'invalid': False,
'kernel_config': u'',
'name': u'board:lumpy',
'only_if_needed': False,
'platform': False}],
'email_list': u'',
'hostqueueentry_set': [{'aborted': False,
'active': False,
'complete': False,
'deleted': False,
'execution_subdir': u'',
'finished_on': None,
u'id': 1,
'meta_host': {u'id': 1,
'invalid': False,
'kernel_config': u'',
'name': u'board:lumpy',
'only_if_needed': False,
'platform': False},
'started_on': None,
'status': u'Queued'}],
u'id': 1,
'jobkeyval_set': [],
'max_runtime_hrs': 72,
'max_runtime_mins': 1440,
'name': u'dummy',
'owner': u'autotest_system',
'parse_failed_repair': True,
'priority': 40,
'parent_job_id': 0,
'reboot_after': 0,
'reboot_before': 1,
'run_reset': True,
'run_verify': False,
'shard': {'hostname': u'shard1', u'id': 1},
'synch_count': 0,
'test_retry': 0,
'timeout': 24,
'timeout_mins': 1440}
def _get_sample_serialized_suite_keyvals(self):
return {'id': 1,
'job_id': 0,
'key': 'test_key',
'value': 'test_value'}
def testHeartbeat(self):
"""Trigger heartbeat, verify RPCs and persisting of the responses."""
self.setup_mocks()
global_config.global_config.override_config_value(
'SHARD', 'shard_hostname', 'host1')
self.expect_heartbeat(
return_hosts=[self._get_sample_serialized_host()],
return_jobs=[self._get_sample_serialized_job()],
return_suite_keyvals=[
self._get_sample_serialized_suite_keyvals()])
modified_sample_host = self._get_sample_serialized_host()
modified_sample_host['hostname'] = 'host2'
self.expect_heartbeat(
return_hosts=[modified_sample_host],
known_host_ids=[modified_sample_host['id']],
known_host_statuses=[modified_sample_host['status']],
known_job_ids=[1])
def verify_upload_jobs_and_hqes(name, shard_hostname, jobs, hqes,
known_host_ids, known_host_statuses,
known_job_ids):
self.assertEqual(len(jobs), 1)
self.assertEqual(len(hqes), 1)
job, hqe = jobs[0], hqes[0]
self.assertEqual(hqe['status'], 'Completed')
self.expect_heartbeat(
jobs=mox.IgnoreArg(), hqes=mox.IgnoreArg(),
known_host_ids=[modified_sample_host['id']],
known_host_statuses=[modified_sample_host['status']],
known_job_ids=[], side_effect=verify_upload_jobs_and_hqes)
self.mox.ReplayAll()
sut = shard_client.get_shard_client()
sut.do_heartbeat()
# Check if dummy object was saved to DB
host = models.Host.objects.get(id=2)
self.assertEqual(host.hostname, 'host1')
# Check if suite keyval was saved to DB
suite_keyval = models.JobKeyval.objects.filter(job_id=0)[0]
self.assertEqual(suite_keyval.key, 'test_key')
sut.do_heartbeat()
# Ensure it wasn't overwritten
host = models.Host.objects.get(id=2)
self.assertEqual(host.hostname, 'host1')
job = models.Job.objects.all()[0]
job.shard = None
job.save()
hqe = job.hostqueueentry_set.all()[0]
hqe.status = 'Completed'
hqe.save()
sut.do_heartbeat()
self.mox.VerifyAll()
def testRemoveInvalidHosts(self):
self.setup_mocks()
self.setup_global_config()
host_serialized = self._get_sample_serialized_host()
host_id = host_serialized[u'id']
# 1st heartbeat: return a host.
# 2nd heartbeat: "delete" that host. Also send a spurious extra ID
# that isn't present to ensure shard client doesn't crash. (Note: delete
# operation doesn't actually delete db entry. Djanjo model ;logic
# instead simply marks it as invalid.
# 3rd heartbeat: host is no longer present in shard's request.
self.expect_heartbeat(return_hosts=[host_serialized])
self.expect_heartbeat(known_host_ids=[host_id],
known_host_statuses=[u'Ready'],
return_incorrect_hosts=[host_id, 42])
self.expect_heartbeat()
self.mox.ReplayAll()
sut = shard_client.get_shard_client()
sut.do_heartbeat()
host = models.Host.smart_get(host_id)
self.assertFalse(host.invalid)
# Host should no longer "exist" after the invalidation.
# Why don't we simply count the number of hosts in db? Because the host
# actually remains int he db, but simply has it's invalid bit set to
# True.
sut.do_heartbeat()
with self.assertRaises(models.Host.DoesNotExist):
host = models.Host.smart_get(host_id)
# Subsequent heartbeat no longer passes the host id as a known host.
sut.do_heartbeat()
def testFailAndRedownloadJobs(self):
self.setup_mocks()
self.setup_global_config()
job1_serialized = self._get_sample_serialized_job()
job2_serialized = self._get_sample_serialized_job()
job2_serialized['id'] = 2
job2_serialized['hostqueueentry_set'][0]['id'] = 2
self.expect_heartbeat(return_jobs=[job1_serialized])
self.expect_heartbeat(return_jobs=[job1_serialized, job2_serialized])
self.expect_heartbeat(known_job_ids=[job1_serialized['id'],
job2_serialized['id']])
self.expect_heartbeat(known_job_ids=[job2_serialized['id']])
self.mox.ReplayAll()
sut = shard_client.get_shard_client()
original_process_heartbeat_response = sut.process_heartbeat_response
def failing_process_heartbeat_response(*args, **kwargs):
raise RuntimeError
sut.process_heartbeat_response = failing_process_heartbeat_response
self.assertRaises(RuntimeError, sut.do_heartbeat)
sut.process_heartbeat_response = original_process_heartbeat_response
sut.do_heartbeat()
sut.do_heartbeat()
job2 = models.Job.objects.get(pk=job1_serialized['id'])
job2.hostqueueentry_set.all().update(complete=True)
sut.do_heartbeat()
self.mox.VerifyAll()
def testFailAndRedownloadHosts(self):
self.setup_mocks()
self.setup_global_config()
host1_serialized = self._get_sample_serialized_host()
host2_serialized = self._get_sample_serialized_host()
host2_serialized['id'] = 3
host2_serialized['hostname'] = 'host2'
self.expect_heartbeat(return_hosts=[host1_serialized])
self.expect_heartbeat(return_hosts=[host1_serialized, host2_serialized])
self.expect_heartbeat(known_host_ids=[host1_serialized['id'],
host2_serialized['id']],
known_host_statuses=[host1_serialized['status'],
host2_serialized['status']])
self.mox.ReplayAll()
sut = shard_client.get_shard_client()
original_process_heartbeat_response = sut.process_heartbeat_response
def failing_process_heartbeat_response(*args, **kwargs):
raise RuntimeError
sut.process_heartbeat_response = failing_process_heartbeat_response
self.assertRaises(RuntimeError, sut.do_heartbeat)
self.assertEqual(models.Host.objects.count(), 0)
sut.process_heartbeat_response = original_process_heartbeat_response
sut.do_heartbeat()
sut.do_heartbeat()
self.mox.VerifyAll()
def testHeartbeatNoShardMode(self):
"""Ensure an exception is thrown when run on a non-shard machine."""
self.mox.ReplayAll()
self.assertRaises(error.HeartbeatOnlyAllowedInShardModeException,
shard_client.get_shard_client)
self.mox.VerifyAll()
def testLoop(self):
"""Test looping over heartbeats and aborting that loop works."""
self.setup_mocks()
self.setup_global_config()
global_config.global_config.override_config_value(
'SHARD', 'heartbeat_pause_sec', '0.01')
self.expect_heartbeat()
sut = None
def shutdown_sut(*args, **kwargs):
sut.shutdown()
self.expect_heartbeat(side_effect=shutdown_sut)
self.mox.ReplayAll()
sut = shard_client.get_shard_client()
sut.loop(None)
self.mox.VerifyAll()
def testLoopWithDeadline(self):
"""Test looping over heartbeats with a timeout."""
self.setup_mocks()
self.setup_global_config()
self.mox.StubOutWithMock(time, 'time')
global_config.global_config.override_config_value(
'SHARD', 'heartbeat_pause_sec', '0.01')
time.time().AndReturn(1516894000)
time.time().AndReturn(1516894000)
self.expect_heartbeat()
# Set expectation that heartbeat took 1 minute.
time.time().MultipleTimes().AndReturn(1516894000 + 60)
self.mox.ReplayAll()
sut = shard_client.get_shard_client()
# 36 seconds
sut.loop(lifetime_hours=0.01)
self.mox.VerifyAll()
def test_remove_incorrect_hosts(self):
"""Test _remove_incorrect_hosts with MultipleObjectsReturned."""
self.setup_mocks()
self.setup_global_config()
self.mox.StubOutWithMock(model_logic.ModelWithInvalidQuerySet, 'delete')
call = models.Host.objects.filter(id__in=[1]).delete()
call.AndRaise(MultipleObjectsReturned('e'))
self.mox.ReplayAll()
sut = shard_client.get_shard_client()
sut._remove_incorrect_hosts(incorrect_host_ids=[1])
self.mox.VerifyAll()
if __name__ == '__main__':
unittest.main()