// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "ipc/ipc_mojo_bootstrap.h"

#include <inttypes.h>
#include <stdint.h>

#include <map>
#include <memory>
#include <set>
#include <utility>
#include <vector>

#include "base/callback.h"
#include "base/containers/queue.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/no_destructor.h"
#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_checker.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/trace_event/memory_allocator_dump.h"
#include "base/trace_event/memory_dump_manager.h"
#include "base/trace_event/memory_dump_provider.h"
#include "ipc/ipc_channel.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/associated_group_controller.h"
#include "mojo/public/cpp/bindings/connector.h"
#include "mojo/public/cpp/bindings/interface_endpoint_client.h"
#include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
#include "mojo/public/cpp/bindings/interface_id.h"
#include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/message_header_validator.h"
#include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
#include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
#include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
#include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"

namespace IPC {

namespace {

class ChannelAssociatedGroupController;

// Used to track some internal Channel state in pursuit of message leaks.
//
// TODO(https://crbug.com/813045): Remove this.
class ControllerMemoryDumpProvider
    : public base::trace_event::MemoryDumpProvider {
 public:
  ControllerMemoryDumpProvider() {
    base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
        this, "IPCChannel", nullptr);
  }

  ~ControllerMemoryDumpProvider() override {
    base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
        this);
  }

  void AddController(ChannelAssociatedGroupController* controller) {
    base::AutoLock lock(lock_);
    controllers_.insert(controller);
  }

  void RemoveController(ChannelAssociatedGroupController* controller) {
    base::AutoLock lock(lock_);
    controllers_.erase(controller);
  }

  // base::trace_event::MemoryDumpProvider:
  bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
                    base::trace_event::ProcessMemoryDump* pmd) override;

 private:
  base::Lock lock_;
  std::set<ChannelAssociatedGroupController*> controllers_;

  DISALLOW_COPY_AND_ASSIGN(ControllerMemoryDumpProvider);
};

ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
  static base::NoDestructor<ControllerMemoryDumpProvider> provider;
  return *provider;
}

class ChannelAssociatedGroupController
    : public mojo::AssociatedGroupController,
      public mojo::MessageReceiver,
      public mojo::PipeControlMessageHandlerDelegate {
 public:
  ChannelAssociatedGroupController(
      bool set_interface_id_namespace_bit,
      const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
      const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
      : task_runner_(ipc_task_runner),
        proxy_task_runner_(proxy_task_runner),
        set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
        filters_(this),
        control_message_handler_(this),
        control_message_proxy_thunk_(this),
        control_message_proxy_(&control_message_proxy_thunk_) {
    thread_checker_.DetachFromThread();
    control_message_handler_.SetDescription(
        "IPC::mojom::Bootstrap [master] PipeControlMessageHandler");
    filters_.Append<mojo::MessageHeaderValidator>(
        "IPC::mojom::Bootstrap [master] MessageHeaderValidator");

    GetMemoryDumpProvider().AddController(this);
  }

  size_t GetQueuedMessageCount() {
    base::AutoLock lock(outgoing_messages_lock_);
    return outgoing_messages_.size();
  }

  void Bind(mojo::ScopedMessagePipeHandle handle) {
    DCHECK(thread_checker_.CalledOnValidThread());
    DCHECK(task_runner_->BelongsToCurrentThread());

    connector_.reset(new mojo::Connector(
        std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
        task_runner_));
    connector_->set_incoming_receiver(&filters_);
    connector_->set_connection_error_handler(
        base::Bind(&ChannelAssociatedGroupController::OnPipeError,
                   base::Unretained(this)));
    connector_->set_enforce_errors_from_incoming_receiver(false);
    connector_->SetWatcherHeapProfilerTag("IPC Channel");
  }

  void Pause() {
    DCHECK(!paused_);
    paused_ = true;
  }

  void Unpause() {
    DCHECK(paused_);
    paused_ = false;
  }

  void FlushOutgoingMessages() {
    std::vector<mojo::Message> outgoing_messages;
    {
      base::AutoLock lock(outgoing_messages_lock_);
      std::swap(outgoing_messages, outgoing_messages_);
    }
    for (auto& message : outgoing_messages)
      SendMessage(&message);
  }

  void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender,
                              mojom::ChannelAssociatedRequest* receiver) {
    mojo::InterfaceId sender_id, receiver_id;
    if (set_interface_id_namespace_bit_) {
      sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
      receiver_id = 1;
    } else {
      sender_id = 1;
      receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
    }

    {
      base::AutoLock locker(lock_);
      Endpoint* sender_endpoint = new Endpoint(this, sender_id);
      Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
      endpoints_.insert({ sender_id, sender_endpoint });
      endpoints_.insert({ receiver_id, receiver_endpoint });
      sender_endpoint->set_handle_created();
      receiver_endpoint->set_handle_created();
    }

    mojo::ScopedInterfaceEndpointHandle sender_handle =
        CreateScopedInterfaceEndpointHandle(sender_id);
    mojo::ScopedInterfaceEndpointHandle receiver_handle =
        CreateScopedInterfaceEndpointHandle(receiver_id);

    sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0));
    *receiver = mojom::ChannelAssociatedRequest(std::move(receiver_handle));
  }

  void ShutDown() {
    DCHECK(thread_checker_.CalledOnValidThread());
    shut_down_ = true;
    connector_->CloseMessagePipe();
    OnPipeError();
    connector_.reset();

    base::AutoLock lock(outgoing_messages_lock_);
    outgoing_messages_.clear();
  }

  // mojo::AssociatedGroupController:
  mojo::InterfaceId AssociateInterface(
      mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
    if (!handle_to_send.pending_association())
      return mojo::kInvalidInterfaceId;

    uint32_t id = 0;
    {
      base::AutoLock locker(lock_);
      do {
        if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
          next_interface_id_ = 2;
        id = next_interface_id_++;
        if (set_interface_id_namespace_bit_)
          id |= mojo::kInterfaceIdNamespaceMask;
      } while (ContainsKey(endpoints_, id));

      Endpoint* endpoint = new Endpoint(this, id);
      if (encountered_error_)
        endpoint->set_peer_closed();
      endpoint->set_handle_created();
      endpoints_.insert({id, endpoint});
    }

    if (!NotifyAssociation(&handle_to_send, id)) {
      // The peer handle of |handle_to_send|, which is supposed to join this
      // associated group, has been closed.
      {
        base::AutoLock locker(lock_);
        Endpoint* endpoint = FindEndpoint(id);
        if (endpoint)
          MarkClosedAndMaybeRemove(endpoint);
      }

      control_message_proxy_.NotifyPeerEndpointClosed(
          id, handle_to_send.disconnect_reason());
    }
    return id;
  }

  mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
      mojo::InterfaceId id) override {
    if (!mojo::IsValidInterfaceId(id))
      return mojo::ScopedInterfaceEndpointHandle();

    // Unless it is the master ID, |id| is from the remote side and therefore
    // its namespace bit is supposed to be different than the value that this
    // router would use.
    if (!mojo::IsMasterInterfaceId(id) &&
        set_interface_id_namespace_bit_ ==
            mojo::HasInterfaceIdNamespaceBitSet(id)) {
      return mojo::ScopedInterfaceEndpointHandle();
    }

    base::AutoLock locker(lock_);
    bool inserted = false;
    Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
    if (inserted) {
      DCHECK(!endpoint->handle_created());
      if (encountered_error_)
        endpoint->set_peer_closed();
    } else {
      if (endpoint->handle_created())
        return mojo::ScopedInterfaceEndpointHandle();
    }

    endpoint->set_handle_created();
    return CreateScopedInterfaceEndpointHandle(id);
  }

  void CloseEndpointHandle(
      mojo::InterfaceId id,
      const base::Optional<mojo::DisconnectReason>& reason) override {
    if (!mojo::IsValidInterfaceId(id))
      return;
    {
      base::AutoLock locker(lock_);
      DCHECK(ContainsKey(endpoints_, id));
      Endpoint* endpoint = endpoints_[id].get();
      DCHECK(!endpoint->client());
      DCHECK(!endpoint->closed());
      MarkClosedAndMaybeRemove(endpoint);
    }

    if (!mojo::IsMasterInterfaceId(id) || reason)
      control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
  }

  mojo::InterfaceEndpointController* AttachEndpointClient(
      const mojo::ScopedInterfaceEndpointHandle& handle,
      mojo::InterfaceEndpointClient* client,
      scoped_refptr<base::SequencedTaskRunner> runner) override {
    const mojo::InterfaceId id = handle.id();

    DCHECK(mojo::IsValidInterfaceId(id));
    DCHECK(client);

    base::AutoLock locker(lock_);
    DCHECK(ContainsKey(endpoints_, id));

    Endpoint* endpoint = endpoints_[id].get();
    endpoint->AttachClient(client, std::move(runner));

    if (endpoint->peer_closed())
      NotifyEndpointOfError(endpoint, true /* force_async */);

    return endpoint;
  }

  void DetachEndpointClient(
      const mojo::ScopedInterfaceEndpointHandle& handle) override {
    const mojo::InterfaceId id = handle.id();

    DCHECK(mojo::IsValidInterfaceId(id));

    base::AutoLock locker(lock_);
    DCHECK(ContainsKey(endpoints_, id));

    Endpoint* endpoint = endpoints_[id].get();
    endpoint->DetachClient();
  }

  void RaiseError() override {
    // We ignore errors on channel endpoints, leaving the pipe open. There are
    // good reasons for this:
    //
    //   * We should never close a channel endpoint in either process as long as
    //     the child process is still alive. The child's endpoint should only be
    //     closed implicitly by process death, and the browser's endpoint should
    //     only be closed after the child process is confirmed to be dead. Crash
    //     reporting logic in Chrome relies on this behavior in order to do the
    //     right thing.
    //
    //   * There are two interesting conditions under which RaiseError() can be
    //     implicitly reached: an incoming message fails validation, or the
    //     local endpoint drops a response callback without calling it.
    //
    //   * In the validation case, we also report the message as bad, and this
    //     will imminently trigger the common bad-IPC path in the browser,
    //     causing the browser to kill the offending renderer.
    //
    //   * In the dropped response callback case, the net result of ignoring the
    //     issue is generally innocuous. While indicative of programmer error,
    //     it's not a severe failure and is already covered by separate DCHECKs.
    //
    // See https://crbug.com/861607 for additional discussion.
  }

  bool PrefersSerializedMessages() override { return true; }

 private:
  class Endpoint;
  class ControlMessageProxyThunk;
  friend class Endpoint;
  friend class ControlMessageProxyThunk;

  // MessageWrapper objects are always destroyed under the controller's lock. On
  // destruction, if the message it wrappers contains
  // ScopedInterfaceEndpointHandles (which cannot be destructed under the
  // controller's lock), the wrapper unlocks to clean them up.
  class MessageWrapper {
   public:
    MessageWrapper() = default;

    MessageWrapper(ChannelAssociatedGroupController* controller,
                   mojo::Message message)
        : controller_(controller), value_(std::move(message)) {}

    MessageWrapper(MessageWrapper&& other)
        : controller_(other.controller_), value_(std::move(other.value_)) {}

    ~MessageWrapper() {
      if (value_.associated_endpoint_handles()->empty())
        return;

      controller_->lock_.AssertAcquired();
      {
        base::AutoUnlock unlocker(controller_->lock_);
        value_.mutable_associated_endpoint_handles()->clear();
      }
    }

    MessageWrapper& operator=(MessageWrapper&& other) {
      controller_ = other.controller_;
      value_ = std::move(other.value_);
      return *this;
    }

    mojo::Message& value() { return value_; }

   private:
    ChannelAssociatedGroupController* controller_ = nullptr;
    mojo::Message value_;

    DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
  };

  class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
                   public mojo::InterfaceEndpointController {
   public:
    Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
        : controller_(controller), id_(id) {}

    mojo::InterfaceId id() const { return id_; }

    bool closed() const {
      controller_->lock_.AssertAcquired();
      return closed_;
    }

    void set_closed() {
      controller_->lock_.AssertAcquired();
      closed_ = true;
    }

    bool peer_closed() const {
      controller_->lock_.AssertAcquired();
      return peer_closed_;
    }

    void set_peer_closed() {
      controller_->lock_.AssertAcquired();
      peer_closed_ = true;
    }

    bool handle_created() const {
      controller_->lock_.AssertAcquired();
      return handle_created_;
    }

    void set_handle_created() {
      controller_->lock_.AssertAcquired();
      handle_created_ = true;
    }

    const base::Optional<mojo::DisconnectReason>& disconnect_reason() const {
      return disconnect_reason_;
    }

    void set_disconnect_reason(
        const base::Optional<mojo::DisconnectReason>& disconnect_reason) {
      disconnect_reason_ = disconnect_reason;
    }

    base::SequencedTaskRunner* task_runner() const {
      return task_runner_.get();
    }

    mojo::InterfaceEndpointClient* client() const {
      controller_->lock_.AssertAcquired();
      return client_;
    }

    void AttachClient(mojo::InterfaceEndpointClient* client,
                      scoped_refptr<base::SequencedTaskRunner> runner) {
      controller_->lock_.AssertAcquired();
      DCHECK(!client_);
      DCHECK(!closed_);
      DCHECK(runner->RunsTasksInCurrentSequence());

      task_runner_ = std::move(runner);
      client_ = client;
    }

    void DetachClient() {
      controller_->lock_.AssertAcquired();
      DCHECK(client_);
      DCHECK(task_runner_->RunsTasksInCurrentSequence());
      DCHECK(!closed_);

      task_runner_ = nullptr;
      client_ = nullptr;
      sync_watcher_.reset();
    }

    uint32_t EnqueueSyncMessage(MessageWrapper message) {
      controller_->lock_.AssertAcquired();
      uint32_t id = GenerateSyncMessageId();
      sync_messages_.emplace(id, std::move(message));
      SignalSyncMessageEvent();
      return id;
    }

    void SignalSyncMessageEvent() {
      controller_->lock_.AssertAcquired();

      if (sync_watcher_)
        sync_watcher_->SignalEvent();
    }

    MessageWrapper PopSyncMessage(uint32_t id) {
      controller_->lock_.AssertAcquired();
      if (sync_messages_.empty() || sync_messages_.front().first != id)
        return MessageWrapper();
      MessageWrapper message = std::move(sync_messages_.front().second);
      sync_messages_.pop();
      return message;
    }

    // mojo::InterfaceEndpointController:
    bool SendMessage(mojo::Message* message) override {
      DCHECK(task_runner_->RunsTasksInCurrentSequence());
      message->set_interface_id(id_);
      return controller_->SendMessage(message);
    }

    void AllowWokenUpBySyncWatchOnSameThread() override {
      DCHECK(task_runner_->RunsTasksInCurrentSequence());

      EnsureSyncWatcherExists();
      sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
    }

    bool SyncWatch(const bool* should_stop) override {
      DCHECK(task_runner_->RunsTasksInCurrentSequence());

      // It's not legal to make sync calls from the master endpoint's thread,
      // and in fact they must only happen from the proxy task runner.
      DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
      DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());

      EnsureSyncWatcherExists();
      return sync_watcher_->SyncWatch(should_stop);
    }

   private:
    friend class base::RefCountedThreadSafe<Endpoint>;

    ~Endpoint() override {
      controller_->lock_.AssertAcquired();
      DCHECK(!client_);
      DCHECK(closed_);
      DCHECK(peer_closed_);
      DCHECK(!sync_watcher_);
    }

    void OnSyncMessageEventReady() {
      DCHECK(task_runner_->RunsTasksInCurrentSequence());

      scoped_refptr<Endpoint> keepalive(this);
      scoped_refptr<AssociatedGroupController> controller_keepalive(
          controller_);
      base::AutoLock locker(controller_->lock_);
      bool more_to_process = false;
      if (!sync_messages_.empty()) {
        MessageWrapper message_wrapper =
            std::move(sync_messages_.front().second);
        sync_messages_.pop();

        bool dispatch_succeeded;
        mojo::InterfaceEndpointClient* client = client_;
        {
          base::AutoUnlock unlocker(controller_->lock_);
          dispatch_succeeded =
              client->HandleIncomingMessage(&message_wrapper.value());
        }

        if (!sync_messages_.empty())
          more_to_process = true;

        if (!dispatch_succeeded)
          controller_->RaiseError();
      }

      if (!more_to_process)
        sync_watcher_->ResetEvent();

      // If there are no queued sync messages and the peer has closed, there
      // there won't be incoming sync messages in the future. If any
      // SyncWatch() calls are on the stack for this endpoint, resetting the
      // watcher will allow them to exit as the stack undwinds.
      if (!more_to_process && peer_closed_)
        sync_watcher_.reset();
    }

    void EnsureSyncWatcherExists() {
      DCHECK(task_runner_->RunsTasksInCurrentSequence());
      if (sync_watcher_)
        return;

      base::AutoLock locker(controller_->lock_);
      sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
          base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
                              base::Unretained(this)));
      if (peer_closed_ || !sync_messages_.empty())
        SignalSyncMessageEvent();
    }

    uint32_t GenerateSyncMessageId() {
      // Overflow is fine.
      uint32_t id = next_sync_message_id_++;
      DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
      return id;
    }

    ChannelAssociatedGroupController* const controller_;
    const mojo::InterfaceId id_;

    bool closed_ = false;
    bool peer_closed_ = false;
    bool handle_created_ = false;
    base::Optional<mojo::DisconnectReason> disconnect_reason_;
    mojo::InterfaceEndpointClient* client_ = nullptr;
    scoped_refptr<base::SequencedTaskRunner> task_runner_;
    std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
    base::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_;
    uint32_t next_sync_message_id_ = 0;

    DISALLOW_COPY_AND_ASSIGN(Endpoint);
  };

  class ControlMessageProxyThunk : public MessageReceiver {
   public:
    explicit ControlMessageProxyThunk(
        ChannelAssociatedGroupController* controller)
        : controller_(controller) {}

   private:
    // MessageReceiver:
    bool Accept(mojo::Message* message) override {
      return controller_->SendMessage(message);
    }

    ChannelAssociatedGroupController* controller_;

    DISALLOW_COPY_AND_ASSIGN(ControlMessageProxyThunk);
  };

  ~ChannelAssociatedGroupController() override {
    DCHECK(!connector_);

    base::AutoLock locker(lock_);
    for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
      Endpoint* endpoint = iter->second.get();
      ++iter;

      if (!endpoint->closed()) {
        // This happens when a NotifyPeerEndpointClosed message been received,
        // but the interface ID hasn't been used to create local endpoint
        // handle.
        DCHECK(!endpoint->client());
        DCHECK(endpoint->peer_closed());
        MarkClosedAndMaybeRemove(endpoint);
      } else {
        MarkPeerClosedAndMaybeRemove(endpoint);
      }
    }

    DCHECK(endpoints_.empty());

    GetMemoryDumpProvider().RemoveController(this);
  }

  bool SendMessage(mojo::Message* message) {
    if (task_runner_->BelongsToCurrentThread()) {
      DCHECK(thread_checker_.CalledOnValidThread());
      if (!connector_ || paused_) {
        if (!shut_down_) {
          base::AutoLock lock(outgoing_messages_lock_);
          outgoing_messages_.emplace_back(std::move(*message));
        }
        return true;
      }
      return connector_->Accept(message);
    } else {
      // Do a message size check here so we don't lose valuable stack
      // information to the task scheduler.
      CHECK_LE(message->data_num_bytes(), Channel::kMaximumMessageSize);

      // We always post tasks to the master endpoint thread when called from
      // other threads in order to simulate IPC::ChannelProxy::Send behavior.
      task_runner_->PostTask(
          FROM_HERE,
          base::Bind(
              &ChannelAssociatedGroupController::SendMessageOnMasterThread,
              this, base::Passed(message)));
      return true;
    }
  }

  void SendMessageOnMasterThread(mojo::Message message) {
    DCHECK(thread_checker_.CalledOnValidThread());
    if (!SendMessage(&message))
      RaiseError();
  }

  void OnPipeError() {
    DCHECK(thread_checker_.CalledOnValidThread());

    // We keep |this| alive here because it's possible for the notifications
    // below to release all other references.
    scoped_refptr<ChannelAssociatedGroupController> keepalive(this);

    base::AutoLock locker(lock_);
    encountered_error_ = true;

    std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
    for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
      Endpoint* endpoint = iter->second.get();
      ++iter;

      if (endpoint->client())
        endpoints_to_notify.push_back(endpoint);

      MarkPeerClosedAndMaybeRemove(endpoint);
    }

    for (auto& endpoint : endpoints_to_notify) {
      // Because a notification may in turn detach any endpoint, we have to
      // check each client again here.
      if (endpoint->client())
        NotifyEndpointOfError(endpoint.get(), false /* force_async */);
    }
  }

  void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
    lock_.AssertAcquired();
    DCHECK(endpoint->task_runner() && endpoint->client());
    if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
      mojo::InterfaceEndpointClient* client = endpoint->client();
      base::Optional<mojo::DisconnectReason> reason(
          endpoint->disconnect_reason());

      base::AutoUnlock unlocker(lock_);
      client->NotifyError(reason);
    } else {
      endpoint->task_runner()->PostTask(
          FROM_HERE,
          base::Bind(&ChannelAssociatedGroupController::
                         NotifyEndpointOfErrorOnEndpointThread,
                     this, endpoint->id(), base::Unretained(endpoint)));
    }
  }

  void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
                                             Endpoint* endpoint) {
    base::AutoLock locker(lock_);
    auto iter = endpoints_.find(id);
    if (iter == endpoints_.end() || iter->second.get() != endpoint)
      return;
    if (!endpoint->client())
      return;

    DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
    NotifyEndpointOfError(endpoint, false /* force_async */);
  }

  void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
    lock_.AssertAcquired();
    endpoint->set_closed();
    if (endpoint->closed() && endpoint->peer_closed())
      endpoints_.erase(endpoint->id());
  }

  void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
    lock_.AssertAcquired();
    endpoint->set_peer_closed();
    endpoint->SignalSyncMessageEvent();
    if (endpoint->closed() && endpoint->peer_closed())
      endpoints_.erase(endpoint->id());
  }

  Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
    lock_.AssertAcquired();
    DCHECK(!inserted || !*inserted);

    Endpoint* endpoint = FindEndpoint(id);
    if (!endpoint) {
      endpoint = new Endpoint(this, id);
      endpoints_.insert({id, endpoint});
      if (inserted)
        *inserted = true;
    }
    return endpoint;
  }

  Endpoint* FindEndpoint(mojo::InterfaceId id) {
    lock_.AssertAcquired();
    auto iter = endpoints_.find(id);
    return iter != endpoints_.end() ? iter->second.get() : nullptr;
  }

  // mojo::MessageReceiver:
  bool Accept(mojo::Message* message) override {
    DCHECK(thread_checker_.CalledOnValidThread());

    if (!message->DeserializeAssociatedEndpointHandles(this))
      return false;

    if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
      return control_message_handler_.Accept(message);

    mojo::InterfaceId id = message->interface_id();
    DCHECK(mojo::IsValidInterfaceId(id));

    base::AutoLock locker(lock_);
    Endpoint* endpoint = FindEndpoint(id);
    if (!endpoint)
      return true;

    mojo::InterfaceEndpointClient* client = endpoint->client();
    if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
      // No client has been bound yet or the client runs tasks on another
      // thread. We assume the other thread must always be the one on which
      // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
      //
      // If the client is not yet bound, it must be bound by the time this task
      // runs or else it's programmer error.
      DCHECK(proxy_task_runner_);

      if (message->has_flag(mojo::Message::kFlagIsSync)) {
        MessageWrapper message_wrapper(this, std::move(*message));
        // Sync messages may need to be handled by the endpoint if it's blocking
        // on a sync reply. We pass ownership of the message to the endpoint's
        // sync message queue. If the endpoint was blocking, it will dequeue the
        // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
        // call will dequeue the message and dispatch it.
        uint32_t message_id =
            endpoint->EnqueueSyncMessage(std::move(message_wrapper));
        proxy_task_runner_->PostTask(
            FROM_HERE,
            base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
                       this, id, message_id));
        return true;
      }

      proxy_task_runner_->PostTask(
          FROM_HERE,
          base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
                     this, base::Passed(message)));
      return true;
    }

    // We do not expect to receive sync responses on the master endpoint thread.
    // If it's happening, it's a bug.
    DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
           !message->has_flag(mojo::Message::kFlagIsResponse));

    base::AutoUnlock unlocker(lock_);
    return client->HandleIncomingMessage(message);
  }

  void AcceptOnProxyThread(mojo::Message message) {
    DCHECK(proxy_task_runner_->BelongsToCurrentThread());

    mojo::InterfaceId id = message.interface_id();
    DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));

    base::AutoLock locker(lock_);
    Endpoint* endpoint = FindEndpoint(id);
    if (!endpoint)
      return;

    mojo::InterfaceEndpointClient* client = endpoint->client();
    if (!client)
      return;

    DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());

    // Sync messages should never make their way to this method.
    DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));

    bool result = false;
    {
      base::AutoUnlock unlocker(lock_);
      result = client->HandleIncomingMessage(&message);
    }

    if (!result)
      RaiseError();
  }

  void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
    DCHECK(proxy_task_runner_->BelongsToCurrentThread());

    base::AutoLock locker(lock_);
    Endpoint* endpoint = FindEndpoint(interface_id);
    if (!endpoint)
      return;

    // Careful, if the endpoint is detached its members are cleared. Check for
    // that before dereferencing.
    mojo::InterfaceEndpointClient* client = endpoint->client();
    if (!client)
      return;

    DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
    MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);

    // The message must have already been dequeued by the endpoint waking up
    // from a sync wait. Nothing to do.
    if (message_wrapper.value().IsNull())
      return;

    bool result = false;
    {
      base::AutoUnlock unlocker(lock_);
      result = client->HandleIncomingMessage(&message_wrapper.value());
    }

    if (!result)
      RaiseError();
  }

  // mojo::PipeControlMessageHandlerDelegate:
  bool OnPeerAssociatedEndpointClosed(
      mojo::InterfaceId id,
      const base::Optional<mojo::DisconnectReason>& reason) override {
    DCHECK(thread_checker_.CalledOnValidThread());

    scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
    base::AutoLock locker(lock_);
    scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
    if (reason)
      endpoint->set_disconnect_reason(reason);
    if (!endpoint->peer_closed()) {
      if (endpoint->client())
        NotifyEndpointOfError(endpoint.get(), false /* force_async */);
      MarkPeerClosedAndMaybeRemove(endpoint.get());
    }

    return true;
  }

  // Checked in places which must be run on the master endpoint's thread.
  base::ThreadChecker thread_checker_;

  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;

  scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
  const bool set_interface_id_namespace_bit_;
  bool paused_ = false;
  std::unique_ptr<mojo::Connector> connector_;
  mojo::FilterChain filters_;
  mojo::PipeControlMessageHandler control_message_handler_;
  ControlMessageProxyThunk control_message_proxy_thunk_;

  // NOTE: It is unsafe to call into this object while holding |lock_|.
  mojo::PipeControlMessageProxy control_message_proxy_;

  // Guards access to |outgoing_messages_| only. Used to support memory dumps
  // which may be triggered from any thread.
  base::Lock outgoing_messages_lock_;

  // Outgoing messages that were sent before this controller was bound to a
  // real message pipe.
  std::vector<mojo::Message> outgoing_messages_;

  // Guards the fields below for thread-safe access.
  base::Lock lock_;

  bool encountered_error_ = false;
  bool shut_down_ = false;

  // ID #1 is reserved for the mojom::Channel interface.
  uint32_t next_interface_id_ = 2;

  std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_;

  DISALLOW_COPY_AND_ASSIGN(ChannelAssociatedGroupController);
};

bool ControllerMemoryDumpProvider::OnMemoryDump(
    const base::trace_event::MemoryDumpArgs& args,
    base::trace_event::ProcessMemoryDump* pmd) {
  base::AutoLock lock(lock_);
  for (auto* controller : controllers_) {
    base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
        base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
                           reinterpret_cast<uintptr_t>(controller)));
    dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
                    base::trace_event::MemoryAllocatorDump::kUnitsObjects,
                    controller->GetQueuedMessageCount());
  }

  return true;
}

class MojoBootstrapImpl : public MojoBootstrap {
 public:
  MojoBootstrapImpl(
      mojo::ScopedMessagePipeHandle handle,
      const scoped_refptr<ChannelAssociatedGroupController> controller)
      : controller_(controller),
        associated_group_(controller),
        handle_(std::move(handle)) {}

  ~MojoBootstrapImpl() override {
    controller_->ShutDown();
  }

 private:
  void Connect(mojom::ChannelAssociatedPtr* sender,
               mojom::ChannelAssociatedRequest* receiver) override {
    controller_->Bind(std::move(handle_));
    controller_->CreateChannelEndpoints(sender, receiver);
  }

  void Pause() override {
    controller_->Pause();
  }

  void Unpause() override {
    controller_->Unpause();
  }

  void Flush() override {
    controller_->FlushOutgoingMessages();
  }

  mojo::AssociatedGroup* GetAssociatedGroup() override {
    return &associated_group_;
  }

  scoped_refptr<ChannelAssociatedGroupController> controller_;
  mojo::AssociatedGroup associated_group_;

  mojo::ScopedMessagePipeHandle handle_;

  DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl);
};

}  // namespace

// static
std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
    mojo::ScopedMessagePipeHandle handle,
    Channel::Mode mode,
    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
  return std::make_unique<MojoBootstrapImpl>(
      std::move(handle),
      new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
                                           ipc_task_runner, proxy_task_runner));
}

}  // namespace IPC