#!/usr/bin/python import re, unittest import common from autotest_lib.frontend import setup_django_environment from autotest_lib.frontend import setup_test_environment from autotest_lib.client.common_lib.test_utils import mock from django.db import connection from autotest_lib.frontend.tko import models, rpc_interface # this will need to be updated when the view changes for the test to be # consistent with reality _CREATE_TEST_VIEW = """ CREATE VIEW tko_test_view_2 AS SELECT tko_tests.test_idx AS test_idx, tko_tests.job_idx AS job_idx, tko_tests.test AS test_name, tko_tests.subdir AS subdir, tko_tests.kernel_idx AS kernel_idx, tko_tests.status AS status_idx, tko_tests.reason AS reason, tko_tests.machine_idx AS machine_idx, tko_tests.invalid AS invalid, tko_tests.invalidates_test_idx AS invalidates_test_idx, tko_tests.started_time AS test_started_time, tko_tests.finished_time AS test_finished_time, tko_jobs.tag AS job_tag, tko_jobs.label AS job_name, tko_jobs.username AS job_owner, tko_jobs.queued_time AS job_queued_time, tko_jobs.started_time AS job_started_time, tko_jobs.finished_time AS job_finished_time, tko_jobs.afe_job_id AS afe_job_id, tko_machines.hostname AS hostname, tko_machines.machine_group AS platform, tko_machines.owner AS machine_owner, tko_kernels.kernel_hash AS kernel_hash, tko_kernels.base AS kernel_base, tko_kernels.printable AS kernel, tko_status.word AS status FROM tko_tests INNER JOIN tko_jobs ON tko_jobs.job_idx = tko_tests.job_idx INNER JOIN tko_machines ON tko_machines.machine_idx = tko_jobs.machine_idx INNER JOIN tko_kernels ON tko_kernels.kernel_idx = tko_tests.kernel_idx INNER JOIN tko_status ON tko_status.status_idx = tko_tests.status; """ # this will need to be updated if the table schemas change (or removed if we # add proper primary keys) _CREATE_ITERATION_ATTRIBUTES = """ CREATE TABLE "tko_iteration_attributes" ( "test_idx" integer NOT NULL REFERENCES "tko_tests" ("test_idx"), "iteration" integer NOT NULL, "attribute" varchar(90) NOT NULL, "value" varchar(300) NOT NULL ); """ _CREATE_ITERATION_RESULTS = """ CREATE TABLE "tko_iteration_result" ( "test_idx" integer NOT NULL REFERENCES "tko_tests" ("test_idx"), "iteration" integer NOT NULL, "attribute" varchar(90) NOT NULL, "value" numeric(12, 31) NULL ); """ def setup_test_view(): """ Django has no way to actually represent a view; we simply create a model for TestView. This means when we syncdb, Django will create a table for it. So manually remove that table and replace it with a view. """ cursor = connection.cursor() cursor.execute('DROP TABLE tko_test_view_2') cursor.execute(_CREATE_TEST_VIEW) def fix_iteration_tables(): """ Since iteration tables don't have any real primary key, we "fake" one in the Django models. So fix up the generated schema to match the real schema. """ cursor = connection.cursor() cursor.execute('DROP TABLE tko_iteration_attributes') cursor.execute(_CREATE_ITERATION_ATTRIBUTES) cursor.execute('DROP TABLE tko_iteration_result') cursor.execute(_CREATE_ITERATION_RESULTS) class TkoTestMixin(object): def _patch_sqlite_stuff(self): self.god.stub_with(models.TempManager, '_get_column_names', self._get_column_names_for_sqlite3) self.god.stub_with(models.TempManager, '_cursor_rowcount', self._cursor_rowcount_for_sqlite3) # add some functions to SQLite for MySQL compatibility connection.cursor() # ensure connection is alive connection.connection.create_function('if', 3, self._sqlite_if) connection.connection.create_function('find_in_set', 2, self._sqlite_find_in_set) fix_iteration_tables() def _cursor_rowcount_for_sqlite3(self, cursor): return len(cursor.fetchall()) def _sqlite_find_in_set(self, needle, haystack): return needle in haystack.split(',') def _sqlite_if(self, condition, true_result, false_result): if condition: return true_result return false_result # sqlite takes any columns that don't have aliases and names them # "table_name"."column_name". we map these to just column_name. _SQLITE_AUTO_COLUMN_ALIAS_RE = re.compile(r'".+"\."(.+)"') def _get_column_names_for_sqlite3(self, cursor): names = [column_info[0] for column_info in cursor.description] # replace all "table_name"."column_name" constructs with just # column_name for i, name in enumerate(names): match = self._SQLITE_AUTO_COLUMN_ALIAS_RE.match(name) if match: names[i] = match.group(1) return names def _create_initial_data(self): machine = models.Machine.objects.create(hostname='myhost') # create basic objects kernel_name = 'mykernel1' kernel1 = models.Kernel.objects.create(kernel_hash=kernel_name, base=kernel_name, printable=kernel_name) kernel_name = 'mykernel2' kernel2 = models.Kernel.objects.create(kernel_hash=kernel_name, base=kernel_name, printable=kernel_name) good_status = models.Status.objects.create(word='GOOD') failed_status = models.Status.objects.create(word='FAILED') job1 = models.Job.objects.create(tag='1-myjobtag1', label='myjob1', username='myuser', machine=machine, afe_job_id=1) job2 = models.Job.objects.create(tag='2-myjobtag2', label='myjob2', username='myuser', machine=machine, afe_job_id=2) job1_test1 = models.Test.objects.create(job=job1, test='mytest1', kernel=kernel1, status=good_status, machine=machine) self.first_test = job1_test1 job1_test2 = models.Test.objects.create(job=job1, test='mytest2', kernel=kernel1, status=failed_status, machine=machine) job2_test1 = models.Test.objects.create(job=job2, test='kernbench', kernel=kernel2, status=good_status, machine=machine) job1.jobkeyval_set.create(key='keyval_key', value='keyval_value') # create test attributes, test labels, and iterations # like Noah's Ark, include two of each...just in case there's a bug with # multiple related items models.TestAttribute.objects.create(test=job1_test1, attribute='myattr', value='myval') models.TestAttribute.objects.create(test=job1_test1, attribute='myattr2', value='myval2') self._add_iteration_keyval('tko_iteration_attributes', test=job1_test1, iteration=1, attribute='iattr', value='ival') self._add_iteration_keyval('tko_iteration_attributes', test=job1_test1, iteration=1, attribute='iattr2', value='ival2') self._add_iteration_keyval('tko_iteration_result', test=job1_test1, iteration=1, attribute='iresult', value=1) self._add_iteration_keyval('tko_iteration_result', test=job1_test1, iteration=1, attribute='iresult2', value=2) self._add_iteration_keyval('tko_iteration_result', test=job1_test1, iteration=2, attribute='iresult', value=3) self._add_iteration_keyval('tko_iteration_result', test=job1_test1, iteration=2, attribute='iresult2', value=4) label1 = models.TestLabel.objects.create(name='testlabel1') label2 = models.TestLabel.objects.create(name='testlabel2') label1.tests.add(job1_test1) label2.tests.add(job1_test1) def _add_iteration_keyval(self, table, test, iteration, attribute, value): cursor = connection.cursor() cursor.execute('INSERT INTO %s ' 'VALUES (%%s, %%s, %%s, %%s)' % table, (test.test_idx, iteration, attribute, value)) class RpcInterfaceTest(unittest.TestCase, TkoTestMixin): def setUp(self): self.god = mock.mock_god() setup_test_environment.set_up() self._patch_sqlite_stuff() setup_test_view() self._create_initial_data() def tearDown(self): setup_test_environment.tear_down() self.god.unstub_all() def _check_for_get_test_views(self, test): self.assertEquals(test['test_name'], 'mytest1') self.assertEquals(test['job_tag'], '1-myjobtag1') self.assertEquals(test['job_name'], 'myjob1') self.assertEquals(test['job_owner'], 'myuser') self.assertEquals(test['status'], 'GOOD') self.assertEquals(test['hostname'], 'myhost') self.assertEquals(test['kernel'], 'mykernel1') def test_get_detailed_test_views(self): test = rpc_interface.get_detailed_test_views()[0] self._check_for_get_test_views(test) self.assertEquals(test['attributes'], {'myattr': 'myval', 'myattr2': 'myval2'}) self.assertEquals(test['iterations'], [{'attr': {'iattr': 'ival', 'iattr2': 'ival2'}, 'perf': {'iresult': 1, 'iresult2': 2}}, {'attr': {}, 'perf': {'iresult': 3, 'iresult2': 4}}]) self.assertEquals(test['labels'], ['testlabel1', 'testlabel2']) self.assertEquals(test['job_keyvals'], {'keyval_key': 'keyval_value'}) def test_test_attributes(self): rpc_interface.set_test_attribute('foo', 'bar', test_name='mytest1') test = rpc_interface.get_detailed_test_views()[0] self.assertEquals(test['attributes'], {'foo': 'bar', 'myattr': 'myval', 'myattr2': 'myval2'}) rpc_interface.set_test_attribute('foo', 'goo', test_name='mytest1') test = rpc_interface.get_detailed_test_views()[0] self.assertEquals(test['attributes'], {'foo': 'goo', 'myattr': 'myval', 'myattr2': 'myval2'}) rpc_interface.set_test_attribute('foo', None, test_name='mytest1') test = rpc_interface.get_detailed_test_views()[0] self.assertEquals(test['attributes'], {'myattr': 'myval', 'myattr2': 'myval2'}) def test_immutable_attributes(self): self.assertRaises(ValueError, rpc_interface.set_test_attribute, 'myattr', 'foo', test_name='mytest1') def test_get_test_views(self): tests = rpc_interface.get_test_views() self.assertEquals(len(tests), 3) test = rpc_interface.get_test_views( job_name='myjob1', test_name='mytest1')[0] self.assertEquals(tests[0], test) self._check_for_get_test_views(test) self.assertEquals( [], rpc_interface.get_test_views(hostname='fakehost')) def _check_test_names(self, tests, expected_names): self.assertEquals(set(test['test_name'] for test in tests), set(expected_names)) def test_get_test_views_filter_on_labels(self): tests = rpc_interface.get_test_views(include_labels=['testlabel1']) self._check_test_names(tests, ['mytest1']) tests = rpc_interface.get_test_views(exclude_labels=['testlabel1']) self._check_test_names(tests, ['mytest2', 'kernbench']) def test_get_test_views_filter_on_attributes(self): tests = rpc_interface.get_test_views( include_attributes_where='attribute = "myattr" ' 'and value = "myval"') self._check_test_names(tests, ['mytest1']) tests = rpc_interface.get_test_views( exclude_attributes_where='attribute="myattr2"') self._check_test_names(tests, ['mytest2', 'kernbench']) def test_get_num_test_views(self): self.assertEquals(rpc_interface.get_num_test_views(), 3) self.assertEquals(rpc_interface.get_num_test_views( job_name='myjob1', test_name='mytest1'), 1) def test_get_group_counts(self): self.assertEquals(rpc_interface.get_num_groups(['job_name']), 2) counts = rpc_interface.get_group_counts(['job_name']) groups = counts['groups'] self.assertEquals(len(groups), 2) group1, group2 = groups self.assertEquals(group1['group_count'], 2) self.assertEquals(group1['job_name'], 'myjob1') self.assertEquals(group2['group_count'], 1) self.assertEquals(group2['job_name'], 'myjob2') extra = {'extra' : 'kernel_hash'} counts = rpc_interface.get_group_counts(['job_name'], header_groups=[('job_name',)], extra_select_fields=extra) groups = counts['groups'] self.assertEquals(len(groups), 2) group1, group2 = groups self.assertEquals(group1['group_count'], 2) self.assertEquals(group1['header_indices'], [0]) self.assertEquals(group1['extra'], 'mykernel1') self.assertEquals(group2['group_count'], 1) self.assertEquals(group2['header_indices'], [1]) self.assertEquals(group2['extra'], 'mykernel2') def test_get_status_counts(self): counts = rpc_interface.get_status_counts(group_by=['job_name']) group1, group2 = counts['groups'] self.assertEquals(group1['pass_count'], 1) self.assertEquals(group1['complete_count'], 2) self.assertEquals(group1['incomplete_count'], 0) self.assertEquals(group2['pass_count'], 1) self.assertEquals(group2['complete_count'], 1) self.assertEquals(group2['incomplete_count'], 0) def test_get_latest_tests(self): counts = rpc_interface.get_latest_tests(group_by=['job_name']) group1, group2 = counts['groups'] self.assertEquals(group1['pass_count'], 0) self.assertEquals(group1['complete_count'], 1) self.assertEquals(group1['test_idx'], 2) self.assertEquals(group2['test_idx'], 3) def test_get_latest_tests_extra_info(self): counts = rpc_interface.get_latest_tests(group_by=['job_name'], extra_info=['job_tag']) group1, group2 = counts['groups'] self.assertEquals(group1['extra_info'], ['1-myjobtag1']) self.assertEquals(group2['extra_info'], ['2-myjobtag2']) def test_get_job_ids(self): self.assertEquals([1,2], rpc_interface.get_job_ids()) self.assertEquals([1], rpc_interface.get_job_ids(test_name='mytest2')) def test_get_hosts_and_tests(self): host_info = rpc_interface.get_hosts_and_tests() self.assertEquals(len(host_info), 1) info = host_info['myhost'] self.assertEquals(info['tests'], ['kernbench']) self.assertEquals(info['id'], 1) def _check_for_get_test_labels(self, label, label_num): self.assertEquals(label['id'], label_num) self.assertEquals(label['description'], '') self.assertEquals(label['name'], 'testlabel%d' % label_num) def test_test_labels(self): labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1') self.assertEquals(len(labels), 2) label1 = labels[0] label2 = labels[1] self._check_for_get_test_labels(label1, 1) self._check_for_get_test_labels(label2, 2) rpc_interface.test_label_remove_tests(label1['id'], test_name='mytest1') labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1') self.assertEquals(len(labels), 1) label = labels[0] self._check_for_get_test_labels(label, 2) rpc_interface.test_label_add_tests(label1['id'], test_name='mytest1') labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1') self.assertEquals(len(labels), 2) label1 = labels[0] label2 = labels[1] self._check_for_get_test_labels(label1, 1) self._check_for_get_test_labels(label2, 2) def test_get_test_attribute_fields(self): tests = rpc_interface.get_test_views( test_attribute_fields=['myattr', 'myattr2']) self.assertEquals(len(tests), 3) self.assertEquals(tests[0]['test_attribute_myattr'], 'myval') self.assertEquals(tests[0]['test_attribute_myattr2'], 'myval2') for index in (1, 2): self.assertEquals(tests[index]['test_attribute_myattr'], None) self.assertEquals(tests[index]['test_attribute_myattr2'], None) def test_filtering_on_test_attribute_fields(self): tests = rpc_interface.get_test_views( extra_where='test_attribute_myattr.value = "myval"', test_attribute_fields=['myattr']) self.assertEquals(len(tests), 1) def test_grouping_with_test_attribute_fields(self): num_groups = rpc_interface.get_num_groups( ['test_attribute_myattr'], test_attribute_fields=['myattr']) self.assertEquals(num_groups, 2) counts = rpc_interface.get_group_counts( ['test_attribute_myattr'], test_attribute_fields=['myattr']) groups = counts['groups'] self.assertEquals(len(groups), num_groups) self.assertEquals(groups[0]['test_attribute_myattr'], None) self.assertEquals(groups[0]['group_count'], 2) self.assertEquals(groups[1]['test_attribute_myattr'], 'myval') self.assertEquals(groups[1]['group_count'], 1) def test_extra_info_test_attributes(self): counts = rpc_interface.get_latest_tests( group_by=['test_idx'], extra_info=['test_attribute_myattr'], test_attribute_fields=['myattr']) group1 = counts['groups'][0] self.assertEquals(group1['extra_info'], ['myval']) def test_get_test_label_fields(self): tests = rpc_interface.get_test_views( test_label_fields=['testlabel1', 'testlabel2']) self.assertEquals(len(tests), 3) self.assertEquals(tests[0]['test_label_testlabel1'], 'testlabel1') self.assertEquals(tests[0]['test_label_testlabel2'], 'testlabel2') for index in (1, 2): self.assertEquals(tests[index]['test_label_testlabel1'], None) self.assertEquals(tests[index]['test_label_testlabel2'], None) def test_filtering_on_test_label_fields(self): tests = rpc_interface.get_test_views( extra_where='test_label_testlabel1 = "testlabel1"', test_label_fields=['testlabel1']) self.assertEquals(len(tests), 1) def test_grouping_on_test_label_fields(self): num_groups = rpc_interface.get_num_groups( ['test_label_testlabel1'], test_label_fields=['testlabel1']) self.assertEquals(num_groups, 2) counts = rpc_interface.get_group_counts( ['test_label_testlabel1'], test_label_fields=['testlabel1']) groups = counts['groups'] self.assertEquals(len(groups), 2) self.assertEquals(groups[0]['test_label_testlabel1'], None) self.assertEquals(groups[0]['group_count'], 2) self.assertEquals(groups[1]['test_label_testlabel1'], 'testlabel1') self.assertEquals(groups[1]['group_count'], 1) def test_get_iteration_result_fields(self): num_iterations = rpc_interface.get_num_test_views( iteration_result_fields=['iresult', 'iresult2']) self.assertEquals(num_iterations, 2) iterations = rpc_interface.get_test_views( iteration_result_fields=['iresult', 'iresult2']) self.assertEquals(len(iterations), 2) for index in (0, 1): self.assertEquals(iterations[index]['test_idx'], 1) self.assertEquals(iterations[0]['iteration_index'], 1) self.assertEquals(iterations[0]['iteration_result_iresult'], 1) self.assertEquals(iterations[0]['iteration_result_iresult2'], 2) self.assertEquals(iterations[1]['iteration_index'], 2) self.assertEquals(iterations[1]['iteration_result_iresult'], 3) self.assertEquals(iterations[1]['iteration_result_iresult2'], 4) def test_filtering_on_iteration_result_fields(self): iterations = rpc_interface.get_test_views( extra_where='iteration_result_iresult.value = 1', iteration_result_fields=['iresult']) self.assertEquals(len(iterations), 1) def test_grouping_with_iteration_result_fields(self): num_groups = rpc_interface.get_num_groups( ['iteration_result_iresult'], iteration_result_fields=['iresult']) self.assertEquals(num_groups, 2) counts = rpc_interface.get_group_counts( ['iteration_result_iresult'], iteration_result_fields=['iresult']) groups = counts['groups'] self.assertEquals(len(groups), 2) self.assertEquals(groups[0]['iteration_result_iresult'], 1) self.assertEquals(groups[0]['group_count'], 1) self.assertEquals(groups[1]['iteration_result_iresult'], 3) self.assertEquals(groups[1]['group_count'], 1) def _setup_machine_labels(self): models.TestAttribute.objects.create(test=self.first_test, attribute='host-labels', value='label1,label2') def test_get_machine_label_fields(self): self._setup_machine_labels() tests = rpc_interface.get_test_views( machine_label_fields=['label1', 'otherlabel']) self.assertEquals(len(tests), 3) self.assertEquals(tests[0]['machine_label_label1'], 'label1') self.assertEquals(tests[0]['machine_label_otherlabel'], None) for index in (1, 2): self.assertEquals(tests[index]['machine_label_label1'], None) self.assertEquals(tests[index]['machine_label_otherlabel'], None) def test_grouping_with_machine_label_fields(self): self._setup_machine_labels() counts = rpc_interface.get_group_counts(['machine_label_label1'], machine_label_fields=['label1']) groups = counts['groups'] self.assertEquals(len(groups), 2) self.assertEquals(groups[0]['machine_label_label1'], None) self.assertEquals(groups[0]['group_count'], 2) self.assertEquals(groups[1]['machine_label_label1'], 'label1') self.assertEquals(groups[1]['group_count'], 1) def test_filtering_on_machine_label_fields(self): self._setup_machine_labels() tests = rpc_interface.get_test_views( extra_where='machine_label_label1 = "label1"', machine_label_fields=['label1']) self.assertEquals(len(tests), 1) def test_quoting_fields(self): # ensure fields with special characters are properly quoted throughout rpc_interface.add_test_label('hyphen-label') rpc_interface.get_group_counts( ['test_attribute_hyphen-attr', 'test_label_hyphen-label', 'machine_label_hyphen-label', 'iteration_result_hyphen-result'], test_attribute_fields=['hyphen-attr'], test_label_fields=['hyphen-label'], machine_label_fields=['hyphen-label'], iteration_result_fields=['hyphen-result']) if __name__ == '__main__': unittest.main()