普通文本  |  1520行  |  42.9 KB

// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <map>
#include <queue>
#include <sstream>

#include "base/logging.h"
#include "base/rand_util.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/ports/node_delegate.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace mojo {
namespace edk {
namespace ports {
namespace test {

namespace {

void LogMessage(const Message* message) {
  std::stringstream ports;
  for (size_t i = 0; i < message->num_ports(); ++i) {
    if (i > 0)
      ports << ",";
    ports << message->ports()[i];
  }
  DVLOG(1) << "message: \""
           << static_cast<const char*>(message->payload_bytes())
           << "\" ports=[" << ports.str() << "]";
}

void ClosePortsInMessage(Node* node, Message* message) {
  for (size_t i = 0; i < message->num_ports(); ++i) {
    PortRef port;
    ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port));
    EXPECT_EQ(OK, node->ClosePort(port));
  }
}

class TestMessage : public Message {
 public:
  static ScopedMessage NewUserMessage(size_t num_payload_bytes,
                                      size_t num_ports) {
    return ScopedMessage(new TestMessage(num_payload_bytes, num_ports));
  }

  TestMessage(size_t num_payload_bytes, size_t num_ports)
      : Message(num_payload_bytes, num_ports) {
    start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes];
    InitializeUserMessageHeader(start_);
  }

  TestMessage(size_t num_header_bytes,
              size_t num_payload_bytes,
              size_t num_ports_bytes)
      : Message(num_header_bytes,
                num_payload_bytes,
                num_ports_bytes) {
    start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes];
  }

  ~TestMessage() override {
    delete[] start_;
  }
};

struct Task {
  Task(NodeName node_name, ScopedMessage message)
      : node_name(node_name),
        message(std::move(message)),
        priority(base::RandUint64()) {
  }

  NodeName node_name;
  ScopedMessage message;
  uint64_t priority;
};

struct TaskComparator {
  bool operator()(const Task* a, const Task* b) {
    return a->priority < b->priority;
  }
};

const size_t kMaxNodes = 3;

std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue;
Node* node_map[kMaxNodes];

Node* GetNode(const NodeName& name) {
  return node_map[name.v1];
}

void SetNode(const NodeName& name, Node* node) {
  node_map[name.v1] = node;
}

void PumpTasks() {
  while (!task_queue.empty()) {
    Task* task = task_queue.top();
    task_queue.pop();

    Node* node = GetNode(task->node_name);
    if (node)
      node->AcceptMessage(std::move(task->message));

    delete task;
  }
}

void PumpUntilTask(EventType type) {
  while (!task_queue.empty()) {
    Task* task = task_queue.top();

    const EventHeader* header = GetEventHeader(*task->message);
    if (header->type == type)
      return;

    task_queue.pop();

    Node* node = GetNode(task->node_name);
    if (node)
      node->AcceptMessage(std::move(task->message));

    delete task;
  }
}

void DiscardPendingTasks() {
  while (!task_queue.empty()) {
    Task* task = task_queue.top();
    task_queue.pop();
    delete task;
  }
}

int SendStringMessage(Node* node, const PortRef& port, const std::string& s) {
  size_t size = s.size() + 1;
  ScopedMessage message = TestMessage::NewUserMessage(size, 0);
  memcpy(message->mutable_payload_bytes(), s.data(), size);
  return node->SendMessage(port, std::move(message));
}

int SendStringMessageWithPort(Node* node,
                              const PortRef& port,
                              const std::string& s,
                              const PortName& sent_port_name) {
  size_t size = s.size() + 1;
  ScopedMessage message = TestMessage::NewUserMessage(size, 1);
  memcpy(message->mutable_payload_bytes(), s.data(), size);
  message->mutable_ports()[0] = sent_port_name;
  return node->SendMessage(port, std::move(message));
}

int SendStringMessageWithPort(Node* node,
                              const PortRef& port,
                              const std::string& s,
                              const PortRef& sent_port) {
  return SendStringMessageWithPort(node, port, s, sent_port.name());
}

const char* ToString(const ScopedMessage& message) {
  return static_cast<const char*>(message->payload_bytes());
}

class TestNodeDelegate : public NodeDelegate {
 public:
  explicit TestNodeDelegate(const NodeName& node_name)
      : node_name_(node_name),
        drop_messages_(false),
        read_messages_(true),
        save_messages_(false) {
  }

  void set_drop_messages(bool value) { drop_messages_ = value; }
  void set_read_messages(bool value) { read_messages_ = value; }
  void set_save_messages(bool value) { save_messages_ = value; }

  bool GetSavedMessage(ScopedMessage* message) {
    if (saved_messages_.empty()) {
      message->reset();
      return false;
    }
    *message = std::move(saved_messages_.front());
    saved_messages_.pop();
    return true;
  }

  void GenerateRandomPortName(PortName* port_name) override {
    static uint64_t next_port_name = 1;
    port_name->v1 = next_port_name++;
    port_name->v2 = 0;
  }

  void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
    message->reset(new TestMessage(num_header_bytes, 0, 0));
  }

  void ForwardMessage(const NodeName& node_name,
                      ScopedMessage message) override {
    if (drop_messages_) {
      DVLOG(1) << "Dropping ForwardMessage from node "
               << node_name_ << " to " << node_name;
      ClosePortsInMessage(GetNode(node_name), message.get());
      return;
    }
    DVLOG(1) << "ForwardMessage from node "
             << node_name_ << " to " << node_name;
    task_queue.push(new Task(node_name, std::move(message)));
  }

  void BroadcastMessage(ScopedMessage message) override {
    for (size_t i = 0; i < kMaxNodes; ++i) {
      Node* node = node_map[i];
      // Broadcast doesn't deliver to the local node.
      if (node && node != GetNode(node_name_)) {
        // NOTE: We only need to support broadcast of events, which have no
        // payload or ports bytes.
        ScopedMessage new_message(
            new TestMessage(message->num_header_bytes(), 0, 0));
        memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
               message->num_header_bytes());
        node->AcceptMessage(std::move(new_message));
      }
    }
  }

  void PortStatusChanged(const PortRef& port) override {
    DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_;
    if (!read_messages_)
      return;
    Node* node = GetNode(node_name_);
    for (;;) {
      ScopedMessage message;
      int rv = node->GetMessage(port, &message);
      EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED);
      if (rv == ERROR_PORT_PEER_CLOSED || !message)
        break;
      if (save_messages_) {
        SaveMessage(std::move(message));
      } else {
        LogMessage(message.get());
        for (size_t i = 0; i < message->num_ports(); ++i) {
          std::stringstream buf;
          buf << "got port: " << message->ports()[i];

          PortRef received_port;
          node->GetPort(message->ports()[i], &received_port);

          SendStringMessage(node, received_port, buf.str());

          // Avoid leaking these ports.
          node->ClosePort(received_port);
        }
      }
    }
  }

 private:
  void SaveMessage(ScopedMessage message) {
    saved_messages_.emplace(std::move(message));
  }

  std::queue<ScopedMessage> saved_messages_;
  NodeName node_name_;
  bool drop_messages_;
  bool read_messages_;
  bool save_messages_;
};

class PortsTest : public testing::Test {
 public:
  void SetUp() override {
    DiscardPendingTasks();
    SetNode(NodeName(0, 1), nullptr);
    SetNode(NodeName(1, 1), nullptr);
    SetNode(NodeName(2, 1), nullptr);
  }
};

}  // namespace

TEST_F(PortsTest, Basic1) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  SetNode(node0_name, &node0);

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  SetNode(node1_name, &node1);

  // Setup pipe between node0 and node1.
  PortRef x0, x1;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
  EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
  EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));

  // Transfer a port from node0 to node1.
  PortRef a0, a1;
  EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));

  EXPECT_EQ(OK, node0.ClosePort(a0));

  EXPECT_EQ(OK, node0.ClosePort(x0));
  EXPECT_EQ(OK, node1.ClosePort(x1));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, Basic2) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  SetNode(node0_name, &node0);

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  SetNode(node1_name, &node1);

  // Setup pipe between node0 and node1.
  PortRef x0, x1;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
  EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
  EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));

  PortRef b0, b1;
  EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1));
  EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again"));

  // This may cause a SendMessage(b1) failure.
  EXPECT_EQ(OK, node0.ClosePort(b0));

  EXPECT_EQ(OK, node0.ClosePort(x0));
  EXPECT_EQ(OK, node1.ClosePort(x1));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, Basic3) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  SetNode(node0_name, &node0);

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  SetNode(node1_name, &node1);

  // Setup pipe between node0 and node1.
  PortRef x0, x1;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
  EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
  EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));

  // Transfer a port from node0 to node1.
  PortRef a0, a1;
  EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
  EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again"));

  // Transfer a0 as well.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0));

  PortRef b0, b1;
  EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1));
  EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz"));

  // This may cause a SendMessage(b1) failure.
  EXPECT_EQ(OK, node0.ClosePort(b0));

  EXPECT_EQ(OK, node0.ClosePort(x0));
  EXPECT_EQ(OK, node1.ClosePort(x1));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, LostConnectionToNode1) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  SetNode(node0_name, &node0);

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  SetNode(node1_name, &node1);

  // Setup pipe between node0 and node1.
  PortRef x0, x1;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
  EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
  EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));

  // Transfer port to node1 and simulate a lost connection to node1. Dropping
  // events from node1 is how we simulate the lost connection.

  node1_delegate.set_drop_messages(true);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1));

  PumpTasks();

  EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));

  PumpTasks();

  EXPECT_EQ(OK, node0.ClosePort(a0));
  EXPECT_EQ(OK, node0.ClosePort(x0));
  EXPECT_EQ(OK, node1.ClosePort(x1));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, LostConnectionToNode2) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  // Setup pipe between node0 and node1.
  PortRef x0, x1;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
  EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
  EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));

  node1_delegate.set_read_messages(false);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1));

  PumpTasks();

  node1_delegate.set_drop_messages(true);

  EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));

  PumpTasks();

  ScopedMessage message;
  EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node0.ClosePort(a0));

  EXPECT_EQ(OK, node0.ClosePort(x0));

  EXPECT_EQ(OK, node1.GetMessage(x1, &message));
  EXPECT_TRUE(message);
  ClosePortsInMessage(&node1, message.get());

  EXPECT_EQ(OK, node1.ClosePort(x1));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
  // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
  // node.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  NodeName node2_name(2, 1);
  TestNodeDelegate node2_delegate(node2_name);
  Node node2(node2_name, &node2_delegate);
  node_map[2] = &node2;

  node1_delegate.set_save_messages(true);

  // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
  EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
  EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&C));
  EXPECT_EQ(OK, node2.CreateUninitializedPort(&D));
  EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name()));
  EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name()));

  // Create E-F and send F over A to node 1.
  PortRef E, F;
  EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F));

  PumpTasks();

  ScopedMessage message;
  ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());

  EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F));

  // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
  // will trivially become aware of the loss, and this test verifies that the
  // port A on node 0 will eventually also become aware of it.

  EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F));

  node_map[2] = nullptr;
  EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name));

  PumpTasks();

  // Port F should be gone.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F));

  // Port E should have detected peer closure despite the fact that there is
  // no longer a continuous route from F to E over which the event could travel.
  PortStatus status;
  EXPECT_EQ(OK, node0.GetStatus(E, &status));
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node0.ClosePort(A));
  EXPECT_EQ(OK, node1.ClosePort(B));
  EXPECT_EQ(OK, node1.ClosePort(C));
  EXPECT_EQ(OK, node0.ClosePort(E));

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
  // Tests that a proxy gets cleaned up when its direct peer lives on a lost
  // node and it's predecessor lives on the same node.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  node1_delegate.set_save_messages(true);

  // Create A-B spanning nodes 0 and 1.
  PortRef A, B;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
  EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
  EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));

  // Create C-D and send D over A to node 1.
  PortRef C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D));

  // Pump tasks until the start of port collapse for port D, which should become
  // a proxy.
  PumpUntilTask(EventType::kObserveProxy);

  ScopedMessage message;
  ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());

  PortRef E;
  EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
  PumpTasks();

  // Port C should have detected peer closure.
  PortStatus status;
  EXPECT_EQ(OK, node0.GetStatus(C, &status));
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node0.ClosePort(A));
  EXPECT_EQ(OK, node1.ClosePort(B));
  EXPECT_EQ(OK, node0.ClosePort(C));
  EXPECT_EQ(OK, node1.ClosePort(E));

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, GetMessage1) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));

  ScopedMessage message;
  EXPECT_EQ(OK, node0.GetMessage(a0, &message));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node0.ClosePort(a1));

  EXPECT_EQ(OK, node0.GetMessage(a0, &message));
  EXPECT_FALSE(message);

  PumpTasks();

  EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node0.ClosePort(a0));

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, GetMessage2) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_read_messages(false);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));

  EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1"));

  ScopedMessage message;
  EXPECT_EQ(OK, node0.GetMessage(a0, &message));

  ASSERT_TRUE(message);
  EXPECT_EQ(0, strcmp("1", ToString(message)));

  EXPECT_EQ(OK, node0.ClosePort(a0));
  EXPECT_EQ(OK, node0.ClosePort(a1));

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, GetMessage3) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_read_messages(false);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));

  const char* kStrings[] = {
    "1",
    "2",
    "3"
  };

  for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
    EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i]));

  ScopedMessage message;
  for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
    EXPECT_EQ(OK, node0.GetMessage(a0, &message));
    ASSERT_TRUE(message);
    EXPECT_EQ(0, strcmp(kStrings[i], ToString(message)));
    DVLOG(1) << "got " << kStrings[i];
  }

  EXPECT_EQ(OK, node0.ClosePort(a0));
  EXPECT_EQ(OK, node0.ClosePort(a1));

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, Delegation1) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  SetNode(node0_name, &node0);

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  node0_delegate.set_save_messages(true);
  node1_delegate.set_save_messages(true);

  // Setup pipe between node0 and node1.
  PortRef x0, x1;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
  EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
  EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
  EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));

  // In this test, we send a message to a port that has been moved.

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));

  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1));

  PumpTasks();

  ScopedMessage message;
  ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));

  ASSERT_EQ(1u, message->num_ports());

  // This is "a1" from the point of view of node1.
  PortName a2_name = message->ports()[0];

  EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name));

  PumpTasks();

  EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello"));

  PumpTasks();

  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));

  ASSERT_EQ(1u, message->num_ports());

  // This is "a2" from the point of view of node1.
  PortName a3_name = message->ports()[0];

  PortRef a3;
  EXPECT_EQ(OK, node0.GetPort(a3_name, &a3));

  EXPECT_EQ(0, strcmp("a2", ToString(message)));

  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));

  EXPECT_EQ(0u, message->num_ports());
  EXPECT_EQ(0, strcmp("hello", ToString(message)));

  EXPECT_EQ(OK, node0.ClosePort(a0));
  EXPECT_EQ(OK, node0.ClosePort(a3));

  EXPECT_EQ(OK, node0.ClosePort(x0));
  EXPECT_EQ(OK, node1.ClosePort(x1));

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, Delegation2) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  SetNode(node0_name, &node0);

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  node0_delegate.set_save_messages(true);
  node1_delegate.set_save_messages(true);

  for (int i = 0; i < 10; ++i) {
    // Setup pipe a<->b between node0 and node1.
    PortRef A, B;
    EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
    EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
    EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
    EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));

    PortRef C, D;
    EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));

    PortRef E, F;
    EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));

    // Pass D over A to B.
    EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D));

    // Pass F over C to D.
    EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F));

    // This message should find its way to node1.
    EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello"));

    PumpTasks();

    EXPECT_EQ(OK, node0.ClosePort(C));
    EXPECT_EQ(OK, node0.ClosePort(E));

    EXPECT_EQ(OK, node0.ClosePort(A));
    EXPECT_EQ(OK, node1.ClosePort(B));

    for (;;) {
      ScopedMessage message;
      if (node1_delegate.GetSavedMessage(&message)) {
        ClosePortsInMessage(&node1, message.get());
        if (strcmp("hello", ToString(message)) == 0)
          break;
      } else {
        ASSERT_TRUE(false);  // "hello" message not delivered!
        break;
      }
    }

    PumpTasks();  // Because ClosePort may have generated tasks.
  }

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, SendUninitialized) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  PortRef x0;
  EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
  EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
            SendStringMessage(&node0, x0, "oops"));
  EXPECT_EQ(OK, node0.ClosePort(x0));
  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, SendFailure) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_save_messages(true);

  PortRef A, B;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));

  // Try to send A over itself.

  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
            SendStringMessageWithPort(&node0, A, "oops", A));

  // Try to send B over A.

  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
            SendStringMessageWithPort(&node0, A, "nope", B));

  // B should be closed immediately.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));

  PumpTasks();

  // There should have been no messages accepted.
  ScopedMessage message;
  EXPECT_FALSE(node0_delegate.GetSavedMessage(&message));

  EXPECT_EQ(OK, node0.ClosePort(A));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, DontLeakUnreceivedPorts) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_read_messages(false);

  PortRef A, B;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));

  PortRef C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));

  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));

  PumpTasks();

  EXPECT_EQ(OK, node0.ClosePort(C));

  EXPECT_EQ(OK, node0.ClosePort(A));
  EXPECT_EQ(OK, node0.ClosePort(B));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_save_messages(true);

  PortRef A, B;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));

  PortRef C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));

  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));

  ScopedMessage message;
  EXPECT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());

  PortRef E;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));

  EXPECT_TRUE(node0.CanShutdownCleanly(true));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(true));
  EXPECT_FALSE(node0.CanShutdownCleanly(false));

  EXPECT_EQ(OK, node0.ClosePort(A));
  EXPECT_EQ(OK, node0.ClosePort(B));
  EXPECT_EQ(OK, node0.ClosePort(C));
  EXPECT_EQ(OK, node0.ClosePort(E));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, ProxyCollapse1) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_save_messages(true);

  PortRef A, B;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));

  PortRef X, Y;
  EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Send B and receive it as C.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));

  // Send C and receive it as D.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));

  // Send D and receive it as E.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D));
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node0.ClosePort(X));
  EXPECT_EQ(OK, node0.ClosePort(Y));

  EXPECT_EQ(OK, node0.ClosePort(A));
  EXPECT_EQ(OK, node0.ClosePort(E));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, ProxyCollapse2) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_save_messages(true);

  PortRef A, B;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));

  PortRef X, Y;
  EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Send B and receive it as C.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));

  // Send A and receive it as D.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));

  // At this point we have a scenario with:
  //
  // D -> [B] -> C -> [A]
  //
  // Ensure that the proxies can collapse.

  EXPECT_EQ(OK, node0.ClosePort(X));
  EXPECT_EQ(OK, node0.ClosePort(Y));

  EXPECT_EQ(OK, node0.ClosePort(C));
  EXPECT_EQ(OK, node0.ClosePort(D));

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, SendWithClosedPeer) {
  // This tests that if a port is sent when its peer is already known to be
  // closed, the newly created port will be aware of that peer closure, and the
  // proxy will eventually collapse.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_read_messages(false);

  // Send a message from A to B, then close A.
  PortRef A, B;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
  EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
  EXPECT_EQ(OK, node0.ClosePort(A));

  PumpTasks();

  // Now send B over X-Y as new port C.
  PortRef X, Y;
  EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));

  node0_delegate.set_read_messages(true);
  node0_delegate.set_save_messages(true);
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));

  EXPECT_EQ(OK, node0.ClosePort(X));
  EXPECT_EQ(OK, node0.ClosePort(Y));

  ScopedMessage message;
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());

  PortRef C;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));

  PumpTasks();

  // C should receive the message originally sent to B, and it should also be
  // aware of A's closure.

  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  EXPECT_EQ(0, strcmp("hey", ToString(message)));

  PortStatus status;
  EXPECT_EQ(OK, node0.GetStatus(C, &status));
  EXPECT_FALSE(status.receiving_messages);
  EXPECT_FALSE(status.has_messages);
  EXPECT_TRUE(status.peer_closed);

  node0.ClosePort(C);

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, SendWithClosedPeerSent) {
  // This tests that if a port is closed while some number of proxies are still
  // routing messages (directly or indirectly) to it, that the peer port is
  // eventually notified of the closure, and the dead-end proxies will
  // eventually be removed.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  node0_delegate.set_save_messages(true);

  PortRef X, Y;
  EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));

  PortRef A, B;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));

  ScopedMessage message;

  // Send A as new port C.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));

  // Send C as new port D.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));

  node0_delegate.set_read_messages(false);

  // Send a message to B through D, then close D.
  EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
  EXPECT_EQ(OK, node0.ClosePort(D));

  PumpTasks();

  // Now send B as new port E.

  node0_delegate.set_read_messages(true);
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));

  EXPECT_EQ(OK, node0.ClosePort(X));
  EXPECT_EQ(OK, node0.ClosePort(Y));

  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());

  PortRef E;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));

  PumpTasks();

  // E should receive the message originally sent to B, and it should also be
  // aware of D's closure.

  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  EXPECT_EQ(0, strcmp("hey", ToString(message)));

  PortStatus status;
  EXPECT_EQ(OK, node0.GetStatus(E, &status));
  EXPECT_FALSE(status.receiving_messages);
  EXPECT_FALSE(status.has_messages);
  EXPECT_TRUE(status.peer_closed);

  node0.ClosePort(E);

  PumpTasks();

  EXPECT_TRUE(node0.CanShutdownCleanly(false));
}

TEST_F(PortsTest, MergePorts) {
  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));

  node0_delegate.set_read_messages(false);
  node1_delegate.set_save_messages(true);

  // Write a message on A.
  EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));

  PumpTasks();

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));

  PumpTasks();

  // Expect only two receiving ports to be left after pumping tasks.
  EXPECT_TRUE(node0.CanShutdownCleanly(true));
  EXPECT_TRUE(node1.CanShutdownCleanly(true));

  // Expect D to have received the message sent on A.
  ScopedMessage message;
  ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
  EXPECT_EQ(0, strcmp("hey", ToString(message)));

  EXPECT_EQ(OK, node0.ClosePort(A));
  EXPECT_EQ(OK, node1.ClosePort(D));

  // No more ports should be open.
  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, MergePortWithClosedPeer1) {
  // This tests that the right thing happens when initiating a merge on a port
  // whose peer has already been closed.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));

  node0_delegate.set_read_messages(false);
  node1_delegate.set_save_messages(true);

  // Write a message on A.
  EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));

  PumpTasks();

  // Close A.
  EXPECT_EQ(OK, node0.ClosePort(A));

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));

  PumpTasks();

  // Expect only one receiving port to be left after pumping tasks.
  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(true));

  // Expect D to have received the message sent on A.
  ScopedMessage message;
  ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
  EXPECT_EQ(0, strcmp("hey", ToString(message)));

  EXPECT_EQ(OK, node1.ClosePort(D));

  // No more ports should be open.
  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, MergePortWithClosedPeer2) {
  // This tests that the right thing happens when merging into a port whose peer
  // has already been closed.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));

  node0_delegate.set_save_messages(true);
  node1_delegate.set_read_messages(false);

  // Write a message on D.
  EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));

  PumpTasks();

  // Close D.
  EXPECT_EQ(OK, node1.ClosePort(D));

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));

  PumpTasks();

  // Expect only one receiving port to be left after pumping tasks.
  EXPECT_TRUE(node0.CanShutdownCleanly(true));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));

  // Expect A to have received the message sent on D.
  ScopedMessage message;
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  EXPECT_EQ(0, strcmp("hey", ToString(message)));

  EXPECT_EQ(OK, node0.ClosePort(A));

  // No more ports should be open.
  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, MergePortsWithClosedPeers) {
  // This tests that no residual ports are left behind if two ports are merged
  // when both of their peers have been closed.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));

  node0_delegate.set_save_messages(true);
  node1_delegate.set_read_messages(false);

  // Close A and D.
  EXPECT_EQ(OK, node0.ClosePort(A));
  EXPECT_EQ(OK, node1.ClosePort(D));

  PumpTasks();

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));

  PumpTasks();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, MergePortsWithMovedPeers) {
  // This tests that no ports can be merged successfully even if their peers
  // are moved around.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  node0_delegate.set_save_messages(true);
  node1_delegate.set_read_messages(false);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));

  // Set up another pair X-Y for moving ports on node0.
  PortRef X, Y;
  EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Move A to new port E.
  EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node0.ClosePort(X));
  EXPECT_EQ(OK, node0.ClosePort(Y));

  node0_delegate.set_read_messages(false);

  // Write messages on E and D.
  EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey"));
  EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi"));

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));

  node0_delegate.set_read_messages(true);
  node1_delegate.set_read_messages(true);
  node1_delegate.set_save_messages(true);

  PumpTasks();

  // Expect to receive D's message on E and E's message on D.
  ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
  EXPECT_EQ(0, strcmp("hi", ToString(message)));
  ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
  EXPECT_EQ(0, strcmp("hey", ToString(message)));

  // Close E and D.
  EXPECT_EQ(OK, node0.ClosePort(E));
  EXPECT_EQ(OK, node1.ClosePort(D));

  PumpTasks();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

TEST_F(PortsTest, MergePortsFailsGracefully) {
  // This tests that the system remains in a well-defined state if something
  // goes wrong during port merge.

  NodeName node0_name(0, 1);
  TestNodeDelegate node0_delegate(node0_name);
  Node node0(node0_name, &node0_delegate);
  node_map[0] = &node0;

  NodeName node1_name(1, 1);
  TestNodeDelegate node1_delegate(node1_name);
  Node node1(node1_name, &node1_delegate);
  node_map[1] = &node1;

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));

  PumpTasks();

  // Initiate a merge between B and C.
  EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));

  // Move C to a new port E. This is dumb and nobody should do it, but it's
  // possible. MergePorts will fail as a result because C won't be in a
  // receiving state when the event arrives at node1, so B should be closed.
  ScopedMessage message;
  PortRef X, Y;
  EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y));
  node1_delegate.set_save_messages(true);
  EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C));
  ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E));
  EXPECT_EQ(OK, node1.ClosePort(X));
  EXPECT_EQ(OK, node1.ClosePort(Y));

  // C goes away as a result of normal proxy removal.
  PumpTasks();

  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C));

  // B should have been closed cleanly.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));

  // Close A, D, and E.
  EXPECT_EQ(OK, node0.ClosePort(A));
  EXPECT_EQ(OK, node1.ClosePort(D));
  EXPECT_EQ(OK, node1.ClosePort(E));

  PumpTasks();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.CanShutdownCleanly(false));
  EXPECT_TRUE(node1.CanShutdownCleanly(false));
}

}  // namespace test
}  // namespace ports
}  // namespace edk
}  // namespace mojo