普通文本  |  697行  |  25.67 KB

# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Translates gRPC's client-side API into gRPC's client-side Beta API."""

import grpc
from grpc import _common
from grpc.beta import _metadata
from grpc.beta import interfaces
from grpc.framework.common import cardinality
from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face

# pylint: disable=too-many-arguments,too-many-locals,unused-argument

_STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = {
    grpc.StatusCode.CANCELLED: (face.Abortion.Kind.CANCELLED,
                                face.CancellationError),
    grpc.StatusCode.UNKNOWN: (face.Abortion.Kind.REMOTE_FAILURE,
                              face.RemoteError),
    grpc.StatusCode.DEADLINE_EXCEEDED: (face.Abortion.Kind.EXPIRED,
                                        face.ExpirationError),
    grpc.StatusCode.UNIMPLEMENTED: (face.Abortion.Kind.LOCAL_FAILURE,
                                    face.LocalError),
}


def _effective_metadata(metadata, metadata_transformer):
    non_none_metadata = () if metadata is None else metadata
    if metadata_transformer is None:
        return non_none_metadata
    else:
        return metadata_transformer(non_none_metadata)


def _credentials(grpc_call_options):
    return None if grpc_call_options is None else grpc_call_options.credentials


def _abortion(rpc_error_call):
    code = rpc_error_call.code()
    pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
    error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0]
    return face.Abortion(error_kind, rpc_error_call.initial_metadata(),
                         rpc_error_call.trailing_metadata(), code,
                         rpc_error_call.details())


def _abortion_error(rpc_error_call):
    code = rpc_error_call.code()
    pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
    exception_class = face.AbortionError if pair is None else pair[1]
    return exception_class(rpc_error_call.initial_metadata(),
                           rpc_error_call.trailing_metadata(), code,
                           rpc_error_call.details())


class _InvocationProtocolContext(interfaces.GRPCInvocationContext):

    def disable_next_request_compression(self):
        pass  # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.


class _Rendezvous(future.Future, face.Call):

    def __init__(self, response_future, response_iterator, call):
        self._future = response_future
        self._iterator = response_iterator
        self._call = call

    def cancel(self):
        return self._call.cancel()

    def cancelled(self):
        return self._future.cancelled()

    def running(self):
        return self._future.running()

    def done(self):
        return self._future.done()

    def result(self, timeout=None):
        try:
            return self._future.result(timeout=timeout)
        except grpc.RpcError as rpc_error_call:
            raise _abortion_error(rpc_error_call)
        except grpc.FutureTimeoutError:
            raise future.TimeoutError()
        except grpc.FutureCancelledError:
            raise future.CancelledError()

    def exception(self, timeout=None):
        try:
            rpc_error_call = self._future.exception(timeout=timeout)
            if rpc_error_call is None:
                return None
            else:
                return _abortion_error(rpc_error_call)
        except grpc.FutureTimeoutError:
            raise future.TimeoutError()
        except grpc.FutureCancelledError:
            raise future.CancelledError()

    def traceback(self, timeout=None):
        try:
            return self._future.traceback(timeout=timeout)
        except grpc.FutureTimeoutError:
            raise future.TimeoutError()
        except grpc.FutureCancelledError:
            raise future.CancelledError()

    def add_done_callback(self, fn):
        self._future.add_done_callback(lambda ignored_callback: fn(self))

    def __iter__(self):
        return self

    def _next(self):
        try:
            return next(self._iterator)
        except grpc.RpcError as rpc_error_call:
            raise _abortion_error(rpc_error_call)

    def __next__(self):
        return self._next()

    def next(self):
        return self._next()

    def is_active(self):
        return self._call.is_active()

    def time_remaining(self):
        return self._call.time_remaining()

    def add_abortion_callback(self, abortion_callback):

        def done_callback():
            if self.code() is not grpc.StatusCode.OK:
                abortion_callback(_abortion(self._call))

        registered = self._call.add_callback(done_callback)
        return None if registered else done_callback()

    def protocol_context(self):
        return _InvocationProtocolContext()

    def initial_metadata(self):
        return _metadata.beta(self._call.initial_metadata())

    def terminal_metadata(self):
        return _metadata.beta(self._call.terminal_metadata())

    def code(self):
        return self._call.code()

    def details(self):
        return self._call.details()


def _blocking_unary_unary(channel, group, method, timeout, with_call,
                          protocol_options, metadata, metadata_transformer,
                          request, request_serializer, response_deserializer):
    try:
        multi_callable = channel.unary_unary(
            _common.fully_qualified_method(group, method),
            request_serializer=request_serializer,
            response_deserializer=response_deserializer)
        effective_metadata = _effective_metadata(metadata, metadata_transformer)
        if with_call:
            response, call = multi_callable.with_call(
                request,
                timeout=timeout,
                metadata=_metadata.unbeta(effective_metadata),
                credentials=_credentials(protocol_options))
            return response, _Rendezvous(None, None, call)
        else:
            return multi_callable(
                request,
                timeout=timeout,
                metadata=_metadata.unbeta(effective_metadata),
                credentials=_credentials(protocol_options))
    except grpc.RpcError as rpc_error_call:
        raise _abortion_error(rpc_error_call)


def _future_unary_unary(channel, group, method, timeout, protocol_options,
                        metadata, metadata_transformer, request,
                        request_serializer, response_deserializer):
    multi_callable = channel.unary_unary(
        _common.fully_qualified_method(group, method),
        request_serializer=request_serializer,
        response_deserializer=response_deserializer)
    effective_metadata = _effective_metadata(metadata, metadata_transformer)
    response_future = multi_callable.future(
        request,
        timeout=timeout,
        metadata=_metadata.unbeta(effective_metadata),
        credentials=_credentials(protocol_options))
    return _Rendezvous(response_future, None, response_future)


def _unary_stream(channel, group, method, timeout, protocol_options, metadata,
                  metadata_transformer, request, request_serializer,
                  response_deserializer):
    multi_callable = channel.unary_stream(
        _common.fully_qualified_method(group, method),
        request_serializer=request_serializer,
        response_deserializer=response_deserializer)
    effective_metadata = _effective_metadata(metadata, metadata_transformer)
    response_iterator = multi_callable(
        request,
        timeout=timeout,
        metadata=_metadata.unbeta(effective_metadata),
        credentials=_credentials(protocol_options))
    return _Rendezvous(None, response_iterator, response_iterator)


def _blocking_stream_unary(channel, group, method, timeout, with_call,
                           protocol_options, metadata, metadata_transformer,
                           request_iterator, request_serializer,
                           response_deserializer):
    try:
        multi_callable = channel.stream_unary(
            _common.fully_qualified_method(group, method),
            request_serializer=request_serializer,
            response_deserializer=response_deserializer)
        effective_metadata = _effective_metadata(metadata, metadata_transformer)
        if with_call:
            response, call = multi_callable.with_call(
                request_iterator,
                timeout=timeout,
                metadata=_metadata.unbeta(effective_metadata),
                credentials=_credentials(protocol_options))
            return response, _Rendezvous(None, None, call)
        else:
            return multi_callable(
                request_iterator,
                timeout=timeout,
                metadata=_metadata.unbeta(effective_metadata),
                credentials=_credentials(protocol_options))
    except grpc.RpcError as rpc_error_call:
        raise _abortion_error(rpc_error_call)


def _future_stream_unary(channel, group, method, timeout, protocol_options,
                         metadata, metadata_transformer, request_iterator,
                         request_serializer, response_deserializer):
    multi_callable = channel.stream_unary(
        _common.fully_qualified_method(group, method),
        request_serializer=request_serializer,
        response_deserializer=response_deserializer)
    effective_metadata = _effective_metadata(metadata, metadata_transformer)
    response_future = multi_callable.future(
        request_iterator,
        timeout=timeout,
        metadata=_metadata.unbeta(effective_metadata),
        credentials=_credentials(protocol_options))
    return _Rendezvous(response_future, None, response_future)


def _stream_stream(channel, group, method, timeout, protocol_options, metadata,
                   metadata_transformer, request_iterator, request_serializer,
                   response_deserializer):
    multi_callable = channel.stream_stream(
        _common.fully_qualified_method(group, method),
        request_serializer=request_serializer,
        response_deserializer=response_deserializer)
    effective_metadata = _effective_metadata(metadata, metadata_transformer)
    response_iterator = multi_callable(
        request_iterator,
        timeout=timeout,
        metadata=_metadata.unbeta(effective_metadata),
        credentials=_credentials(protocol_options))
    return _Rendezvous(None, response_iterator, response_iterator)


class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):

    def __init__(self, channel, group, method, metadata_transformer,
                 request_serializer, response_deserializer):
        self._channel = channel
        self._group = group
        self._method = method
        self._metadata_transformer = metadata_transformer
        self._request_serializer = request_serializer
        self._response_deserializer = response_deserializer

    def __call__(self,
                 request,
                 timeout,
                 metadata=None,
                 with_call=False,
                 protocol_options=None):
        return _blocking_unary_unary(
            self._channel, self._group, self._method, timeout, with_call,
            protocol_options, metadata, self._metadata_transformer, request,
            self._request_serializer, self._response_deserializer)

    def future(self, request, timeout, metadata=None, protocol_options=None):
        return _future_unary_unary(
            self._channel, self._group, self._method, timeout, protocol_options,
            metadata, self._metadata_transformer, request,
            self._request_serializer, self._response_deserializer)

    def event(self,
              request,
              receiver,
              abortion_callback,
              timeout,
              metadata=None,
              protocol_options=None):
        raise NotImplementedError()


class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):

    def __init__(self, channel, group, method, metadata_transformer,
                 request_serializer, response_deserializer):
        self._channel = channel
        self._group = group
        self._method = method
        self._metadata_transformer = metadata_transformer
        self._request_serializer = request_serializer
        self._response_deserializer = response_deserializer

    def __call__(self, request, timeout, metadata=None, protocol_options=None):
        return _unary_stream(
            self._channel, self._group, self._method, timeout, protocol_options,
            metadata, self._metadata_transformer, request,
            self._request_serializer, self._response_deserializer)

    def event(self,
              request,
              receiver,
              abortion_callback,
              timeout,
              metadata=None,
              protocol_options=None):
        raise NotImplementedError()


class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):

    def __init__(self, channel, group, method, metadata_transformer,
                 request_serializer, response_deserializer):
        self._channel = channel
        self._group = group
        self._method = method
        self._metadata_transformer = metadata_transformer
        self._request_serializer = request_serializer
        self._response_deserializer = response_deserializer

    def __call__(self,
                 request_iterator,
                 timeout,
                 metadata=None,
                 with_call=False,
                 protocol_options=None):
        return _blocking_stream_unary(
            self._channel, self._group, self._method, timeout, with_call,
            protocol_options, metadata, self._metadata_transformer,
            request_iterator, self._request_serializer,
            self._response_deserializer)

    def future(self,
               request_iterator,
               timeout,
               metadata=None,
               protocol_options=None):
        return _future_stream_unary(
            self._channel, self._group, self._method, timeout, protocol_options,
            metadata, self._metadata_transformer, request_iterator,
            self._request_serializer, self._response_deserializer)

    def event(self,
              receiver,
              abortion_callback,
              timeout,
              metadata=None,
              protocol_options=None):
        raise NotImplementedError()


class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):

    def __init__(self, channel, group, method, metadata_transformer,
                 request_serializer, response_deserializer):
        self._channel = channel
        self._group = group
        self._method = method
        self._metadata_transformer = metadata_transformer
        self._request_serializer = request_serializer
        self._response_deserializer = response_deserializer

    def __call__(self,
                 request_iterator,
                 timeout,
                 metadata=None,
                 protocol_options=None):
        return _stream_stream(
            self._channel, self._group, self._method, timeout, protocol_options,
            metadata, self._metadata_transformer, request_iterator,
            self._request_serializer, self._response_deserializer)

    def event(self,
              receiver,
              abortion_callback,
              timeout,
              metadata=None,
              protocol_options=None):
        raise NotImplementedError()


class _GenericStub(face.GenericStub):

    def __init__(self, channel, metadata_transformer, request_serializers,
                 response_deserializers):
        self._channel = channel
        self._metadata_transformer = metadata_transformer
        self._request_serializers = request_serializers or {}
        self._response_deserializers = response_deserializers or {}

    def blocking_unary_unary(self,
                             group,
                             method,
                             request,
                             timeout,
                             metadata=None,
                             with_call=None,
                             protocol_options=None):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _blocking_unary_unary(self._channel, group, method, timeout,
                                     with_call, protocol_options, metadata,
                                     self._metadata_transformer, request,
                                     request_serializer, response_deserializer)

    def future_unary_unary(self,
                           group,
                           method,
                           request,
                           timeout,
                           metadata=None,
                           protocol_options=None):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _future_unary_unary(self._channel, group, method, timeout,
                                   protocol_options, metadata,
                                   self._metadata_transformer, request,
                                   request_serializer, response_deserializer)

    def inline_unary_stream(self,
                            group,
                            method,
                            request,
                            timeout,
                            metadata=None,
                            protocol_options=None):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _unary_stream(self._channel, group, method, timeout,
                             protocol_options, metadata,
                             self._metadata_transformer, request,
                             request_serializer, response_deserializer)

    def blocking_stream_unary(self,
                              group,
                              method,
                              request_iterator,
                              timeout,
                              metadata=None,
                              with_call=None,
                              protocol_options=None):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _blocking_stream_unary(
            self._channel, group, method, timeout, with_call, protocol_options,
            metadata, self._metadata_transformer, request_iterator,
            request_serializer, response_deserializer)

    def future_stream_unary(self,
                            group,
                            method,
                            request_iterator,
                            timeout,
                            metadata=None,
                            protocol_options=None):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _future_stream_unary(
            self._channel, group, method, timeout, protocol_options, metadata,
            self._metadata_transformer, request_iterator, request_serializer,
            response_deserializer)

    def inline_stream_stream(self,
                             group,
                             method,
                             request_iterator,
                             timeout,
                             metadata=None,
                             protocol_options=None):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _stream_stream(self._channel, group, method, timeout,
                              protocol_options, metadata,
                              self._metadata_transformer, request_iterator,
                              request_serializer, response_deserializer)

    def event_unary_unary(self,
                          group,
                          method,
                          request,
                          receiver,
                          abortion_callback,
                          timeout,
                          metadata=None,
                          protocol_options=None):
        raise NotImplementedError()

    def event_unary_stream(self,
                           group,
                           method,
                           request,
                           receiver,
                           abortion_callback,
                           timeout,
                           metadata=None,
                           protocol_options=None):
        raise NotImplementedError()

    def event_stream_unary(self,
                           group,
                           method,
                           receiver,
                           abortion_callback,
                           timeout,
                           metadata=None,
                           protocol_options=None):
        raise NotImplementedError()

    def event_stream_stream(self,
                            group,
                            method,
                            receiver,
                            abortion_callback,
                            timeout,
                            metadata=None,
                            protocol_options=None):
        raise NotImplementedError()

    def unary_unary(self, group, method):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _UnaryUnaryMultiCallable(
            self._channel, group, method, self._metadata_transformer,
            request_serializer, response_deserializer)

    def unary_stream(self, group, method):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _UnaryStreamMultiCallable(
            self._channel, group, method, self._metadata_transformer,
            request_serializer, response_deserializer)

    def stream_unary(self, group, method):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _StreamUnaryMultiCallable(
            self._channel, group, method, self._metadata_transformer,
            request_serializer, response_deserializer)

    def stream_stream(self, group, method):
        request_serializer = self._request_serializers.get((
            group,
            method,
        ))
        response_deserializer = self._response_deserializers.get((
            group,
            method,
        ))
        return _StreamStreamMultiCallable(
            self._channel, group, method, self._metadata_transformer,
            request_serializer, response_deserializer)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        return False


class _DynamicStub(face.DynamicStub):

    def __init__(self, backing_generic_stub, group, cardinalities):
        self._generic_stub = backing_generic_stub
        self._group = group
        self._cardinalities = cardinalities

    def __getattr__(self, attr):
        method_cardinality = self._cardinalities.get(attr)
        if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
            return self._generic_stub.unary_unary(self._group, attr)
        elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
            return self._generic_stub.unary_stream(self._group, attr)
        elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
            return self._generic_stub.stream_unary(self._group, attr)
        elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
            return self._generic_stub.stream_stream(self._group, attr)
        else:
            raise AttributeError(
                '_DynamicStub object has no attribute "%s"!' % attr)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        return False


def generic_stub(channel, host, metadata_transformer, request_serializers,
                 response_deserializers):
    return _GenericStub(channel, metadata_transformer, request_serializers,
                        response_deserializers)


def dynamic_stub(channel, service, cardinalities, host, metadata_transformer,
                 request_serializers, response_deserializers):
    return _DynamicStub(
        _GenericStub(channel, metadata_transformer, request_serializers,
                     response_deserializers), service, cardinalities)