// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <map>
#include <queue>
#include <sstream>
#include <utility>

#include "base/bind.h"
#include "base/callback.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/rand_util.h"
#include "base/strings/string_piece.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/ports/node_delegate.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace mojo {
namespace edk {
namespace ports {
namespace test {

namespace {

bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
  return !strcmp(static_cast<const char*>(message->payload_bytes()), s.data());
}

class TestMessage : public Message {
 public:
  static ScopedMessage NewUserMessage(size_t num_payload_bytes,
                                      size_t num_ports) {
    return ScopedMessage(new TestMessage(num_payload_bytes, num_ports));
  }

  TestMessage(size_t num_payload_bytes, size_t num_ports)
      : Message(num_payload_bytes, num_ports) {
    start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes];
    InitializeUserMessageHeader(start_);
  }

  TestMessage(size_t num_header_bytes,
              size_t num_payload_bytes,
              size_t num_ports_bytes)
      : Message(num_header_bytes,
                num_payload_bytes,
                num_ports_bytes) {
    start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes];
  }

  ~TestMessage() override {
    delete[] start_;
  }
};

class TestNode;

class MessageRouter {
 public:
  virtual ~MessageRouter() {}

  virtual void GeneratePortName(PortName* name) = 0;
  virtual void ForwardMessage(TestNode* from_node,
                              const NodeName& node_name,
                              ScopedMessage message) = 0;
  virtual void BroadcastMessage(TestNode* from_node, ScopedMessage message) = 0;
};

class TestNode : public NodeDelegate {
 public:
  explicit TestNode(uint64_t id)
      : node_name_(id, 1),
        node_(node_name_, this),
        node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
        messages_available_event_(
            base::WaitableEvent::ResetPolicy::AUTOMATIC,
            base::WaitableEvent::InitialState::NOT_SIGNALED),
        idle_event_(
            base::WaitableEvent::ResetPolicy::MANUAL,
            base::WaitableEvent::InitialState::SIGNALED) {
  }

  ~TestNode() override {
    StopWhenIdle();
    node_thread_.Stop();
  }

  const NodeName& name() const { return node_name_; }

  // NOTE: Node is thread-safe.
  Node& node() { return node_; }

  base::WaitableEvent& idle_event() { return idle_event_; }

  bool IsIdle() {
    base::AutoLock lock(lock_);
    return started_ && !dispatching_ &&
        (incoming_messages_.empty() || (block_on_event_ && blocked_));
  }

  void BlockOnEvent(EventType type) {
    base::AutoLock lock(lock_);
    blocked_event_type_ = type;
    block_on_event_ = true;
  }

  void Unblock() {
    base::AutoLock lock(lock_);
    block_on_event_ = false;
    messages_available_event_.Signal();
  }

  void Start(MessageRouter* router) {
    router_ = router;
    node_thread_.Start();
    node_thread_.task_runner()->PostTask(
        FROM_HERE,
        base::Bind(&TestNode::ProcessMessages, base::Unretained(this)));
  }

  void StopWhenIdle() {
    base::AutoLock lock(lock_);
    should_quit_ = true;
    messages_available_event_.Signal();
  }

  void WakeUp() { messages_available_event_.Signal(); }

  int SendStringMessage(const PortRef& port, const std::string& s) {
    size_t size = s.size() + 1;
    ScopedMessage message = TestMessage::NewUserMessage(size, 0);
    memcpy(message->mutable_payload_bytes(), s.data(), size);
    return node_.SendMessage(port, std::move(message));
  }

  int SendStringMessageWithPort(const PortRef& port,
                                const std::string& s,
                                const PortName& sent_port_name) {
    size_t size = s.size() + 1;
    ScopedMessage message = TestMessage::NewUserMessage(size, 1);
    memcpy(message->mutable_payload_bytes(), s.data(), size);
    message->mutable_ports()[0] = sent_port_name;
    return node_.SendMessage(port, std::move(message));
  }

  int SendStringMessageWithPort(const PortRef& port,
                                const std::string& s,
                                const PortRef& sent_port) {
    return SendStringMessageWithPort(port, s, sent_port.name());
  }

  void set_drop_messages(bool value) {
    base::AutoLock lock(lock_);
    drop_messages_ = value;
  }

  void set_save_messages(bool value) {
    base::AutoLock lock(lock_);
    save_messages_ = value;
  }

  bool ReadMessage(const PortRef& port, ScopedMessage* message) {
    return node_.GetMessage(port, message, nullptr) == OK && *message;
  }

  bool GetSavedMessage(ScopedMessage* message) {
    base::AutoLock lock(lock_);
    if (saved_messages_.empty()) {
      message->reset();
      return false;
    }
    std::swap(*message, saved_messages_.front());
    saved_messages_.pop();
    return true;
  }

  void EnqueueMessage(ScopedMessage message) {
    idle_event_.Reset();

    // NOTE: This may be called from ForwardMessage and thus must not reenter
    // |node_|.
    base::AutoLock lock(lock_);
    incoming_messages_.emplace(std::move(message));
    messages_available_event_.Signal();
  }

  void GenerateRandomPortName(PortName* port_name) override {
    DCHECK(router_);
    router_->GeneratePortName(port_name);
  }

  void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
    message->reset(new TestMessage(num_header_bytes, 0, 0));
  }

  void ForwardMessage(const NodeName& node_name,
                      ScopedMessage message) override {
    {
      base::AutoLock lock(lock_);
      if (drop_messages_) {
        DVLOG(1) << "Dropping ForwardMessage from node "
                 << node_name_ << " to " << node_name;

        base::AutoUnlock unlock(lock_);
        ClosePortsInMessage(message.get());
        return;
      }
    }

    DCHECK(router_);
    DVLOG(1) << "ForwardMessage from node "
             << node_name_ << " to " << node_name;
    router_->ForwardMessage(this, node_name, std::move(message));
  }

  void BroadcastMessage(ScopedMessage message) override {
    router_->BroadcastMessage(this, std::move(message));
  }

  void PortStatusChanged(const PortRef& port) override {
    // The port may be closed, in which case we ignore the notification.
    base::AutoLock lock(lock_);
    if (!save_messages_)
      return;

    for (;;) {
      ScopedMessage message;
      {
        base::AutoUnlock unlock(lock_);
        if (!ReadMessage(port, &message))
          break;
      }

      saved_messages_.emplace(std::move(message));
    }
  }

  void ClosePortsInMessage(Message* message) {
    for (size_t i = 0; i < message->num_ports(); ++i) {
      PortRef port;
      ASSERT_EQ(OK, node_.GetPort(message->ports()[i], &port));
      EXPECT_EQ(OK, node_.ClosePort(port));
    }
  }

 private:
  void ProcessMessages() {
    for (;;) {
      messages_available_event_.Wait();

      base::AutoLock lock(lock_);

      if (should_quit_)
        return;

      dispatching_ = true;
      while (!incoming_messages_.empty()) {
        if (block_on_event_ &&
            GetEventHeader(*incoming_messages_.front())->type ==
                blocked_event_type_) {
          blocked_ = true;
          // Go idle if we hit a blocked event type.
          break;
        } else {
          blocked_ = false;
        }
        ScopedMessage message = std::move(incoming_messages_.front());
        incoming_messages_.pop();

        // NOTE: AcceptMessage() can re-enter this object to call any of the
        // NodeDelegate interface methods.
        base::AutoUnlock unlock(lock_);
        node_.AcceptMessage(std::move(message));
      }

      dispatching_ = false;
      started_ = true;
      idle_event_.Signal();
    };
  }

  const NodeName node_name_;
  Node node_;
  MessageRouter* router_ = nullptr;

  base::Thread node_thread_;
  base::WaitableEvent messages_available_event_;
  base::WaitableEvent idle_event_;

  // Guards fields below.
  base::Lock lock_;
  bool started_ = false;
  bool dispatching_ = false;
  bool should_quit_ = false;
  bool drop_messages_ = false;
  bool save_messages_ = false;
  bool blocked_ = false;
  bool block_on_event_ = false;
  EventType blocked_event_type_;
  std::queue<ScopedMessage> incoming_messages_;
  std::queue<ScopedMessage> saved_messages_;
};

class PortsTest : public testing::Test, public MessageRouter {
 public:
  void AddNode(TestNode* node) {
    {
      base::AutoLock lock(lock_);
      nodes_[node->name()] = node;
    }
    node->Start(this);
  }

  void RemoveNode(TestNode* node) {
    {
      base::AutoLock lock(lock_);
      nodes_.erase(node->name());
    }

    for (const auto& entry : nodes_)
      entry.second->node().LostConnectionToNode(node->name());
  }

  // Waits until all known Nodes are idle. Message forwarding and processing
  // is handled in such a way that idleness is a stable state: once all nodes in
  // the system are idle, they will remain idle until the test explicitly
  // initiates some further event (e.g. sending a message, closing a port, or
  // removing a Node).
  void WaitForIdle() {
    for (;;) {
      base::AutoLock global_lock(global_lock_);
      bool all_nodes_idle = true;
      for (const auto& entry : nodes_) {
        if (!entry.second->IsIdle())
          all_nodes_idle = false;
        entry.second->WakeUp();
      }
      if (all_nodes_idle)
        return;

      // Wait for any Node to signal that it's idle.
      base::AutoUnlock global_unlock(global_lock_);
      std::vector<base::WaitableEvent*> events;
      for (const auto& entry : nodes_)
        events.push_back(&entry.second->idle_event());
      base::WaitableEvent::WaitMany(events.data(), events.size());
    }
  }

  void CreatePortPair(TestNode* node0,
                      PortRef* port0,
                      TestNode* node1,
                      PortRef* port1) {
    if (node0 == node1) {
      EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
    } else {
      EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
      EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
      EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
                                                 port1->name()));
      EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
                                                 port0->name()));
    }
  }

 private:
  // MessageRouter:
  void GeneratePortName(PortName* name) override {
    base::AutoLock lock(lock_);
    name->v1 = next_port_id_++;
    name->v2 = 0;
  }

  void ForwardMessage(TestNode* from_node,
                      const NodeName& node_name,
                      ScopedMessage message) override {
    base::AutoLock global_lock(global_lock_);
    base::AutoLock lock(lock_);
    // Drop messages from nodes that have been removed.
    if (nodes_.find(from_node->name()) == nodes_.end()) {
      from_node->ClosePortsInMessage(message.get());
      return;
    }

    auto it = nodes_.find(node_name);
    if (it == nodes_.end()) {
      DVLOG(1) << "Node not found: " << node_name;
      return;
    }

    it->second->EnqueueMessage(std::move(message));
  }

  void BroadcastMessage(TestNode* from_node, ScopedMessage message) override {
    base::AutoLock global_lock(global_lock_);
    base::AutoLock lock(lock_);

    // Drop messages from nodes that have been removed.
    if (nodes_.find(from_node->name()) == nodes_.end())
      return;

    for (const auto& entry : nodes_) {
      TestNode* node = entry.second;
      // Broadcast doesn't deliver to the local node.
      if (node == from_node)
        continue;

      // NOTE: We only need to support broadcast of events. Events have no
      // payload or ports bytes.
      ScopedMessage new_message(
          new TestMessage(message->num_header_bytes(), 0, 0));
      memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
             message->num_header_bytes());
      node->EnqueueMessage(std::move(new_message));
    }
  }

  base::MessageLoop message_loop_;

  // Acquired before any operation which makes a Node busy, and before testing
  // if all nodes are idle.
  base::Lock global_lock_;

  base::Lock lock_;
  uint64_t next_port_id_ = 1;
  std::map<NodeName, TestNode*> nodes_;
};

}  // namespace

TEST_F(PortsTest, Basic1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
  EXPECT_EQ(OK, node0.node().ClosePort(a0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Basic2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef b0, b1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
  EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));

  EXPECT_EQ(OK, node0.node().ClosePort(b0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Basic3) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));

  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
  EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));

  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));

  PortRef b0, b1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
  EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));

  EXPECT_EQ(OK, node0.node().ClosePort(b0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNode1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);
  node1.set_drop_messages(true);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  // Transfer a port to node1 and simulate a lost connection to node1.

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));

  WaitForIdle();

  RemoveNode(&node1);

  WaitForIdle();

  EXPECT_EQ(OK, node0.node().ClosePort(a0));
  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNode2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));

  WaitForIdle();

  node1.set_drop_messages(true);

  RemoveNode(&node1);

  WaitForIdle();

  // a0 should have eventually detected peer closure after node loss.
  ScopedMessage message;
  EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
            node0.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node0.node().ClosePort(a0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));

  EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
  EXPECT_TRUE(message);
  node1.ClosePortsInMessage(message.get());

  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
  // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
  // node.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  TestNode node2(2);
  AddNode(&node2);

  // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
  PortRef A, B, C, D;
  CreatePortPair(&node0, &A, &node1, &B);
  CreatePortPair(&node1, &C, &node2, &D);

  // Create E-F and send F over A to node 1.
  PortRef E, F;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));

  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());

  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));

  // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
  // will trivially become aware of the loss, and this test verifies that the
  // port A on node 0 will eventually also become aware of it.

  // Make sure node2 stops processing events when it encounters an ObserveProxy.
  node2.BlockOnEvent(EventType::kObserveProxy);

  EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
  WaitForIdle();

  // Simulate node 1 and 2 disconnecting.
  EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));

  // Let node2 continue processing events and wait for everyone to go idle.
  node2.Unblock();
  WaitForIdle();

  // Port F should be gone.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));

  // Port E should have detected peer closure despite the fact that there is
  // no longer a continuous route from F to E over which the event could travel.
  PortStatus status;
  EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(B));
  EXPECT_EQ(OK, node1.node().ClosePort(C));
  EXPECT_EQ(OK, node0.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
  // Tests that a proxy gets cleaned up when its direct peer lives on a lost
  // node and it's predecessor lives on the same node.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef A, B;
  CreatePortPair(&node0, &A, &node1, &B);

  PortRef C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));

  // Send D but block node0 on an ObserveProxy event.
  node0.BlockOnEvent(EventType::kObserveProxy);
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));

  // node0 won't collapse the proxy but node1 will receive the message before
  // going idle.
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));

  RemoveNode(&node1);

  node0.Unblock();
  WaitForIdle();

  // Port C should have detected peer closure.
  PortStatus status;
  EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(B));
  EXPECT_EQ(OK, node0.node().ClosePort(C));
  EXPECT_EQ(OK, node1.node().ClosePort(E));

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage1) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  ScopedMessage message;
  EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node.node().ClosePort(a1));

  WaitForIdle();

  EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
            node.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node.node().ClosePort(a0));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage2) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));

  ScopedMessage message;
  EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));

  ASSERT_TRUE(message);
  EXPECT_TRUE(MessageEquals(message, "1"));

  EXPECT_EQ(OK, node.node().ClosePort(a0));
  EXPECT_EQ(OK, node.node().ClosePort(a1));

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage3) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  const char* kStrings[] = {
    "1",
    "2",
    "3"
  };

  for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
    EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));

  ScopedMessage message;
  for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
    EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    ASSERT_TRUE(message);
    EXPECT_TRUE(MessageEquals(message, kStrings[i]));
  }

  EXPECT_EQ(OK, node.node().ClosePort(a0));
  EXPECT_EQ(OK, node.node().ClosePort(a1));

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Delegation1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  // In this test, we send a message to a port that has been moved.

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(x1, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "a1"));

  // This is "a1" from the point of view of node1.
  PortName a2_name = message->ports()[0];
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
  EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));

  WaitForIdle();

  ASSERT_TRUE(node0.ReadMessage(x0, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "a2"));

  // This is "a2" from the point of view of node1.
  PortName a3_name = message->ports()[0];

  PortRef a3;
  EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));

  ASSERT_TRUE(node0.ReadMessage(a3, &message));
  EXPECT_EQ(0u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "hello"));

  EXPECT_EQ(OK, node0.node().ClosePort(a0));
  EXPECT_EQ(OK, node0.node().ClosePort(a3));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Delegation2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  for (int i = 0; i < 100; ++i) {
    // Setup pipe a<->b between node0 and node1.
    PortRef A, B;
    CreatePortPair(&node0, &A, &node1, &B);

    PortRef C, D;
    EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));

    PortRef E, F;
    EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));

    node1.set_save_messages(true);

    // Pass D over A to B.
    EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));

    // Pass F over C to D.
    EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));

    // This message should find its way to node1.
    EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));

    WaitForIdle();

    EXPECT_EQ(OK, node0.node().ClosePort(C));
    EXPECT_EQ(OK, node0.node().ClosePort(E));

    EXPECT_EQ(OK, node0.node().ClosePort(A));
    EXPECT_EQ(OK, node1.node().ClosePort(B));

    bool got_hello = false;
    ScopedMessage message;
    while (node1.GetSavedMessage(&message)) {
      node1.ClosePortsInMessage(message.get());
      if (MessageEquals(message, "hello")) {
        got_hello = true;
        break;
      }
    }

    EXPECT_TRUE(got_hello);

    WaitForIdle();  // Because closing ports may have generated tasks.
  }

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendUninitialized) {
  TestNode node(0);
  AddNode(&node);

  PortRef x0;
  EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
  EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
  EXPECT_EQ(OK, node.node().ClosePort(x0));
  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendFailure) {
  TestNode node(0);
  AddNode(&node);

  node.set_save_messages(true);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  // Try to send A over itself.

  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
            node.SendStringMessageWithPort(A, "oops", A));

  // Try to send B over A.

  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
            node.SendStringMessageWithPort(A, "nope", B));

  // B should be closed immediately.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));

  WaitForIdle();

  // There should have been no messages accepted.
  ScopedMessage message;
  EXPECT_FALSE(node.GetSavedMessage(&message));

  EXPECT_EQ(OK, node.node().ClosePort(A));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, DontLeakUnreceivedPorts) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B, C, D;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));

  EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));

  EXPECT_EQ(OK, node.node().ClosePort(C));
  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(B));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B, C, D;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));

  EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));

  ScopedMessage message;
  EXPECT_TRUE(node.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "foo"));
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_TRUE(
      node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  WaitForIdle();

  EXPECT_TRUE(
      node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_FALSE(node.node().CanShutdownCleanly());

  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(B));
  EXPECT_EQ(OK, node.node().ClosePort(C));
  EXPECT_EQ(OK, node.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, ProxyCollapse1) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Send B and receive it as C.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  // Send C and receive it as D.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));

  // Send D and receive it as E.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(E));

  // The node should not idle until all proxies are collapsed.
  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, ProxyCollapse2) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Send B and A to create proxies in each direction.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  // At this point we have a scenario with:
  //
  // D -> [B] -> C -> [A]
  //
  // Ensure that the proxies can collapse. The sent ports will be closed
  // eventually as a result of Y's closure.

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendWithClosedPeer) {
  // This tests that if a port is sent when its peer is already known to be
  // closed, the newly created port will be aware of that peer closure, and the
  // proxy will eventually collapse.

  TestNode node(0);
  AddNode(&node);

  // Send a message from A to B, then close A.
  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
  EXPECT_EQ(OK, node.node().ClosePort(A));

  // Now send B over X-Y as new port C.
  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  ScopedMessage message;
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  WaitForIdle();

  // C should have received the message originally sent to B, and it should also
  // be aware of A's closure.

  ASSERT_TRUE(node.ReadMessage(C, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  PortStatus status;
  EXPECT_EQ(OK, node.node().GetStatus(C, &status));
  EXPECT_FALSE(status.receiving_messages);
  EXPECT_FALSE(status.has_messages);
  EXPECT_TRUE(status.peer_closed);

  node.node().ClosePort(C);

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendWithClosedPeerSent) {
  // This tests that if a port is closed while some number of proxies are still
  // routing messages (directly or indirectly) to it, that the peer port is
  // eventually notified of the closure, and the dead-end proxies will
  // eventually be removed.

  TestNode node(0);
  AddNode(&node);

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  ScopedMessage message;

  // Send A as new port C.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  // Send C as new port D.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));

  // Send a message to B through D, then close D.
  EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
  EXPECT_EQ(OK, node.node().ClosePort(D));

  // Now send B as new port E.

  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  EXPECT_EQ(OK, node.node().ClosePort(X));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node.node().ClosePort(Y));

  WaitForIdle();

  // E should receive the message originally sent to B, and it should also be
  // aware of D's closure.

  ASSERT_TRUE(node.ReadMessage(E, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  PortStatus status;
  EXPECT_EQ(OK, node.node().GetStatus(E, &status));
  EXPECT_FALSE(status.receiving_messages);
  EXPECT_FALSE(status.has_messages);
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePorts) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on A.
  EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle.
  EXPECT_TRUE(
      node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_TRUE(
      node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  // Expect D to have received the message sent on A.
  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortWithClosedPeer1) {
  // This tests that the right thing happens when initiating a merge on a port
  // whose peer has already been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on A.
  EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));

  // Close A.
  EXPECT_EQ(OK, node0.node().ClosePort(A));

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle. node0 should have no ports since
  // A was explicitly closed.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(
      node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  // Expect D to have received the message sent on A.
  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortWithClosedPeer2) {
  // This tests that the right thing happens when merging into a port whose peer
  // has already been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on D and close it.
  EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle. node1 should have no ports since
  // D was explicitly closed.
  EXPECT_TRUE(
      node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_TRUE(node1.node().CanShutdownCleanly());

  // Expect A to have received the message sent on D.
  ScopedMessage message;
  ASSERT_TRUE(node0.ReadMessage(A, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node0.node().ClosePort(A));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsWithClosedPeers) {
  // This tests that no residual ports are left behind if two ports are merged
  // when both of their peers have been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Close A and D.
  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  WaitForIdle();

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsWithMovedPeers) {
  // This tests that ports can be merged successfully even if their peers are
  // moved around.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Set up another pair X-Y for moving ports on node0.
  PortRef X, Y;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Move A to new port E.
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
  ASSERT_TRUE(node0.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node0.node().ClosePort(X));
  EXPECT_EQ(OK, node0.node().ClosePort(Y));

  // Write messages on E and D.
  EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
  EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect to receive D's message on E and E's message on D.
  ASSERT_TRUE(node0.ReadMessage(E, &message));
  EXPECT_TRUE(MessageEquals(message, "hi"));
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  // Close E and D.
  EXPECT_EQ(OK, node0.node().ClosePort(E));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsFailsGracefully) {
  // This tests that the system remains in a well-defined state if something
  // goes wrong during port merge.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  ScopedMessage message;
  PortRef X, Y;
  EXPECT_EQ(OK, node1.node().CreatePortPair(&X, &Y));

  // Block the merge from proceeding until we can do something stupid with port
  // C. This avoids the test logic racing with async merge logic.
  node1.BlockOnEvent(EventType::kMergePort);

  // Initiate the merge between B and C.
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  // Move C to a new port E. This is not a sane use of Node's public API but
  // is still hypothetically possible. It allows us to force a merge failure
  // because C will be in an invalid state by the term the merge is processed.
  // As a result, B should be closed.
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(X, "foo", C));

  node1.Unblock();

  ASSERT_TRUE(node1.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node1.node().ClosePort(X));
  EXPECT_EQ(OK, node1.node().ClosePort(Y));

  WaitForIdle();

  // C goes away as a result of normal proxy removal. B should have been closed
  // cleanly by the failed MergePorts.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));

  // Close A, D, and E.
  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));
  EXPECT_EQ(OK, node1.node().ClosePort(E));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

}  // namespace test
}  // namespace ports
}  // namespace edk
}  // namespace mojo