#!/usr/bin/env python
"""
This script prints out a csv file of `suite,test,path/to/control.file` where
each row is a test that has failed every time that it ran for the past N days,
where N is that one constant lower in this file.
You run it like this
./always_failing_tests.py | tee output
But please note that since we're using the models to do queries, you'll probably
need to move your local shadow config out of the way before you run this script
so that you point at prod.
"""
import time
import hashlib
import re
import datetime
import sys
import common
from autotest_lib.frontend import setup_django_readonly_environment
# Django and the models are only setup after
# the setup_django_readonly_environment module is imported.
from autotest_lib.frontend.tko import models as tko_models
from autotest_lib.frontend.afe import models as afe_models
from autotest_lib.server.cros.dynamic_suite import suite
_DAYS_NOT_RUNNING_CUTOFF = 30
def md5(s):
m = hashlib.md5()
m.update(s)
return m.hexdigest()
def main():
cutoff_delta = datetime.timedelta(_DAYS_NOT_RUNNING_CUTOFF)
cutoff_date = datetime.datetime.today() - cutoff_delta
statuses = {s.status_idx: s.word for s in tko_models.Status.objects.all()}
now = time.time()
tests = tko_models.Test.objects.select_related('job'
).filter(started_time__gte=cutoff_date
).exclude(test__contains='/'
).exclude(test__contains='_JOB'
).exclude(test='provision'
).exclude(test__contains='try_new_image')
tests = list(tests)
# These prints are vague profiling work. We're handling a lot of data, so I
# had to dump some decent work into making sure things chug along at a
# decent speed.
print "DB: %d -- len=%d" % (time.time()-now, len(tests))
def only_failures(d, t):
word = statuses[t.status_id]
if word == 'TEST_NA':
return d
if word == 'GOOD' or word == 'WARN':
passed = True
else:
passed = False
d[t.test] = d.get(t.test, False) or passed
return d
dct = reduce(only_failures, tests, {})
print "OF: %d -- len=%d" % (time.time()-now, len(dct))
all_fail = filter(lambda x: x.test in dct and not dct[x.test], tests)
print "AF: %d -- len=%d" % (time.time()-now, len(all_fail))
hash_to_file = {}
fs_getter = suite.Suite.create_fs_getter(common.autotest_dir)
for control_file in fs_getter.get_control_file_list():
with open(control_file, 'rb') as f:
h = md5(f.read())
hash_to_file[h] = control_file.replace(common.autotest_dir, '')\
.lstrip('/')
print "HF: %d -- len=%d" % (time.time()-now, len(hash_to_file))
afe_job_ids = set(map(lambda t: t.job.afe_job_id, all_fail))
afe_jobs = afe_models.Job.objects.select_related('parent_job')\
.filter(id__in=afe_job_ids)
print "AJ: %d -- len=%d" % (time.time()-now, len(afe_jobs))
job_to_hash = {}
for job in afe_jobs:
job_to_hash[job.id] = md5(job.control_file)
print "JH: %d -- len=%d" % (time.time()-now, len(job_to_hash))
job_to_suite = {}
rgx = re.compile("test_suites/control.(\w+)")
for job in afe_jobs:
job_id = job.parent_job
if not job_id:
job_id = job
x = rgx.search(job_id.name)
if not x:
print job_id.name
continue
job_to_suite[job.id] = x.groups(1)[0]
def collect_by_suite_name(d, t):
s = job_to_suite.get(t.job.afe_job_id, None)
d.setdefault((s, t.test), []).append(t)
return d
by_name = reduce(collect_by_suite_name, all_fail, {})
print "BN: %d -- len=%d" % (time.time()-now, len(by_name))
for (s, testname), tests in by_name.iteritems():
for test in tests:
h = job_to_hash[test.job.afe_job_id]
if h in hash_to_file:
print "%s,%s,%s" % (s, testname, hash_to_file[h])
break
else:
print "%s,%s,?" % (s, testname)
if __name__ == '__main__':
sys.exit(main())