// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include <inttypes.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <map> #include <queue> #include <sstream> #include <utility> #include "base/bind.h" #include "base/callback.h" #include "base/logging.h" #include "base/memory/ref_counted.h" #include "base/rand_util.h" #include "base/strings/string_piece.h" #include "base/strings/stringprintf.h" #include "base/synchronization/lock.h" #include "base/synchronization/waitable_event.h" #include "base/threading/thread.h" #include "mojo/edk/system/ports/event.h" #include "mojo/edk/system/ports/node.h" #include "mojo/edk/system/ports/node_delegate.h" #include "testing/gtest/include/gtest/gtest.h" namespace mojo { namespace edk { namespace ports { namespace test { namespace { bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) { return !strcmp(static_cast<const char*>(message->payload_bytes()), s.data()); } class TestMessage : public Message { public: static ScopedMessage NewUserMessage(size_t num_payload_bytes, size_t num_ports) { return ScopedMessage(new TestMessage(num_payload_bytes, num_ports)); } TestMessage(size_t num_payload_bytes, size_t num_ports) : Message(num_payload_bytes, num_ports) { start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes]; InitializeUserMessageHeader(start_); } TestMessage(size_t num_header_bytes, size_t num_payload_bytes, size_t num_ports_bytes) : Message(num_header_bytes, num_payload_bytes, num_ports_bytes) { start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes]; } ~TestMessage() override { delete[] start_; } }; class TestNode; class MessageRouter { public: virtual ~MessageRouter() {} virtual void GeneratePortName(PortName* name) = 0; virtual void ForwardMessage(TestNode* from_node, const NodeName& node_name, ScopedMessage message) = 0; virtual void BroadcastMessage(TestNode* from_node, ScopedMessage message) = 0; }; class TestNode : public NodeDelegate { public: explicit TestNode(uint64_t id) : node_name_(id, 1), node_(node_name_, this), node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)), messages_available_event_( base::WaitableEvent::ResetPolicy::AUTOMATIC, base::WaitableEvent::InitialState::NOT_SIGNALED), idle_event_( base::WaitableEvent::ResetPolicy::MANUAL, base::WaitableEvent::InitialState::SIGNALED) { } ~TestNode() override { StopWhenIdle(); node_thread_.Stop(); } const NodeName& name() const { return node_name_; } // NOTE: Node is thread-safe. Node& node() { return node_; } base::WaitableEvent& idle_event() { return idle_event_; } bool IsIdle() { base::AutoLock lock(lock_); return started_ && !dispatching_ && (incoming_messages_.empty() || (block_on_event_ && blocked_)); } void BlockOnEvent(EventType type) { base::AutoLock lock(lock_); blocked_event_type_ = type; block_on_event_ = true; } void Unblock() { base::AutoLock lock(lock_); block_on_event_ = false; messages_available_event_.Signal(); } void Start(MessageRouter* router) { router_ = router; node_thread_.Start(); node_thread_.task_runner()->PostTask( FROM_HERE, base::Bind(&TestNode::ProcessMessages, base::Unretained(this))); } void StopWhenIdle() { base::AutoLock lock(lock_); should_quit_ = true; messages_available_event_.Signal(); } void WakeUp() { messages_available_event_.Signal(); } int SendStringMessage(const PortRef& port, const std::string& s) { size_t size = s.size() + 1; ScopedMessage message = TestMessage::NewUserMessage(size, 0); memcpy(message->mutable_payload_bytes(), s.data(), size); return node_.SendMessage(port, std::move(message)); } int SendStringMessageWithPort(const PortRef& port, const std::string& s, const PortName& sent_port_name) { size_t size = s.size() + 1; ScopedMessage message = TestMessage::NewUserMessage(size, 1); memcpy(message->mutable_payload_bytes(), s.data(), size); message->mutable_ports()[0] = sent_port_name; return node_.SendMessage(port, std::move(message)); } int SendStringMessageWithPort(const PortRef& port, const std::string& s, const PortRef& sent_port) { return SendStringMessageWithPort(port, s, sent_port.name()); } void set_drop_messages(bool value) { base::AutoLock lock(lock_); drop_messages_ = value; } void set_save_messages(bool value) { base::AutoLock lock(lock_); save_messages_ = value; } bool ReadMessage(const PortRef& port, ScopedMessage* message) { return node_.GetMessage(port, message, nullptr) == OK && *message; } bool GetSavedMessage(ScopedMessage* message) { base::AutoLock lock(lock_); if (saved_messages_.empty()) { message->reset(); return false; } std::swap(*message, saved_messages_.front()); saved_messages_.pop(); return true; } void EnqueueMessage(ScopedMessage message) { idle_event_.Reset(); // NOTE: This may be called from ForwardMessage and thus must not reenter // |node_|. base::AutoLock lock(lock_); incoming_messages_.emplace(std::move(message)); messages_available_event_.Signal(); } void GenerateRandomPortName(PortName* port_name) override { DCHECK(router_); router_->GeneratePortName(port_name); } void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override { message->reset(new TestMessage(num_header_bytes, 0, 0)); } void ForwardMessage(const NodeName& node_name, ScopedMessage message) override { { base::AutoLock lock(lock_); if (drop_messages_) { DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to " << node_name; base::AutoUnlock unlock(lock_); ClosePortsInMessage(message.get()); return; } } DCHECK(router_); DVLOG(1) << "ForwardMessage from node " << node_name_ << " to " << node_name; router_->ForwardMessage(this, node_name, std::move(message)); } void BroadcastMessage(ScopedMessage message) override { router_->BroadcastMessage(this, std::move(message)); } void PortStatusChanged(const PortRef& port) override { // The port may be closed, in which case we ignore the notification. base::AutoLock lock(lock_); if (!save_messages_) return; for (;;) { ScopedMessage message; { base::AutoUnlock unlock(lock_); if (!ReadMessage(port, &message)) break; } saved_messages_.emplace(std::move(message)); } } void ClosePortsInMessage(Message* message) { for (size_t i = 0; i < message->num_ports(); ++i) { PortRef port; ASSERT_EQ(OK, node_.GetPort(message->ports()[i], &port)); EXPECT_EQ(OK, node_.ClosePort(port)); } } private: void ProcessMessages() { for (;;) { messages_available_event_.Wait(); base::AutoLock lock(lock_); if (should_quit_) return; dispatching_ = true; while (!incoming_messages_.empty()) { if (block_on_event_ && GetEventHeader(*incoming_messages_.front())->type == blocked_event_type_) { blocked_ = true; // Go idle if we hit a blocked event type. break; } else { blocked_ = false; } ScopedMessage message = std::move(incoming_messages_.front()); incoming_messages_.pop(); // NOTE: AcceptMessage() can re-enter this object to call any of the // NodeDelegate interface methods. base::AutoUnlock unlock(lock_); node_.AcceptMessage(std::move(message)); } dispatching_ = false; started_ = true; idle_event_.Signal(); }; } const NodeName node_name_; Node node_; MessageRouter* router_ = nullptr; base::Thread node_thread_; base::WaitableEvent messages_available_event_; base::WaitableEvent idle_event_; // Guards fields below. base::Lock lock_; bool started_ = false; bool dispatching_ = false; bool should_quit_ = false; bool drop_messages_ = false; bool save_messages_ = false; bool blocked_ = false; bool block_on_event_ = false; EventType blocked_event_type_; std::queue<ScopedMessage> incoming_messages_; std::queue<ScopedMessage> saved_messages_; }; class PortsTest : public testing::Test, public MessageRouter { public: void AddNode(TestNode* node) { { base::AutoLock lock(lock_); nodes_[node->name()] = node; } node->Start(this); } void RemoveNode(TestNode* node) { { base::AutoLock lock(lock_); nodes_.erase(node->name()); } for (const auto& entry : nodes_) entry.second->node().LostConnectionToNode(node->name()); } // Waits until all known Nodes are idle. Message forwarding and processing // is handled in such a way that idleness is a stable state: once all nodes in // the system are idle, they will remain idle until the test explicitly // initiates some further event (e.g. sending a message, closing a port, or // removing a Node). void WaitForIdle() { for (;;) { base::AutoLock global_lock(global_lock_); bool all_nodes_idle = true; for (const auto& entry : nodes_) { if (!entry.second->IsIdle()) all_nodes_idle = false; entry.second->WakeUp(); } if (all_nodes_idle) return; // Wait for any Node to signal that it's idle. base::AutoUnlock global_unlock(global_lock_); std::vector<base::WaitableEvent*> events; for (const auto& entry : nodes_) events.push_back(&entry.second->idle_event()); base::WaitableEvent::WaitMany(events.data(), events.size()); } } void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1, PortRef* port1) { if (node0 == node1) { EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1)); } else { EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0)); EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1)); EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(), port1->name())); EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(), port0->name())); } } private: // MessageRouter: void GeneratePortName(PortName* name) override { base::AutoLock lock(lock_); name->v1 = next_port_id_++; name->v2 = 0; } void ForwardMessage(TestNode* from_node, const NodeName& node_name, ScopedMessage message) override { base::AutoLock global_lock(global_lock_); base::AutoLock lock(lock_); // Drop messages from nodes that have been removed. if (nodes_.find(from_node->name()) == nodes_.end()) { from_node->ClosePortsInMessage(message.get()); return; } auto it = nodes_.find(node_name); if (it == nodes_.end()) { DVLOG(1) << "Node not found: " << node_name; return; } it->second->EnqueueMessage(std::move(message)); } void BroadcastMessage(TestNode* from_node, ScopedMessage message) override { base::AutoLock global_lock(global_lock_); base::AutoLock lock(lock_); // Drop messages from nodes that have been removed. if (nodes_.find(from_node->name()) == nodes_.end()) return; for (const auto& entry : nodes_) { TestNode* node = entry.second; // Broadcast doesn't deliver to the local node. if (node == from_node) continue; // NOTE: We only need to support broadcast of events. Events have no // payload or ports bytes. ScopedMessage new_message( new TestMessage(message->num_header_bytes(), 0, 0)); memcpy(new_message->mutable_header_bytes(), message->header_bytes(), message->num_header_bytes()); node->EnqueueMessage(std::move(new_message)); } } base::MessageLoop message_loop_; // Acquired before any operation which makes a Node busy, and before testing // if all nodes are idle. base::Lock global_lock_; base::Lock lock_; uint64_t next_port_id_ = 1; std::map<NodeName, TestNode*> nodes_; }; } // namespace TEST_F(PortsTest, Basic1) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Basic2) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); PortRef b0, b1; EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1)); EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again")); EXPECT_EQ(OK, node0.node().ClosePort(b0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Basic3) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again")); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0)); PortRef b0, b1; EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1)); EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz")); EXPECT_EQ(OK, node0.node().ClosePort(b0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNode1) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); node1.set_drop_messages(true); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); // Transfer a port to node1 and simulate a lost connection to node1. PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); WaitForIdle(); RemoveNode(&node1); WaitForIdle(); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNode2) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1)); WaitForIdle(); node1.set_drop_messages(true); RemoveNode(&node1); WaitForIdle(); // a0 should have eventually detected peer closure after node loss. ScopedMessage message; EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.node().GetMessage(a0, &message, nullptr)); EXPECT_FALSE(message); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr)); EXPECT_TRUE(message); node1.ClosePortsInMessage(message.get()); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { // Tests that a proxy gets cleaned up when its indirect peer lives on a lost // node. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); TestNode node2(2); AddNode(&node2); // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. PortRef A, B, C, D; CreatePortPair(&node0, &A, &node1, &B); CreatePortPair(&node1, &C, &node2, &D); // Create E-F and send F over A to node 1. PortRef E, F; EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 // will trivially become aware of the loss, and this test verifies that the // port A on node 0 will eventually also become aware of it. // Make sure node2 stops processing events when it encounters an ObserveProxy. node2.BlockOnEvent(EventType::kObserveProxy); EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); WaitForIdle(); // Simulate node 1 and 2 disconnecting. EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); // Let node2 continue processing events and wait for everyone to go idle. node2.Unblock(); WaitForIdle(); // Port F should be gone. EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); // Port E should have detected peer closure despite the fact that there is // no longer a continuous route from F to E over which the event could travel. PortStatus status; EXPECT_EQ(OK, node0.node().GetStatus(E, &status)); EXPECT_TRUE(status.peer_closed); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(B)); EXPECT_EQ(OK, node1.node().ClosePort(C)); EXPECT_EQ(OK, node0.node().ClosePort(E)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { // Tests that a proxy gets cleaned up when its direct peer lives on a lost // node and it's predecessor lives on the same node. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef A, B; CreatePortPair(&node0, &A, &node1, &B); PortRef C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); // Send D but block node0 on an ObserveProxy event. node0.BlockOnEvent(EventType::kObserveProxy); EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D)); // node0 won't collapse the proxy but node1 will receive the message before // going idle. WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); RemoveNode(&node1); node0.Unblock(); WaitForIdle(); // Port C should have detected peer closure. PortStatus status; EXPECT_EQ(OK, node0.node().GetStatus(C, &status)); EXPECT_TRUE(status.peer_closed); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(B)); EXPECT_EQ(OK, node0.node().ClosePort(C)); EXPECT_EQ(OK, node1.node().ClosePort(E)); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage1) { TestNode node(0); AddNode(&node); PortRef a0, a1; EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); ScopedMessage message; EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); EXPECT_FALSE(message); EXPECT_EQ(OK, node.node().ClosePort(a1)); WaitForIdle(); EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node.node().GetMessage(a0, &message, nullptr)); EXPECT_FALSE(message); EXPECT_EQ(OK, node.node().ClosePort(a0)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage2) { TestNode node(0); AddNode(&node); PortRef a0, a1; EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node.SendStringMessage(a1, "1")); ScopedMessage message; EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); ASSERT_TRUE(message); EXPECT_TRUE(MessageEquals(message, "1")); EXPECT_EQ(OK, node.node().ClosePort(a0)); EXPECT_EQ(OK, node.node().ClosePort(a1)); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage3) { TestNode node(0); AddNode(&node); PortRef a0, a1; EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); const char* kStrings[] = { "1", "2", "3" }; for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i])); ScopedMessage message; for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) { EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); ASSERT_TRUE(message); EXPECT_TRUE(MessageEquals(message, kStrings[i])); } EXPECT_EQ(OK, node.node().ClosePort(a0)); EXPECT_EQ(OK, node.node().ClosePort(a1)); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, Delegation1) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); // In this test, we send a message to a port that has been moved. PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1)); WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(x1, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_TRUE(MessageEquals(message, "a1")); // This is "a1" from the point of view of node1. PortName a2_name = message->ports()[0]; EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name)); EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello")); WaitForIdle(); ASSERT_TRUE(node0.ReadMessage(x0, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_TRUE(MessageEquals(message, "a2")); // This is "a2" from the point of view of node1. PortName a3_name = message->ports()[0]; PortRef a3; EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3)); ASSERT_TRUE(node0.ReadMessage(a3, &message)); EXPECT_EQ(0u, message->num_ports()); EXPECT_TRUE(MessageEquals(message, "hello")); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(a3)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Delegation2) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); for (int i = 0; i < 100; ++i) { // Setup pipe a<->b between node0 and node1. PortRef A, B; CreatePortPair(&node0, &A, &node1, &B); PortRef C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); PortRef E, F; EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); node1.set_save_messages(true); // Pass D over A to B. EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D)); // Pass F over C to D. EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F)); // This message should find its way to node1. EXPECT_EQ(OK, node0.SendStringMessage(E, "hello")); WaitForIdle(); EXPECT_EQ(OK, node0.node().ClosePort(C)); EXPECT_EQ(OK, node0.node().ClosePort(E)); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(B)); bool got_hello = false; ScopedMessage message; while (node1.GetSavedMessage(&message)) { node1.ClosePortsInMessage(message.get()); if (MessageEquals(message, "hello")) { got_hello = true; break; } } EXPECT_TRUE(got_hello); WaitForIdle(); // Because closing ports may have generated tasks. } EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendUninitialized) { TestNode node(0); AddNode(&node); PortRef x0; EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0)); EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops")); EXPECT_EQ(OK, node.node().ClosePort(x0)); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendFailure) { TestNode node(0); AddNode(&node); node.set_save_messages(true); PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); // Try to send A over itself. EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, node.SendStringMessageWithPort(A, "oops", A)); // Try to send B over A. EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, node.SendStringMessageWithPort(A, "nope", B)); // B should be closed immediately. EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B)); WaitForIdle(); // There should have been no messages accepted. ScopedMessage message; EXPECT_FALSE(node.GetSavedMessage(&message)); EXPECT_EQ(OK, node.node().ClosePort(A)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, DontLeakUnreceivedPorts) { TestNode node(0); AddNode(&node); PortRef A, B, C, D; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); EXPECT_EQ(OK, node.node().ClosePort(C)); EXPECT_EQ(OK, node.node().ClosePort(A)); EXPECT_EQ(OK, node.node().ClosePort(B)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { TestNode node(0); AddNode(&node); PortRef A, B, C, D; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); ScopedMessage message; EXPECT_TRUE(node.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_TRUE(MessageEquals(message, "foo")); PortRef E; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); EXPECT_TRUE( node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); WaitForIdle(); EXPECT_TRUE( node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); EXPECT_FALSE(node.node().CanShutdownCleanly()); EXPECT_EQ(OK, node.node().ClosePort(A)); EXPECT_EQ(OK, node.node().ClosePort(B)); EXPECT_EQ(OK, node.node().ClosePort(C)); EXPECT_EQ(OK, node.node().ClosePort(E)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, ProxyCollapse1) { TestNode node(0); AddNode(&node); PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); PortRef X, Y; EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); ScopedMessage message; // Send B and receive it as C. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); // Send C and receive it as D. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); // Send D and receive it as E. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node.node().ClosePort(X)); EXPECT_EQ(OK, node.node().ClosePort(Y)); EXPECT_EQ(OK, node.node().ClosePort(A)); EXPECT_EQ(OK, node.node().ClosePort(E)); // The node should not idle until all proxies are collapsed. WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, ProxyCollapse2) { TestNode node(0); AddNode(&node); PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); PortRef X, Y; EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); ScopedMessage message; // Send B and A to create proxies in each direction. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); EXPECT_EQ(OK, node.node().ClosePort(X)); EXPECT_EQ(OK, node.node().ClosePort(Y)); // At this point we have a scenario with: // // D -> [B] -> C -> [A] // // Ensure that the proxies can collapse. The sent ports will be closed // eventually as a result of Y's closure. WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendWithClosedPeer) { // This tests that if a port is sent when its peer is already known to be // closed, the newly created port will be aware of that peer closure, and the // proxy will eventually collapse. TestNode node(0); AddNode(&node); // Send a message from A to B, then close A. PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node.SendStringMessage(A, "hey")); EXPECT_EQ(OK, node.node().ClosePort(A)); // Now send B over X-Y as new port C. PortRef X, Y; EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); ScopedMessage message; ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); EXPECT_EQ(OK, node.node().ClosePort(X)); EXPECT_EQ(OK, node.node().ClosePort(Y)); WaitForIdle(); // C should have received the message originally sent to B, and it should also // be aware of A's closure. ASSERT_TRUE(node.ReadMessage(C, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); PortStatus status; EXPECT_EQ(OK, node.node().GetStatus(C, &status)); EXPECT_FALSE(status.receiving_messages); EXPECT_FALSE(status.has_messages); EXPECT_TRUE(status.peer_closed); node.node().ClosePort(C); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendWithClosedPeerSent) { // This tests that if a port is closed while some number of proxies are still // routing messages (directly or indirectly) to it, that the peer port is // eventually notified of the closure, and the dead-end proxies will // eventually be removed. TestNode node(0); AddNode(&node); PortRef X, Y; EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); ScopedMessage message; // Send A as new port C. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); // Send C as new port D. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); // Send a message to B through D, then close D. EXPECT_EQ(OK, node.SendStringMessage(D, "hey")); EXPECT_EQ(OK, node.node().ClosePort(D)); // Now send B as new port E. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); EXPECT_EQ(OK, node.node().ClosePort(X)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node.node().ClosePort(Y)); WaitForIdle(); // E should receive the message originally sent to B, and it should also be // aware of D's closure. ASSERT_TRUE(node.ReadMessage(E, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); PortStatus status; EXPECT_EQ(OK, node.node().GetStatus(E, &status)); EXPECT_FALSE(status.receiving_messages); EXPECT_FALSE(status.has_messages); EXPECT_TRUE(status.peer_closed); EXPECT_EQ(OK, node.node().ClosePort(E)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePorts) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Write a message on A. EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect all proxies to be gone once idle. EXPECT_TRUE( node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); EXPECT_TRUE( node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); // Expect D to have received the message sent on A. ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(D, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(D)); // No more ports should be open. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortWithClosedPeer1) { // This tests that the right thing happens when initiating a merge on a port // whose peer has already been closed. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Write a message on A. EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); // Close A. EXPECT_EQ(OK, node0.node().ClosePort(A)); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect all proxies to be gone once idle. node0 should have no ports since // A was explicitly closed. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE( node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); // Expect D to have received the message sent on A. ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(D, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); EXPECT_EQ(OK, node1.node().ClosePort(D)); // No more ports should be open. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortWithClosedPeer2) { // This tests that the right thing happens when merging into a port whose peer // has already been closed. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Write a message on D and close it. EXPECT_EQ(OK, node0.SendStringMessage(D, "hey")); EXPECT_EQ(OK, node1.node().ClosePort(D)); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect all proxies to be gone once idle. node1 should have no ports since // D was explicitly closed. EXPECT_TRUE( node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); EXPECT_TRUE(node1.node().CanShutdownCleanly()); // Expect A to have received the message sent on D. ScopedMessage message; ASSERT_TRUE(node0.ReadMessage(A, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); EXPECT_EQ(OK, node0.node().ClosePort(A)); // No more ports should be open. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsWithClosedPeers) { // This tests that no residual ports are left behind if two ports are merged // when both of their peers have been closed. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Close A and D. EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(D)); WaitForIdle(); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect everything to have gone away. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsWithMovedPeers) { // This tests that ports can be merged successfully even if their peers are // moved around. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Set up another pair X-Y for moving ports on node0. PortRef X, Y; EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y)); ScopedMessage message; // Move A to new port E. EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A)); ASSERT_TRUE(node0.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node0.node().ClosePort(X)); EXPECT_EQ(OK, node0.node().ClosePort(Y)); // Write messages on E and D. EXPECT_EQ(OK, node0.SendStringMessage(E, "hey")); EXPECT_EQ(OK, node1.SendStringMessage(D, "hi")); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect to receive D's message on E and E's message on D. ASSERT_TRUE(node0.ReadMessage(E, &message)); EXPECT_TRUE(MessageEquals(message, "hi")); ASSERT_TRUE(node1.ReadMessage(D, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); // Close E and D. EXPECT_EQ(OK, node0.node().ClosePort(E)); EXPECT_EQ(OK, node1.node().ClosePort(D)); WaitForIdle(); // Expect everything to have gone away. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsFailsGracefully) { // This tests that the system remains in a well-defined state if something // goes wrong during port merge. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); ScopedMessage message; PortRef X, Y; EXPECT_EQ(OK, node1.node().CreatePortPair(&X, &Y)); // Block the merge from proceeding until we can do something stupid with port // C. This avoids the test logic racing with async merge logic. node1.BlockOnEvent(EventType::kMergePort); // Initiate the merge between B and C. EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); // Move C to a new port E. This is not a sane use of Node's public API but // is still hypothetically possible. It allows us to force a merge failure // because C will be in an invalid state by the term the merge is processed. // As a result, B should be closed. EXPECT_EQ(OK, node1.SendStringMessageWithPort(X, "foo", C)); node1.Unblock(); ASSERT_TRUE(node1.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node1.node().ClosePort(X)); EXPECT_EQ(OK, node1.node().ClosePort(Y)); WaitForIdle(); // C goes away as a result of normal proxy removal. B should have been closed // cleanly by the failed MergePorts. EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C)); EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B)); // Close A, D, and E. EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(D)); EXPECT_EQ(OK, node1.node().ClosePort(E)); WaitForIdle(); // Expect everything to have gone away. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } } // namespace test } // namespace ports } // namespace edk } // namespace mojo