# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Monitor jobs and abort them as necessary.
This daemon does a number of upkeep tasks:
* When a process owning a job crashes, job_aborter will mark the job as
aborted in the database and clean up its lease files.
* When a job is marked aborted in the database, job_aborter will signal
the process owning the job to abort.
See also http://goto.google.com/monitor_db_per_job_refactor
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import sys
import time
from lucifer import autotest
from lucifer import handoffs
from lucifer import leasing
from lucifer import loglib
logger = logging.getLogger(__name__)
def main(args):
"""Main function
@param args: list of command line args
"""
parser = argparse.ArgumentParser(prog='job_aborter', description=__doc__)
parser.add_argument('--jobdir', required=True)
loglib.add_logging_options(parser)
args = parser.parse_args(args)
loglib.configure_logging_with_args(parser, args)
logger.info('Starting with args: %r', args)
autotest.monkeypatch()
ts_mon_config = autotest.chromite_load('ts_mon_config')
with ts_mon_config.SetupTsMonGlobalState('job_aborter'):
_main_loop(jobdir=args.jobdir)
assert False # cannot exit normally
def _main_loop(jobdir):
transaction = autotest.deps_load('django.db.transaction')
@transaction.commit_manually
def flush_transaction():
"""Flush transaction https://stackoverflow.com/questions/3346124/"""
transaction.commit()
metrics = _Metrics()
metrics.send_starting()
while True:
logger.debug('Tick')
metrics.send_tick()
_main_loop_body(metrics, jobdir)
flush_transaction()
time.sleep(20)
def _main_loop_body(metrics, jobdir):
active_leases = {
lease.id: lease for lease in leasing.leases_iter(jobdir)
if not lease.expired()
}
_mark_expired_jobs_failed(metrics, active_leases)
_abort_timed_out_jobs(active_leases)
_abort_jobs_marked_aborting(active_leases)
_abort_special_tasks_marked_aborted()
_clean_up_expired_leases(jobdir)
# TODO(crbug.com/748234): abort_jobs_past_max_runtime goes into lucifer
def _mark_expired_jobs_failed(metrics, active_leases):
"""Mark expired jobs failed.
Expired jobs are jobs that have an incomplete JobHandoff and that do
not have an active lease. These jobs have been handed off to a
job_reporter, but that job_reporter has crashed. These jobs are
marked failed in the database.
@param metrics: _Metrics instance.
@param active_leases: dict mapping job ids to Leases.
"""
logger.debug('Looking for expired jobs')
job_ids = []
for handoff in handoffs.incomplete():
logger.debug('Found handoff: %d', handoff.job_id)
if handoff.job_id not in active_leases:
logger.info('Handoff %d is missing active lease; cleaning up',
handoff.job_id)
job_ids.append(handoff.job_id)
handoffs.clean_up(job_ids)
handoffs.mark_complete(job_ids)
metrics.send_expired_jobs(len(job_ids))
def _abort_timed_out_jobs(active_leases):
"""Send abort to timed out jobs.
@param active_leases: dict mapping job ids to Leases.
"""
for job in _timed_out_jobs_queryset():
if job.id in active_leases:
logger.info('Job %d is timed out; aborting', job.id)
active_leases[job.id].maybe_abort()
def _abort_jobs_marked_aborting(active_leases):
"""Send abort to jobs marked aborting in Autotest database.
@param active_leases: dict mapping job ids to Leases.
"""
for job in _aborting_jobs_queryset():
if job.id in active_leases:
logger.info('Job %d is marked for aborting; aborting', job.id)
active_leases[job.id].maybe_abort()
def _abort_special_tasks_marked_aborted():
# TODO(crbug.com/748234): Special tasks not implemented yet. This
# would abort jobs running on the behalf of special tasks and thus
# need to check a different database table.
pass
def _clean_up_expired_leases(jobdir):
"""Clean up files for expired leases.
We only care about active leases, so we can remove the stale files
for expired leases.
"""
for lease in leasing.leases_iter(jobdir):
if lease.expired():
lease.cleanup()
def _timed_out_jobs_queryset():
"""Return a QuerySet of timed out Jobs.
@returns: Django QuerySet
"""
models = autotest.load('frontend.afe.models')
return (
models.Job.objects
.filter(hostqueueentry__complete=False)
.extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
.distinct()
)
def _aborting_jobs_queryset():
"""Return a QuerySet of aborting Jobs.
@returns: Django QuerySet
"""
models = autotest.load('frontend.afe.models')
return (
models.Job.objects
.filter(hostqueueentry__aborted=True)
.filter(hostqueueentry__complete=False)
.distinct()
)
class _Metrics(object):
"""Class for sending job_aborter metrics."""
def __init__(self):
metrics = autotest.chromite_load('metrics')
prefix = 'chromeos/lucifer/job_aborter'
self._starting_m = metrics.Counter(prefix + '/start')
self._tick_m = metrics.Counter(prefix + '/tick')
self._expired_m = metrics.Counter(prefix + '/expired_jobs')
def send_starting(self):
"""Send starting metric."""
self._starting_m.increment()
def send_tick(self):
"""Send tick metric."""
self._tick_m.increment()
def send_expired_jobs(self, count):
"""Send expired_jobs metric."""
self._expired_m.increment_by(count)
if __name__ == '__main__':
main(sys.argv[1:])