# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import collections
import itertools
import traceback
import unittest
from xml.etree import ElementTree

import coverage
from six import moves

from tests import _loader


class CaseResult(
        collections.namedtuple('CaseResult', [
            'id', 'name', 'kind', 'stdout', 'stderr', 'skip_reason', 'traceback'
        ])):
    """A serializable result of a single test case.

  Attributes:
    id (object): Any serializable object used to denote the identity of this
      test case.
    name (str or None): A human-readable name of the test case.
    kind (CaseResult.Kind): The kind of test result.
    stdout (object or None): Output on stdout, or None if nothing was captured.
    stderr (object or None): Output on stderr, or None if nothing was captured.
    skip_reason (object or None): The reason the test was skipped. Must be
      something if self.kind is CaseResult.Kind.SKIP, else None.
    traceback (object or None): The traceback of the test. Must be something if
      self.kind is CaseResult.Kind.{ERROR, FAILURE, EXPECTED_FAILURE}, else
      None.
  """

    class Kind(object):
        UNTESTED = 'untested'
        RUNNING = 'running'
        ERROR = 'error'
        FAILURE = 'failure'
        SUCCESS = 'success'
        SKIP = 'skip'
        EXPECTED_FAILURE = 'expected failure'
        UNEXPECTED_SUCCESS = 'unexpected success'

    def __new__(cls,
                id=None,
                name=None,
                kind=None,
                stdout=None,
                stderr=None,
                skip_reason=None,
                traceback=None):
        """Helper keyword constructor for the namedtuple.

    See this class' attributes for information on the arguments."""
        assert id is not None
        assert name is None or isinstance(name, str)
        if kind is CaseResult.Kind.UNTESTED:
            pass
        elif kind is CaseResult.Kind.RUNNING:
            pass
        elif kind is CaseResult.Kind.ERROR:
            assert traceback is not None
        elif kind is CaseResult.Kind.FAILURE:
            assert traceback is not None
        elif kind is CaseResult.Kind.SUCCESS:
            pass
        elif kind is CaseResult.Kind.SKIP:
            assert skip_reason is not None
        elif kind is CaseResult.Kind.EXPECTED_FAILURE:
            assert traceback is not None
        elif kind is CaseResult.Kind.UNEXPECTED_SUCCESS:
            pass
        else:
            assert False
        return super(cls, CaseResult).__new__(cls, id, name, kind, stdout,
                                              stderr, skip_reason, traceback)

    def updated(self,
                name=None,
                kind=None,
                stdout=None,
                stderr=None,
                skip_reason=None,
                traceback=None):
        """Get a new validated CaseResult with the fields updated.

    See this class' attributes for information on the arguments."""
        name = self.name if name is None else name
        kind = self.kind if kind is None else kind
        stdout = self.stdout if stdout is None else stdout
        stderr = self.stderr if stderr is None else stderr
        skip_reason = self.skip_reason if skip_reason is None else skip_reason
        traceback = self.traceback if traceback is None else traceback
        return CaseResult(
            id=self.id,
            name=name,
            kind=kind,
            stdout=stdout,
            stderr=stderr,
            skip_reason=skip_reason,
            traceback=traceback)


class AugmentedResult(unittest.TestResult):
    """unittest.Result that keeps track of additional information.

  Uses CaseResult objects to store test-case results, providing additional
  information beyond that of the standard Python unittest library, such as
  standard output.

  Attributes:
    id_map (callable): A unary callable mapping unittest.TestCase objects to
      unique identifiers.
    cases (dict): A dictionary mapping from the identifiers returned by id_map
      to CaseResult objects corresponding to those IDs.
  """

    def __init__(self, id_map):
        """Initialize the object with an identifier mapping.

    Arguments:
      id_map (callable): Corresponds to the attribute `id_map`."""
        super(AugmentedResult, self).__init__()
        self.id_map = id_map
        self.cases = None

    def startTestRun(self):
        """See unittest.TestResult.startTestRun."""
        super(AugmentedResult, self).startTestRun()
        self.cases = dict()

    def startTest(self, test):
        """See unittest.TestResult.startTest."""
        super(AugmentedResult, self).startTest(test)
        case_id = self.id_map(test)
        self.cases[case_id] = CaseResult(
            id=case_id, name=test.id(), kind=CaseResult.Kind.RUNNING)

    def addError(self, test, err):
        """See unittest.TestResult.addError."""
        super(AugmentedResult, self).addError(test, err)
        case_id = self.id_map(test)
        self.cases[case_id] = self.cases[case_id].updated(
            kind=CaseResult.Kind.ERROR, traceback=err)

    def addFailure(self, test, err):
        """See unittest.TestResult.addFailure."""
        super(AugmentedResult, self).addFailure(test, err)
        case_id = self.id_map(test)
        self.cases[case_id] = self.cases[case_id].updated(
            kind=CaseResult.Kind.FAILURE, traceback=err)

    def addSuccess(self, test):
        """See unittest.TestResult.addSuccess."""
        super(AugmentedResult, self).addSuccess(test)
        case_id = self.id_map(test)
        self.cases[case_id] = self.cases[case_id].updated(
            kind=CaseResult.Kind.SUCCESS)

    def addSkip(self, test, reason):
        """See unittest.TestResult.addSkip."""
        super(AugmentedResult, self).addSkip(test, reason)
        case_id = self.id_map(test)
        self.cases[case_id] = self.cases[case_id].updated(
            kind=CaseResult.Kind.SKIP, skip_reason=reason)

    def addExpectedFailure(self, test, err):
        """See unittest.TestResult.addExpectedFailure."""
        super(AugmentedResult, self).addExpectedFailure(test, err)
        case_id = self.id_map(test)
        self.cases[case_id] = self.cases[case_id].updated(
            kind=CaseResult.Kind.EXPECTED_FAILURE, traceback=err)

    def addUnexpectedSuccess(self, test):
        """See unittest.TestResult.addUnexpectedSuccess."""
        super(AugmentedResult, self).addUnexpectedSuccess(test)
        case_id = self.id_map(test)
        self.cases[case_id] = self.cases[case_id].updated(
            kind=CaseResult.Kind.UNEXPECTED_SUCCESS)

    def set_output(self, test, stdout, stderr):
        """Set the output attributes for the CaseResult corresponding to a test.

    Args:
      test (unittest.TestCase): The TestCase to set the outputs of.
      stdout (str): Output from stdout to assign to self.id_map(test).
      stderr (str): Output from stderr to assign to self.id_map(test).
    """
        case_id = self.id_map(test)
        self.cases[case_id] = self.cases[case_id].updated(
            stdout=stdout.decode(), stderr=stderr.decode())

    def augmented_results(self, filter):
        """Convenience method to retrieve filtered case results.

    Args:
      filter (callable): A unary predicate to filter over CaseResult objects.
    """
        return (self.cases[case_id]
                for case_id in self.cases
                if filter(self.cases[case_id]))


class CoverageResult(AugmentedResult):
    """Extension to AugmentedResult adding coverage.py support per test.\

  Attributes:
    coverage_context (coverage.Coverage): coverage.py management object.
  """

    def __init__(self, id_map):
        """See AugmentedResult.__init__."""
        super(CoverageResult, self).__init__(id_map=id_map)
        self.coverage_context = None

    def startTest(self, test):
        """See unittest.TestResult.startTest.

    Additionally initializes and begins code coverage tracking."""
        super(CoverageResult, self).startTest(test)
        self.coverage_context = coverage.Coverage(data_suffix=True)
        self.coverage_context.start()

    def stopTest(self, test):
        """See unittest.TestResult.stopTest.

    Additionally stops and deinitializes code coverage tracking."""
        super(CoverageResult, self).stopTest(test)
        self.coverage_context.stop()
        self.coverage_context.save()
        self.coverage_context = None


class _Colors(object):
    """Namespaced constants for terminal color magic numbers."""
    HEADER = '\033[95m'
    INFO = '\033[94m'
    OK = '\033[92m'
    WARN = '\033[93m'
    FAIL = '\033[91m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
    END = '\033[0m'


class TerminalResult(CoverageResult):
    """Extension to CoverageResult adding basic terminal reporting."""

    def __init__(self, out, id_map):
        """Initialize the result object.

    Args:
      out (file-like): Output file to which terminal-colored live results will
        be written.
      id_map (callable): See AugmentedResult.__init__.
    """
        super(TerminalResult, self).__init__(id_map=id_map)
        self.out = out

    def startTestRun(self):
        """See unittest.TestResult.startTestRun."""
        super(TerminalResult, self).startTestRun()
        self.out.write(
            _Colors.HEADER + 'Testing gRPC Python...\n' + _Colors.END)

    def stopTestRun(self):
        """See unittest.TestResult.stopTestRun."""
        super(TerminalResult, self).stopTestRun()
        self.out.write(summary(self))
        self.out.flush()

    def addError(self, test, err):
        """See unittest.TestResult.addError."""
        super(TerminalResult, self).addError(test, err)
        self.out.write(
            _Colors.FAIL + 'ERROR         {}\n'.format(test.id()) + _Colors.END)
        self.out.flush()

    def addFailure(self, test, err):
        """See unittest.TestResult.addFailure."""
        super(TerminalResult, self).addFailure(test, err)
        self.out.write(
            _Colors.FAIL + 'FAILURE       {}\n'.format(test.id()) + _Colors.END)
        self.out.flush()

    def addSuccess(self, test):
        """See unittest.TestResult.addSuccess."""
        super(TerminalResult, self).addSuccess(test)
        self.out.write(
            _Colors.OK + 'SUCCESS       {}\n'.format(test.id()) + _Colors.END)
        self.out.flush()

    def addSkip(self, test, reason):
        """See unittest.TestResult.addSkip."""
        super(TerminalResult, self).addSkip(test, reason)
        self.out.write(
            _Colors.INFO + 'SKIP          {}\n'.format(test.id()) + _Colors.END)
        self.out.flush()

    def addExpectedFailure(self, test, err):
        """See unittest.TestResult.addExpectedFailure."""
        super(TerminalResult, self).addExpectedFailure(test, err)
        self.out.write(
            _Colors.INFO + 'FAILURE_OK    {}\n'.format(test.id()) + _Colors.END)
        self.out.flush()

    def addUnexpectedSuccess(self, test):
        """See unittest.TestResult.addUnexpectedSuccess."""
        super(TerminalResult, self).addUnexpectedSuccess(test)
        self.out.write(
            _Colors.INFO + 'UNEXPECTED_OK {}\n'.format(test.id()) + _Colors.END)
        self.out.flush()


def _traceback_string(type, value, trace):
    """Generate a descriptive string of a Python exception traceback.

  Args:
    type (class): The type of the exception.
    value (Exception): The value of the exception.
    trace (traceback): Traceback of the exception.

  Returns:
    str: Formatted exception descriptive string.
  """
    buffer = moves.cStringIO()
    traceback.print_exception(type, value, trace, file=buffer)
    return buffer.getvalue()


def summary(result):
    """A summary string of a result object.

  Args:
    result (AugmentedResult): The result object to get the summary of.

  Returns:
    str: The summary string.
  """
    assert isinstance(result, AugmentedResult)
    untested = list(
        result.augmented_results(
            lambda case_result: case_result.kind is CaseResult.Kind.UNTESTED))
    running = list(
        result.augmented_results(
            lambda case_result: case_result.kind is CaseResult.Kind.RUNNING))
    failures = list(
        result.augmented_results(
            lambda case_result: case_result.kind is CaseResult.Kind.FAILURE))
    errors = list(
        result.augmented_results(
            lambda case_result: case_result.kind is CaseResult.Kind.ERROR))
    successes = list(
        result.augmented_results(
            lambda case_result: case_result.kind is CaseResult.Kind.SUCCESS))
    skips = list(
        result.augmented_results(
            lambda case_result: case_result.kind is CaseResult.Kind.SKIP))
    expected_failures = list(
        result.augmented_results(
            lambda case_result: case_result.kind is CaseResult.Kind.EXPECTED_FAILURE
        ))
    unexpected_successes = list(
        result.augmented_results(
            lambda case_result: case_result.kind is CaseResult.Kind.UNEXPECTED_SUCCESS
        ))
    running_names = [case.name for case in running]
    finished_count = (len(failures) + len(errors) + len(successes) +
                      len(expected_failures) + len(unexpected_successes))
    statistics = ('{finished} tests finished:\n'
                  '\t{successful} successful\n'
                  '\t{unsuccessful} unsuccessful\n'
                  '\t{skipped} skipped\n'
                  '\t{expected_fail} expected failures\n'
                  '\t{unexpected_successful} unexpected successes\n'
                  'Interrupted Tests:\n'
                  '\t{interrupted}\n'.format(
                      finished=finished_count,
                      successful=len(successes),
                      unsuccessful=(len(failures) + len(errors)),
                      skipped=len(skips),
                      expected_fail=len(expected_failures),
                      unexpected_successful=len(unexpected_successes),
                      interrupted=str(running_names)))
    tracebacks = '\n\n'.join(
        [(_Colors.FAIL + '{test_name}' + _Colors.END + '\n' + _Colors.BOLD +
          'traceback:' + _Colors.END + '\n' + '{traceback}\n' + _Colors.BOLD +
          'stdout:' + _Colors.END + '\n' + '{stdout}\n' + _Colors.BOLD +
          'stderr:' + _Colors.END + '\n' + '{stderr}\n').format(
              test_name=result.name,
              traceback=_traceback_string(*result.traceback),
              stdout=result.stdout,
              stderr=result.stderr)
         for result in itertools.chain(failures, errors)])
    notes = 'Unexpected successes: {}\n'.format(
        [result.name for result in unexpected_successes])
    return statistics + '\nErrors/Failures: \n' + tracebacks + '\n' + notes


def jenkins_junit_xml(result):
    """An XML tree object that when written is recognizable by Jenkins.

  Args:
    result (AugmentedResult): The result object to get the junit xml output of.

  Returns:
    ElementTree.ElementTree: The XML tree.
  """
    assert isinstance(result, AugmentedResult)
    root = ElementTree.Element('testsuites')
    suite = ElementTree.SubElement(root, 'testsuite', {
        'name': 'Python gRPC tests',
    })
    for case in result.cases.values():
        if case.kind is CaseResult.Kind.SUCCESS:
            ElementTree.SubElement(suite, 'testcase', {
                'name': case.name,
            })
        elif case.kind in (CaseResult.Kind.ERROR, CaseResult.Kind.FAILURE):
            case_xml = ElementTree.SubElement(suite, 'testcase', {
                'name': case.name,
            })
            error_xml = ElementTree.SubElement(case_xml, 'error', {})
            error_xml.text = ''.format(case.stderr, case.traceback)
    return ElementTree.ElementTree(element=root)