"""
Autotest AFE Cleanup used by the scheduler
"""
import time, logging, random
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import email_manager
from autotest_lib.scheduler import scheduler_config
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import host_protections
from autotest_lib.client.common_lib.cros.graphite import autotest_stats
class PeriodicCleanup(object):
"""Base class to schedule periodical cleanup work.
"""
def __init__(self, db, clean_interval_minutes, run_at_initialize=False):
self._db = db
self.clean_interval_minutes = clean_interval_minutes
self._last_clean_time = time.time()
self._run_at_initialize = run_at_initialize
def initialize(self):
"""Method called by scheduler at the startup.
"""
if self._run_at_initialize:
self._cleanup()
def run_cleanup_maybe(self):
"""Test if cleanup method should be called.
"""
should_cleanup = (self._last_clean_time +
self.clean_interval_minutes * 60
< time.time())
if should_cleanup:
self._cleanup()
self._last_clean_time = time.time()
def _cleanup(self):
"""Abrstract cleanup method."""
raise NotImplementedError
class UserCleanup(PeriodicCleanup):
"""User cleanup that is controlled by the global config variable
clean_interval_minutes in the SCHEDULER section.
"""
timer = autotest_stats.Timer('monitor_db_cleanup.user_cleanup')
def __init__(self, db, clean_interval_minutes):
super(UserCleanup, self).__init__(db, clean_interval_minutes)
self._last_reverify_time = time.time()
@timer.decorate
def _cleanup(self):
logging.info('Running periodic cleanup')
self._abort_timed_out_jobs()
self._abort_jobs_past_max_runtime()
self._clear_inactive_blocks()
self._check_for_db_inconsistencies()
self._reverify_dead_hosts()
self._django_session_cleanup()
@timer.decorate
def _abort_timed_out_jobs(self):
msg = 'Aborting all jobs that have timed out and are not complete'
logging.info(msg)
query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
for job in query.distinct():
logging.warning('Aborting job %d due to job timeout', job.id)
job.abort()
@timer.decorate
def _abort_jobs_past_max_runtime(self):
"""
Abort executions that have started and are past the job's max runtime.
"""
logging.info('Aborting all jobs that have passed maximum runtime')
rows = self._db.execute("""
SELECT hqe.id
FROM afe_host_queue_entries AS hqe
INNER JOIN afe_jobs ON (hqe.job_id = afe_jobs.id)
WHERE NOT hqe.complete AND NOT hqe.aborted AND
hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE <
NOW()""")
query = models.HostQueueEntry.objects.filter(
id__in=[row[0] for row in rows])
for queue_entry in query.distinct():
logging.warning('Aborting entry %s due to max runtime', queue_entry)
queue_entry.abort()
@timer.decorate
def _check_for_db_inconsistencies(self):
logging.info('Cleaning db inconsistencies')
self._check_all_invalid_related_objects()
def _check_invalid_related_objects_one_way(self, first_model,
relation_field, second_model):
if 'invalid' not in first_model.get_field_dict():
return []
invalid_objects = list(first_model.objects.filter(invalid=True))
first_model.objects.populate_relationships(invalid_objects,
second_model,
'related_objects')
error_lines = []
for invalid_object in invalid_objects:
if invalid_object.related_objects:
related_list = ', '.join(str(related_object) for related_object
in invalid_object.related_objects)
error_lines.append('Invalid %s %s is related to %ss: %s'
% (first_model.__name__, invalid_object,
second_model.__name__, related_list))
related_manager = getattr(invalid_object, relation_field)
related_manager.clear()
return error_lines
def _check_invalid_related_objects(self, first_model, first_field,
second_model, second_field):
errors = self._check_invalid_related_objects_one_way(
first_model, first_field, second_model)
errors.extend(self._check_invalid_related_objects_one_way(
second_model, second_field, first_model))
return errors
def _check_all_invalid_related_objects(self):
model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
(models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
(models.AclGroup, 'users', models.User, 'aclgroup_set'),
(models.Test, 'dependency_labels', models.Label,
'test_set'))
errors = []
for first_model, first_field, second_model, second_field in model_pairs:
errors.extend(self._check_invalid_related_objects(
first_model, first_field, second_model, second_field))
if errors:
subject = ('%s relationships to invalid models, cleaned all' %
len(errors))
message = '\n'.join(errors)
logging.warning(subject)
logging.warning(message)
email_manager.manager.enqueue_notify_email(subject, message)
@timer.decorate
def _clear_inactive_blocks(self):
msg = 'Clear out blocks for all completed jobs.'
logging.info(msg)
# this would be simpler using NOT IN (subquery), but MySQL
# treats all IN subqueries as dependent, so this optimizes much
# better
self._db.execute("""
DELETE ihq FROM afe_ineligible_host_queues ihq
LEFT JOIN (SELECT DISTINCT job_id FROM afe_host_queue_entries
WHERE NOT complete) hqe
USING (job_id) WHERE hqe.job_id IS NULL""")
def _should_reverify_hosts_now(self):
reverify_period_sec = (scheduler_config.config.reverify_period_minutes
* 60)
if reverify_period_sec == 0:
return False
return (self._last_reverify_time + reverify_period_sec) <= time.time()
def _choose_subset_of_hosts_to_reverify(self, hosts):
"""Given hosts needing verification, return a subset to reverify."""
max_at_once = scheduler_config.config.reverify_max_hosts_at_once
if (max_at_once > 0 and len(hosts) > max_at_once):
return random.sample(hosts, max_at_once)
return sorted(hosts)
@timer.decorate
def _reverify_dead_hosts(self):
if not self._should_reverify_hosts_now():
return
self._last_reverify_time = time.time()
logging.info('Checking for dead hosts to reverify')
hosts = models.Host.objects.filter(
status=models.Host.Status.REPAIR_FAILED,
locked=False,
invalid=False)
hosts = hosts.exclude(
protection=host_protections.Protection.DO_NOT_VERIFY)
if not hosts:
return
hosts = list(hosts)
total_hosts = len(hosts)
hosts = self._choose_subset_of_hosts_to_reverify(hosts)
logging.info('Reverifying dead hosts (%d of %d) %s', len(hosts),
total_hosts, ', '.join(host.hostname for host in hosts))
for host in hosts:
models.SpecialTask.schedule_special_task(
host=host, task=models.SpecialTask.Task.VERIFY)
@timer.decorate
def _django_session_cleanup(self):
"""Clean up django_session since django doesn't for us.
http://www.djangoproject.com/documentation/0.96/sessions/
"""
logging.info('Deleting old sessions from django_session')
sql = 'TRUNCATE TABLE django_session'
self._db.execute(sql)
class TwentyFourHourUpkeep(PeriodicCleanup):
"""Cleanup that runs at the startup of monitor_db and every subsequent
twenty four hours.
"""
timer = autotest_stats.Timer('monitor_db_cleanup.twentyfourhour_cleanup')
def __init__(self, db, drone_manager, run_at_initialize=True):
"""Initialize TwentyFourHourUpkeep.
@param db: Database connection object.
@param drone_manager: DroneManager to access drones.
@param run_at_initialize: True to run cleanup when scheduler starts.
Default is set to True.
"""
self.drone_manager = drone_manager
clean_interval_minutes = 24 * 60 # 24 hours
super(TwentyFourHourUpkeep, self).__init__(
db, clean_interval_minutes, run_at_initialize=run_at_initialize)
@timer.decorate
def _cleanup(self):
logging.info('Running 24 hour clean up')
self._check_for_uncleanable_db_inconsistencies()
self._cleanup_orphaned_containers()
@timer.decorate
def _check_for_uncleanable_db_inconsistencies(self):
logging.info('Checking for uncleanable DB inconsistencies')
self._check_for_active_and_complete_queue_entries()
self._check_for_multiple_platform_hosts()
self._check_for_no_platform_hosts()
self._check_for_multiple_atomic_group_hosts()
@timer.decorate
def _check_for_active_and_complete_queue_entries(self):
query = models.HostQueueEntry.objects.filter(active=True, complete=True)
if query.count() != 0:
subject = ('%d queue entries found with active=complete=1'
% query.count())
lines = []
for entry in query:
lines.append(str(entry.get_object_dict()))
if entry.status == 'Aborted':
logging.error('Aborted entry: %s is both active and '
'complete. Setting active value to False.',
str(entry))
entry.active = False
entry.save()
self._send_inconsistency_message(subject, lines)
@timer.decorate
def _check_for_multiple_platform_hosts(self):
rows = self._db.execute("""
SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
GROUP_CONCAT(afe_labels.name)
FROM afe_hosts
INNER JOIN afe_hosts_labels ON
afe_hosts.id = afe_hosts_labels.host_id
INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
WHERE afe_labels.platform
GROUP BY afe_hosts.id
HAVING platform_count > 1
ORDER BY hostname""")
if rows:
subject = '%s hosts with multiple platforms' % self._db.rowcount
lines = [' '.join(str(item) for item in row)
for row in rows]
self._send_inconsistency_message(subject, lines)
@timer.decorate
def _check_for_no_platform_hosts(self):
rows = self._db.execute("""
SELECT hostname
FROM afe_hosts
LEFT JOIN afe_hosts_labels
ON afe_hosts.id = afe_hosts_labels.host_id
AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
WHERE platform)
WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
if rows:
logging.warning('%s hosts with no platform\n%s', self._db.rowcount,
', '.join(row[0] for row in rows))
@timer.decorate
def _check_for_multiple_atomic_group_hosts(self):
rows = self._db.execute("""
SELECT afe_hosts.id, hostname,
COUNT(DISTINCT afe_atomic_groups.name) AS atomic_group_count,
GROUP_CONCAT(afe_labels.name),
GROUP_CONCAT(afe_atomic_groups.name)
FROM afe_hosts
INNER JOIN afe_hosts_labels ON
afe_hosts.id = afe_hosts_labels.host_id
INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
INNER JOIN afe_atomic_groups ON
afe_labels.atomic_group_id = afe_atomic_groups.id
WHERE NOT afe_hosts.invalid AND NOT afe_labels.invalid
GROUP BY afe_hosts.id
HAVING atomic_group_count > 1
ORDER BY hostname""")
if rows:
subject = '%s hosts with multiple atomic groups' % self._db.rowcount
lines = [' '.join(str(item) for item in row)
for row in rows]
self._send_inconsistency_message(subject, lines)
def _send_inconsistency_message(self, subject, lines):
logging.error(subject)
message = '\n'.join(lines)
if len(message) > 5000:
message = message[:5000] + '\n(truncated)\n'
email_manager.manager.enqueue_notify_email(subject, message)
@timer.decorate
def _cleanup_orphaned_containers(self):
"""Cleanup orphaned containers in each drone.
The function queues a lxc_cleanup call in each drone without waiting for
the script to finish, as the cleanup procedure could take minutes and the
script output is logged.
"""
ssp_enabled = global_config.global_config.get_config_value(
'AUTOSERV', 'enable_ssp_container')
if not ssp_enabled:
logging.info('Server-side packaging is not enabled, no need to clean'
' up orphaned containers.')
return
self.drone_manager.cleanup_orphaned_containers()