// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <map>
#include <queue>
#include <sstream>
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/rand_util.h"
#include "base/strings/string_piece.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/ports/node_delegate.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace edk {
namespace ports {
namespace test {
namespace {
bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
return !strcmp(static_cast<const char*>(message->payload_bytes()), s.data());
}
class TestMessage : public Message {
public:
static ScopedMessage NewUserMessage(size_t num_payload_bytes,
size_t num_ports) {
return ScopedMessage(new TestMessage(num_payload_bytes, num_ports));
}
TestMessage(size_t num_payload_bytes, size_t num_ports)
: Message(num_payload_bytes, num_ports) {
start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes];
InitializeUserMessageHeader(start_);
}
TestMessage(size_t num_header_bytes,
size_t num_payload_bytes,
size_t num_ports_bytes)
: Message(num_header_bytes,
num_payload_bytes,
num_ports_bytes) {
start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes];
}
~TestMessage() override {
delete[] start_;
}
};
class TestNode;
class MessageRouter {
public:
virtual ~MessageRouter() {}
virtual void GeneratePortName(PortName* name) = 0;
virtual void ForwardMessage(TestNode* from_node,
const NodeName& node_name,
ScopedMessage message) = 0;
virtual void BroadcastMessage(TestNode* from_node, ScopedMessage message) = 0;
};
class TestNode : public NodeDelegate {
public:
explicit TestNode(uint64_t id)
: node_name_(id, 1),
node_(node_name_, this),
node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
messages_available_event_(
base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED),
idle_event_(
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::SIGNALED) {
}
~TestNode() override {
StopWhenIdle();
node_thread_.Stop();
}
const NodeName& name() const { return node_name_; }
// NOTE: Node is thread-safe.
Node& node() { return node_; }
base::WaitableEvent& idle_event() { return idle_event_; }
bool IsIdle() {
base::AutoLock lock(lock_);
return started_ && !dispatching_ &&
(incoming_messages_.empty() || (block_on_event_ && blocked_));
}
void BlockOnEvent(EventType type) {
base::AutoLock lock(lock_);
blocked_event_type_ = type;
block_on_event_ = true;
}
void Unblock() {
base::AutoLock lock(lock_);
block_on_event_ = false;
messages_available_event_.Signal();
}
void Start(MessageRouter* router) {
router_ = router;
node_thread_.Start();
node_thread_.task_runner()->PostTask(
FROM_HERE,
base::Bind(&TestNode::ProcessMessages, base::Unretained(this)));
}
void StopWhenIdle() {
base::AutoLock lock(lock_);
should_quit_ = true;
messages_available_event_.Signal();
}
void WakeUp() { messages_available_event_.Signal(); }
int SendStringMessage(const PortRef& port, const std::string& s) {
size_t size = s.size() + 1;
ScopedMessage message = TestMessage::NewUserMessage(size, 0);
memcpy(message->mutable_payload_bytes(), s.data(), size);
return node_.SendMessage(port, std::move(message));
}
int SendStringMessageWithPort(const PortRef& port,
const std::string& s,
const PortName& sent_port_name) {
size_t size = s.size() + 1;
ScopedMessage message = TestMessage::NewUserMessage(size, 1);
memcpy(message->mutable_payload_bytes(), s.data(), size);
message->mutable_ports()[0] = sent_port_name;
return node_.SendMessage(port, std::move(message));
}
int SendStringMessageWithPort(const PortRef& port,
const std::string& s,
const PortRef& sent_port) {
return SendStringMessageWithPort(port, s, sent_port.name());
}
void set_drop_messages(bool value) {
base::AutoLock lock(lock_);
drop_messages_ = value;
}
void set_save_messages(bool value) {
base::AutoLock lock(lock_);
save_messages_ = value;
}
bool ReadMessage(const PortRef& port, ScopedMessage* message) {
return node_.GetMessage(port, message, nullptr) == OK && *message;
}
bool GetSavedMessage(ScopedMessage* message) {
base::AutoLock lock(lock_);
if (saved_messages_.empty()) {
message->reset();
return false;
}
std::swap(*message, saved_messages_.front());
saved_messages_.pop();
return true;
}
void EnqueueMessage(ScopedMessage message) {
idle_event_.Reset();
// NOTE: This may be called from ForwardMessage and thus must not reenter
// |node_|.
base::AutoLock lock(lock_);
incoming_messages_.emplace(std::move(message));
messages_available_event_.Signal();
}
void GenerateRandomPortName(PortName* port_name) override {
DCHECK(router_);
router_->GeneratePortName(port_name);
}
void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
message->reset(new TestMessage(num_header_bytes, 0, 0));
}
void ForwardMessage(const NodeName& node_name,
ScopedMessage message) override {
{
base::AutoLock lock(lock_);
if (drop_messages_) {
DVLOG(1) << "Dropping ForwardMessage from node "
<< node_name_ << " to " << node_name;
base::AutoUnlock unlock(lock_);
ClosePortsInMessage(message.get());
return;
}
}
DCHECK(router_);
DVLOG(1) << "ForwardMessage from node "
<< node_name_ << " to " << node_name;
router_->ForwardMessage(this, node_name, std::move(message));
}
void BroadcastMessage(ScopedMessage message) override {
router_->BroadcastMessage(this, std::move(message));
}
void PortStatusChanged(const PortRef& port) override {
// The port may be closed, in which case we ignore the notification.
base::AutoLock lock(lock_);
if (!save_messages_)
return;
for (;;) {
ScopedMessage message;
{
base::AutoUnlock unlock(lock_);
if (!ReadMessage(port, &message))
break;
}
saved_messages_.emplace(std::move(message));
}
}
void ClosePortsInMessage(Message* message) {
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef port;
ASSERT_EQ(OK, node_.GetPort(message->ports()[i], &port));
EXPECT_EQ(OK, node_.ClosePort(port));
}
}
private:
void ProcessMessages() {
for (;;) {
messages_available_event_.Wait();
base::AutoLock lock(lock_);
if (should_quit_)
return;
dispatching_ = true;
while (!incoming_messages_.empty()) {
if (block_on_event_ &&
GetEventHeader(*incoming_messages_.front())->type ==
blocked_event_type_) {
blocked_ = true;
// Go idle if we hit a blocked event type.
break;
} else {
blocked_ = false;
}
ScopedMessage message = std::move(incoming_messages_.front());
incoming_messages_.pop();
// NOTE: AcceptMessage() can re-enter this object to call any of the
// NodeDelegate interface methods.
base::AutoUnlock unlock(lock_);
node_.AcceptMessage(std::move(message));
}
dispatching_ = false;
started_ = true;
idle_event_.Signal();
};
}
const NodeName node_name_;
Node node_;
MessageRouter* router_ = nullptr;
base::Thread node_thread_;
base::WaitableEvent messages_available_event_;
base::WaitableEvent idle_event_;
// Guards fields below.
base::Lock lock_;
bool started_ = false;
bool dispatching_ = false;
bool should_quit_ = false;
bool drop_messages_ = false;
bool save_messages_ = false;
bool blocked_ = false;
bool block_on_event_ = false;
EventType blocked_event_type_;
std::queue<ScopedMessage> incoming_messages_;
std::queue<ScopedMessage> saved_messages_;
};
class PortsTest : public testing::Test, public MessageRouter {
public:
void AddNode(TestNode* node) {
{
base::AutoLock lock(lock_);
nodes_[node->name()] = node;
}
node->Start(this);
}
void RemoveNode(TestNode* node) {
{
base::AutoLock lock(lock_);
nodes_.erase(node->name());
}
for (const auto& entry : nodes_)
entry.second->node().LostConnectionToNode(node->name());
}
// Waits until all known Nodes are idle. Message forwarding and processing
// is handled in such a way that idleness is a stable state: once all nodes in
// the system are idle, they will remain idle until the test explicitly
// initiates some further event (e.g. sending a message, closing a port, or
// removing a Node).
void WaitForIdle() {
for (;;) {
base::AutoLock global_lock(global_lock_);
bool all_nodes_idle = true;
for (const auto& entry : nodes_) {
if (!entry.second->IsIdle())
all_nodes_idle = false;
entry.second->WakeUp();
}
if (all_nodes_idle)
return;
// Wait for any Node to signal that it's idle.
base::AutoUnlock global_unlock(global_lock_);
std::vector<base::WaitableEvent*> events;
for (const auto& entry : nodes_)
events.push_back(&entry.second->idle_event());
base::WaitableEvent::WaitMany(events.data(), events.size());
}
}
void CreatePortPair(TestNode* node0,
PortRef* port0,
TestNode* node1,
PortRef* port1) {
if (node0 == node1) {
EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
} else {
EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
port1->name()));
EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
port0->name()));
}
}
private:
// MessageRouter:
void GeneratePortName(PortName* name) override {
base::AutoLock lock(lock_);
name->v1 = next_port_id_++;
name->v2 = 0;
}
void ForwardMessage(TestNode* from_node,
const NodeName& node_name,
ScopedMessage message) override {
base::AutoLock global_lock(global_lock_);
base::AutoLock lock(lock_);
// Drop messages from nodes that have been removed.
if (nodes_.find(from_node->name()) == nodes_.end()) {
from_node->ClosePortsInMessage(message.get());
return;
}
auto it = nodes_.find(node_name);
if (it == nodes_.end()) {
DVLOG(1) << "Node not found: " << node_name;
return;
}
it->second->EnqueueMessage(std::move(message));
}
void BroadcastMessage(TestNode* from_node, ScopedMessage message) override {
base::AutoLock global_lock(global_lock_);
base::AutoLock lock(lock_);
// Drop messages from nodes that have been removed.
if (nodes_.find(from_node->name()) == nodes_.end())
return;
for (const auto& entry : nodes_) {
TestNode* node = entry.second;
// Broadcast doesn't deliver to the local node.
if (node == from_node)
continue;
// NOTE: We only need to support broadcast of events. Events have no
// payload or ports bytes.
ScopedMessage new_message(
new TestMessage(message->num_header_bytes(), 0, 0));
memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
message->num_header_bytes());
node->EnqueueMessage(std::move(new_message));
}
}
base::MessageLoop message_loop_;
// Acquired before any operation which makes a Node busy, and before testing
// if all nodes are idle.
base::Lock global_lock_;
base::Lock lock_;
uint64_t next_port_id_ = 1;
std::map<NodeName, TestNode*> nodes_;
};
} // namespace
TEST_F(PortsTest, Basic1) {
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
PortRef x0, x1;
CreatePortPair(&node0, &x0, &node1, &x1);
PortRef a0, a1;
EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
EXPECT_EQ(OK, node0.node().ClosePort(a0));
EXPECT_EQ(OK, node0.node().ClosePort(x0));
EXPECT_EQ(OK, node1.node().ClosePort(x1));
WaitForIdle();
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, Basic2) {
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
PortRef x0, x1;
CreatePortPair(&node0, &x0, &node1, &x1);
PortRef b0, b1;
EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
EXPECT_EQ(OK, node0.node().ClosePort(b0));
EXPECT_EQ(OK, node0.node().ClosePort(x0));
EXPECT_EQ(OK, node1.node().ClosePort(x1));
WaitForIdle();
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, Basic3) {
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
PortRef x0, x1;
CreatePortPair(&node0, &x0, &node1, &x1);
PortRef a0, a1;
EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
PortRef b0, b1;
EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
EXPECT_EQ(OK, node0.node().ClosePort(b0));
EXPECT_EQ(OK, node0.node().ClosePort(x0));
EXPECT_EQ(OK, node1.node().ClosePort(x1));
WaitForIdle();
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, LostConnectionToNode1) {
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
node1.set_drop_messages(true);
PortRef x0, x1;
CreatePortPair(&node0, &x0, &node1, &x1);
// Transfer a port to node1 and simulate a lost connection to node1.
PortRef a0, a1;
EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
WaitForIdle();
RemoveNode(&node1);
WaitForIdle();
EXPECT_EQ(OK, node0.node().ClosePort(a0));
EXPECT_EQ(OK, node0.node().ClosePort(x0));
EXPECT_EQ(OK, node1.node().ClosePort(x1));
WaitForIdle();
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, LostConnectionToNode2) {
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
PortRef x0, x1;
CreatePortPair(&node0, &x0, &node1, &x1);
PortRef a0, a1;
EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
WaitForIdle();
node1.set_drop_messages(true);
RemoveNode(&node1);
WaitForIdle();
// a0 should have eventually detected peer closure after node loss.
ScopedMessage message;
EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
node0.node().GetMessage(a0, &message, nullptr));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node0.node().ClosePort(a0));
EXPECT_EQ(OK, node0.node().ClosePort(x0));
EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
EXPECT_TRUE(message);
node1.ClosePortsInMessage(message.get());
EXPECT_EQ(OK, node1.node().ClosePort(x1));
WaitForIdle();
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
// Tests that a proxy gets cleaned up when its indirect peer lives on a lost
// node.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
TestNode node2(2);
AddNode(&node2);
// Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
PortRef A, B, C, D;
CreatePortPair(&node0, &A, &node1, &B);
CreatePortPair(&node1, &C, &node2, &D);
// Create E-F and send F over A to node 1.
PortRef E, F;
EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
WaitForIdle();
ScopedMessage message;
ASSERT_TRUE(node1.ReadMessage(B, &message));
ASSERT_EQ(1u, message->num_ports());
EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
// Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
// will trivially become aware of the loss, and this test verifies that the
// port A on node 0 will eventually also become aware of it.
// Make sure node2 stops processing events when it encounters an ObserveProxy.
node2.BlockOnEvent(EventType::kObserveProxy);
EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
WaitForIdle();
// Simulate node 1 and 2 disconnecting.
EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
// Let node2 continue processing events and wait for everyone to go idle.
node2.Unblock();
WaitForIdle();
// Port F should be gone.
EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
// Port E should have detected peer closure despite the fact that there is
// no longer a continuous route from F to E over which the event could travel.
PortStatus status;
EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
EXPECT_TRUE(status.peer_closed);
EXPECT_EQ(OK, node0.node().ClosePort(A));
EXPECT_EQ(OK, node1.node().ClosePort(B));
EXPECT_EQ(OK, node1.node().ClosePort(C));
EXPECT_EQ(OK, node0.node().ClosePort(E));
WaitForIdle();
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
// Tests that a proxy gets cleaned up when its direct peer lives on a lost
// node and it's predecessor lives on the same node.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
PortRef A, B;
CreatePortPair(&node0, &A, &node1, &B);
PortRef C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
// Send D but block node0 on an ObserveProxy event.
node0.BlockOnEvent(EventType::kObserveProxy);
EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
// node0 won't collapse the proxy but node1 will receive the message before
// going idle.
WaitForIdle();
ScopedMessage message;
ASSERT_TRUE(node1.ReadMessage(B, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
RemoveNode(&node1);
node0.Unblock();
WaitForIdle();
// Port C should have detected peer closure.
PortStatus status;
EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
EXPECT_TRUE(status.peer_closed);
EXPECT_EQ(OK, node0.node().ClosePort(A));
EXPECT_EQ(OK, node1.node().ClosePort(B));
EXPECT_EQ(OK, node0.node().ClosePort(C));
EXPECT_EQ(OK, node1.node().ClosePort(E));
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, GetMessage1) {
TestNode node(0);
AddNode(&node);
PortRef a0, a1;
EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
ScopedMessage message;
EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node.node().ClosePort(a1));
WaitForIdle();
EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
node.node().GetMessage(a0, &message, nullptr));
EXPECT_FALSE(message);
EXPECT_EQ(OK, node.node().ClosePort(a0));
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, GetMessage2) {
TestNode node(0);
AddNode(&node);
PortRef a0, a1;
EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
ScopedMessage message;
EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
ASSERT_TRUE(message);
EXPECT_TRUE(MessageEquals(message, "1"));
EXPECT_EQ(OK, node.node().ClosePort(a0));
EXPECT_EQ(OK, node.node().ClosePort(a1));
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, GetMessage3) {
TestNode node(0);
AddNode(&node);
PortRef a0, a1;
EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
const char* kStrings[] = {
"1",
"2",
"3"
};
for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
ScopedMessage message;
for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
ASSERT_TRUE(message);
EXPECT_TRUE(MessageEquals(message, kStrings[i]));
}
EXPECT_EQ(OK, node.node().ClosePort(a0));
EXPECT_EQ(OK, node.node().ClosePort(a1));
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, Delegation1) {
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
PortRef x0, x1;
CreatePortPair(&node0, &x0, &node1, &x1);
// In this test, we send a message to a port that has been moved.
PortRef a0, a1;
EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
WaitForIdle();
ScopedMessage message;
ASSERT_TRUE(node1.ReadMessage(x1, &message));
ASSERT_EQ(1u, message->num_ports());
EXPECT_TRUE(MessageEquals(message, "a1"));
// This is "a1" from the point of view of node1.
PortName a2_name = message->ports()[0];
EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
WaitForIdle();
ASSERT_TRUE(node0.ReadMessage(x0, &message));
ASSERT_EQ(1u, message->num_ports());
EXPECT_TRUE(MessageEquals(message, "a2"));
// This is "a2" from the point of view of node1.
PortName a3_name = message->ports()[0];
PortRef a3;
EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
ASSERT_TRUE(node0.ReadMessage(a3, &message));
EXPECT_EQ(0u, message->num_ports());
EXPECT_TRUE(MessageEquals(message, "hello"));
EXPECT_EQ(OK, node0.node().ClosePort(a0));
EXPECT_EQ(OK, node0.node().ClosePort(a3));
EXPECT_EQ(OK, node0.node().ClosePort(x0));
EXPECT_EQ(OK, node1.node().ClosePort(x1));
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, Delegation2) {
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
for (int i = 0; i < 100; ++i) {
// Setup pipe a<->b between node0 and node1.
PortRef A, B;
CreatePortPair(&node0, &A, &node1, &B);
PortRef C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
PortRef E, F;
EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
node1.set_save_messages(true);
// Pass D over A to B.
EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
// Pass F over C to D.
EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
// This message should find its way to node1.
EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
WaitForIdle();
EXPECT_EQ(OK, node0.node().ClosePort(C));
EXPECT_EQ(OK, node0.node().ClosePort(E));
EXPECT_EQ(OK, node0.node().ClosePort(A));
EXPECT_EQ(OK, node1.node().ClosePort(B));
bool got_hello = false;
ScopedMessage message;
while (node1.GetSavedMessage(&message)) {
node1.ClosePortsInMessage(message.get());
if (MessageEquals(message, "hello")) {
got_hello = true;
break;
}
}
EXPECT_TRUE(got_hello);
WaitForIdle(); // Because closing ports may have generated tasks.
}
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendUninitialized) {
TestNode node(0);
AddNode(&node);
PortRef x0;
EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
EXPECT_EQ(OK, node.node().ClosePort(x0));
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendFailure) {
TestNode node(0);
AddNode(&node);
node.set_save_messages(true);
PortRef A, B;
EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
// Try to send A over itself.
EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
node.SendStringMessageWithPort(A, "oops", A));
// Try to send B over A.
EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
node.SendStringMessageWithPort(A, "nope", B));
// B should be closed immediately.
EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
WaitForIdle();
// There should have been no messages accepted.
ScopedMessage message;
EXPECT_FALSE(node.GetSavedMessage(&message));
EXPECT_EQ(OK, node.node().ClosePort(A));
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, DontLeakUnreceivedPorts) {
TestNode node(0);
AddNode(&node);
PortRef A, B, C, D;
EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
EXPECT_EQ(OK, node.node().ClosePort(C));
EXPECT_EQ(OK, node.node().ClosePort(A));
EXPECT_EQ(OK, node.node().ClosePort(B));
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
TestNode node(0);
AddNode(&node);
PortRef A, B, C, D;
EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
ScopedMessage message;
EXPECT_TRUE(node.ReadMessage(B, &message));
ASSERT_EQ(1u, message->num_ports());
EXPECT_TRUE(MessageEquals(message, "foo"));
PortRef E;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
EXPECT_TRUE(
node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
WaitForIdle();
EXPECT_TRUE(
node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
EXPECT_FALSE(node.node().CanShutdownCleanly());
EXPECT_EQ(OK, node.node().ClosePort(A));
EXPECT_EQ(OK, node.node().ClosePort(B));
EXPECT_EQ(OK, node.node().ClosePort(C));
EXPECT_EQ(OK, node.node().ClosePort(E));
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, ProxyCollapse1) {
TestNode node(0);
AddNode(&node);
PortRef A, B;
EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
PortRef X, Y;
EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
ScopedMessage message;
// Send B and receive it as C.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
// Send C and receive it as D.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef D;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
// Send D and receive it as E.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
EXPECT_EQ(OK, node.node().ClosePort(X));
EXPECT_EQ(OK, node.node().ClosePort(Y));
EXPECT_EQ(OK, node.node().ClosePort(A));
EXPECT_EQ(OK, node.node().ClosePort(E));
// The node should not idle until all proxies are collapsed.
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, ProxyCollapse2) {
TestNode node(0);
AddNode(&node);
PortRef A, B;
EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
PortRef X, Y;
EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
ScopedMessage message;
// Send B and A to create proxies in each direction.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
EXPECT_EQ(OK, node.node().ClosePort(X));
EXPECT_EQ(OK, node.node().ClosePort(Y));
// At this point we have a scenario with:
//
// D -> [B] -> C -> [A]
//
// Ensure that the proxies can collapse. The sent ports will be closed
// eventually as a result of Y's closure.
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendWithClosedPeer) {
// This tests that if a port is sent when its peer is already known to be
// closed, the newly created port will be aware of that peer closure, and the
// proxy will eventually collapse.
TestNode node(0);
AddNode(&node);
// Send a message from A to B, then close A.
PortRef A, B;
EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
EXPECT_EQ(OK, node.node().ClosePort(A));
// Now send B over X-Y as new port C.
PortRef X, Y;
EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
ScopedMessage message;
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
EXPECT_EQ(OK, node.node().ClosePort(X));
EXPECT_EQ(OK, node.node().ClosePort(Y));
WaitForIdle();
// C should have received the message originally sent to B, and it should also
// be aware of A's closure.
ASSERT_TRUE(node.ReadMessage(C, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
PortStatus status;
EXPECT_EQ(OK, node.node().GetStatus(C, &status));
EXPECT_FALSE(status.receiving_messages);
EXPECT_FALSE(status.has_messages);
EXPECT_TRUE(status.peer_closed);
node.node().ClosePort(C);
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendWithClosedPeerSent) {
// This tests that if a port is closed while some number of proxies are still
// routing messages (directly or indirectly) to it, that the peer port is
// eventually notified of the closure, and the dead-end proxies will
// eventually be removed.
TestNode node(0);
AddNode(&node);
PortRef X, Y;
EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
PortRef A, B;
EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
ScopedMessage message;
// Send A as new port C.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
// Send C as new port D.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef D;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
// Send a message to B through D, then close D.
EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
EXPECT_EQ(OK, node.node().ClosePort(D));
// Now send B as new port E.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
EXPECT_EQ(OK, node.node().ClosePort(X));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
EXPECT_EQ(OK, node.node().ClosePort(Y));
WaitForIdle();
// E should receive the message originally sent to B, and it should also be
// aware of D's closure.
ASSERT_TRUE(node.ReadMessage(E, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
PortStatus status;
EXPECT_EQ(OK, node.node().GetStatus(E, &status));
EXPECT_FALSE(status.receiving_messages);
EXPECT_FALSE(status.has_messages);
EXPECT_TRUE(status.peer_closed);
EXPECT_EQ(OK, node.node().ClosePort(E));
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePorts) {
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Write a message on A.
EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect all proxies to be gone once idle.
EXPECT_TRUE(
node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
EXPECT_TRUE(
node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
// Expect D to have received the message sent on A.
ScopedMessage message;
ASSERT_TRUE(node1.ReadMessage(D, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
EXPECT_EQ(OK, node0.node().ClosePort(A));
EXPECT_EQ(OK, node1.node().ClosePort(D));
// No more ports should be open.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortWithClosedPeer1) {
// This tests that the right thing happens when initiating a merge on a port
// whose peer has already been closed.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Write a message on A.
EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
// Close A.
EXPECT_EQ(OK, node0.node().ClosePort(A));
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect all proxies to be gone once idle. node0 should have no ports since
// A was explicitly closed.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(
node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
// Expect D to have received the message sent on A.
ScopedMessage message;
ASSERT_TRUE(node1.ReadMessage(D, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
EXPECT_EQ(OK, node1.node().ClosePort(D));
// No more ports should be open.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortWithClosedPeer2) {
// This tests that the right thing happens when merging into a port whose peer
// has already been closed.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Write a message on D and close it.
EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
EXPECT_EQ(OK, node1.node().ClosePort(D));
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect all proxies to be gone once idle. node1 should have no ports since
// D was explicitly closed.
EXPECT_TRUE(
node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
EXPECT_TRUE(node1.node().CanShutdownCleanly());
// Expect A to have received the message sent on D.
ScopedMessage message;
ASSERT_TRUE(node0.ReadMessage(A, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
EXPECT_EQ(OK, node0.node().ClosePort(A));
// No more ports should be open.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsWithClosedPeers) {
// This tests that no residual ports are left behind if two ports are merged
// when both of their peers have been closed.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Close A and D.
EXPECT_EQ(OK, node0.node().ClosePort(A));
EXPECT_EQ(OK, node1.node().ClosePort(D));
WaitForIdle();
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect everything to have gone away.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsWithMovedPeers) {
// This tests that ports can be merged successfully even if their peers are
// moved around.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Set up another pair X-Y for moving ports on node0.
PortRef X, Y;
EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
ScopedMessage message;
// Move A to new port E.
EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
ASSERT_TRUE(node0.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
EXPECT_EQ(OK, node0.node().ClosePort(X));
EXPECT_EQ(OK, node0.node().ClosePort(Y));
// Write messages on E and D.
EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
// Initiate a merge between B and C.
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect to receive D's message on E and E's message on D.
ASSERT_TRUE(node0.ReadMessage(E, &message));
EXPECT_TRUE(MessageEquals(message, "hi"));
ASSERT_TRUE(node1.ReadMessage(D, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
// Close E and D.
EXPECT_EQ(OK, node0.node().ClosePort(E));
EXPECT_EQ(OK, node1.node().ClosePort(D));
WaitForIdle();
// Expect everything to have gone away.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsFailsGracefully) {
// This tests that the system remains in a well-defined state if something
// goes wrong during port merge.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
ScopedMessage message;
PortRef X, Y;
EXPECT_EQ(OK, node1.node().CreatePortPair(&X, &Y));
// Block the merge from proceeding until we can do something stupid with port
// C. This avoids the test logic racing with async merge logic.
node1.BlockOnEvent(EventType::kMergePort);
// Initiate the merge between B and C.
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
// Move C to a new port E. This is not a sane use of Node's public API but
// is still hypothetically possible. It allows us to force a merge failure
// because C will be in an invalid state by the term the merge is processed.
// As a result, B should be closed.
EXPECT_EQ(OK, node1.SendStringMessageWithPort(X, "foo", C));
node1.Unblock();
ASSERT_TRUE(node1.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
EXPECT_EQ(OK, node1.node().ClosePort(X));
EXPECT_EQ(OK, node1.node().ClosePort(Y));
WaitForIdle();
// C goes away as a result of normal proxy removal. B should have been closed
// cleanly by the failed MergePorts.
EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
// Close A, D, and E.
EXPECT_EQ(OK, node0.node().ClosePort(A));
EXPECT_EQ(OK, node1.node().ClosePort(D));
EXPECT_EQ(OK, node1.node().ClosePort(E));
WaitForIdle();
// Expect everything to have gone away.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
} // namespace test
} // namespace ports
} // namespace edk
} // namespace mojo