// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/edk/system/ports/node.h"

#include <string.h>

#include <utility>

#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "mojo/edk/system/ports/node_delegate.h"

namespace mojo {
namespace edk {
namespace ports {

namespace {

int DebugError(const char* message, int error_code) {
  CHECK(false) << "Oops: " << message;
  return error_code;
}

#define OOPS(x) DebugError(#x, x)

bool CanAcceptMoreMessages(const Port* port) {
  // Have we already doled out the last message (i.e., do we expect to NOT
  // receive further messages)?
  uint64_t next_sequence_num = port->message_queue.next_sequence_num();
  if (port->state == Port::kClosed)
    return false;
  if (port->peer_closed || port->remove_proxy_on_last_message) {
    if (port->last_sequence_num_to_receive == next_sequence_num - 1)
      return false;
  }
  return true;
}

}  // namespace

class Node::LockedPort {
 public:
  explicit LockedPort(Port* port) : port_(port) {
    port_->lock.AssertAcquired();
  }

  Port* get() const { return port_; }
  Port* operator->() const { return port_; }

 private:
  Port* const port_;
};

Node::Node(const NodeName& name, NodeDelegate* delegate)
    : name_(name),
      delegate_(delegate) {
}

Node::~Node() {
  if (!ports_.empty())
    DLOG(WARNING) << "Unclean shutdown for node " << name_;
}

bool Node::CanShutdownCleanly(bool allow_local_ports) {
  base::AutoLock ports_lock(ports_lock_);

  if (!allow_local_ports) {
#if DCHECK_IS_ON()
    for (auto entry : ports_) {
      DVLOG(2) << "Port " << entry.first << " referencing node "
               << entry.second->peer_node_name << " is blocking shutdown of "
               << "node " << name_ << " (state=" << entry.second->state << ")";
    }
#endif
    return ports_.empty();
  }

  // NOTE: This is not efficient, though it probably doesn't need to be since
  // relatively few ports should be open during shutdown and shutdown doesn't
  // need to be blazingly fast.
  bool can_shutdown = true;
  for (auto entry : ports_) {
    base::AutoLock lock(entry.second->lock);
    if (entry.second->peer_node_name != name_ &&
        entry.second->state != Port::kReceiving) {
      can_shutdown = false;
#if DCHECK_IS_ON()
      DVLOG(2) << "Port " << entry.first << " referencing node "
               << entry.second->peer_node_name << " is blocking shutdown of "
               << "node " << name_ << " (state=" << entry.second->state << ")";
#else
      // Exit early when not debugging.
      break;
#endif
    }
  }

  return can_shutdown;
}

int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
  scoped_refptr<Port> port = GetPort(port_name);
  if (!port)
    return ERROR_PORT_UNKNOWN;

  *port_ref = PortRef(port_name, std::move(port));
  return OK;
}

int Node::CreateUninitializedPort(PortRef* port_ref) {
  PortName port_name;
  delegate_->GenerateRandomPortName(&port_name);

  scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum,
                                                         kInitialSequenceNum));
  int rv = AddPortWithName(port_name, port);
  if (rv != OK)
    return rv;

  *port_ref = PortRef(port_name, std::move(port));
  return OK;
}

int Node::InitializePort(const PortRef& port_ref,
                         const NodeName& peer_node_name,
                         const PortName& peer_port_name) {
  Port* port = port_ref.port();

  {
    base::AutoLock lock(port->lock);
    if (port->state != Port::kUninitialized)
      return ERROR_PORT_STATE_UNEXPECTED;

    port->state = Port::kReceiving;
    port->peer_node_name = peer_node_name;
    port->peer_port_name = peer_port_name;
  }

  delegate_->PortStatusChanged(port_ref);

  return OK;
}

int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
  int rv;

  rv = CreateUninitializedPort(port0_ref);
  if (rv != OK)
    return rv;

  rv = CreateUninitializedPort(port1_ref);
  if (rv != OK)
    return rv;

  rv = InitializePort(*port0_ref, name_, port1_ref->name());
  if (rv != OK)
    return rv;

  rv = InitializePort(*port1_ref, name_, port0_ref->name());
  if (rv != OK)
    return rv;

  return OK;
}

int Node::SetUserData(const PortRef& port_ref,
                      const scoped_refptr<UserData>& user_data) {
  Port* port = port_ref.port();

  base::AutoLock lock(port->lock);
  if (port->state == Port::kClosed)
    return ERROR_PORT_STATE_UNEXPECTED;

  port->user_data = std::move(user_data);

  return OK;
}

int Node::GetUserData(const PortRef& port_ref,
                      scoped_refptr<UserData>* user_data) {
  Port* port = port_ref.port();

  base::AutoLock lock(port->lock);
  if (port->state == Port::kClosed)
    return ERROR_PORT_STATE_UNEXPECTED;

  *user_data = port->user_data;

  return OK;
}

int Node::ClosePort(const PortRef& port_ref) {
  std::deque<PortName> referenced_port_names;

  ObserveClosureEventData data;

  NodeName peer_node_name;
  PortName peer_port_name;
  Port* port = port_ref.port();
  {
    // We may need to erase the port, which requires ports_lock_ to be held,
    // but ports_lock_ must be acquired before any individual port locks.
    base::AutoLock ports_lock(ports_lock_);

    base::AutoLock lock(port->lock);
    if (port->state == Port::kUninitialized) {
      // If the port was not yet initialized, there's nothing interesting to do.
      ErasePort_Locked(port_ref.name());
      return OK;
    }

    if (port->state != Port::kReceiving)
      return ERROR_PORT_STATE_UNEXPECTED;

    port->state = Port::kClosed;

    // We pass along the sequence number of the last message sent from this
    // port to allow the peer to have the opportunity to consume all inbound
    // messages before notifying the embedder that this port is closed.
    data.last_sequence_num = port->next_sequence_num_to_send - 1;

    peer_node_name = port->peer_node_name;
    peer_port_name = port->peer_port_name;

    // If the port being closed still has unread messages, then we need to take
    // care to close those ports so as to avoid leaking memory.
    port->message_queue.GetReferencedPorts(&referenced_port_names);

    ErasePort_Locked(port_ref.name());
  }

  DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
           << " to " << peer_port_name << "@" << peer_node_name;

  delegate_->ForwardMessage(
      peer_node_name,
      NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));

  for (const auto& name : referenced_port_names) {
    PortRef ref;
    if (GetPort(name, &ref) == OK)
      ClosePort(ref);
  }
  return OK;
}

int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
  Port* port = port_ref.port();

  base::AutoLock lock(port->lock);

  if (port->state != Port::kReceiving)
    return ERROR_PORT_STATE_UNEXPECTED;

  port_status->has_messages = port->message_queue.HasNextMessage();
  port_status->receiving_messages = CanAcceptMoreMessages(port);
  port_status->peer_closed = port->peer_closed;
  return OK;
}

int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) {
  return GetMessageIf(port_ref, nullptr, message);
}

int Node::GetMessageIf(const PortRef& port_ref,
                       std::function<bool(const Message&)> selector,
                       ScopedMessage* message) {
  *message = nullptr;

  DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_;

  Port* port = port_ref.port();
  {
    base::AutoLock lock(port->lock);

    // This could also be treated like the port being unknown since the
    // embedder should no longer be referring to a port that has been sent.
    if (port->state != Port::kReceiving)
      return ERROR_PORT_STATE_UNEXPECTED;

    // Let the embedder get messages until there are no more before reporting
    // that the peer closed its end.
    if (!CanAcceptMoreMessages(port))
      return ERROR_PORT_PEER_CLOSED;

    port->message_queue.GetNextMessageIf(std::move(selector), message);
  }

  // Allow referenced ports to trigger PortStatusChanged calls.
  if (*message) {
    for (size_t i = 0; i < (*message)->num_ports(); ++i) {
      const PortName& new_port_name = (*message)->ports()[i];
      scoped_refptr<Port> new_port = GetPort(new_port_name);

      DCHECK(new_port) << "Port " << new_port_name << "@" << name_
                       << " does not exist!";

      base::AutoLock lock(new_port->lock);

      DCHECK(new_port->state == Port::kReceiving);
      new_port->message_queue.set_signalable(true);
    }
  }

  return OK;
}

int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) {
  int rv = SendMessageInternal(port_ref, &message);
  if (rv != OK) {
    // If send failed, close all carried ports. Note that we're careful not to
    // close the sending port itself if it happened to be one of the encoded
    // ports (an invalid but possible condition.)
    for (size_t i = 0; i < message->num_ports(); ++i) {
      if (message->ports()[i] == port_ref.name())
        continue;

      PortRef port;
      if (GetPort(message->ports()[i], &port) == OK)
        ClosePort(port);
    }
  }
  return rv;
}

int Node::AcceptMessage(ScopedMessage message) {
  const EventHeader* header = GetEventHeader(*message);
  switch (header->type) {
    case EventType::kUser:
      return OnUserMessage(std::move(message));
    case EventType::kPortAccepted:
      return OnPortAccepted(header->port_name);
    case EventType::kObserveProxy:
      return OnObserveProxy(
          header->port_name,
          *GetEventData<ObserveProxyEventData>(*message));
    case EventType::kObserveProxyAck:
      return OnObserveProxyAck(
          header->port_name,
          GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
    case EventType::kObserveClosure:
      return OnObserveClosure(
          header->port_name,
          GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
    case EventType::kMergePort:
      return OnMergePort(header->port_name,
                         *GetEventData<MergePortEventData>(*message));
  }
  return OOPS(ERROR_NOT_IMPLEMENTED);
}

int Node::MergePorts(const PortRef& port_ref,
                     const NodeName& destination_node_name,
                     const PortName& destination_port_name) {
  Port* port = port_ref.port();
  MergePortEventData data;
  {
    base::AutoLock lock(port->lock);

    DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
             << " to " << destination_port_name << "@" << destination_node_name;

    // Send the port-to-merge over to the destination node so it can be merged
    // into the port cycle atomically there.
    data.new_port_name = port_ref.name();
    WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
                 &data.new_port_descriptor);
  }
  delegate_->ForwardMessage(
      destination_node_name,
      NewInternalMessage(destination_port_name,
                         EventType::kMergePort, data));
  return OK;
}

int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
  Port* port0 = port0_ref.port();
  Port* port1 = port1_ref.port();
  int rv;
  {
    // |ports_lock_| must be held when acquiring overlapping port locks.
    base::AutoLock ports_lock(ports_lock_);
    base::AutoLock port0_lock(port0->lock);
    base::AutoLock port1_lock(port1->lock);

    DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
             << " and " << port1_ref.name() << "@" << name_;

    if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
      rv = ERROR_PORT_STATE_UNEXPECTED;
    else
      rv = MergePorts_Locked(port0_ref, port1_ref);
  }

  if (rv != OK) {
    ClosePort(port0_ref);
    ClosePort(port1_ref);
  }

  return rv;
}

int Node::LostConnectionToNode(const NodeName& node_name) {
  // We can no longer send events to the given node. We also can't expect any
  // PortAccepted events.

  DVLOG(1) << "Observing lost connection from node " << name_
           << " to node " << node_name;

  DestroyAllPortsWithPeer(node_name, kInvalidPortName);
  return OK;
}

int Node::OnUserMessage(ScopedMessage message) {
  PortName port_name = GetEventHeader(*message)->port_name;
  const auto* event = GetEventData<UserEventData>(*message);

#if DCHECK_IS_ON()
  std::ostringstream ports_buf;
  for (size_t i = 0; i < message->num_ports(); ++i) {
    if (i > 0)
      ports_buf << ",";
    ports_buf << message->ports()[i];
  }

  DVLOG(2) << "AcceptMessage " << event->sequence_num
             << " [ports=" << ports_buf.str() << "] at "
             << port_name << "@" << name_;
#endif

  scoped_refptr<Port> port = GetPort(port_name);

  // Even if this port does not exist, cannot receive anymore messages or is
  // buffering or proxying messages, we still need these ports to be bound to
  // this node. When the message is forwarded, these ports will get transferred
  // following the usual method. If the message cannot be accepted, then the
  // newly bound ports will simply be closed.

  for (size_t i = 0; i < message->num_ports(); ++i) {
    int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
    if (rv != OK)
      return rv;
  }

  bool has_next_message = false;
  bool message_accepted = false;

  if (port) {
    // We may want to forward messages once the port lock is held, so we must
    // acquire |ports_lock_| first.
    base::AutoLock ports_lock(ports_lock_);
    base::AutoLock lock(port->lock);

    // Reject spurious messages if we've already received the last expected
    // message.
    if (CanAcceptMoreMessages(port.get())) {
      message_accepted = true;
      port->message_queue.AcceptMessage(std::move(message), &has_next_message);

      if (port->state == Port::kBuffering) {
        has_next_message = false;
      } else if (port->state == Port::kProxying) {
        has_next_message = false;

        // Forward messages. We forward messages in sequential order here so
        // that we maintain the message queue's notion of next sequence number.
        // That's useful for the proxy removal process as we can tell when this
        // port has seen all of the messages it is expected to see.
        int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
        if (rv != OK)
          return rv;

        MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
      }
    }
  }

  if (!message_accepted) {
    DVLOG(2) << "Message not accepted!\n";
    // Close all newly accepted ports as they are effectively orphaned.
    for (size_t i = 0; i < message->num_ports(); ++i) {
      PortRef port_ref;
      if (GetPort(message->ports()[i], &port_ref) == OK) {
        ClosePort(port_ref);
      } else {
        DLOG(WARNING) << "Cannot close non-existent port!\n";
      }
    }
  } else if (has_next_message) {
    PortRef port_ref(port_name, port);
    delegate_->PortStatusChanged(port_ref);
  }

  return OK;
}

int Node::OnPortAccepted(const PortName& port_name) {
  scoped_refptr<Port> port = GetPort(port_name);
  if (!port)
    return ERROR_PORT_UNKNOWN;

  DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
           << " pointing to "
           << port->peer_port_name << "@" << port->peer_node_name;

  return BeginProxying(PortRef(port_name, port));
}

int Node::OnObserveProxy(const PortName& port_name,
                         const ObserveProxyEventData& event) {
  if (port_name == kInvalidPortName) {
    // An ObserveProxy with an invalid target port name is a broadcast used to
    // inform ports when their peer (which was itself a proxy) has become
    // defunct due to unexpected node disconnection.
    //
    // Receiving ports affected by this treat it as equivalent to peer closure.
    // Proxies affected by this can be removed and will in turn broadcast their
    // own death with a similar message.
    CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
    CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
    DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
    return OK;
  }

  // The port may have already been closed locally, in which case the
  // ObserveClosure message will contain the last_sequence_num field.
  // We can then silently ignore this message.
  scoped_refptr<Port> port = GetPort(port_name);
  if (!port) {
    DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";

    if (port_name != event.proxy_port_name &&
        port_name != event.proxy_to_port_name) {
      // The receiving port may have been removed while this message was in
      // transit.  In this case, we restart the ObserveProxy circulation from
      // the referenced proxy port to avoid leaking the proxy.
      delegate_->ForwardMessage(
          event.proxy_node_name,
          NewInternalMessage(
              event.proxy_port_name, EventType::kObserveProxy, event));
    }
    return OK;
  }

  DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
           << event.proxy_port_name << "@"
           << event.proxy_node_name << " pointing to "
           << event.proxy_to_port_name << "@"
           << event.proxy_to_node_name;

  {
    base::AutoLock lock(port->lock);

    if (port->peer_node_name == event.proxy_node_name &&
        port->peer_port_name == event.proxy_port_name) {
      if (port->state == Port::kReceiving) {
        port->peer_node_name = event.proxy_to_node_name;
        port->peer_port_name = event.proxy_to_port_name;

        ObserveProxyAckEventData ack;
        ack.last_sequence_num = port->next_sequence_num_to_send - 1;

        delegate_->ForwardMessage(
            event.proxy_node_name,
            NewInternalMessage(event.proxy_port_name,
                               EventType::kObserveProxyAck,
                               ack));
      } else {
        // As a proxy ourselves, we don't know how to honor the ObserveProxy
        // event or to populate the last_sequence_num field of ObserveProxyAck.
        // Afterall, another port could be sending messages to our peer now
        // that we've sent out our own ObserveProxy event.  Instead, we will
        // send an ObserveProxyAck indicating that the ObserveProxy event
        // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
        // However, this has to be done after we are removed as a proxy.
        // Otherwise, we might just find ourselves back here again, which
        // would be akin to a busy loop.

        DVLOG(2) << "Delaying ObserveProxyAck to "
                 << event.proxy_port_name << "@" << event.proxy_node_name;

        ObserveProxyAckEventData ack;
        ack.last_sequence_num = kInvalidSequenceNum;

        port->send_on_proxy_removal.reset(
            new std::pair<NodeName, ScopedMessage>(
                event.proxy_node_name,
                NewInternalMessage(event.proxy_port_name,
                                   EventType::kObserveProxyAck,
                                   ack)));
      }
    } else {
      // Forward this event along to our peer. Eventually, it should find the
      // port referring to the proxy.
      delegate_->ForwardMessage(
          port->peer_node_name,
          NewInternalMessage(port->peer_port_name,
                             EventType::kObserveProxy,
                             event));
    }
  }
  return OK;
}

int Node::OnObserveProxyAck(const PortName& port_name,
                            uint64_t last_sequence_num) {
  DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
           << " (last_sequence_num=" << last_sequence_num << ")";

  scoped_refptr<Port> port = GetPort(port_name);
  if (!port)
    return ERROR_PORT_UNKNOWN;  // The port may have observed closure first, so
                                // this is not an "Oops".

  {
    base::AutoLock lock(port->lock);

    if (port->state != Port::kProxying)
      return OOPS(ERROR_PORT_STATE_UNEXPECTED);

    if (last_sequence_num == kInvalidSequenceNum) {
      // Send again.
      InitiateProxyRemoval(LockedPort(port.get()), port_name);
      return OK;
    }

    // We can now remove this port once we have received and forwarded the last
    // message addressed to this port.
    port->remove_proxy_on_last_message = true;
    port->last_sequence_num_to_receive = last_sequence_num;
  }
  TryRemoveProxy(PortRef(port_name, port));
  return OK;
}

int Node::OnObserveClosure(const PortName& port_name,
                           uint64_t last_sequence_num) {
  // OK if the port doesn't exist, as it may have been closed already.
  scoped_refptr<Port> port = GetPort(port_name);
  if (!port)
    return OK;

  // This message tells the port that it should no longer expect more messages
  // beyond last_sequence_num. This message is forwarded along until we reach
  // the receiving end, and this message serves as an equivalent to
  // ObserveProxyAck.

  bool notify_delegate = false;
  ObserveClosureEventData forwarded_data;
  NodeName peer_node_name;
  PortName peer_port_name;
  bool try_remove_proxy = false;
  {
    base::AutoLock lock(port->lock);

    port->peer_closed = true;
    port->last_sequence_num_to_receive = last_sequence_num;

    DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
             << " (state=" << port->state << ") pointing to "
             << port->peer_port_name << "@" << port->peer_node_name
             << " (last_sequence_num=" << last_sequence_num << ")";

    // We always forward ObserveClosure, even beyond the receiving port which
    // cares about it. This ensures that any dead-end proxies beyond that port
    // are notified to remove themselves.

    if (port->state == Port::kReceiving) {
      notify_delegate = true;

      // When forwarding along the other half of the port cycle, this will only
      // reach dead-end proxies. Tell them we've sent our last message so they
      // can go away.
      //
      // TODO: Repurposing ObserveClosure for this has the desired result but
      // may be semantically confusing since the forwarding port is not actually
      // closed. Consider replacing this with a new event type.
      forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
    } else {
      // We haven't yet reached the receiving peer of the closed port, so
      // forward the message along as-is.
      forwarded_data.last_sequence_num = last_sequence_num;

      // See about removing the port if it is a proxy as our peer won't be able
      // to participate in proxy removal.
      port->remove_proxy_on_last_message = true;
      if (port->state == Port::kProxying)
        try_remove_proxy = true;
    }

    DVLOG(2) << "Forwarding ObserveClosure from "
             << port_name << "@" << name_ << " to peer "
             << port->peer_port_name << "@" << port->peer_node_name
             << " (last_sequence_num=" << forwarded_data.last_sequence_num
             << ")";

    peer_node_name = port->peer_node_name;
    peer_port_name = port->peer_port_name;
  }
  if (try_remove_proxy)
    TryRemoveProxy(PortRef(port_name, port));

  delegate_->ForwardMessage(
      peer_node_name,
      NewInternalMessage(peer_port_name, EventType::kObserveClosure,
                         forwarded_data));

  if (notify_delegate) {
    PortRef port_ref(port_name, port);
    delegate_->PortStatusChanged(port_ref);
  }
  return OK;
}

int Node::OnMergePort(const PortName& port_name,
                      const MergePortEventData& event) {
  scoped_refptr<Port> port = GetPort(port_name);

  DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
           << (port ? port->state : -1) << ") merging with proxy "
           << event.new_port_name
           << "@" << name_ << " pointing to "
           << event.new_port_descriptor.peer_port_name << "@"
           << event.new_port_descriptor.peer_node_name << " referred by "
           << event.new_port_descriptor.referring_port_name << "@"
           << event.new_port_descriptor.referring_node_name;

  bool close_target_port = false;
  bool close_new_port = false;

  // Accept the new port. This is now the receiving end of the other port cycle
  // to be merged with ours.
  int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
  if (rv != OK) {
    close_target_port = true;
  } else if (port) {
    // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
    // needs to hold |ports_lock_|. We also acquire multiple port locks within.
    base::AutoLock ports_lock(ports_lock_);
    base::AutoLock lock(port->lock);

    if (port->state != Port::kReceiving) {
      close_new_port = true;
    } else {
      scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
      base::AutoLock new_port_lock(new_port->lock);
      DCHECK(new_port->state == Port::kReceiving);

      // Both ports are locked. Now all we have to do is swap their peer
      // information and set them up as proxies.

      PortRef port0_ref(port_name, port);
      PortRef port1_ref(event.new_port_name, new_port);
      int rv = MergePorts_Locked(port0_ref, port1_ref);
      if (rv == OK)
        return rv;

      close_new_port = true;
      close_target_port = true;
    }
  } else {
    close_new_port = true;
  }

  if (close_target_port) {
    PortRef target_port;
    rv = GetPort(port_name, &target_port);
    DCHECK(rv == OK);

    ClosePort(target_port);
  }

  if (close_new_port) {
    PortRef new_port;
    rv = GetPort(event.new_port_name, &new_port);
    DCHECK(rv == OK);

    ClosePort(new_port);
  }

  return ERROR_PORT_STATE_UNEXPECTED;
}

int Node::AddPortWithName(const PortName& port_name,
                          const scoped_refptr<Port>& port) {
  base::AutoLock lock(ports_lock_);

  if (!ports_.insert(std::make_pair(port_name, port)).second)
    return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.

  DVLOG(2) << "Created port " << port_name << "@" << name_;
  return OK;
}

void Node::ErasePort(const PortName& port_name) {
  base::AutoLock lock(ports_lock_);
  ErasePort_Locked(port_name);
}

void Node::ErasePort_Locked(const PortName& port_name) {
  ports_lock_.AssertAcquired();
  ports_.erase(port_name);
  DVLOG(2) << "Deleted port " << port_name << "@" << name_;
}

scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
  base::AutoLock lock(ports_lock_);
  return GetPort_Locked(port_name);
}

scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
  ports_lock_.AssertAcquired();
  auto iter = ports_.find(port_name);
  if (iter == ports_.end())
    return nullptr;

  return iter->second;
}

int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) {
  ScopedMessage& m = *message;
  for (size_t i = 0; i < m->num_ports(); ++i) {
    if (m->ports()[i] == port_ref.name())
      return ERROR_PORT_CANNOT_SEND_SELF;
  }

  Port* port = port_ref.port();
  NodeName peer_node_name;
  {
    // We must acquire |ports_lock_| before grabbing any port locks, because
    // WillSendMessage_Locked may need to lock multiple ports out of order.
    base::AutoLock ports_lock(ports_lock_);
    base::AutoLock lock(port->lock);

    if (port->state != Port::kReceiving)
      return ERROR_PORT_STATE_UNEXPECTED;

    if (port->peer_closed)
      return ERROR_PORT_PEER_CLOSED;

    int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
    if (rv != OK)
      return rv;

    // Beyond this point there's no sense in returning anything but OK. Even if
    // message forwarding or acceptance fails, there's nothing the embedder can
    // do to recover. Assume that failure beyond this point must be treated as a
    // transport failure.

    peer_node_name = port->peer_node_name;
  }

  if (peer_node_name != name_) {
    delegate_->ForwardMessage(peer_node_name, std::move(m));
    return OK;
  }

  int rv = AcceptMessage(std::move(m));
  if (rv != OK) {
    // See comment above for why we don't return an error in this case.
    DVLOG(2) << "AcceptMessage failed: " << rv;
  }

  return OK;
}

int Node::MergePorts_Locked(const PortRef& port0_ref,
                            const PortRef& port1_ref) {
  Port* port0 = port0_ref.port();
  Port* port1 = port1_ref.port();

  ports_lock_.AssertAcquired();
  port0->lock.AssertAcquired();
  port1->lock.AssertAcquired();

  CHECK(port0->state == Port::kReceiving);
  CHECK(port1->state == Port::kReceiving);

  // Ports cannot be merged with their own receiving peer!
  if (port0->peer_node_name == name_ &&
      port0->peer_port_name == port1_ref.name())
    return ERROR_PORT_STATE_UNEXPECTED;

  if (port1->peer_node_name == name_ &&
      port1->peer_port_name == port0_ref.name())
    return ERROR_PORT_STATE_UNEXPECTED;

  // Only merge if both ports have never sent a message.
  if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
      port1->next_sequence_num_to_send == kInitialSequenceNum) {
    // Swap the ports' peer information and switch them both into buffering
    // (eventually proxying) mode.

    std::swap(port0->peer_node_name, port1->peer_node_name);
    std::swap(port0->peer_port_name, port1->peer_port_name);

    port0->state = Port::kBuffering;
    if (port0->peer_closed)
      port0->remove_proxy_on_last_message = true;

    port1->state = Port::kBuffering;
    if (port1->peer_closed)
      port1->remove_proxy_on_last_message = true;

    int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name());
    int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name());

    if (rv1 == OK && rv2 == OK) {
      // If either merged port had a closed peer, its new peer needs to be
      // informed of this.
      if (port1->peer_closed) {
        ObserveClosureEventData data;
        data.last_sequence_num = port0->last_sequence_num_to_receive;
        delegate_->ForwardMessage(
            port0->peer_node_name,
            NewInternalMessage(port0->peer_port_name,
                               EventType::kObserveClosure, data));
      }

      if (port0->peer_closed) {
        ObserveClosureEventData data;
        data.last_sequence_num = port1->last_sequence_num_to_receive;
        delegate_->ForwardMessage(
            port1->peer_node_name,
            NewInternalMessage(port1->peer_port_name,
                               EventType::kObserveClosure, data));
      }

      return OK;
    }

    // If either proxy failed to initialize (e.g. had undeliverable messages
    // or ended up in a bad state somehow), we keep the system in a consistent
    // state by undoing the peer swap.
    std::swap(port0->peer_node_name, port1->peer_node_name);
    std::swap(port0->peer_port_name, port1->peer_port_name);
    port0->remove_proxy_on_last_message = false;
    port1->remove_proxy_on_last_message = false;
    port0->state = Port::kReceiving;
    port1->state = Port::kReceiving;
  }

  return ERROR_PORT_STATE_UNEXPECTED;
}

void Node::WillSendPort(const LockedPort& port,
                        const NodeName& to_node_name,
                        PortName* port_name,
                        PortDescriptor* port_descriptor) {
  port->lock.AssertAcquired();

  PortName local_port_name = *port_name;

  PortName new_port_name;
  delegate_->GenerateRandomPortName(&new_port_name);

  // Make sure we don't send messages to the new peer until after we know it
  // exists. In the meantime, just buffer messages locally.
  DCHECK(port->state == Port::kReceiving);
  port->state = Port::kBuffering;

  // If we already know our peer is closed, we already know this proxy can
  // be removed once it receives and forwards its last expected message.
  if (port->peer_closed)
    port->remove_proxy_on_last_message = true;

  *port_name = new_port_name;

  port_descriptor->peer_node_name = port->peer_node_name;
  port_descriptor->peer_port_name = port->peer_port_name;
  port_descriptor->referring_node_name = name_;
  port_descriptor->referring_port_name = local_port_name;
  port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
  port_descriptor->next_sequence_num_to_receive =
      port->message_queue.next_sequence_num();
  port_descriptor->last_sequence_num_to_receive =
      port->last_sequence_num_to_receive;
  port_descriptor->peer_closed = port->peer_closed;
  memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));

  // Configure the local port to point to the new port.
  port->peer_node_name = to_node_name;
  port->peer_port_name = new_port_name;
}

int Node::AcceptPort(const PortName& port_name,
                     const PortDescriptor& port_descriptor) {
  scoped_refptr<Port> port = make_scoped_refptr(
      new Port(port_descriptor.next_sequence_num_to_send,
               port_descriptor.next_sequence_num_to_receive));
  port->state = Port::kReceiving;
  port->peer_node_name = port_descriptor.peer_node_name;
  port->peer_port_name = port_descriptor.peer_port_name;
  port->last_sequence_num_to_receive =
      port_descriptor.last_sequence_num_to_receive;
  port->peer_closed = port_descriptor.peer_closed;

  DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
           << port->peer_closed << "; last_sequence_num_to_receive="
           << port->last_sequence_num_to_receive << "]";

  // A newly accepted port is not signalable until the message referencing the
  // new port finds its way to the consumer (see GetMessageIf).
  port->message_queue.set_signalable(false);

  int rv = AddPortWithName(port_name, port);
  if (rv != OK)
    return rv;

  // Allow referring port to forward messages.
  delegate_->ForwardMessage(
      port_descriptor.referring_node_name,
      NewInternalMessage(port_descriptor.referring_port_name,
                         EventType::kPortAccepted));
  return OK;
}

int Node::WillSendMessage_Locked(const LockedPort& port,
                                 const PortName& port_name,
                                 Message* message) {
  ports_lock_.AssertAcquired();
  port->lock.AssertAcquired();

  DCHECK(message);

  // Messages may already have a sequence number if they're being forwarded
  // by a proxy. Otherwise, use the next outgoing sequence number.
  uint64_t* sequence_num =
      &GetMutableEventData<UserEventData>(message)->sequence_num;
  if (*sequence_num == 0)
    *sequence_num = port->next_sequence_num_to_send++;

#if DCHECK_IS_ON()
  std::ostringstream ports_buf;
  for (size_t i = 0; i < message->num_ports(); ++i) {
    if (i > 0)
      ports_buf << ",";
    ports_buf << message->ports()[i];
  }
#endif

  if (message->num_ports() > 0) {
    // Note: Another thread could be trying to send the same ports, so we need
    // to ensure that they are ours to send before we mutate their state.

    std::vector<scoped_refptr<Port>> ports;
    ports.resize(message->num_ports());

    {
      for (size_t i = 0; i < message->num_ports(); ++i) {
        ports[i] = GetPort_Locked(message->ports()[i]);
        DCHECK(ports[i]);

        ports[i]->lock.Acquire();
        int error = OK;
        if (ports[i]->state != Port::kReceiving)
          error = ERROR_PORT_STATE_UNEXPECTED;
        else if (message->ports()[i] == port->peer_port_name)
          error = ERROR_PORT_CANNOT_SEND_PEER;

        if (error != OK) {
          // Oops, we cannot send this port.
          for (size_t j = 0; j <= i; ++j)
            ports[i]->lock.Release();
          // Backpedal on the sequence number.
          port->next_sequence_num_to_send--;
          return error;
        }
      }
    }

    PortDescriptor* port_descriptors =
        GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));

    for (size_t i = 0; i < message->num_ports(); ++i) {
      WillSendPort(LockedPort(ports[i].get()),
                   port->peer_node_name,
                   message->mutable_ports() + i,
                   port_descriptors + i);
    }

    for (size_t i = 0; i < message->num_ports(); ++i)
      ports[i]->lock.Release();
  }

#if DCHECK_IS_ON()
  DVLOG(2) << "Sending message "
           << GetEventData<UserEventData>(*message)->sequence_num
           << " [ports=" << ports_buf.str() << "]"
           << " from " << port_name << "@" << name_
           << " to " << port->peer_port_name << "@" << port->peer_node_name;
#endif

  GetMutableEventHeader(message)->port_name = port->peer_port_name;
  return OK;
}

int Node::BeginProxying_Locked(const LockedPort& port,
                               const PortName& port_name) {
  ports_lock_.AssertAcquired();
  port->lock.AssertAcquired();

  if (port->state != Port::kBuffering)
    return OOPS(ERROR_PORT_STATE_UNEXPECTED);

  port->state = Port::kProxying;

  int rv = ForwardMessages_Locked(LockedPort(port), port_name);
  if (rv != OK)
    return rv;

  // We may have observed closure while buffering. In that case, we can advance
  // to removing the proxy without sending out an ObserveProxy message. We
  // already know the last expected message, etc.

  if (port->remove_proxy_on_last_message) {
    MaybeRemoveProxy_Locked(LockedPort(port), port_name);

    // Make sure we propagate closure to our current peer.
    ObserveClosureEventData data;
    data.last_sequence_num = port->last_sequence_num_to_receive;
    delegate_->ForwardMessage(
        port->peer_node_name,
        NewInternalMessage(port->peer_port_name,
                           EventType::kObserveClosure, data));
  } else {
    InitiateProxyRemoval(LockedPort(port), port_name);
  }

  return OK;
}

int Node::BeginProxying(PortRef port_ref) {
  Port* port = port_ref.port();
  {
    base::AutoLock ports_lock(ports_lock_);
    base::AutoLock lock(port->lock);

    if (port->state != Port::kBuffering)
      return OOPS(ERROR_PORT_STATE_UNEXPECTED);

    port->state = Port::kProxying;

    int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
    if (rv != OK)
      return rv;
  }

  bool should_remove;
  NodeName peer_node_name;
  ScopedMessage closure_message;
  {
    base::AutoLock lock(port->lock);
    if (port->state != Port::kProxying)
      return OOPS(ERROR_PORT_STATE_UNEXPECTED);

    should_remove = port->remove_proxy_on_last_message;
    if (should_remove) {
      // Make sure we propagate closure to our current peer.
      ObserveClosureEventData data;
      data.last_sequence_num = port->last_sequence_num_to_receive;
      peer_node_name = port->peer_node_name;
      closure_message = NewInternalMessage(port->peer_port_name,
                                           EventType::kObserveClosure, data);
    } else {
      InitiateProxyRemoval(LockedPort(port), port_ref.name());
    }
  }

  if (should_remove) {
    TryRemoveProxy(port_ref);
    delegate_->ForwardMessage(peer_node_name, std::move(closure_message));
  }

  return OK;
}

int Node::ForwardMessages_Locked(const LockedPort& port,
                                 const PortName &port_name) {
  ports_lock_.AssertAcquired();
  port->lock.AssertAcquired();

  for (;;) {
    ScopedMessage message;
    port->message_queue.GetNextMessageIf(nullptr, &message);
    if (!message)
      break;

    int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
    if (rv != OK)
      return rv;

    delegate_->ForwardMessage(port->peer_node_name, std::move(message));
  }
  return OK;
}

void Node::InitiateProxyRemoval(const LockedPort& port,
                                const PortName& port_name) {
  port->lock.AssertAcquired();

  // To remove this node, we start by notifying the connected graph that we are
  // a proxy. This allows whatever port is referencing this node to skip it.
  // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
  // the peer was closed in the meantime).

  ObserveProxyEventData data;
  data.proxy_node_name = name_;
  data.proxy_port_name = port_name;
  data.proxy_to_node_name = port->peer_node_name;
  data.proxy_to_port_name = port->peer_port_name;

  delegate_->ForwardMessage(
      port->peer_node_name,
      NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
}

void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
                                   const PortName& port_name) {
  // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
  ports_lock_.AssertAcquired();
  port->lock.AssertAcquired();

  DCHECK(port->state == Port::kProxying);

  // Make sure we have seen ObserveProxyAck before removing the port.
  if (!port->remove_proxy_on_last_message)
    return;

  if (!CanAcceptMoreMessages(port.get())) {
    // This proxy port is done. We can now remove it!
    ErasePort_Locked(port_name);

    if (port->send_on_proxy_removal) {
      NodeName to_node = port->send_on_proxy_removal->first;
      ScopedMessage& message = port->send_on_proxy_removal->second;

      delegate_->ForwardMessage(to_node, std::move(message));
      port->send_on_proxy_removal.reset();
    }
  } else {
    DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
             << " now; waiting for more messages";
  }
}

void Node::TryRemoveProxy(PortRef port_ref) {
  Port* port = port_ref.port();
  bool should_erase = false;
  ScopedMessage msg;
  NodeName to_node;
  {
    base::AutoLock lock(port->lock);

    // Port already removed. Nothing to do.
    if (port->state == Port::kClosed)
      return;

    DCHECK(port->state == Port::kProxying);

    // Make sure we have seen ObserveProxyAck before removing the port.
    if (!port->remove_proxy_on_last_message)
      return;

    if (!CanAcceptMoreMessages(port)) {
      // This proxy port is done. We can now remove it!
      should_erase = true;

      if (port->send_on_proxy_removal) {
        to_node = port->send_on_proxy_removal->first;
        msg = std::move(port->send_on_proxy_removal->second);
        port->send_on_proxy_removal.reset();
      }
    } else {
      DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
               << " now; waiting for more messages";
    }
  }

  if (should_erase)
    ErasePort(port_ref.name());

  if (msg)
    delegate_->ForwardMessage(to_node, std::move(msg));
}

void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
                                   const PortName& port_name) {
  // Wipes out all ports whose peer node matches |node_name| and whose peer port
  // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
  // node is matched.

  std::vector<PortRef> ports_to_notify;
  std::vector<PortName> dead_proxies_to_broadcast;
  std::deque<PortName> referenced_port_names;

  {
    base::AutoLock ports_lock(ports_lock_);

    for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
      Port* port = iter->second.get();
      {
        base::AutoLock port_lock(port->lock);

        if (port->peer_node_name == node_name &&
              (port_name == kInvalidPortName ||
                    port->peer_port_name == port_name)) {
          if (!port->peer_closed) {
            // Treat this as immediate peer closure. It's an exceptional
            // condition akin to a broken pipe, so we don't care about losing
            // messages.

            port->peer_closed = true;
            port->last_sequence_num_to_receive =
                port->message_queue.next_sequence_num() - 1;

            if (port->state == Port::kReceiving)
              ports_to_notify.push_back(PortRef(iter->first, port));
          }

          // We don't expect to forward any further messages, and we don't
          // expect to receive a Port{Accepted,Rejected} event. Because we're
          // a proxy with no active peer, we cannot use the normal proxy removal
          // procedure of forward-propagating an ObserveProxy. Instead we
          // broadcast our own death so it can be back-propagated. This is
          // inefficient but rare.
          if (port->state != Port::kReceiving) {
            dead_proxies_to_broadcast.push_back(iter->first);
            iter->second->message_queue.GetReferencedPorts(
                &referenced_port_names);
          }
        }
      }
    }

    for (const auto& proxy_name : dead_proxies_to_broadcast) {
      ports_.erase(proxy_name);
      DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
    }
  }

  // Wake up any receiving ports who have just observed simulated peer closure.
  for (const auto& port : ports_to_notify)
    delegate_->PortStatusChanged(port);

  for (const auto& proxy_name : dead_proxies_to_broadcast) {
    // Broadcast an event signifying that this proxy is no longer functioning.
    ObserveProxyEventData event;
    event.proxy_node_name = name_;
    event.proxy_port_name = proxy_name;
    event.proxy_to_node_name = kInvalidNodeName;
    event.proxy_to_port_name = kInvalidPortName;
    delegate_->BroadcastMessage(NewInternalMessage(
        kInvalidPortName, EventType::kObserveProxy, event));

    // Also process death locally since the port that points this closed one
    // could be on the current node.
    // Note: Although this is recursive, only a single port is involved which
    // limits the expected branching to 1.
    DestroyAllPortsWithPeer(name_, proxy_name);
  }

  // Close any ports referenced by the closed proxies.
  for (const auto& name : referenced_port_names) {
    PortRef ref;
    if (GetPort(name, &ref) == OK)
      ClosePort(ref);
  }
}

ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
                                              const EventType& type,
                                              const void* data,
                                              size_t num_data_bytes) {
  ScopedMessage message;
  delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);

  EventHeader* header = GetMutableEventHeader(message.get());
  header->port_name = port_name;
  header->type = type;
  header->padding = 0;

  if (num_data_bytes)
    memcpy(header + 1, data, num_data_bytes);

  return message;
}

}  // namespace ports
}  // namespace edk
}  // namespace mojo