普通文本  |  613行  |  24.4 KB

#!/usr/bin/python

import re, unittest
import common
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend import setup_test_environment
from autotest_lib.client.common_lib.test_utils import mock
from django.db import connection
from autotest_lib.frontend.tko import models, rpc_interface

# this will need to be updated when the view changes for the test to be
# consistent with reality
_CREATE_TEST_VIEW = """
CREATE VIEW tko_test_view_2 AS
SELECT  tko_tests.test_idx AS test_idx,
        tko_tests.job_idx AS job_idx,
        tko_tests.test AS test_name,
        tko_tests.subdir AS subdir,
        tko_tests.kernel_idx AS kernel_idx,
        tko_tests.status AS status_idx,
        tko_tests.reason AS reason,
        tko_tests.machine_idx AS machine_idx,
        tko_tests.invalid AS invalid,
        tko_tests.invalidates_test_idx AS invalidates_test_idx,
        tko_tests.started_time AS test_started_time,
        tko_tests.finished_time AS test_finished_time,
        tko_jobs.tag AS job_tag,
        tko_jobs.label AS job_name,
        tko_jobs.username AS job_owner,
        tko_jobs.queued_time AS job_queued_time,
        tko_jobs.started_time AS job_started_time,
        tko_jobs.finished_time AS job_finished_time,
        tko_jobs.afe_job_id AS afe_job_id,
        tko_machines.hostname AS hostname,
        tko_machines.machine_group AS platform,
        tko_machines.owner AS machine_owner,
        tko_kernels.kernel_hash AS kernel_hash,
        tko_kernels.base AS kernel_base,
        tko_kernels.printable AS kernel,
        tko_status.word AS status
FROM tko_tests
INNER JOIN tko_jobs ON tko_jobs.job_idx = tko_tests.job_idx
INNER JOIN tko_machines ON tko_machines.machine_idx = tko_jobs.machine_idx
INNER JOIN tko_kernels ON tko_kernels.kernel_idx = tko_tests.kernel_idx
INNER JOIN tko_status ON tko_status.status_idx = tko_tests.status;
"""

# this will need to be updated if the table schemas change (or removed if we
# add proper primary keys)
_CREATE_ITERATION_ATTRIBUTES = """
CREATE TABLE "tko_iteration_attributes" (
    "test_idx" integer NOT NULL REFERENCES "tko_tests" ("test_idx"),
    "iteration" integer NOT NULL,
    "attribute" varchar(90) NOT NULL,
    "value" varchar(300) NOT NULL
);
"""

_CREATE_ITERATION_RESULTS = """
CREATE TABLE "tko_iteration_result" (
    "test_idx" integer NOT NULL REFERENCES "tko_tests" ("test_idx"),
    "iteration" integer NOT NULL,
    "attribute" varchar(90) NOT NULL,
    "value" numeric(12, 31) NULL
);
"""


def setup_test_view():
    """
    Django has no way to actually represent a view; we simply create a model for
    TestView.   This means when we syncdb, Django will create a table for it.
    So manually remove that table and replace it with a view.
    """
    cursor = connection.cursor()
    cursor.execute('DROP TABLE tko_test_view_2')
    cursor.execute(_CREATE_TEST_VIEW)


def fix_iteration_tables():
    """
    Since iteration tables don't have any real primary key, we "fake" one in the
    Django models.  So fix up the generated schema to match the real schema.
    """
    cursor = connection.cursor()
    cursor.execute('DROP TABLE tko_iteration_attributes')
    cursor.execute(_CREATE_ITERATION_ATTRIBUTES)
    cursor.execute('DROP TABLE tko_iteration_result')
    cursor.execute(_CREATE_ITERATION_RESULTS)


class TkoTestMixin(object):
    def _patch_sqlite_stuff(self):
        self.god.stub_with(models.TempManager, '_get_column_names',
                           self._get_column_names_for_sqlite3)
        self.god.stub_with(models.TempManager, '_cursor_rowcount',
                           self._cursor_rowcount_for_sqlite3)

        # add some functions to SQLite for MySQL compatibility
        connection.cursor() # ensure connection is alive
        connection.connection.create_function('if', 3, self._sqlite_if)
        connection.connection.create_function('find_in_set', 2,
                                              self._sqlite_find_in_set)

        fix_iteration_tables()


    def _cursor_rowcount_for_sqlite3(self, cursor):
        return len(cursor.fetchall())


    def _sqlite_find_in_set(self, needle, haystack):
        return needle in haystack.split(',')


    def _sqlite_if(self, condition, true_result, false_result):
        if condition:
            return true_result
        return false_result


    # sqlite takes any columns that don't have aliases and names them
    # "table_name"."column_name".  we map these to just column_name.
    _SQLITE_AUTO_COLUMN_ALIAS_RE = re.compile(r'".+"\."(.+)"')


    def _get_column_names_for_sqlite3(self, cursor):
        names = [column_info[0] for column_info in cursor.description]

        # replace all "table_name"."column_name" constructs with just
        # column_name
        for i, name in enumerate(names):
            match = self._SQLITE_AUTO_COLUMN_ALIAS_RE.match(name)
            if match:
                names[i] = match.group(1)

        return names


    def _create_initial_data(self):
        machine = models.Machine.objects.create(hostname='myhost')

        # create basic objects
        kernel_name = 'mykernel1'
        kernel1 = models.Kernel.objects.create(kernel_hash=kernel_name,
                                               base=kernel_name,
                                               printable=kernel_name)

        kernel_name = 'mykernel2'
        kernel2 = models.Kernel.objects.create(kernel_hash=kernel_name,
                                               base=kernel_name,
                                               printable=kernel_name)

        good_status = models.Status.objects.create(word='GOOD')
        failed_status = models.Status.objects.create(word='FAILED')

        job1 = models.Job.objects.create(tag='1-myjobtag1', label='myjob1',
                                         username='myuser', machine=machine,
                                         afe_job_id=1)
        job2 = models.Job.objects.create(tag='2-myjobtag2', label='myjob2',
                                         username='myuser', machine=machine,
                                         afe_job_id=2)

        job1_test1 = models.Test.objects.create(job=job1, test='mytest1',
                                                kernel=kernel1,
                                                status=good_status,
                                                machine=machine)
        self.first_test = job1_test1
        job1_test2 = models.Test.objects.create(job=job1, test='mytest2',
                                                kernel=kernel1,
                                                status=failed_status,
                                                machine=machine)
        job2_test1 = models.Test.objects.create(job=job2, test='kernbench',
                                                kernel=kernel2,
                                                status=good_status,
                                                machine=machine)

        job1.jobkeyval_set.create(key='keyval_key', value='keyval_value')

        # create test attributes, test labels, and iterations
        # like Noah's Ark, include two of each...just in case there's a bug with
        # multiple related items
        models.TestAttribute.objects.create(test=job1_test1, attribute='myattr',
                                            value='myval')
        models.TestAttribute.objects.create(test=job1_test1,
                                            attribute='myattr2', value='myval2')

        self._add_iteration_keyval('tko_iteration_attributes', test=job1_test1,
                                   iteration=1, attribute='iattr',
                                   value='ival')
        self._add_iteration_keyval('tko_iteration_attributes', test=job1_test1,
                                   iteration=1, attribute='iattr2',
                                   value='ival2')
        self._add_iteration_keyval('tko_iteration_result', test=job1_test1,
                                   iteration=1, attribute='iresult', value=1)
        self._add_iteration_keyval('tko_iteration_result', test=job1_test1,
                                   iteration=1, attribute='iresult2', value=2)
        self._add_iteration_keyval('tko_iteration_result', test=job1_test1,
                                   iteration=2, attribute='iresult', value=3)
        self._add_iteration_keyval('tko_iteration_result', test=job1_test1,
                                   iteration=2, attribute='iresult2', value=4)

        label1 = models.TestLabel.objects.create(name='testlabel1')
        label2 = models.TestLabel.objects.create(name='testlabel2')

        label1.tests.add(job1_test1)
        label2.tests.add(job1_test1)


    def _add_iteration_keyval(self, table, test, iteration, attribute, value):
        cursor = connection.cursor()
        cursor.execute('INSERT INTO %s ' 'VALUES (%%s, %%s, %%s, %%s)' % table,
                       (test.test_idx, iteration, attribute, value))


class RpcInterfaceTest(unittest.TestCase, TkoTestMixin):
    def setUp(self):
        self.god = mock.mock_god()

        setup_test_environment.set_up()
        self._patch_sqlite_stuff()
        setup_test_view()
        self._create_initial_data()


    def tearDown(self):
        setup_test_environment.tear_down()
        self.god.unstub_all()


    def _check_for_get_test_views(self, test):
        self.assertEquals(test['test_name'], 'mytest1')
        self.assertEquals(test['job_tag'], '1-myjobtag1')
        self.assertEquals(test['job_name'], 'myjob1')
        self.assertEquals(test['job_owner'], 'myuser')
        self.assertEquals(test['status'], 'GOOD')
        self.assertEquals(test['hostname'], 'myhost')
        self.assertEquals(test['kernel'], 'mykernel1')


    def test_get_detailed_test_views(self):
        test = rpc_interface.get_detailed_test_views()[0]

        self._check_for_get_test_views(test)

        self.assertEquals(test['attributes'], {'myattr': 'myval',
                                               'myattr2': 'myval2'})
        self.assertEquals(test['iterations'], [{'attr': {'iattr': 'ival',
                                                         'iattr2': 'ival2'},
                                                'perf': {'iresult': 1,
                                                         'iresult2': 2}},
                                               {'attr': {},
                                                'perf': {'iresult': 3,
                                                         'iresult2': 4}}])
        self.assertEquals(test['labels'], ['testlabel1', 'testlabel2'])
        self.assertEquals(test['job_keyvals'], {'keyval_key': 'keyval_value'})


    def test_test_attributes(self):
        rpc_interface.set_test_attribute('foo', 'bar', test_name='mytest1')
        test = rpc_interface.get_detailed_test_views()[0]
        self.assertEquals(test['attributes'], {'foo': 'bar',
                                               'myattr': 'myval',
                                               'myattr2': 'myval2'})

        rpc_interface.set_test_attribute('foo', 'goo', test_name='mytest1')
        test = rpc_interface.get_detailed_test_views()[0]
        self.assertEquals(test['attributes'], {'foo': 'goo',
                                               'myattr': 'myval',
                                               'myattr2': 'myval2'})

        rpc_interface.set_test_attribute('foo', None, test_name='mytest1')
        test = rpc_interface.get_detailed_test_views()[0]
        self.assertEquals(test['attributes'], {'myattr': 'myval',
                                               'myattr2': 'myval2'})


    def test_immutable_attributes(self):
        self.assertRaises(ValueError, rpc_interface.set_test_attribute,
                          'myattr', 'foo', test_name='mytest1')


    def test_get_test_views(self):
        tests = rpc_interface.get_test_views()

        self.assertEquals(len(tests), 3)
        test = rpc_interface.get_test_views(
            job_name='myjob1', test_name='mytest1')[0]
        self.assertEquals(tests[0], test)

        self._check_for_get_test_views(test)

        self.assertEquals(
            [], rpc_interface.get_test_views(hostname='fakehost'))


    def _check_test_names(self, tests, expected_names):
        self.assertEquals(set(test['test_name'] for test in tests),
                          set(expected_names))


    def test_get_test_views_filter_on_labels(self):
        tests = rpc_interface.get_test_views(include_labels=['testlabel1'])
        self._check_test_names(tests, ['mytest1'])

        tests = rpc_interface.get_test_views(exclude_labels=['testlabel1'])
        self._check_test_names(tests, ['mytest2', 'kernbench'])


    def test_get_test_views_filter_on_attributes(self):
        tests = rpc_interface.get_test_views(
                include_attributes_where='attribute = "myattr" '
                                         'and value = "myval"')
        self._check_test_names(tests, ['mytest1'])

        tests = rpc_interface.get_test_views(
                exclude_attributes_where='attribute="myattr2"')
        self._check_test_names(tests, ['mytest2', 'kernbench'])


    def test_get_num_test_views(self):
        self.assertEquals(rpc_interface.get_num_test_views(), 3)
        self.assertEquals(rpc_interface.get_num_test_views(
            job_name='myjob1', test_name='mytest1'), 1)


    def test_get_group_counts(self):
        self.assertEquals(rpc_interface.get_num_groups(['job_name']), 2)

        counts = rpc_interface.get_group_counts(['job_name'])
        groups = counts['groups']
        self.assertEquals(len(groups), 2)
        group1, group2 = groups

        self.assertEquals(group1['group_count'], 2)
        self.assertEquals(group1['job_name'], 'myjob1')
        self.assertEquals(group2['group_count'], 1)
        self.assertEquals(group2['job_name'], 'myjob2')

        extra = {'extra' : 'kernel_hash'}
        counts = rpc_interface.get_group_counts(['job_name'],
                                                header_groups=[('job_name',)],
                                                extra_select_fields=extra)
        groups = counts['groups']
        self.assertEquals(len(groups), 2)
        group1, group2 = groups

        self.assertEquals(group1['group_count'], 2)
        self.assertEquals(group1['header_indices'], [0])
        self.assertEquals(group1['extra'], 'mykernel1')
        self.assertEquals(group2['group_count'], 1)
        self.assertEquals(group2['header_indices'], [1])
        self.assertEquals(group2['extra'], 'mykernel2')


    def test_get_status_counts(self):
        counts = rpc_interface.get_status_counts(group_by=['job_name'])
        group1, group2 = counts['groups']
        self.assertEquals(group1['pass_count'], 1)
        self.assertEquals(group1['complete_count'], 2)
        self.assertEquals(group1['incomplete_count'], 0)
        self.assertEquals(group2['pass_count'], 1)
        self.assertEquals(group2['complete_count'], 1)
        self.assertEquals(group2['incomplete_count'], 0)


    def test_get_latest_tests(self):
        counts = rpc_interface.get_latest_tests(group_by=['job_name'])
        group1, group2 = counts['groups']
        self.assertEquals(group1['pass_count'], 0)
        self.assertEquals(group1['complete_count'], 1)
        self.assertEquals(group1['test_idx'], 2)
        self.assertEquals(group2['test_idx'], 3)


    def test_get_latest_tests_extra_info(self):
        counts = rpc_interface.get_latest_tests(group_by=['job_name'],
                                                extra_info=['job_tag'])
        group1, group2 = counts['groups']
        self.assertEquals(group1['extra_info'], ['1-myjobtag1'])
        self.assertEquals(group2['extra_info'], ['2-myjobtag2'])


    def test_get_job_ids(self):
        self.assertEquals([1,2], rpc_interface.get_job_ids())
        self.assertEquals([1], rpc_interface.get_job_ids(test_name='mytest2'))


    def test_get_hosts_and_tests(self):
        host_info = rpc_interface.get_hosts_and_tests()
        self.assertEquals(len(host_info), 1)
        info = host_info['myhost']

        self.assertEquals(info['tests'], ['kernbench'])
        self.assertEquals(info['id'], 1)


    def _check_for_get_test_labels(self, label, label_num):
        self.assertEquals(label['id'], label_num)
        self.assertEquals(label['description'], '')
        self.assertEquals(label['name'], 'testlabel%d' % label_num)


    def test_test_labels(self):
        labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1')
        self.assertEquals(len(labels), 2)
        label1 = labels[0]
        label2 = labels[1]

        self._check_for_get_test_labels(label1, 1)
        self._check_for_get_test_labels(label2, 2)

        rpc_interface.test_label_remove_tests(label1['id'], test_name='mytest1')

        labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1')
        self.assertEquals(len(labels), 1)
        label = labels[0]

        self._check_for_get_test_labels(label, 2)

        rpc_interface.test_label_add_tests(label1['id'], test_name='mytest1')

        labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1')
        self.assertEquals(len(labels), 2)
        label1 = labels[0]
        label2 = labels[1]

        self._check_for_get_test_labels(label1, 1)
        self._check_for_get_test_labels(label2, 2)


    def test_get_test_attribute_fields(self):
        tests = rpc_interface.get_test_views(
                test_attribute_fields=['myattr', 'myattr2'])
        self.assertEquals(len(tests), 3)

        self.assertEquals(tests[0]['test_attribute_myattr'], 'myval')
        self.assertEquals(tests[0]['test_attribute_myattr2'], 'myval2')

        for index in (1, 2):
            self.assertEquals(tests[index]['test_attribute_myattr'], None)
            self.assertEquals(tests[index]['test_attribute_myattr2'], None)


    def test_filtering_on_test_attribute_fields(self):
        tests = rpc_interface.get_test_views(
                extra_where='test_attribute_myattr.value = "myval"',
                test_attribute_fields=['myattr'])
        self.assertEquals(len(tests), 1)


    def test_grouping_with_test_attribute_fields(self):
        num_groups = rpc_interface.get_num_groups(
                ['test_attribute_myattr'], test_attribute_fields=['myattr'])
        self.assertEquals(num_groups, 2)

        counts = rpc_interface.get_group_counts(
                ['test_attribute_myattr'], test_attribute_fields=['myattr'])
        groups = counts['groups']
        self.assertEquals(len(groups), num_groups)
        self.assertEquals(groups[0]['test_attribute_myattr'], None)
        self.assertEquals(groups[0]['group_count'], 2)
        self.assertEquals(groups[1]['test_attribute_myattr'], 'myval')
        self.assertEquals(groups[1]['group_count'], 1)


    def test_extra_info_test_attributes(self):
        counts = rpc_interface.get_latest_tests(
                group_by=['test_idx'], extra_info=['test_attribute_myattr'],
                test_attribute_fields=['myattr'])
        group1 = counts['groups'][0]
        self.assertEquals(group1['extra_info'], ['myval'])


    def test_get_test_label_fields(self):
        tests = rpc_interface.get_test_views(
                test_label_fields=['testlabel1', 'testlabel2'])
        self.assertEquals(len(tests), 3)

        self.assertEquals(tests[0]['test_label_testlabel1'], 'testlabel1')
        self.assertEquals(tests[0]['test_label_testlabel2'], 'testlabel2')

        for index in (1, 2):
            self.assertEquals(tests[index]['test_label_testlabel1'], None)
            self.assertEquals(tests[index]['test_label_testlabel2'], None)


    def test_filtering_on_test_label_fields(self):
        tests = rpc_interface.get_test_views(
                extra_where='test_label_testlabel1 = "testlabel1"',
                test_label_fields=['testlabel1'])
        self.assertEquals(len(tests), 1)


    def test_grouping_on_test_label_fields(self):
        num_groups = rpc_interface.get_num_groups(
                ['test_label_testlabel1'], test_label_fields=['testlabel1'])
        self.assertEquals(num_groups, 2)

        counts = rpc_interface.get_group_counts(
                ['test_label_testlabel1'], test_label_fields=['testlabel1'])
        groups = counts['groups']
        self.assertEquals(len(groups), 2)
        self.assertEquals(groups[0]['test_label_testlabel1'], None)
        self.assertEquals(groups[0]['group_count'], 2)
        self.assertEquals(groups[1]['test_label_testlabel1'], 'testlabel1')
        self.assertEquals(groups[1]['group_count'], 1)


    def test_get_iteration_result_fields(self):
        num_iterations = rpc_interface.get_num_test_views(
                iteration_result_fields=['iresult', 'iresult2'])
        self.assertEquals(num_iterations, 2)

        iterations = rpc_interface.get_test_views(
                iteration_result_fields=['iresult', 'iresult2'])
        self.assertEquals(len(iterations), 2)

        for index in (0, 1):
            self.assertEquals(iterations[index]['test_idx'], 1)

        self.assertEquals(iterations[0]['iteration_index'], 1)
        self.assertEquals(iterations[0]['iteration_result_iresult'], 1)
        self.assertEquals(iterations[0]['iteration_result_iresult2'], 2)

        self.assertEquals(iterations[1]['iteration_index'], 2)
        self.assertEquals(iterations[1]['iteration_result_iresult'], 3)
        self.assertEquals(iterations[1]['iteration_result_iresult2'], 4)


    def test_filtering_on_iteration_result_fields(self):
        iterations = rpc_interface.get_test_views(
                extra_where='iteration_result_iresult.value = 1',
                iteration_result_fields=['iresult'])
        self.assertEquals(len(iterations), 1)


    def test_grouping_with_iteration_result_fields(self):
        num_groups = rpc_interface.get_num_groups(
                ['iteration_result_iresult'],
                iteration_result_fields=['iresult'])
        self.assertEquals(num_groups, 2)

        counts = rpc_interface.get_group_counts(
                ['iteration_result_iresult'],
                iteration_result_fields=['iresult'])
        groups = counts['groups']
        self.assertEquals(len(groups), 2)
        self.assertEquals(groups[0]['iteration_result_iresult'], 1)
        self.assertEquals(groups[0]['group_count'], 1)
        self.assertEquals(groups[1]['iteration_result_iresult'], 3)
        self.assertEquals(groups[1]['group_count'], 1)


    def _setup_machine_labels(self):
        models.TestAttribute.objects.create(test=self.first_test,
                                            attribute='host-labels',
                                            value='label1,label2')


    def test_get_machine_label_fields(self):
        self._setup_machine_labels()

        tests = rpc_interface.get_test_views(
                machine_label_fields=['label1', 'otherlabel'])
        self.assertEquals(len(tests), 3)

        self.assertEquals(tests[0]['machine_label_label1'], 'label1')
        self.assertEquals(tests[0]['machine_label_otherlabel'], None)

        for index in (1, 2):
            self.assertEquals(tests[index]['machine_label_label1'], None)
            self.assertEquals(tests[index]['machine_label_otherlabel'], None)


    def test_grouping_with_machine_label_fields(self):
        self._setup_machine_labels()

        counts = rpc_interface.get_group_counts(['machine_label_label1'],
                                                machine_label_fields=['label1'])
        groups = counts['groups']
        self.assertEquals(len(groups), 2)
        self.assertEquals(groups[0]['machine_label_label1'], None)
        self.assertEquals(groups[0]['group_count'], 2)
        self.assertEquals(groups[1]['machine_label_label1'], 'label1')
        self.assertEquals(groups[1]['group_count'], 1)


    def test_filtering_on_machine_label_fields(self):
        self._setup_machine_labels()

        tests = rpc_interface.get_test_views(
                extra_where='machine_label_label1 = "label1"',
                machine_label_fields=['label1'])
        self.assertEquals(len(tests), 1)


    def test_quoting_fields(self):
        # ensure fields with special characters are properly quoted throughout
        rpc_interface.add_test_label('hyphen-label')
        rpc_interface.get_group_counts(
                ['test_attribute_hyphen-attr', 'test_label_hyphen-label',
                 'machine_label_hyphen-label',
                 'iteration_result_hyphen-result'],
                test_attribute_fields=['hyphen-attr'],
                test_label_fields=['hyphen-label'],
                machine_label_fields=['hyphen-label'],
                iteration_result_fields=['hyphen-result'])


if __name__ == '__main__':
    unittest.main()