// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/core/channel.h"

#include <errno.h>
#include <sys/socket.h>

#include <algorithm>
#include <limits>
#include <memory>

#include "base/bind.h"
#include "base/containers/queue.h"
#include "base/location.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop_current.h"
#include "base/message_loop/message_pump_for_io.h"
#include "base/synchronization/lock.h"
#include "base/task_runner.h"
#include "build/build_config.h"
#include "mojo/core/core.h"
#include "mojo/public/cpp/platform/socket_utils_posix.h"

#if !defined(OS_NACL)
#include <sys/uio.h>
#endif

#if defined(OS_MACOSX) && !defined(OS_IOS)
#include "mojo/core/mach_port_relay.h"
#endif

namespace mojo {
namespace core {

namespace {

const size_t kMaxBatchReadCapacity = 256 * 1024;

// A view over a Channel::Message object. The write queue uses these since
// large messages may need to be sent in chunks.
class MessageView {
 public:
  // Owns |message|. |offset| indexes the first unsent byte in the message.
  MessageView(Channel::MessagePtr message, size_t offset)
      : message_(std::move(message)),
        offset_(offset),
        handles_(message_->TakeHandlesForTransport()) {
    DCHECK_GT(message_->data_num_bytes(), offset_);
  }

  MessageView(MessageView&& other) { *this = std::move(other); }

  MessageView& operator=(MessageView&& other) {
    message_ = std::move(other.message_);
    offset_ = other.offset_;
    handles_ = std::move(other.handles_);
    return *this;
  }

  ~MessageView() {}

  const void* data() const {
    return static_cast<const char*>(message_->data()) + offset_;
  }

  size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }

  size_t data_offset() const { return offset_; }
  void advance_data_offset(size_t num_bytes) {
    DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
    offset_ += num_bytes;
  }

  std::vector<PlatformHandleInTransit> TakeHandles() {
    return std::move(handles_);
  }
  Channel::MessagePtr TakeMessage() { return std::move(message_); }

  void SetHandles(std::vector<PlatformHandleInTransit> handles) {
    handles_ = std::move(handles);
  }

 private:
  Channel::MessagePtr message_;
  size_t offset_;
  std::vector<PlatformHandleInTransit> handles_;

  DISALLOW_COPY_AND_ASSIGN(MessageView);
};

class ChannelPosix : public Channel,
#if defined(OS_MACOSX) && !defined(OS_IOS)
                     public MachPortRelay::Observer,
#endif
                     public base::MessageLoopCurrent::DestructionObserver,
                     public base::MessagePumpForIO::FdWatcher {
 public:
  ChannelPosix(Delegate* delegate,
               ConnectionParams connection_params,
               scoped_refptr<base::TaskRunner> io_task_runner)
      : Channel(delegate), self_(this), io_task_runner_(io_task_runner) {
    if (connection_params.server_endpoint().is_valid())
      server_ = connection_params.TakeServerEndpoint();
    else
      socket_ = connection_params.TakeEndpoint().TakePlatformHandle().TakeFD();

    CHECK(server_.is_valid() || socket_.is_valid());
  }

  void Start() override {
#if defined(OS_MACOSX) && !defined(OS_IOS)
    auto* relay = Core::Get()->GetMachPortRelay();
    if (relay) {
      // We should only have a relay if we know the remote process handle,
      // because that means we're in the broker process.
      relay->AddObserver(this);
    }
#endif

    if (io_task_runner_->RunsTasksInCurrentSequence()) {
      StartOnIOThread();
    } else {
      io_task_runner_->PostTask(
          FROM_HERE, base::BindOnce(&ChannelPosix::StartOnIOThread, this));
    }
  }

  void ShutDownImpl() override {
    // Always shut down asynchronously when called through the public interface.
    io_task_runner_->PostTask(
        FROM_HERE, base::BindOnce(&ChannelPosix::ShutDownOnIOThread, this));
  }

  void Write(MessagePtr message) override {
#if defined(OS_MACOSX) && !defined(OS_IOS)
    // If this message has Mach ports and we have a MachPortRelay, use the relay
    // to rewrite the ports as receive rights from which the send right can be
    // read. See |MachPortRelay::SendPortsToProcess()|.
    //
    // Note that if we don't have a relay, the receiving process must, and they
    // must also have the ability to extract a send right from the ports that
    // are already attached.
    MachPortRelay* relay = Core::Get()->GetMachPortRelay();
    if (relay && remote_process().is_valid() && message->has_mach_ports()) {
      if (relay->port_provider()->TaskForPid(remote_process().get()) ==
          MACH_PORT_NULL) {
        // We also need to have a task port for the remote process before we can
        // send it any other ports. If we don't have one yet, queue the message
        // until OnProcessReady() is invoked.
        base::AutoLock lock(task_port_wait_lock_);
        pending_outgoing_with_mach_ports_.emplace_back(std::move(message));
        return;
      }

      relay->SendPortsToProcess(message.get(), remote_process().get());
    }
#endif

    bool write_error = false;
    {
      base::AutoLock lock(write_lock_);
      if (reject_writes_)
        return;
      if (outgoing_messages_.empty()) {
        if (!WriteNoLock(MessageView(std::move(message), 0)))
          reject_writes_ = write_error = true;
      } else {
        outgoing_messages_.emplace_back(std::move(message), 0);
      }
    }
    if (write_error) {
      // Invoke OnWriteError() asynchronously on the IO thread, in case Write()
      // was called by the delegate, in which case we should not re-enter it.
      io_task_runner_->PostTask(
          FROM_HERE, base::BindOnce(&ChannelPosix::OnWriteError, this,
                                    Error::kDisconnected));
    }
  }

  void LeakHandle() override {
    DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    leak_handle_ = true;
  }

  bool GetReadPlatformHandles(const void* payload,
                              size_t payload_size,
                              size_t num_handles,
                              const void* extra_header,
                              size_t extra_header_size,
                              std::vector<PlatformHandle>* handles,
                              bool* deferred) override {
    if (num_handles > std::numeric_limits<uint16_t>::max())
      return false;
#if defined(OS_MACOSX) && !defined(OS_IOS)
    // On OSX, we can have mach ports which are located in the extra header
    // section.
    using MachPortsEntry = Channel::Message::MachPortsEntry;
    using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
    if (extra_header_size <
        sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry)) {
      return false;
    }
    const MachPortsExtraHeader* mach_ports_header =
        reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
    size_t num_mach_ports = mach_ports_header->num_ports;
    if (num_mach_ports > num_handles)
      return false;
    if (incoming_fds_.size() + num_mach_ports < num_handles)
      return true;

    std::vector<PlatformHandleInTransit> handles_in_transit(num_handles);
    const MachPortsEntry* mach_ports = mach_ports_header->entries;

    // If we know the remote process handle, we assume all incoming Mach ports
    // are send right references owned by the remote process. Otherwise they're
    // receive ports we can use to read a send right.
    const bool extract_send_rights = remote_process().is_valid();
    for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
      if (mach_port_index < num_mach_ports &&
          mach_ports[mach_port_index].index == i) {
        mach_port_t port_name =
            static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port);
        if (extract_send_rights) {
          handles_in_transit[i] =
              PlatformHandleInTransit::CreateForMachPortName(port_name);
        } else {
          handles_in_transit[i] = PlatformHandleInTransit(
              PlatformHandle(MachPortRelay::ReceiveSendRight(
                  base::mac::ScopedMachReceiveRight(port_name))));
        }
        mach_port_index++;
      } else {
        if (incoming_fds_.empty())
          return false;
        handles_in_transit[i] = PlatformHandleInTransit(
            PlatformHandle(std::move(incoming_fds_.front())));
        incoming_fds_.pop_front();
      }
    }
    if (extract_send_rights && num_mach_ports) {
      MachPortRelay* relay = Core::Get()->GetMachPortRelay();
      DCHECK(relay);
      // Extracting send rights requires that we have a task port for the
      // remote process, which we may not yet have.
      if (relay->port_provider()->TaskForPid(remote_process().get()) !=
          MACH_PORT_NULL) {
        // We do have a task port, so extract the send rights immediately.
        for (auto& handle : handles_in_transit) {
          if (handle.is_mach_port_name()) {
            handle = PlatformHandleInTransit(PlatformHandle(relay->ExtractPort(
                handle.mach_port_name(), remote_process().get())));
          }
        }
      } else {
        // No task port, we have to defer this message.
        *deferred = true;
        base::AutoLock lock(task_port_wait_lock_);
        std::vector<uint8_t> data(payload_size);
        memcpy(data.data(), payload, payload_size);
        pending_incoming_with_mach_ports_.emplace_back(
            std::move(data), std::move(handles_in_transit));
        return true;
      }
    }

    handles->resize(handles_in_transit.size());
    for (size_t i = 0; i < handles->size(); ++i)
      handles->at(i) = handles_in_transit[i].TakeHandle();
#else
    if (incoming_fds_.size() < num_handles)
      return true;

    handles->resize(num_handles);
    for (size_t i = 0; i < num_handles; ++i) {
      handles->at(i) = PlatformHandle(std::move(incoming_fds_.front()));
      incoming_fds_.pop_front();
    }
#endif

    return true;
  }

 private:
  ~ChannelPosix() override {
    DCHECK(!read_watcher_);
    DCHECK(!write_watcher_);
  }

  void StartOnIOThread() {
    DCHECK(!read_watcher_);
    DCHECK(!write_watcher_);
    read_watcher_.reset(
        new base::MessagePumpForIO::FdWatchController(FROM_HERE));
    base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
    if (server_.is_valid()) {
      base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
          server_.platform_handle().GetFD().get(), false /* persistent */,
          base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
    } else {
      write_watcher_.reset(
          new base::MessagePumpForIO::FdWatchController(FROM_HERE));
      base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
          socket_.get(), true /* persistent */,
          base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
      base::AutoLock lock(write_lock_);
      FlushOutgoingMessagesNoLock();
    }
  }

  void WaitForWriteOnIOThread() {
    base::AutoLock lock(write_lock_);
    WaitForWriteOnIOThreadNoLock();
  }

  void WaitForWriteOnIOThreadNoLock() {
    if (pending_write_)
      return;
    if (!write_watcher_)
      return;
    if (io_task_runner_->RunsTasksInCurrentSequence()) {
      pending_write_ = true;
      base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
          socket_.get(), false /* persistent */,
          base::MessagePumpForIO::WATCH_WRITE, write_watcher_.get(), this);
    } else {
      io_task_runner_->PostTask(
          FROM_HERE,
          base::BindOnce(&ChannelPosix::WaitForWriteOnIOThread, this));
    }
  }

  void ShutDownOnIOThread() {
    base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);

    read_watcher_.reset();
    write_watcher_.reset();
    if (leak_handle_) {
      ignore_result(socket_.release());
      server_.TakePlatformHandle().release();
    } else {
      socket_.reset();
      ignore_result(server_.TakePlatformHandle());
    }
#if defined(OS_MACOSX)
    fds_to_close_.clear();
#endif

#if defined(OS_MACOSX) && !defined(OS_IOS)
    auto* relay = Core::Get()->GetMachPortRelay();
    if (relay)
      relay->RemoveObserver(this);
#endif

    // May destroy the |this| if it was the last reference.
    self_ = nullptr;
  }

#if defined(OS_MACOSX) && !defined(OS_IOS)
  // MachPortRelay::Observer:
  void OnProcessReady(base::ProcessHandle process) override {
    if (process != remote_process().get())
      return;

    io_task_runner_->PostTask(
        FROM_HERE,
        base::BindOnce(
            &ChannelPosix::FlushPendingMessagesWithMachPortsOnIOThread, this));
  }

  void FlushPendingMessagesWithMachPortsOnIOThread() {
    // We have a task port for the remote process. Now we can send or accept
    // any pending messages with Mach ports.
    std::vector<RawIncomingMessage> incoming;
    std::vector<MessagePtr> outgoing;
    {
      base::AutoLock lock(task_port_wait_lock_);
      if (reject_incoming_messages_with_mach_ports_)
        return;
      std::swap(pending_incoming_with_mach_ports_, incoming);
      std::swap(pending_outgoing_with_mach_ports_, outgoing);
    }

    DCHECK(remote_process().is_valid());
    base::ProcessHandle process = remote_process().get();
    MachPortRelay* relay = Core::Get()->GetMachPortRelay();
    DCHECK(relay);
    for (auto& message : incoming) {
      Channel::Delegate* d = delegate();
      if (!d)
        break;
      std::vector<PlatformHandle> handles(message.handles.size());
      for (size_t i = 0; i < message.handles.size(); ++i) {
        if (message.handles[i].is_mach_port_name()) {
          handles[i] = PlatformHandle(
              relay->ExtractPort(message.handles[i].mach_port_name(), process));
        } else {
          DCHECK(!message.handles[i].owning_process().is_valid());
          handles[i] = message.handles[i].TakeHandle();
        }
      }
      d->OnChannelMessage(message.data.data(), message.data.size(),
                          std::move(handles));
    }

    for (auto& message : outgoing) {
      relay->SendPortsToProcess(message.get(), process);
      Write(std::move(message));
    }
  }
#endif

  // base::MessageLoopCurrent::DestructionObserver:
  void WillDestroyCurrentMessageLoop() override {
    DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    if (self_)
      ShutDownOnIOThread();
  }

  // base::MessagePumpForIO::FdWatcher:
  void OnFileCanReadWithoutBlocking(int fd) override {
    if (server_.is_valid()) {
      CHECK_EQ(fd, server_.platform_handle().GetFD().get());
#if !defined(OS_NACL)
      read_watcher_.reset();
      base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);

      AcceptSocketConnection(server_.platform_handle().GetFD().get(), &socket_);
      ignore_result(server_.TakePlatformHandle());
      if (!socket_.is_valid()) {
        OnError(Error::kConnectionFailed);
        return;
      }
      StartOnIOThread();
#else
      NOTREACHED();
#endif
      return;
    }
    CHECK_EQ(fd, socket_.get());

    bool validation_error = false;
    bool read_error = false;
    size_t next_read_size = 0;
    size_t buffer_capacity = 0;
    size_t total_bytes_read = 0;
    size_t bytes_read = 0;
    do {
      buffer_capacity = next_read_size;
      char* buffer = GetReadBuffer(&buffer_capacity);
      DCHECK_GT(buffer_capacity, 0u);

      std::vector<base::ScopedFD> incoming_fds;
      ssize_t read_result =
          SocketRecvmsg(socket_.get(), buffer, buffer_capacity, &incoming_fds);
      for (auto& fd : incoming_fds)
        incoming_fds_.emplace_back(std::move(fd));

      if (read_result > 0) {
        bytes_read = static_cast<size_t>(read_result);
        total_bytes_read += bytes_read;
        if (!OnReadComplete(bytes_read, &next_read_size)) {
          read_error = true;
          validation_error = true;
          break;
        }
      } else if (read_result == 0 ||
                 (errno != EAGAIN && errno != EWOULDBLOCK)) {
        read_error = true;
        break;
      }
    } while (bytes_read == buffer_capacity &&
             total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0);
    if (read_error) {
      // Stop receiving read notifications.
      read_watcher_.reset();
      if (validation_error)
        OnError(Error::kReceivedMalformedData);
      else
        OnError(Error::kDisconnected);
    }
  }

  void OnFileCanWriteWithoutBlocking(int fd) override {
    bool write_error = false;
    {
      base::AutoLock lock(write_lock_);
      pending_write_ = false;
      if (!FlushOutgoingMessagesNoLock())
        reject_writes_ = write_error = true;
    }
    if (write_error)
      OnWriteError(Error::kDisconnected);
  }

  // Attempts to write a message directly to the channel. If the full message
  // cannot be written, it's queued and a wait is initiated to write the message
  // ASAP on the I/O thread.
  bool WriteNoLock(MessageView message_view) {
    if (server_.is_valid()) {
      outgoing_messages_.emplace_front(std::move(message_view));
      return true;
    }
    size_t bytes_written = 0;
    do {
      message_view.advance_data_offset(bytes_written);

      ssize_t result;
      std::vector<PlatformHandleInTransit> handles = message_view.TakeHandles();
      if (!handles.empty()) {
        iovec iov = {const_cast<void*>(message_view.data()),
                     message_view.data_num_bytes()};
        std::vector<base::ScopedFD> fds(handles.size());
        for (size_t i = 0; i < handles.size(); ++i)
          fds[i] = handles[i].TakeHandle().TakeFD();
        // TODO: Handle lots of handles.
        result = SendmsgWithHandles(socket_.get(), &iov, 1, fds);
        if (result >= 0) {
#if defined(OS_MACOSX)
          // There is a bug on OSX which makes it dangerous to close
          // a file descriptor while it is in transit. So instead we
          // store the file descriptor in a set and send a message to
          // the recipient, which is queued AFTER the message that
          // sent the FD. The recipient will reply to the message,
          // letting us know that it is now safe to close the file
          // descriptor. For more information, see:
          // http://crbug.com/298276
          MessagePtr fds_message(
              new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
                                   Message::MessageType::HANDLES_SENT));
          memcpy(fds_message->mutable_payload(), fds.data(),
                 sizeof(fds[0]) * fds.size());
          outgoing_messages_.emplace_back(std::move(fds_message), 0);
          {
            base::AutoLock l(fds_to_close_lock_);
            for (auto& fd : fds)
              fds_to_close_.emplace_back(std::move(fd));
          }
#endif  // defined(OS_MACOSX)
        } else {
          // Message transmission failed, so pull the FDs back into |handles|
          // so they can be held by the Message again.
          for (size_t i = 0; i < fds.size(); ++i) {
            handles[i] =
                PlatformHandleInTransit(PlatformHandle(std::move(fds[i])));
          }
        }
      } else {
        result = SocketWrite(socket_.get(), message_view.data(),
                             message_view.data_num_bytes());
      }

      if (result < 0) {
        if (errno != EAGAIN &&
            errno != EWOULDBLOCK
#if defined(OS_MACOSX)
            // On OS X if sendmsg() is trying to send fds between processes and
            // there isn't enough room in the output buffer to send the fd
            // structure over atomically then EMSGSIZE is returned.
            //
            // EMSGSIZE presents a problem since the system APIs can only call
            // us when there's room in the socket buffer and not when there is
            // "enough" room.
            //
            // The current behavior is to return to the event loop when EMSGSIZE
            // is received and hopefull service another FD.  This is however
            // still technically a busy wait since the event loop will call us
            // right back until the receiver has read enough data to allow
            // passing the FD over atomically.
            && errno != EMSGSIZE
#endif
            ) {
          return false;
        }
        message_view.SetHandles(std::move(handles));
        outgoing_messages_.emplace_front(std::move(message_view));
        WaitForWriteOnIOThreadNoLock();
        return true;
      }

      bytes_written = static_cast<size_t>(result);
    } while (bytes_written < message_view.data_num_bytes());

    return FlushOutgoingMessagesNoLock();
  }

  bool FlushOutgoingMessagesNoLock() {
    base::circular_deque<MessageView> messages;
    std::swap(outgoing_messages_, messages);

    while (!messages.empty()) {
      if (!WriteNoLock(std::move(messages.front())))
        return false;

      messages.pop_front();
      if (!outgoing_messages_.empty()) {
        // The message was requeued by WriteNoLock(), so we have to wait for
        // pipe to become writable again. Repopulate the message queue and exit.
        // If sending the message triggered any control messages, they may be
        // in |outgoing_messages_| in addition to or instead of the message
        // being sent.
        std::swap(messages, outgoing_messages_);
        while (!messages.empty()) {
          outgoing_messages_.push_front(std::move(messages.back()));
          messages.pop_back();
        }
        return true;
      }
    }

    return true;
  }

#if defined(OS_MACOSX)
  bool OnControlMessage(Message::MessageType message_type,
                        const void* payload,
                        size_t payload_size,
                        std::vector<PlatformHandle> handles) override {
    switch (message_type) {
      case Message::MessageType::HANDLES_SENT: {
        if (payload_size == 0)
          break;
        MessagePtr message(new Channel::Message(
            payload_size, 0, Message::MessageType::HANDLES_SENT_ACK));
        memcpy(message->mutable_payload(), payload, payload_size);
        Write(std::move(message));
        return true;
      }

      case Message::MessageType::HANDLES_SENT_ACK: {
        size_t num_fds = payload_size / sizeof(int);
        if (num_fds == 0 || payload_size % sizeof(int) != 0)
          break;

        const int* fds = reinterpret_cast<const int*>(payload);
        if (!CloseHandles(fds, num_fds))
          break;
        return true;
      }

      default:
        break;
    }

    return false;
  }

  // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
  // |fds| does not match a sequence of handles in |fds_to_close_|.
  bool CloseHandles(const int* fds, size_t num_fds) {
    base::AutoLock l(fds_to_close_lock_);
    if (!num_fds)
      return false;

    auto start = std::find_if(
        fds_to_close_.begin(), fds_to_close_.end(),
        [&fds](const base::ScopedFD& fd) { return fd.get() == fds[0]; });
    if (start == fds_to_close_.end())
      return false;

    auto it = start;
    size_t i = 0;
    // The FDs in the message should match a sequence of handles in
    // |fds_to_close_|.
    // TODO(wez): Consider making |fds_to_close_| a circular_deque<>
    // for greater efficiency? Or assign a unique Id to each FD-containing
    // message, and map that to a vector of FDs to close, to avoid the
    // need for this traversal? Id could even be the first FD in the message.
    for (; i < num_fds && it != fds_to_close_.end(); i++, ++it) {
      if (it->get() != fds[i])
        return false;
    }
    if (i != num_fds)
      return false;

    // Close the FDs by erase()ing their ScopedFDs.
    fds_to_close_.erase(start, it);
    return true;
  }
#endif  // defined(OS_MACOSX)

  void OnWriteError(Error error) {
    DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    DCHECK(reject_writes_);

    if (error == Error::kDisconnected) {
      // If we can't write because the pipe is disconnected then continue
      // reading to fetch any in-flight messages, relying on end-of-stream to
      // signal the actual disconnection.
      if (read_watcher_) {
        write_watcher_.reset();
        return;
      }
    }

    OnError(error);
  }

  // Keeps the Channel alive at least until explicit shutdown on the IO thread.
  scoped_refptr<Channel> self_;

  // We may be initialized with a server socket, in which case this will be
  // valid until it accepts an incoming connection.
  PlatformChannelServerEndpoint server_;

  // The socket over which to communicate. May be passed in at construction time
  // or accepted over |server_|.
  base::ScopedFD socket_;

  scoped_refptr<base::TaskRunner> io_task_runner_;

  // These watchers must only be accessed on the IO thread.
  std::unique_ptr<base::MessagePumpForIO::FdWatchController> read_watcher_;
  std::unique_ptr<base::MessagePumpForIO::FdWatchController> write_watcher_;

  base::circular_deque<base::ScopedFD> incoming_fds_;

  // Protects |pending_write_| and |outgoing_messages_|.
  base::Lock write_lock_;
  bool pending_write_ = false;
  bool reject_writes_ = false;
  base::circular_deque<MessageView> outgoing_messages_;

  bool leak_handle_ = false;

#if defined(OS_MACOSX)
  base::Lock fds_to_close_lock_;
  std::vector<base::ScopedFD> fds_to_close_;
#if !defined(OS_IOS)
  // Guards access to the send/receive queues below. These are messages that
  // can't be fully accepted from or dispatched to the Channel user yet because
  // we're still waiting on a task port for the remote process.
  struct RawIncomingMessage {
    RawIncomingMessage(std::vector<uint8_t> data,
                       std::vector<PlatformHandleInTransit> handles)
        : data(std::move(data)), handles(std::move(handles)) {}
    RawIncomingMessage(RawIncomingMessage&&) = default;
    ~RawIncomingMessage() = default;

    std::vector<uint8_t> data;
    std::vector<PlatformHandleInTransit> handles;
  };

  base::Lock task_port_wait_lock_;
  bool reject_incoming_messages_with_mach_ports_ = false;
  std::vector<MessagePtr> pending_outgoing_with_mach_ports_;
  std::vector<RawIncomingMessage> pending_incoming_with_mach_ports_;
#endif  // !defined(OS_IOS)
#endif  // defined(OS_MACOSX)

  DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
};

}  // namespace

// static
scoped_refptr<Channel> Channel::Create(
    Delegate* delegate,
    ConnectionParams connection_params,
    scoped_refptr<base::TaskRunner> io_task_runner) {
  return new ChannelPosix(delegate, std::move(connection_params),
                          io_task_runner);
}

}  // namespace core
}  // namespace mojo