#!/usr/bin/python #pylint: disable-msg=C0111 import gc, time import common from autotest_lib.frontend import setup_django_environment from autotest_lib.frontend.afe import frontend_test_utils from autotest_lib.client.common_lib.test_utils import mock from autotest_lib.client.common_lib.test_utils import unittest from autotest_lib.database import database_connection from autotest_lib.frontend.afe import models from autotest_lib.scheduler import agent_task from autotest_lib.scheduler import monitor_db, drone_manager, email_manager from autotest_lib.scheduler import pidfile_monitor from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler from autotest_lib.scheduler import monitor_db_functional_test from autotest_lib.scheduler import monitor_db_unittest from autotest_lib.scheduler import scheduler_models _DEBUG = False class AtomicGroupTest(monitor_db_unittest.DispatcherSchedulingTest): def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self): # Create a job scheduled to run on label6. self._create_job(metahosts=[self.label6.id]) self._run_scheduler() # label6 only has hosts that are in atomic groups associated with it, # there should be no scheduling. self._check_for_extra_schedulings() def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self): # Create a job scheduled to run on label5. This is an atomic group # label but this job does not request atomic group scheduling. self._create_job(metahosts=[self.label5.id]) self._run_scheduler() # label6 only has hosts that are in atomic groups associated with it, # there should be no scheduling. self._check_for_extra_schedulings() def test_atomic_group_scheduling_basics(self): # Create jobs scheduled to run on an atomic group. job_a = self._create_job(synchronous=True, metahosts=[self.label4.id], atomic_group=1) job_b = self._create_job(synchronous=True, metahosts=[self.label5.id], atomic_group=1) self._run_scheduler() # atomic_group.max_number_of_machines was 2 so we should run on 2. self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2) self._assert_job_scheduled_on(job_b.id, 8) # label5 self._assert_job_scheduled_on(job_b.id, 9) # label5 self._check_for_extra_schedulings() # The three host label4 atomic group still has one host available. # That means a job with a synch_count of 1 asking to be scheduled on # the atomic group can still use the final machine. # # This may seem like a somewhat odd use case. It allows the use of an # atomic group as a set of machines to run smaller jobs within (a set # of hosts configured for use in network tests with eachother perhaps?) onehost_job = self._create_job(atomic_group=1) self._run_scheduler() self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1) self._check_for_extra_schedulings() # No more atomic groups have hosts available, no more jobs should # be scheduled. self._create_job(atomic_group=1) self._run_scheduler() self._check_for_extra_schedulings() def test_atomic_group_scheduling_obeys_acls(self): # Request scheduling on a specific atomic label but be denied by ACLs. self._do_query('DELETE FROM afe_acl_groups_hosts ' 'WHERE host_id in (8,9)') job = self._create_job(metahosts=[self.label5.id], atomic_group=1) self._run_scheduler() self._check_for_extra_schedulings() def test_atomic_group_scheduling_dependency_label_exclude(self): # A dependency label that matches no hosts in the atomic group. job_a = self._create_job(atomic_group=1) job_a.dependency_labels.add(self.label3) self._run_scheduler() self._check_for_extra_schedulings() def test_atomic_group_scheduling_metahost_dependency_label_exclude(self): # A metahost and dependency label that excludes too many hosts. job_b = self._create_job(synchronous=True, metahosts=[self.label4.id], atomic_group=1) job_b.dependency_labels.add(self.label7) self._run_scheduler() self._check_for_extra_schedulings() def test_atomic_group_scheduling_dependency_label_match(self): # A dependency label that exists on enough atomic group hosts in only # one of the two atomic group labels. job_c = self._create_job(synchronous=True, atomic_group=1) job_c.dependency_labels.add(self.label7) self._run_scheduler() self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2) self._check_for_extra_schedulings() def test_atomic_group_scheduling_no_metahost(self): # Force it to schedule on the other group for a reliable test. self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9') # An atomic job without a metahost. job = self._create_job(synchronous=True, atomic_group=1) self._run_scheduler() self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2) self._check_for_extra_schedulings() def test_atomic_group_scheduling_partial_group(self): # Make one host in labels[3] unavailable so that there are only two # hosts left in the group. self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5') job = self._create_job(synchronous=True, metahosts=[self.label4.id], atomic_group=1) self._run_scheduler() # Verify that it was scheduled on the 2 ready hosts in that group. self._assert_job_scheduled_on(job.id, 6) self._assert_job_scheduled_on(job.id, 7) self._check_for_extra_schedulings() def test_atomic_group_scheduling_not_enough_available(self): # Mark some hosts in each atomic group label as not usable. # One host running, another invalid in the first group label. self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5') self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6') # One host invalid in the second group label. self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9') # Nothing to schedule when no group label has enough (2) good hosts.. self._create_job(atomic_group=1, synchronous=True) self._run_scheduler() # There are not enough hosts in either atomic group, # No more scheduling should occur. self._check_for_extra_schedulings() # Now create an atomic job that has a synch count of 1. It should # schedule on exactly one of the hosts. onehost_job = self._create_job(atomic_group=1) self._run_scheduler() self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1) def test_atomic_group_scheduling_no_valid_hosts(self): self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)') self._create_job(synchronous=True, metahosts=[self.label5.id], atomic_group=1) self._run_scheduler() # no hosts in the selected group and label are valid. no schedulings. self._check_for_extra_schedulings() def test_atomic_group_scheduling_metahost_works(self): # Test that atomic group scheduling also obeys metahosts. self._create_job(metahosts=[0], atomic_group=1) self._run_scheduler() # There are no atomic group hosts that also have that metahost. self._check_for_extra_schedulings() job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1) self._run_scheduler() self._assert_job_scheduled_on(job_b.id, 8) self._assert_job_scheduled_on(job_b.id, 9) self._check_for_extra_schedulings() def test_atomic_group_skips_ineligible_hosts(self): # Test hosts marked ineligible for this job are not eligible. # How would this ever happen anyways? job = self._create_job(metahosts=[self.label4.id], atomic_group=1) models.IneligibleHostQueue.objects.create(job=job, host_id=5) models.IneligibleHostQueue.objects.create(job=job, host_id=6) models.IneligibleHostQueue.objects.create(job=job, host_id=7) self._run_scheduler() # No scheduling should occur as all desired hosts were ineligible. self._check_for_extra_schedulings() def test_atomic_group_scheduling_fail(self): # If synch_count is > the atomic group number of machines, the job # should be aborted immediately. model_job = self._create_job(synchronous=True, atomic_group=1) model_job.synch_count = 4 model_job.save() job = scheduler_models.Job(id=model_job.id) self._run_scheduler() self._check_for_extra_schedulings() queue_entries = job.get_host_queue_entries() self.assertEqual(1, len(queue_entries)) self.assertEqual(queue_entries[0].status, models.HostQueueEntry.Status.ABORTED) def test_atomic_group_no_labels_no_scheduling(self): # Never schedule on atomic groups marked invalid. job = self._create_job(metahosts=[self.label5.id], synchronous=True, atomic_group=1) # Deleting an atomic group via the frontend marks it invalid and # removes all label references to the group. The job now references # an invalid atomic group with no labels associated with it. self.label5.atomic_group.invalid = True self.label5.atomic_group.save() self.label5.atomic_group = None self.label5.save() self._run_scheduler() self._check_for_extra_schedulings() def test_schedule_directly_on_atomic_group_host_fail(self): # Scheduling a job directly on hosts in an atomic group must # fail to avoid users inadvertently holding up the use of an # entire atomic group by using the machines individually. job = self._create_job(hosts=[5]) self._run_scheduler() self._check_for_extra_schedulings() def test_schedule_directly_on_atomic_group_host(self): # Scheduling a job directly on one host in an atomic group will # work when the atomic group is listed on the HQE in addition # to the host (assuming the sync count is 1). job = self._create_job(hosts=[5], atomic_group=1) self._run_scheduler() self._assert_job_scheduled_on(job.id, 5) self._check_for_extra_schedulings() def test_schedule_directly_on_atomic_group_hosts_sync2(self): job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True) self._run_scheduler() self._assert_job_scheduled_on(job.id, 5) self._assert_job_scheduled_on(job.id, 8) self._check_for_extra_schedulings() def test_schedule_directly_on_atomic_group_hosts_wrong_group(self): job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True) self._run_scheduler() self._check_for_extra_schedulings() # TODO(gps): These should probably live in their own TestCase class # specific to testing HostScheduler methods directly. It was convenient # to put it here for now to share existing test environment setup code. def test_HostScheduler_check_atomic_group_labels(self): normal_job = self._create_job(metahosts=[0]) atomic_job = self._create_job(atomic_group=1) # Indirectly initialize the internal state of the host scheduler. self._dispatcher._refresh_pending_queue_entries() atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % atomic_job.id)[0] normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % normal_job.id)[0] host_scheduler = self._dispatcher._host_scheduler self.assertTrue(host_scheduler._check_atomic_group_labels( [self.label4.id], atomic_hqe)) self.assertFalse(host_scheduler._check_atomic_group_labels( [self.label4.id], normal_hqe)) self.assertFalse(host_scheduler._check_atomic_group_labels( [self.label5.id, self.label6.id, self.label7.id], normal_hqe)) self.assertTrue(host_scheduler._check_atomic_group_labels( [self.label4.id, self.label6.id], atomic_hqe)) self.assertTrue(host_scheduler._check_atomic_group_labels( [self.label4.id, self.label5.id], atomic_hqe)) class OnlyIfNeededTest(monitor_db_unittest.DispatcherSchedulingTest): def _setup_test_only_if_needed_labels(self): # apply only_if_needed label3 to host1 models.Host.smart_get('host1').labels.add(self.label3) return self._create_job_simple([1], use_metahost=True) def test_only_if_needed_labels_avoids_host(self): job = self._setup_test_only_if_needed_labels() # if the job doesn't depend on label3, there should be no scheduling self._run_scheduler() self._check_for_extra_schedulings() def test_only_if_needed_labels_schedules(self): job = self._setup_test_only_if_needed_labels() job.dependency_labels.add(self.label3) self._run_scheduler() self._assert_job_scheduled_on(1, 1) self._check_for_extra_schedulings() def test_only_if_needed_labels_via_metahost(self): job = self._setup_test_only_if_needed_labels() job.dependency_labels.add(self.label3) # should also work if the metahost is the only_if_needed label self._do_query('DELETE FROM afe_jobs_dependency_labels') self._create_job(metahosts=[3]) self._run_scheduler() self._assert_job_scheduled_on(2, 1) self._check_for_extra_schedulings() def test_metahosts_obey_blocks(self): """ Metahosts can't get scheduled on hosts already scheduled for that job. """ self._create_job(metahosts=[1], hosts=[1]) # make the nonmetahost entry complete, so the metahost can try # to get scheduled self._update_hqe(set='complete = 1', where='host_id=1') self._run_scheduler() self._check_for_extra_schedulings()