普通文本  |  167行  |  5.3 KB

import csv
import django.http
import common
from autotest_lib.frontend.afe import rpc_utils

class CsvEncoder(object):
    def __init__(self, request, response):
        self._request = request
        self._response = response
        self._output_rows = []


    def _append_output_row(self, row):
        self._output_rows.append(row)


    def _build_response(self):
        response = django.http.HttpResponse(mimetype='text/csv')
        response['Content-Disposition'] = (
            'attachment; filename=tko_query.csv')
        writer = csv.writer(response)
        writer.writerows(self._output_rows)
        return response


    def encode(self):
        raise NotImplementedError


class UnhandledMethodEncoder(CsvEncoder):
    def encode(self):
        return rpc_utils.raw_http_response(
            'Unhandled method %s (this indicates a bug)\r\n' %
            self._request['method'])


class SpreadsheetCsvEncoder(CsvEncoder):
    def _total_index(self, group, num_columns):
        row_index, column_index = group['header_indices']
        return row_index * num_columns + column_index


    def _group_string(self, group):
        result = '%s / %s' % (group['pass_count'], group['complete_count'])
        if group['incomplete_count'] > 0:
            result +=  ' (%s incomplete)' % group['incomplete_count']
        if 'extra_info' in group:
            result = '\n'.join([result] + group['extra_info'])
        return result


    def _build_value_table(self):
        value_table = [''] * self._num_rows * self._num_columns
        for group in self._response['groups']:
            total_index = self._total_index(group, self._num_columns)
            value_table[total_index] = self._group_string(group)
        return value_table


    def _header_string(self, header_value):
        return '/'.join(header_value)


    def _process_value_table(self, value_table, row_headers):
        total_index = 0
        for row_index in xrange(self._num_rows):
            row_header = self._header_string(row_headers[row_index])
            row_end_index = total_index + self._num_columns
            row_values = value_table[total_index:row_end_index]
            self._append_output_row([row_header] + row_values)
            total_index += self._num_columns


    def encode(self):
        header_values = self._response['header_values']
        assert len(header_values) == 2
        row_headers, column_headers = header_values
        self._num_rows, self._num_columns = (len(row_headers),
                                             len(column_headers))

        value_table = self._build_value_table()

        first_line = [''] + [self._header_string(header_value)
                            for header_value in column_headers]
        self._append_output_row(first_line)
        self._process_value_table(value_table, row_headers)

        return self._build_response()


class TableCsvEncoder(CsvEncoder):
    def __init__(self, request, response):
        super(TableCsvEncoder, self).__init__(request, response)
        self._column_specs = request['columns']


    def _format_row(self, row_object):
        """Extract data from a row object into a list of strings"""
        return [row_object.get(field) for field, name in self._column_specs]


    def _encode_table(self, row_objects):
        self._append_output_row([column_spec[1] # header row
                                 for column_spec in self._column_specs])
        for row_object in row_objects:
            self._append_output_row(self._format_row(row_object))
        return self._build_response()


    def encode(self):
        return self._encode_table(self._response)


class GroupedTableCsvEncoder(TableCsvEncoder):
    def encode(self):
        return self._encode_table(self._response['groups'])


class StatusCountTableCsvEncoder(GroupedTableCsvEncoder):
    _PASS_RATE_FIELD = '_test_pass_rate'

    def __init__(self, request, response):
        super(StatusCountTableCsvEncoder, self).__init__(request, response)
        # inject a more sensible field name for test pass rate
        for column_spec in self._column_specs:
            field, name = column_spec
            if name == 'Test pass rate':
                column_spec[0] = self._PASS_RATE_FIELD
                break


    def _format_pass_rate(self, row_object):
        result = '%s / %s' % (row_object['pass_count'],
                              row_object['complete_count'])
        incomplete_count = row_object['incomplete_count']
        if incomplete_count:
            result += ' (%s incomplete)' % incomplete_count
        return result


    def _format_row(self, row_object):
        row_object[self._PASS_RATE_FIELD] = self._format_pass_rate(row_object)
        return super(StatusCountTableCsvEncoder, self)._format_row(row_object)


_ENCODER_MAP = {
    'get_latest_tests' : SpreadsheetCsvEncoder,
    'get_test_views' : TableCsvEncoder,
    'get_group_counts' : GroupedTableCsvEncoder,
}


def _get_encoder_class(request):
    method = request['method']
    if method in _ENCODER_MAP:
        return _ENCODER_MAP[method]
    if method == 'get_status_counts':
        if 'columns' in request:
            return StatusCountTableCsvEncoder
        return SpreadsheetCsvEncoder
    return UnhandledMethodEncoder


def encoder(request, response):
    EncoderClass = _get_encoder_class(request)
    return EncoderClass(request, response)