普通文本  |  282行  |  9.22 KB

// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/system/message_pipe_dispatcher.h"

#include "base/logging.h"
#include "mojo/system/channel.h"
#include "mojo/system/constants.h"
#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/memory.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/options_validation.h"
#include "mojo/system/proxy_message_pipe_endpoint.h"

namespace mojo {
namespace system {

namespace {

const unsigned kInvalidPort = static_cast<unsigned>(-1);

struct SerializedMessagePipeDispatcher {
  MessageInTransit::EndpointId endpoint_id;
};

}  // namespace

// MessagePipeDispatcher -------------------------------------------------------

// static
const MojoCreateMessagePipeOptions
    MessagePipeDispatcher::kDefaultCreateOptions = {
  static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
  MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE
};

MessagePipeDispatcher::MessagePipeDispatcher(
    const MojoCreateMessagePipeOptions& /*validated_options*/)
    : port_(kInvalidPort) {
}

// static
MojoResult MessagePipeDispatcher::ValidateCreateOptions(
    const MojoCreateMessagePipeOptions* in_options,
    MojoCreateMessagePipeOptions* out_options) {
  const MojoCreateMessagePipeOptionsFlags kKnownFlags =
      MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;

  *out_options = kDefaultCreateOptions;
  if (!in_options)
    return MOJO_RESULT_OK;

  MojoResult result =
      ValidateOptionsStructPointerSizeAndFlags<MojoCreateMessagePipeOptions>(
          in_options, kKnownFlags, out_options);
  if (result != MOJO_RESULT_OK)
    return result;

  // Checks for fields beyond |flags|:

  // (Nothing here yet.)

  return MOJO_RESULT_OK;
}

void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
                                 unsigned port) {
  DCHECK(message_pipe);
  DCHECK(port == 0 || port == 1);

  message_pipe_ = message_pipe;
  port_ = port;
}

Dispatcher::Type MessagePipeDispatcher::GetType() const {
  return kTypeMessagePipe;
}

// static
std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
MessagePipeDispatcher::CreateRemoteMessagePipe() {
  scoped_refptr<MessagePipe> message_pipe(
      new MessagePipe(
          scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
          scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
  scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher(
      MessagePipeDispatcher::kDefaultCreateOptions));
  dispatcher->Init(message_pipe, 0);

  return std::make_pair(dispatcher, message_pipe);
}

// static
scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
    Channel* channel,
    const void* source,
    size_t size) {
  if (size != sizeof(SerializedMessagePipeDispatcher)) {
    LOG(ERROR) << "Invalid serialized message pipe dispatcher";
    return scoped_refptr<MessagePipeDispatcher>();
  }

  std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
      remote_message_pipe = CreateRemoteMessagePipe();

  MessageInTransit::EndpointId remote_id =
      static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
  if (remote_id == MessageInTransit::kInvalidEndpointId) {
    // This means that the other end was closed, and there were no messages
    // enqueued for us.
    // TODO(vtl): This is wrong. We should produce a "dead" message pipe
    // dispatcher.
    NOTIMPLEMENTED();
    return scoped_refptr<MessagePipeDispatcher>();
  }
  MessageInTransit::EndpointId local_id =
      channel->AttachMessagePipeEndpoint(remote_message_pipe.second, 1);
  if (local_id == MessageInTransit::kInvalidEndpointId) {
    LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
                  "attach; remote ID = " << remote_id << ")";
    return scoped_refptr<MessagePipeDispatcher>();
  }
  DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = "
           << remote_id << ", new local ID = " << local_id << ")";

  if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) {
    // In general, this shouldn't fail, since we generated |local_id| locally.
    NOTREACHED();
    return scoped_refptr<MessagePipeDispatcher>();
  }

  // TODO(vtl): FIXME -- Need some error handling here.
  channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
  return remote_message_pipe.first;
}

MessagePipeDispatcher::~MessagePipeDispatcher() {
  // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
  DCHECK(!message_pipe_);
}

MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
  lock().AssertAcquired();
  return message_pipe_.get();
}

unsigned MessagePipeDispatcher::GetPortNoLock() const {
  lock().AssertAcquired();
  return port_;
}

void MessagePipeDispatcher::CancelAllWaitersNoLock() {
  lock().AssertAcquired();
  message_pipe_->CancelAllWaiters(port_);
}

void MessagePipeDispatcher::CloseImplNoLock() {
  lock().AssertAcquired();
  message_pipe_->Close(port_);
  message_pipe_ = NULL;
  port_ = kInvalidPort;
}

scoped_refptr<Dispatcher>
MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
  lock().AssertAcquired();

  // TODO(vtl): Currently, there are no options, so we just use
  // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
  // too.
  scoped_refptr<MessagePipeDispatcher> rv =
      new MessagePipeDispatcher(kDefaultCreateOptions);
  rv->Init(message_pipe_, port_);
  message_pipe_ = NULL;
  port_ = kInvalidPort;
  return scoped_refptr<Dispatcher>(rv.get());
}

MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
    const void* bytes,
    uint32_t num_bytes,
    std::vector<DispatcherTransport>* transports,
    MojoWriteMessageFlags flags) {
  DCHECK(!transports || (transports->size() > 0 &&
                         transports->size() <= kMaxMessageNumHandles));

  lock().AssertAcquired();

  if (!VerifyUserPointerWithSize<1>(bytes, num_bytes))
    return MOJO_RESULT_INVALID_ARGUMENT;
  if (num_bytes > kMaxMessageNumBytes)
    return MOJO_RESULT_RESOURCE_EXHAUSTED;

  return message_pipe_->WriteMessage(port_, bytes, num_bytes, transports,
                                     flags);
}

MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
    void* bytes,
    uint32_t* num_bytes,
    DispatcherVector* dispatchers,
    uint32_t* num_dispatchers,
    MojoReadMessageFlags flags) {
  lock().AssertAcquired();

  if (num_bytes) {
    if (!VerifyUserPointer<uint32_t>(num_bytes))
      return MOJO_RESULT_INVALID_ARGUMENT;
    if (!VerifyUserPointerWithSize<1>(bytes, *num_bytes))
      return MOJO_RESULT_INVALID_ARGUMENT;
  }

  return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers,
                                    num_dispatchers, flags);
}

MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(Waiter* waiter,
                                                      MojoHandleSignals signals,
                                                      uint32_t context) {
  lock().AssertAcquired();
  return message_pipe_->AddWaiter(port_, waiter, signals, context);
}

void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
  lock().AssertAcquired();
  message_pipe_->RemoveWaiter(port_, waiter);
}

void MessagePipeDispatcher::StartSerializeImplNoLock(
    Channel* /*channel*/,
    size_t* max_size,
    size_t* max_platform_handles) {
  DCHECK(HasOneRef());  // Only one ref => no need to take the lock.
  *max_size = sizeof(SerializedMessagePipeDispatcher);
  *max_platform_handles = 0;
}

bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
    Channel* channel,
    void* destination,
    size_t* actual_size,
    embedder::PlatformHandleVector* /*platform_handles*/) {
  DCHECK(HasOneRef());  // Only one ref => no need to take the lock.

  // Convert the local endpoint to a proxy endpoint (moving the message queue).
  message_pipe_->ConvertLocalToProxy(port_);

  // Attach the new proxy endpoint to the channel.
  MessageInTransit::EndpointId endpoint_id =
      channel->AttachMessagePipeEndpoint(message_pipe_, port_);
  // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
  // possible that the other endpoint -- the one that we're not sending -- was
  // closed in the intervening time.) In that case, we need to deserialize a
  // "dead" message pipe dispatcher on the other end. (Note that this is
  // different from just producing |MOJO_HANDLE_INVALID|.)
  DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
           << ")";

  // We now have a local ID. Before we can run the proxy endpoint, we need to
  // get an ack back from the other side with the remote ID.
  static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
      endpoint_id;

  message_pipe_ = NULL;
  port_ = kInvalidPort;

  *actual_size = sizeof(SerializedMessagePipeDispatcher);
  return true;
}

// MessagePipeDispatcherTransport ----------------------------------------------

MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
    DispatcherTransport transport) : DispatcherTransport(transport) {
  DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
}

}  // namespace system
}  // namespace mojo