// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <map>
#include <queue>
#include <sstream>
#include "base/logging.h"
#include "base/rand_util.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/ports/node_delegate.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace edk {
namespace ports {
namespace test {
namespace {
void LogMessage(const Message* message) {
std::stringstream ports;
for (size_t i = 0; i < message->num_ports(); ++i) {
if (i > 0)
ports << ",";
ports << message->ports()[i];
}
DVLOG(1) << "message: \""
<< static_cast<const char*>(message->payload_bytes())
<< "\" ports=[" << ports.str() << "]";
}
void ClosePortsInMessage(Node* node, Message* message) {
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef port;
ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port));
EXPECT_EQ(OK, node->ClosePort(port));
}
}
class TestMessage : public Message {
public:
static ScopedMessage NewUserMessage(size_t num_payload_bytes,
size_t num_ports) {
return ScopedMessage(new TestMessage(num_payload_bytes, num_ports));
}
TestMessage(size_t num_payload_bytes, size_t num_ports)
: Message(num_payload_bytes, num_ports) {
start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes];
InitializeUserMessageHeader(start_);
}
TestMessage(size_t num_header_bytes,
size_t num_payload_bytes,
size_t num_ports_bytes)
: Message(num_header_bytes,
num_payload_bytes,
num_ports_bytes) {
start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes];
}
~TestMessage() override {
delete[] start_;
}
};
struct Task {
Task(NodeName node_name, ScopedMessage message)
: node_name(node_name),
message(std::move(message)),
priority(base::RandUint64()) {
}
NodeName node_name;
ScopedMessage message;
uint64_t priority;
};
struct TaskComparator {
bool operator()(const Task* a, const Task* b) {
return a->priority < b->priority;
}
};
const size_t kMaxNodes = 3;
std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue;
Node* node_map[kMaxNodes];
Node* GetNode(const NodeName& name) {
return node_map[name.v1];
}
void SetNode(const NodeName& name, Node* node) {
node_map[name.v1] = node;
}
void PumpTasks() {
while (!task_queue.empty()) {
Task* task = task_queue.top();
task_queue.pop();
Node* node = GetNode(task->node_name);
if (node)
node->AcceptMessage(std::move(task->message));
delete task;
}
}
void PumpUntilTask(EventType type) {
while (!task_queue.empty()) {
Task* task = task_queue.top();
const EventHeader* header = GetEventHeader(*task->message);
if (header->type == type)
return;
task_queue.pop();
Node* node = GetNode(task->node_name);
if (node)
node->AcceptMessage(std::move(task->message));
delete task;
}
}
void DiscardPendingTasks() {
while (!task_queue.empty()) {
Task* task = task_queue.top();
task_queue.pop();
delete task;
}
}
int SendStringMessage(Node* node, const PortRef& port, const std::string& s) {
size_t size = s.size() + 1;
ScopedMessage message = TestMessage::NewUserMessage(size, 0);
memcpy(message->mutable_payload_bytes(), s.data(), size);
return node->SendMessage(port, std::move(message));
}
int SendStringMessageWithPort(Node* node,
const PortRef& port,
const std::string& s,
const PortName& sent_port_name) {
size_t size = s.size() + 1;
ScopedMessage message = TestMessage::NewUserMessage(size, 1);
memcpy(message->mutable_payload_bytes(), s.data(), size);
message->mutable_ports()[0] = sent_port_name;
return node->SendMessage(port, std::move(message));
}
int SendStringMessageWithPort(Node* node,
const PortRef& port,
const std::string& s,
const PortRef& sent_port) {
return SendStringMessageWithPort(node, port, s, sent_port.name());
}
const char* ToString(const ScopedMessage& message) {
return static_cast<const char*>(message->payload_bytes());
}
class TestNodeDelegate : public NodeDelegate {
public:
explicit TestNodeDelegate(const NodeName& node_name)
: node_name_(node_name),
drop_messages_(false),
read_messages_(true),
save_messages_(false) {
}
void set_drop_messages(bool value) { drop_messages_ = value; }
void set_read_messages(bool value) { read_messages_ = value; }
void set_save_messages(bool value) { save_messages_ = value; }
bool GetSavedMessage(ScopedMessage* message) {
if (saved_messages_.empty()) {
message->reset();
return false;
}
*message = std::move(saved_messages_.front());
saved_messages_.pop();
return true;
}
void GenerateRandomPortName(PortName* port_name) override {
static uint64_t next_port_name = 1;
port_name->v1 = next_port_name++;
port_name->v2 = 0;
}
void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
message->reset(new TestMessage(num_header_bytes, 0, 0));
}
void ForwardMessage(const NodeName& node_name,
ScopedMessage message) override {
if (drop_messages_) {
DVLOG(1) << "Dropping ForwardMessage from node "
<< node_name_ << " to " << node_name;
ClosePortsInMessage(GetNode(node_name), message.get());
return;
}
DVLOG(1) << "ForwardMessage from node "
<< node_name_ << " to " << node_name;
task_queue.push(new Task(node_name, std::move(message)));
}
void BroadcastMessage(ScopedMessage message) override {
for (size_t i = 0; i < kMaxNodes; ++i) {
Node* node = node_map[i];
// Broadcast doesn't deliver to the local node.
if (node && node != GetNode(node_name_)) {
// NOTE: We only need to support broadcast of events, which have no
// payload or ports bytes.
ScopedMessage new_message(
new TestMessage(message->num_header_bytes(), 0, 0));
memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
message->num_header_bytes());
node->AcceptMessage(std::move(new_message));
}
}
}
void PortStatusChanged(const PortRef& port) override {
DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_;
if (!read_messages_)
return;
Node* node = GetNode(node_name_);
for (;;) {
ScopedMessage message;
int rv = node->GetMessage(port, &message);
EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED);
if (rv == ERROR_PORT_PEER_CLOSED || !message)
break;
if (save_messages_) {
SaveMessage(std::move(message));
} else {
LogMessage(message.get());
for (size_t i = 0; i < message->num_ports(); ++i) {
std::stringstream buf;
buf << "got port: " << message->ports()[i];
PortRef received_port;
node->GetPort(message->ports()[i], &received_port);
SendStringMessage(node, received_port, buf.str());
// Avoid leaking these ports.
node->ClosePort(received_port);
}
}
}
}
private:
void SaveMessage(ScopedMessage message) {
saved_messages_.emplace(std::move(message));
}
std::queue<ScopedMessage> saved_messages_;
NodeName node_name_;
bool drop_messages_;
bool read_messages_;
bool save_messages_;
};
class PortsTest : public testing::Test {
public:
void SetUp() override {
DiscardPendingTasks();
SetNode(NodeName(0, 1), nullptr);
SetNode(NodeName(1, 1), nullptr);
SetNode(NodeName(2, 1), nullptr);
}
};
} // namespace
TEST_F(PortsTest, Basic1) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
SetNode(node0_name, &node0);
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
SetNode(node1_name, &node1);
// Setup pipe between node0 and node1.
PortRef x0, x1;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
// Transfer a port from node0 to node1.
PortRef a0, a1;
EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
EXPECT_EQ(OK, node0.ClosePort(a0));
EXPECT_EQ(OK, node0.ClosePort(x0));
EXPECT_EQ(OK, node1.ClosePort(x1));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, Basic2) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
SetNode(node0_name, &node0);
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
SetNode(node1_name, &node1);
// Setup pipe between node0 and node1.
PortRef x0, x1;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
PortRef b0, b1;
EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1));
EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again"));
// This may cause a SendMessage(b1) failure.
EXPECT_EQ(OK, node0.ClosePort(b0));
EXPECT_EQ(OK, node0.ClosePort(x0));
EXPECT_EQ(OK, node1.ClosePort(x1));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, Basic3) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
SetNode(node0_name, &node0);
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
SetNode(node1_name, &node1);
// Setup pipe between node0 and node1.
PortRef x0, x1;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
// Transfer a port from node0 to node1.
PortRef a0, a1;
EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again"));
// Transfer a0 as well.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0));
PortRef b0, b1;
EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1));
EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz"));
// This may cause a SendMessage(b1) failure.
EXPECT_EQ(OK, node0.ClosePort(b0));
EXPECT_EQ(OK, node0.ClosePort(x0));
EXPECT_EQ(OK, node1.ClosePort(x1));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, LostConnectionToNode1) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
SetNode(node0_name, &node0);
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
SetNode(node1_name, &node1);
// Setup pipe between node0 and node1.
PortRef x0, x1;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
// Transfer port to node1 and simulate a lost connection to node1. Dropping
// events from node1 is how we simulate the lost connection.
node1_delegate.set_drop_messages(true);
PortRef a0, a1;
EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1));
PumpTasks();
EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
PumpTasks();
EXPECT_EQ(OK, node0.ClosePort(a0));
EXPECT_EQ(OK, node0.ClosePort(x0));
EXPECT_EQ(OK, node1.ClosePort(x1));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, LostConnectionToNode2) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
// Setup pipe between node0 and node1.
PortRef x0, x1;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
node1_delegate.set_read_messages(false);
PortRef a0, a1;
EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1));
PumpTasks();
node1_delegate.set_drop_messages(true);
EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
PumpTasks();
ScopedMessage message;
EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node0.ClosePort(a0));
EXPECT_EQ(OK, node0.ClosePort(x0));
EXPECT_EQ(OK, node1.GetMessage(x1, &message));
EXPECT_TRUE(message);
ClosePortsInMessage(&node1, message.get());
EXPECT_EQ(OK, node1.ClosePort(x1));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
// Tests that a proxy gets cleaned up when its indirect peer lives on a lost
// node.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
NodeName node2_name(2, 1);
TestNodeDelegate node2_delegate(node2_name);
Node node2(node2_name, &node2_delegate);
node_map[2] = &node2;
node1_delegate.set_save_messages(true);
// Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&C));
EXPECT_EQ(OK, node2.CreateUninitializedPort(&D));
EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name()));
EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name()));
// Create E-F and send F over A to node 1.
PortRef E, F;
EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F));
PumpTasks();
ScopedMessage message;
ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F));
// Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
// will trivially become aware of the loss, and this test verifies that the
// port A on node 0 will eventually also become aware of it.
EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F));
node_map[2] = nullptr;
EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name));
PumpTasks();
// Port F should be gone.
EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F));
// Port E should have detected peer closure despite the fact that there is
// no longer a continuous route from F to E over which the event could travel.
PortStatus status;
EXPECT_EQ(OK, node0.GetStatus(E, &status));
EXPECT_TRUE(status.peer_closed);
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node1.ClosePort(B));
EXPECT_EQ(OK, node1.ClosePort(C));
EXPECT_EQ(OK, node0.ClosePort(E));
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
// Tests that a proxy gets cleaned up when its direct peer lives on a lost
// node and it's predecessor lives on the same node.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
node1_delegate.set_save_messages(true);
// Create A-B spanning nodes 0 and 1.
PortRef A, B;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
// Create C-D and send D over A to node 1.
PortRef C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D));
// Pump tasks until the start of port collapse for port D, which should become
// a proxy.
PumpUntilTask(EventType::kObserveProxy);
ScopedMessage message;
ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E));
EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
PumpTasks();
// Port C should have detected peer closure.
PortStatus status;
EXPECT_EQ(OK, node0.GetStatus(C, &status));
EXPECT_TRUE(status.peer_closed);
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node1.ClosePort(B));
EXPECT_EQ(OK, node0.ClosePort(C));
EXPECT_EQ(OK, node1.ClosePort(E));
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, GetMessage1) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
PortRef a0, a1;
EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
ScopedMessage message;
EXPECT_EQ(OK, node0.GetMessage(a0, &message));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node0.ClosePort(a1));
EXPECT_EQ(OK, node0.GetMessage(a0, &message));
EXPECT_FALSE(message);
PumpTasks();
EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node0.ClosePort(a0));
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, GetMessage2) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_read_messages(false);
PortRef a0, a1;
EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1"));
ScopedMessage message;
EXPECT_EQ(OK, node0.GetMessage(a0, &message));
ASSERT_TRUE(message);
EXPECT_EQ(0, strcmp("1", ToString(message)));
EXPECT_EQ(OK, node0.ClosePort(a0));
EXPECT_EQ(OK, node0.ClosePort(a1));
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, GetMessage3) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_read_messages(false);
PortRef a0, a1;
EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
const char* kStrings[] = {
"1",
"2",
"3"
};
for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i]));
ScopedMessage message;
for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
EXPECT_EQ(OK, node0.GetMessage(a0, &message));
ASSERT_TRUE(message);
EXPECT_EQ(0, strcmp(kStrings[i], ToString(message)));
DVLOG(1) << "got " << kStrings[i];
}
EXPECT_EQ(OK, node0.ClosePort(a0));
EXPECT_EQ(OK, node0.ClosePort(a1));
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, Delegation1) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
SetNode(node0_name, &node0);
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
node0_delegate.set_save_messages(true);
node1_delegate.set_save_messages(true);
// Setup pipe between node0 and node1.
PortRef x0, x1;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
// In this test, we send a message to a port that has been moved.
PortRef a0, a1;
EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1));
PumpTasks();
ScopedMessage message;
ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
// This is "a1" from the point of view of node1.
PortName a2_name = message->ports()[0];
EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name));
PumpTasks();
EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello"));
PumpTasks();
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
// This is "a2" from the point of view of node1.
PortName a3_name = message->ports()[0];
PortRef a3;
EXPECT_EQ(OK, node0.GetPort(a3_name, &a3));
EXPECT_EQ(0, strcmp("a2", ToString(message)));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
EXPECT_EQ(0u, message->num_ports());
EXPECT_EQ(0, strcmp("hello", ToString(message)));
EXPECT_EQ(OK, node0.ClosePort(a0));
EXPECT_EQ(OK, node0.ClosePort(a3));
EXPECT_EQ(OK, node0.ClosePort(x0));
EXPECT_EQ(OK, node1.ClosePort(x1));
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, Delegation2) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
SetNode(node0_name, &node0);
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
node0_delegate.set_save_messages(true);
node1_delegate.set_save_messages(true);
for (int i = 0; i < 10; ++i) {
// Setup pipe a<->b between node0 and node1.
PortRef A, B;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
PortRef C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
PortRef E, F;
EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
// Pass D over A to B.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D));
// Pass F over C to D.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F));
// This message should find its way to node1.
EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello"));
PumpTasks();
EXPECT_EQ(OK, node0.ClosePort(C));
EXPECT_EQ(OK, node0.ClosePort(E));
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node1.ClosePort(B));
for (;;) {
ScopedMessage message;
if (node1_delegate.GetSavedMessage(&message)) {
ClosePortsInMessage(&node1, message.get());
if (strcmp("hello", ToString(message)) == 0)
break;
} else {
ASSERT_TRUE(false); // "hello" message not delivered!
break;
}
}
PumpTasks(); // Because ClosePort may have generated tasks.
}
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, SendUninitialized) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
PortRef x0;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
SendStringMessage(&node0, x0, "oops"));
EXPECT_EQ(OK, node0.ClosePort(x0));
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, SendFailure) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_save_messages(true);
PortRef A, B;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
// Try to send A over itself.
EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
SendStringMessageWithPort(&node0, A, "oops", A));
// Try to send B over A.
EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
SendStringMessageWithPort(&node0, A, "nope", B));
// B should be closed immediately.
EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
PumpTasks();
// There should have been no messages accepted.
ScopedMessage message;
EXPECT_FALSE(node0_delegate.GetSavedMessage(&message));
EXPECT_EQ(OK, node0.ClosePort(A));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, DontLeakUnreceivedPorts) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_read_messages(false);
PortRef A, B;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
PortRef C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
PumpTasks();
EXPECT_EQ(OK, node0.ClosePort(C));
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node0.ClosePort(B));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_save_messages(true);
PortRef A, B;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
PortRef C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
ScopedMessage message;
EXPECT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
EXPECT_TRUE(node0.CanShutdownCleanly(true));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(true));
EXPECT_FALSE(node0.CanShutdownCleanly(false));
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node0.ClosePort(B));
EXPECT_EQ(OK, node0.ClosePort(C));
EXPECT_EQ(OK, node0.ClosePort(E));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, ProxyCollapse1) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_save_messages(true);
PortRef A, B;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
PortRef X, Y;
EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
ScopedMessage message;
// Send B and receive it as C.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
// Send C and receive it as D.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef D;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
// Send D and receive it as E.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
EXPECT_EQ(OK, node0.ClosePort(X));
EXPECT_EQ(OK, node0.ClosePort(Y));
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node0.ClosePort(E));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, ProxyCollapse2) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_save_messages(true);
PortRef A, B;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
PortRef X, Y;
EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
ScopedMessage message;
// Send B and receive it as C.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
// Send A and receive it as D.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef D;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
// At this point we have a scenario with:
//
// D -> [B] -> C -> [A]
//
// Ensure that the proxies can collapse.
EXPECT_EQ(OK, node0.ClosePort(X));
EXPECT_EQ(OK, node0.ClosePort(Y));
EXPECT_EQ(OK, node0.ClosePort(C));
EXPECT_EQ(OK, node0.ClosePort(D));
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, SendWithClosedPeer) {
// This tests that if a port is sent when its peer is already known to be
// closed, the newly created port will be aware of that peer closure, and the
// proxy will eventually collapse.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_read_messages(false);
// Send a message from A to B, then close A.
PortRef A, B;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
EXPECT_EQ(OK, node0.ClosePort(A));
PumpTasks();
// Now send B over X-Y as new port C.
PortRef X, Y;
EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
node0_delegate.set_read_messages(true);
node0_delegate.set_save_messages(true);
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
EXPECT_EQ(OK, node0.ClosePort(X));
EXPECT_EQ(OK, node0.ClosePort(Y));
ScopedMessage message;
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
PumpTasks();
// C should receive the message originally sent to B, and it should also be
// aware of A's closure.
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
EXPECT_EQ(0, strcmp("hey", ToString(message)));
PortStatus status;
EXPECT_EQ(OK, node0.GetStatus(C, &status));
EXPECT_FALSE(status.receiving_messages);
EXPECT_FALSE(status.has_messages);
EXPECT_TRUE(status.peer_closed);
node0.ClosePort(C);
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, SendWithClosedPeerSent) {
// This tests that if a port is closed while some number of proxies are still
// routing messages (directly or indirectly) to it, that the peer port is
// eventually notified of the closure, and the dead-end proxies will
// eventually be removed.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
node0_delegate.set_save_messages(true);
PortRef X, Y;
EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
PortRef A, B;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
ScopedMessage message;
// Send A as new port C.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
// Send C as new port D.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef D;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
node0_delegate.set_read_messages(false);
// Send a message to B through D, then close D.
EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
EXPECT_EQ(OK, node0.ClosePort(D));
PumpTasks();
// Now send B as new port E.
node0_delegate.set_read_messages(true);
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
EXPECT_EQ(OK, node0.ClosePort(X));
EXPECT_EQ(OK, node0.ClosePort(Y));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
PumpTasks();
// E should receive the message originally sent to B, and it should also be
// aware of D's closure.
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
EXPECT_EQ(0, strcmp("hey", ToString(message)));
PortStatus status;
EXPECT_EQ(OK, node0.GetStatus(E, &status));
EXPECT_FALSE(status.receiving_messages);
EXPECT_FALSE(status.has_messages);
EXPECT_TRUE(status.peer_closed);
node0.ClosePort(E);
PumpTasks();
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
TEST_F(PortsTest, MergePorts) {
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
node0_delegate.set_read_messages(false);
node1_delegate.set_save_messages(true);
// Write a message on A.
EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
PumpTasks();
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
PumpTasks();
// Expect only two receiving ports to be left after pumping tasks.
EXPECT_TRUE(node0.CanShutdownCleanly(true));
EXPECT_TRUE(node1.CanShutdownCleanly(true));
// Expect D to have received the message sent on A.
ScopedMessage message;
ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
EXPECT_EQ(0, strcmp("hey", ToString(message)));
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node1.ClosePort(D));
// No more ports should be open.
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, MergePortWithClosedPeer1) {
// This tests that the right thing happens when initiating a merge on a port
// whose peer has already been closed.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
node0_delegate.set_read_messages(false);
node1_delegate.set_save_messages(true);
// Write a message on A.
EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
PumpTasks();
// Close A.
EXPECT_EQ(OK, node0.ClosePort(A));
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
PumpTasks();
// Expect only one receiving port to be left after pumping tasks.
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(true));
// Expect D to have received the message sent on A.
ScopedMessage message;
ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
EXPECT_EQ(0, strcmp("hey", ToString(message)));
EXPECT_EQ(OK, node1.ClosePort(D));
// No more ports should be open.
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, MergePortWithClosedPeer2) {
// This tests that the right thing happens when merging into a port whose peer
// has already been closed.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
node0_delegate.set_save_messages(true);
node1_delegate.set_read_messages(false);
// Write a message on D.
EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
PumpTasks();
// Close D.
EXPECT_EQ(OK, node1.ClosePort(D));
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
PumpTasks();
// Expect only one receiving port to be left after pumping tasks.
EXPECT_TRUE(node0.CanShutdownCleanly(true));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
// Expect A to have received the message sent on D.
ScopedMessage message;
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
EXPECT_EQ(0, strcmp("hey", ToString(message)));
EXPECT_EQ(OK, node0.ClosePort(A));
// No more ports should be open.
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, MergePortsWithClosedPeers) {
// This tests that no residual ports are left behind if two ports are merged
// when both of their peers have been closed.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
node0_delegate.set_save_messages(true);
node1_delegate.set_read_messages(false);
// Close A and D.
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node1.ClosePort(D));
PumpTasks();
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
PumpTasks();
// Expect everything to have gone away.
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, MergePortsWithMovedPeers) {
// This tests that no ports can be merged successfully even if their peers
// are moved around.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
node0_delegate.set_save_messages(true);
node1_delegate.set_read_messages(false);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
// Set up another pair X-Y for moving ports on node0.
PortRef X, Y;
EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
ScopedMessage message;
// Move A to new port E.
EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
EXPECT_EQ(OK, node0.ClosePort(X));
EXPECT_EQ(OK, node0.ClosePort(Y));
node0_delegate.set_read_messages(false);
// Write messages on E and D.
EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey"));
EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi"));
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
node0_delegate.set_read_messages(true);
node1_delegate.set_read_messages(true);
node1_delegate.set_save_messages(true);
PumpTasks();
// Expect to receive D's message on E and E's message on D.
ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
EXPECT_EQ(0, strcmp("hi", ToString(message)));
ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
EXPECT_EQ(0, strcmp("hey", ToString(message)));
// Close E and D.
EXPECT_EQ(OK, node0.ClosePort(E));
EXPECT_EQ(OK, node1.ClosePort(D));
PumpTasks();
// Expect everything to have gone away.
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, MergePortsFailsGracefully) {
// This tests that the system remains in a well-defined state if something
// goes wrong during port merge.
NodeName node0_name(0, 1);
TestNodeDelegate node0_delegate(node0_name);
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
NodeName node1_name(1, 1);
TestNodeDelegate node1_delegate(node1_name);
Node node1(node1_name, &node1_delegate);
node_map[1] = &node1;
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
PumpTasks();
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
// Move C to a new port E. This is dumb and nobody should do it, but it's
// possible. MergePorts will fail as a result because C won't be in a
// receiving state when the event arrives at node1, so B should be closed.
ScopedMessage message;
PortRef X, Y;
EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y));
node1_delegate.set_save_messages(true);
EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C));
ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E));
EXPECT_EQ(OK, node1.ClosePort(X));
EXPECT_EQ(OK, node1.ClosePort(Y));
// C goes away as a result of normal proxy removal.
PumpTasks();
EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C));
// B should have been closed cleanly.
EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
// Close A, D, and E.
EXPECT_EQ(OK, node0.ClosePort(A));
EXPECT_EQ(OK, node1.ClosePort(D));
EXPECT_EQ(OK, node1.ClosePort(E));
PumpTasks();
// Expect everything to have gone away.
EXPECT_TRUE(node0.CanShutdownCleanly(false));
EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
} // namespace test
} // namespace ports
} // namespace edk
} // namespace mojo