// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/node.h"
#include <string.h>
#include <algorithm>
#include <utility>
#include "base/atomicops.h"
#include "base/containers/stack_container.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/optional.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_local.h"
#include "build/build_config.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/node_delegate.h"
#include "mojo/core/ports/port_locker.h"
#if !defined(OS_NACL)
#include "crypto/random.h"
#else
#include "base/rand_util.h"
#endif
namespace mojo {
namespace core {
namespace ports {
namespace {
constexpr size_t kRandomNameCacheSize = 256;
// Random port name generator which maintains a cache of random bytes to draw
// from. This amortizes the cost of random name generation on platforms where
// RandBytes may have significant per-call overhead.
//
// Note that the use of this cache means one has to be careful about fork()ing
// a process once any port names have been generated, as that behavior can lead
// to collisions between independently generated names in different processes.
class RandomNameGenerator {
public:
RandomNameGenerator() = default;
~RandomNameGenerator() = default;
PortName GenerateRandomPortName() {
base::AutoLock lock(lock_);
if (cache_index_ == kRandomNameCacheSize) {
#if defined(OS_NACL)
base::RandBytes(cache_, sizeof(PortName) * kRandomNameCacheSize);
#else
crypto::RandBytes(cache_, sizeof(PortName) * kRandomNameCacheSize);
#endif
cache_index_ = 0;
}
return cache_[cache_index_++];
}
private:
base::Lock lock_;
PortName cache_[kRandomNameCacheSize];
size_t cache_index_ = kRandomNameCacheSize;
DISALLOW_COPY_AND_ASSIGN(RandomNameGenerator);
};
base::LazyInstance<RandomNameGenerator>::Leaky g_name_generator =
LAZY_INSTANCE_INITIALIZER;
int DebugError(const char* message, int error_code) {
NOTREACHED() << "Oops: " << message;
return error_code;
}
#define OOPS(x) DebugError(#x, x)
bool CanAcceptMoreMessages(const Port* port) {
// Have we already doled out the last message (i.e., do we expect to NOT
// receive further messages)?
uint64_t next_sequence_num = port->message_queue.next_sequence_num();
if (port->state == Port::kClosed)
return false;
if (port->peer_closed || port->remove_proxy_on_last_message) {
if (port->last_sequence_num_to_receive == next_sequence_num - 1)
return false;
}
return true;
}
void GenerateRandomPortName(PortName* name) {
*name = g_name_generator.Get().GenerateRandomPortName();
}
} // namespace
Node::Node(const NodeName& name, NodeDelegate* delegate)
: name_(name), delegate_(this, delegate) {}
Node::~Node() {
if (!ports_.empty())
DLOG(WARNING) << "Unclean shutdown for node " << name_;
}
bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_lock(ports_lock_);
if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
#if DCHECK_IS_ON()
for (auto& entry : ports_) {
DVLOG(2) << "Port " << entry.first << " referencing node "
<< entry.second->peer_node_name << " is blocking shutdown of "
<< "node " << name_ << " (state=" << entry.second->state << ")";
}
#endif
return ports_.empty();
}
DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
// NOTE: This is not efficient, though it probably doesn't need to be since
// relatively few ports should be open during shutdown and shutdown doesn't
// need to be blazingly fast.
bool can_shutdown = true;
for (auto& entry : ports_) {
PortRef port_ref(entry.first, entry.second);
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
can_shutdown = false;
#if DCHECK_IS_ON()
DVLOG(2) << "Port " << entry.first << " referencing node "
<< port->peer_node_name << " is blocking shutdown of "
<< "node " << name_ << " (state=" << port->state << ")";
#else
// Exit early when not debugging.
break;
#endif
}
}
return can_shutdown;
}
int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock lock(ports_lock_);
auto iter = ports_.find(port_name);
if (iter == ports_.end())
return ERROR_PORT_UNKNOWN;
#if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
// Workaround for https://crbug.com/665869.
base::subtle::MemoryBarrier();
#endif
*port_ref = PortRef(port_name, iter->second);
return OK;
}
int Node::CreateUninitializedPort(PortRef* port_ref) {
PortName port_name;
GenerateRandomPortName(&port_name);
scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
int rv = AddPortWithName(port_name, port);
if (rv != OK)
return rv;
*port_ref = PortRef(port_name, std::move(port));
return OK;
}
int Node::InitializePort(const PortRef& port_ref,
const NodeName& peer_node_name,
const PortName& peer_port_name) {
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kUninitialized)
return ERROR_PORT_STATE_UNEXPECTED;
port->state = Port::kReceiving;
port->peer_node_name = peer_node_name;
port->peer_port_name = peer_port_name;
}
delegate_->PortStatusChanged(port_ref);
return OK;
}
int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
int rv;
rv = CreateUninitializedPort(port0_ref);
if (rv != OK)
return rv;
rv = CreateUninitializedPort(port1_ref);
if (rv != OK)
return rv;
rv = InitializePort(*port0_ref, name_, port1_ref->name());
if (rv != OK)
return rv;
rv = InitializePort(*port1_ref, name_, port0_ref->name());
if (rv != OK)
return rv;
return OK;
}
int Node::SetUserData(const PortRef& port_ref,
scoped_refptr<UserData> user_data) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kClosed)
return ERROR_PORT_STATE_UNEXPECTED;
port->user_data = std::move(user_data);
return OK;
}
int Node::GetUserData(const PortRef& port_ref,
scoped_refptr<UserData>* user_data) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kClosed)
return ERROR_PORT_STATE_UNEXPECTED;
*user_data = port->user_data;
return OK;
}
int Node::ClosePort(const PortRef& port_ref) {
std::vector<std::unique_ptr<UserMessageEvent>> undelivered_messages;
NodeName peer_node_name;
PortName peer_port_name;
uint64_t last_sequence_num = 0;
bool was_initialized = false;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
switch (port->state) {
case Port::kUninitialized:
break;
case Port::kReceiving:
was_initialized = true;
port->state = Port::kClosed;
// We pass along the sequence number of the last message sent from this
// port to allow the peer to have the opportunity to consume all inbound
// messages before notifying the embedder that this port is closed.
last_sequence_num = port->next_sequence_num_to_send - 1;
peer_node_name = port->peer_node_name;
peer_port_name = port->peer_port_name;
// If the port being closed still has unread messages, then we need to
// take care to close those ports so as to avoid leaking memory.
port->message_queue.TakeAllMessages(&undelivered_messages);
break;
default:
return ERROR_PORT_STATE_UNEXPECTED;
}
}
ErasePort(port_ref.name());
if (was_initialized) {
DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
<< name_ << " to " << peer_port_name << "@" << peer_node_name;
delegate_->ForwardEvent(peer_node_name,
std::make_unique<ObserveClosureEvent>(
peer_port_name, last_sequence_num));
for (const auto& message : undelivered_messages) {
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef ref;
if (GetPort(message->ports()[i], &ref) == OK)
ClosePort(ref);
}
}
}
return OK;
}
int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving)
return ERROR_PORT_STATE_UNEXPECTED;
port_status->has_messages = port->message_queue.HasNextMessage();
port_status->receiving_messages = CanAcceptMoreMessages(port);
port_status->peer_closed = port->peer_closed;
port_status->peer_remote = port->peer_node_name != name_;
port_status->queued_message_count =
port->message_queue.queued_message_count();
port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
return OK;
}
int Node::GetMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter) {
*message = nullptr;
DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
// This could also be treated like the port being unknown since the
// embedder should no longer be referring to a port that has been sent.
if (port->state != Port::kReceiving)
return ERROR_PORT_STATE_UNEXPECTED;
// Let the embedder get messages until there are no more before reporting
// that the peer closed its end.
if (!CanAcceptMoreMessages(port))
return ERROR_PORT_PEER_CLOSED;
port->message_queue.GetNextMessage(message, filter);
}
// Allow referenced ports to trigger PortStatusChanged calls.
if (*message) {
for (size_t i = 0; i < (*message)->num_ports(); ++i) {
PortRef new_port_ref;
int rv = GetPort((*message)->ports()[i], &new_port_ref);
DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
<< " does not exist!";
SinglePortLocker locker(&new_port_ref);
DCHECK(locker.port()->state == Port::kReceiving);
locker.port()->message_queue.set_signalable(true);
}
// The user may retransmit this message from another port. We reset the
// sequence number so that the message will get a new one if that happens.
(*message)->set_sequence_num(0);
}
return OK;
}
int Node::SendUserMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent> message) {
int rv = SendUserMessageInternal(port_ref, &message);
if (rv != OK) {
// If send failed, close all carried ports. Note that we're careful not to
// close the sending port itself if it happened to be one of the encoded
// ports (an invalid but possible condition.)
for (size_t i = 0; i < message->num_ports(); ++i) {
if (message->ports()[i] == port_ref.name())
continue;
PortRef port;
if (GetPort(message->ports()[i], &port) == OK)
ClosePort(port);
}
}
return rv;
}
int Node::AcceptEvent(ScopedEvent event) {
switch (event->type()) {
case Event::Type::kUserMessage:
return OnUserMessage(Event::Cast<UserMessageEvent>(&event));
case Event::Type::kPortAccepted:
return OnPortAccepted(Event::Cast<PortAcceptedEvent>(&event));
case Event::Type::kObserveProxy:
return OnObserveProxy(Event::Cast<ObserveProxyEvent>(&event));
case Event::Type::kObserveProxyAck:
return OnObserveProxyAck(Event::Cast<ObserveProxyAckEvent>(&event));
case Event::Type::kObserveClosure:
return OnObserveClosure(Event::Cast<ObserveClosureEvent>(&event));
case Event::Type::kMergePort:
return OnMergePort(Event::Cast<MergePortEvent>(&event));
}
return OOPS(ERROR_NOT_IMPLEMENTED);
}
int Node::MergePorts(const PortRef& port_ref,
const NodeName& destination_node_name,
const PortName& destination_port_name) {
PortName new_port_name;
Event::PortDescriptor new_port_descriptor;
{
SinglePortLocker locker(&port_ref);
DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
<< " to " << destination_port_name << "@" << destination_node_name;
// Send the port-to-merge over to the destination node so it can be merged
// into the port cycle atomically there.
new_port_name = port_ref.name();
ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
&new_port_descriptor);
}
if (new_port_descriptor.peer_node_name == name_ &&
destination_node_name != name_) {
// Ensure that the locally retained peer of the new proxy gets a status
// update so it notices that its peer is now remote.
PortRef local_peer;
if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK)
delegate_->PortStatusChanged(local_peer);
}
delegate_->ForwardEvent(
destination_node_name,
std::make_unique<MergePortEvent>(destination_port_name, new_port_name,
new_port_descriptor));
return OK;
}
int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
<< " and " << port1_ref.name() << "@" << name_;
return MergePortsInternal(port0_ref, port1_ref,
true /* allow_close_on_bad_state */);
}
int Node::LostConnectionToNode(const NodeName& node_name) {
// We can no longer send events to the given node. We also can't expect any
// PortAccepted events.
DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
<< node_name;
DestroyAllPortsWithPeer(node_name, kInvalidPortName);
return OK;
}
int Node::OnUserMessage(std::unique_ptr<UserMessageEvent> message) {
PortName port_name = message->port_name();
#if DCHECK_IS_ON()
std::ostringstream ports_buf;
for (size_t i = 0; i < message->num_ports(); ++i) {
if (i > 0)
ports_buf << ",";
ports_buf << message->ports()[i];
}
DVLOG(4) << "OnUserMessage " << message->sequence_num()
<< " [ports=" << ports_buf.str() << "] at " << port_name << "@"
<< name_;
#endif
// Even if this port does not exist, cannot receive anymore messages or is
// buffering or proxying messages, we still need these ports to be bound to
// this node. When the message is forwarded, these ports will get transferred
// following the usual method. If the message cannot be accepted, then the
// newly bound ports will simply be closed.
for (size_t i = 0; i < message->num_ports(); ++i) {
Event::PortDescriptor& descriptor = message->port_descriptors()[i];
if (descriptor.referring_node_name == kInvalidNodeName) {
// If the referring node name is invalid, this descriptor can be ignored
// and the port should already exist locally.
PortRef port_ref;
if (GetPort(message->ports()[i], &port_ref) != OK)
return ERROR_PORT_UNKNOWN;
} else {
int rv = AcceptPort(message->ports()[i], descriptor);
if (rv != OK)
return rv;
// Ensure that the referring node is wiped out of this descriptor. This
// allows the event to be forwarded across multiple local hops without
// attempting to accept the port more than once.
descriptor.referring_node_name = kInvalidNodeName;
}
}
PortRef port_ref;
GetPort(port_name, &port_ref);
bool has_next_message = false;
bool message_accepted = false;
bool should_forward_messages = false;
if (port_ref.is_valid()) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
// Reject spurious messages if we've already received the last expected
// message.
if (CanAcceptMoreMessages(port)) {
message_accepted = true;
port->message_queue.AcceptMessage(std::move(message), &has_next_message);
if (port->state == Port::kBuffering) {
has_next_message = false;
} else if (port->state == Port::kProxying) {
has_next_message = false;
should_forward_messages = true;
}
}
}
if (should_forward_messages) {
int rv = ForwardUserMessagesFromProxy(port_ref);
if (rv != OK)
return rv;
TryRemoveProxy(port_ref);
}
if (!message_accepted) {
DVLOG(2) << "Message not accepted!\n";
// Close all newly accepted ports as they are effectively orphaned.
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef attached_port_ref;
if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
ClosePort(attached_port_ref);
} else {
DLOG(WARNING) << "Cannot close non-existent port!\n";
}
}
} else if (has_next_message) {
delegate_->PortStatusChanged(port_ref);
}
return OK;
}
int Node::OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event) {
PortRef port_ref;
if (GetPort(event->port_name(), &port_ref) != OK)
return ERROR_PORT_UNKNOWN;
#if DCHECK_IS_ON()
{
SinglePortLocker locker(&port_ref);
DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
<< " pointing to " << locker.port()->peer_port_name << "@"
<< locker.port()->peer_node_name;
}
#endif
return BeginProxying(port_ref);
}
int Node::OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event) {
if (event->port_name() == kInvalidPortName) {
// An ObserveProxy with an invalid target port name is a broadcast used to
// inform ports when their peer (which was itself a proxy) has become
// defunct due to unexpected node disconnection.
//
// Receiving ports affected by this treat it as equivalent to peer closure.
// Proxies affected by this can be removed and will in turn broadcast their
// own death with a similar message.
DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
return OK;
}
// The port may have already been closed locally, in which case the
// ObserveClosure message will contain the last_sequence_num field.
// We can then silently ignore this message.
PortRef port_ref;
if (GetPort(event->port_name(), &port_ref) != OK) {
DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
<< " not found";
return OK;
}
DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
<< ", proxy at " << event->proxy_port_name() << "@"
<< event->proxy_node_name() << " pointing to "
<< event->proxy_target_port_name() << "@"
<< event->proxy_target_node_name();
bool update_status = false;
ScopedEvent event_to_forward;
NodeName event_target_node;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->peer_node_name == event->proxy_node_name() &&
port->peer_port_name == event->proxy_port_name()) {
if (port->state == Port::kReceiving) {
port->peer_node_name = event->proxy_target_node_name();
port->peer_port_name = event->proxy_target_port_name();
event_target_node = event->proxy_node_name();
event_to_forward = std::make_unique<ObserveProxyAckEvent>(
event->proxy_port_name(), port->next_sequence_num_to_send - 1);
update_status = true;
DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
<< "@" << name_ << " to " << event->proxy_port_name() << "@"
<< event_target_node;
} else {
// As a proxy ourselves, we don't know how to honor the ObserveProxy
// event or to populate the last_sequence_num field of ObserveProxyAck.
// Afterall, another port could be sending messages to our peer now
// that we've sent out our own ObserveProxy event. Instead, we will
// send an ObserveProxyAck indicating that the ObserveProxy event
// should be re-sent (last_sequence_num set to kInvalidSequenceNum).
// However, this has to be done after we are removed as a proxy.
// Otherwise, we might just find ourselves back here again, which
// would be akin to a busy loop.
DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
<< "@" << event->proxy_node_name();
port->send_on_proxy_removal.reset(new std::pair<NodeName, ScopedEvent>(
event->proxy_node_name(),
std::make_unique<ObserveProxyAckEvent>(event->proxy_port_name(),
kInvalidSequenceNum)));
}
} else {
// Forward this event along to our peer. Eventually, it should find the
// port referring to the proxy.
event_target_node = port->peer_node_name;
event->set_port_name(port->peer_port_name);
event_to_forward = std::move(event);
}
}
if (event_to_forward)
delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
if (update_status)
delegate_->PortStatusChanged(port_ref);
return OK;
}
int Node::OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event) {
DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
<< " (last_sequence_num=" << event->last_sequence_num() << ")";
PortRef port_ref;
if (GetPort(event->port_name(), &port_ref) != OK)
return ERROR_PORT_UNKNOWN; // The port may have observed closure first.
bool try_remove_proxy_immediately;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kProxying)
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
// If the last sequence number is invalid, this is a signal that we need to
// retransmit the ObserveProxy event for this port rather than flagging the
// the proxy for removal ASAP.
try_remove_proxy_immediately =
event->last_sequence_num() != kInvalidSequenceNum;
if (try_remove_proxy_immediately) {
// We can now remove this port once we have received and forwarded the
// last message addressed to this port.
port->remove_proxy_on_last_message = true;
port->last_sequence_num_to_receive = event->last_sequence_num();
}
}
if (try_remove_proxy_immediately)
TryRemoveProxy(port_ref);
else
InitiateProxyRemoval(port_ref);
return OK;
}
int Node::OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event) {
// OK if the port doesn't exist, as it may have been closed already.
PortRef port_ref;
if (GetPort(event->port_name(), &port_ref) != OK)
return OK;
// This message tells the port that it should no longer expect more messages
// beyond last_sequence_num. This message is forwarded along until we reach
// the receiving end, and this message serves as an equivalent to
// ObserveProxyAck.
bool notify_delegate = false;
NodeName peer_node_name;
PortName peer_port_name;
bool try_remove_proxy = false;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
port->peer_closed = true;
port->last_sequence_num_to_receive = event->last_sequence_num();
DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
<< " (state=" << port->state << ") pointing to "
<< port->peer_port_name << "@" << port->peer_node_name
<< " (last_sequence_num=" << event->last_sequence_num() << ")";
// We always forward ObserveClosure, even beyond the receiving port which
// cares about it. This ensures that any dead-end proxies beyond that port
// are notified to remove themselves.
if (port->state == Port::kReceiving) {
notify_delegate = true;
// When forwarding along the other half of the port cycle, this will only
// reach dead-end proxies. Tell them we've sent our last message so they
// can go away.
//
// TODO: Repurposing ObserveClosure for this has the desired result but
// may be semantically confusing since the forwarding port is not actually
// closed. Consider replacing this with a new event type.
event->set_last_sequence_num(port->next_sequence_num_to_send - 1);
} else {
// We haven't yet reached the receiving peer of the closed port, so we'll
// forward the message along as-is.
// See about removing the port if it is a proxy as our peer won't be able
// to participate in proxy removal.
port->remove_proxy_on_last_message = true;
if (port->state == Port::kProxying)
try_remove_proxy = true;
}
DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
<< name_ << " to peer " << port->peer_port_name << "@"
<< port->peer_node_name
<< " (last_sequence_num=" << event->last_sequence_num() << ")";
peer_node_name = port->peer_node_name;
peer_port_name = port->peer_port_name;
}
if (try_remove_proxy)
TryRemoveProxy(port_ref);
event->set_port_name(peer_port_name);
delegate_->ForwardEvent(peer_node_name, std::move(event));
if (notify_delegate)
delegate_->PortStatusChanged(port_ref);
return OK;
}
int Node::OnMergePort(std::unique_ptr<MergePortEvent> event) {
PortRef port_ref;
GetPort(event->port_name(), &port_ref);
DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
<< " merging with proxy " << event->new_port_name() << "@" << name_
<< " pointing to " << event->new_port_descriptor().peer_port_name
<< "@" << event->new_port_descriptor().peer_node_name
<< " referred by "
<< event->new_port_descriptor().referring_port_name << "@"
<< event->new_port_descriptor().referring_node_name;
// Accept the new port. This is now the receiving end of the other port cycle
// to be merged with ours. Note that we always attempt to accept the new port
// first as otherwise its peer receiving port could be left stranded
// indefinitely.
if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
if (port_ref.is_valid())
ClosePort(port_ref);
return ERROR_PORT_STATE_UNEXPECTED;
}
PortRef new_port_ref;
GetPort(event->new_port_name(), &new_port_ref);
if (!port_ref.is_valid() && new_port_ref.is_valid()) {
ClosePort(new_port_ref);
return ERROR_PORT_UNKNOWN;
} else if (port_ref.is_valid() && !new_port_ref.is_valid()) {
ClosePort(port_ref);
return ERROR_PORT_UNKNOWN;
}
return MergePortsInternal(port_ref, new_port_ref,
false /* allow_close_on_bad_state */);
}
int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock lock(ports_lock_);
if (!ports_.emplace(port_name, std::move(port)).second)
return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
DVLOG(2) << "Created port " << port_name << "@" << name_;
return OK;
}
void Node::ErasePort(const PortName& port_name) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
scoped_refptr<Port> port;
{
base::AutoLock lock(ports_lock_);
auto it = ports_.find(port_name);
if (it == ports_.end())
return;
port = std::move(it->second);
ports_.erase(it);
}
// NOTE: We are careful not to release the port's messages while holding any
// locks, since they may run arbitrary user code upon destruction.
std::vector<std::unique_ptr<UserMessageEvent>> messages;
{
PortRef port_ref(port_name, std::move(port));
SinglePortLocker locker(&port_ref);
locker.port()->message_queue.TakeAllMessages(&messages);
}
DVLOG(2) << "Deleted port " << port_name << "@" << name_;
}
int Node::SendUserMessageInternal(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message) {
std::unique_ptr<UserMessageEvent>& m = *message;
for (size_t i = 0; i < m->num_ports(); ++i) {
if (m->ports()[i] == port_ref.name())
return ERROR_PORT_CANNOT_SEND_SELF;
}
NodeName target_node;
int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
false /* ignore_closed_peer */, m.get(),
&target_node);
if (rv != OK)
return rv;
// Beyond this point there's no sense in returning anything but OK. Even if
// message forwarding or acceptance fails, there's nothing the embedder can
// do to recover. Assume that failure beyond this point must be treated as a
// transport failure.
DCHECK_NE(kInvalidNodeName, target_node);
if (target_node != name_) {
delegate_->ForwardEvent(target_node, std::move(m));
return OK;
}
int accept_result = AcceptEvent(std::move(m));
if (accept_result != OK) {
// See comment above for why we don't return an error in this case.
DVLOG(2) << "AcceptEvent failed: " << accept_result;
}
return OK;
}
int Node::MergePortsInternal(const PortRef& port0_ref,
const PortRef& port1_ref,
bool allow_close_on_bad_state) {
const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
{
base::Optional<PortLocker> locker(base::in_place, port_refs, 2);
auto* port0 = locker->GetPort(port0_ref);
auto* port1 = locker->GetPort(port1_ref);
// There are several conditions which must be met before we'll consider
// merging two ports:
//
// - They must both be in the kReceiving state
// - They must not be each other's peer
// - They must have never sent a user message
//
// If any of these criteria are not met, we fail early.
if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
(port0->peer_node_name == name_ &&
port0->peer_port_name == port1_ref.name()) ||
(port1->peer_node_name == name_ &&
port1->peer_port_name == port0_ref.name()) ||
port0->next_sequence_num_to_send != kInitialSequenceNum ||
port1->next_sequence_num_to_send != kInitialSequenceNum) {
// On failure, we only close a port if it was at least properly in the
// |kReceiving| state. This avoids getting the system in an inconsistent
// state by e.g. closing a proxy abruptly.
//
// Note that we must release the port locks before closing ports.
const bool close_port0 =
port0->state == Port::kReceiving || allow_close_on_bad_state;
const bool close_port1 =
port1->state == Port::kReceiving || allow_close_on_bad_state;
locker.reset();
if (close_port0)
ClosePort(port0_ref);
if (close_port1)
ClosePort(port1_ref);
return ERROR_PORT_STATE_UNEXPECTED;
}
// Swap the ports' peer information and switch them both to proxying mode.
std::swap(port0->peer_node_name, port1->peer_node_name);
std::swap(port0->peer_port_name, port1->peer_port_name);
port0->state = Port::kProxying;
port1->state = Port::kProxying;
if (port0->peer_closed)
port0->remove_proxy_on_last_message = true;
if (port1->peer_closed)
port1->remove_proxy_on_last_message = true;
}
// Flush any queued messages from the new proxies and, if successful, complete
// the merge by initiating proxy removals.
if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
ForwardUserMessagesFromProxy(port1_ref) == OK) {
for (size_t i = 0; i < 2; ++i) {
bool try_remove_proxy_immediately = false;
ScopedEvent closure_event;
NodeName closure_event_target_node;
{
SinglePortLocker locker(port_refs[i]);
auto* port = locker.port();
DCHECK(port->state == Port::kProxying);
try_remove_proxy_immediately = port->remove_proxy_on_last_message;
if (try_remove_proxy_immediately || port->peer_closed) {
// If either end of the port cycle is closed, we propagate an
// ObserveClosure event.
closure_event_target_node = port->peer_node_name;
closure_event = std::make_unique<ObserveClosureEvent>(
port->peer_port_name, port->last_sequence_num_to_receive);
}
}
if (try_remove_proxy_immediately)
TryRemoveProxy(*port_refs[i]);
else
InitiateProxyRemoval(*port_refs[i]);
if (closure_event) {
delegate_->ForwardEvent(closure_event_target_node,
std::move(closure_event));
}
}
return OK;
}
// If we failed to forward proxied messages, we keep the system in a
// consistent state by undoing the peer swap and closing the ports.
{
PortLocker locker(port_refs, 2);
auto* port0 = locker.GetPort(port0_ref);
auto* port1 = locker.GetPort(port1_ref);
std::swap(port0->peer_node_name, port1->peer_node_name);
std::swap(port0->peer_port_name, port1->peer_port_name);
port0->remove_proxy_on_last_message = false;
port1->remove_proxy_on_last_message = false;
DCHECK_EQ(Port::kProxying, port0->state);
DCHECK_EQ(Port::kProxying, port1->state);
port0->state = Port::kReceiving;
port1->state = Port::kReceiving;
}
ClosePort(port0_ref);
ClosePort(port1_ref);
return ERROR_PORT_STATE_UNEXPECTED;
}
void Node::ConvertToProxy(Port* port,
const NodeName& to_node_name,
PortName* port_name,
Event::PortDescriptor* port_descriptor) {
port->AssertLockAcquired();
PortName local_port_name = *port_name;
PortName new_port_name;
GenerateRandomPortName(&new_port_name);
// Make sure we don't send messages to the new peer until after we know it
// exists. In the meantime, just buffer messages locally.
DCHECK(port->state == Port::kReceiving);
port->state = Port::kBuffering;
// If we already know our peer is closed, we already know this proxy can
// be removed once it receives and forwards its last expected message.
if (port->peer_closed)
port->remove_proxy_on_last_message = true;
*port_name = new_port_name;
port_descriptor->peer_node_name = port->peer_node_name;
port_descriptor->peer_port_name = port->peer_port_name;
port_descriptor->referring_node_name = name_;
port_descriptor->referring_port_name = local_port_name;
port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
port_descriptor->next_sequence_num_to_receive =
port->message_queue.next_sequence_num();
port_descriptor->last_sequence_num_to_receive =
port->last_sequence_num_to_receive;
port_descriptor->peer_closed = port->peer_closed;
memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
// Configure the local port to point to the new port.
port->peer_node_name = to_node_name;
port->peer_port_name = new_port_name;
}
int Node::AcceptPort(const PortName& port_name,
const Event::PortDescriptor& port_descriptor) {
scoped_refptr<Port> port =
base::MakeRefCounted<Port>(port_descriptor.next_sequence_num_to_send,
port_descriptor.next_sequence_num_to_receive);
port->state = Port::kReceiving;
port->peer_node_name = port_descriptor.peer_node_name;
port->peer_port_name = port_descriptor.peer_port_name;
port->last_sequence_num_to_receive =
port_descriptor.last_sequence_num_to_receive;
port->peer_closed = port_descriptor.peer_closed;
DVLOG(2) << "Accepting port " << port_name
<< " [peer_closed=" << port->peer_closed
<< "; last_sequence_num_to_receive="
<< port->last_sequence_num_to_receive << "]";
// A newly accepted port is not signalable until the message referencing the
// new port finds its way to the consumer (see GetMessage).
port->message_queue.set_signalable(false);
int rv = AddPortWithName(port_name, std::move(port));
if (rv != OK)
return rv;
// Allow referring port to forward messages.
delegate_->ForwardEvent(
port_descriptor.referring_node_name,
std::make_unique<PortAcceptedEvent>(port_descriptor.referring_port_name));
return OK;
}
int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
Port::State expected_port_state,
bool ignore_closed_peer,
UserMessageEvent* message,
NodeName* forward_to_node) {
bool target_is_remote = false;
for (;;) {
NodeName target_node_name;
{
SinglePortLocker locker(&forwarding_port_ref);
target_node_name = locker.port()->peer_node_name;
}
// NOTE: This may call out to arbitrary user code, so it's important to call
// it only while no port locks are held on the calling thread.
if (target_node_name != name_) {
if (!message->NotifyWillBeRoutedExternally()) {
LOG(ERROR) << "NotifyWillBeRoutedExternally failed unexpectedly.";
return ERROR_PORT_STATE_UNEXPECTED;
}
}
// Simultaneously lock the forwarding port as well as all attached ports.
base::StackVector<PortRef, 4> attached_port_refs;
base::StackVector<const PortRef*, 5> ports_to_lock;
attached_port_refs.container().resize(message->num_ports());
ports_to_lock.container().resize(message->num_ports() + 1);
ports_to_lock[0] = &forwarding_port_ref;
for (size_t i = 0; i < message->num_ports(); ++i) {
GetPort(message->ports()[i], &attached_port_refs[i]);
DCHECK(attached_port_refs[i].is_valid());
ports_to_lock[i + 1] = &attached_port_refs[i];
}
PortLocker locker(ports_to_lock.container().data(),
ports_to_lock.container().size());
auto* forwarding_port = locker.GetPort(forwarding_port_ref);
if (forwarding_port->peer_node_name != target_node_name) {
// The target node has already changed since we last held the lock.
if (target_node_name == name_) {
// If the target node was previously this local node, we need to restart
// the loop, since that means we may now route the message externally.
continue;
}
target_node_name = forwarding_port->peer_node_name;
}
target_is_remote = target_node_name != name_;
if (forwarding_port->state != expected_port_state)
return ERROR_PORT_STATE_UNEXPECTED;
if (forwarding_port->peer_closed && !ignore_closed_peer)
return ERROR_PORT_PEER_CLOSED;
// Messages may already have a sequence number if they're being forwarded by
// a proxy. Otherwise, use the next outgoing sequence number.
if (message->sequence_num() == 0)
message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
#if DCHECK_IS_ON()
std::ostringstream ports_buf;
for (size_t i = 0; i < message->num_ports(); ++i) {
if (i > 0)
ports_buf << ",";
ports_buf << message->ports()[i];
}
#endif
if (message->num_ports() > 0) {
// Sanity check to make sure we can actually send all the attached ports.
// They must all be in the |kReceiving| state and must not be the sender's
// own peer.
DCHECK_EQ(message->num_ports(), attached_port_refs.container().size());
for (size_t i = 0; i < message->num_ports(); ++i) {
auto* attached_port = locker.GetPort(attached_port_refs[i]);
int error = OK;
if (attached_port->state != Port::kReceiving) {
error = ERROR_PORT_STATE_UNEXPECTED;
} else if (attached_port_refs[i].name() ==
forwarding_port->peer_port_name) {
error = ERROR_PORT_CANNOT_SEND_PEER;
}
if (error != OK) {
// Not going to send. Backpedal on the sequence number.
forwarding_port->next_sequence_num_to_send--;
return error;
}
}
if (target_is_remote) {
// We only bother to proxy and rewrite ports in the event if it's
// going to be routed to an external node. This substantially reduces
// the amount of port churn in the system, as many port-carrying
// events are routed at least 1 or 2 intra-node hops before (if ever)
// being routed externally.
Event::PortDescriptor* port_descriptors = message->port_descriptors();
for (size_t i = 0; i < message->num_ports(); ++i) {
ConvertToProxy(locker.GetPort(attached_port_refs[i]),
target_node_name, message->ports() + i,
port_descriptors + i);
}
}
}
#if DCHECK_IS_ON()
DVLOG(4) << "Sending message " << message->sequence_num()
<< " [ports=" << ports_buf.str() << "]"
<< " from " << forwarding_port_ref.name() << "@" << name_ << " to "
<< forwarding_port->peer_port_name << "@" << target_node_name;
#endif
*forward_to_node = target_node_name;
message->set_port_name(forwarding_port->peer_port_name);
break;
}
if (target_is_remote) {
for (size_t i = 0; i < message->num_ports(); ++i) {
// For any ports that were converted to proxies above, make sure their
// prior local peer (if applicable) receives a status update so it can be
// made aware of its peer's location.
const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
if (descriptor.peer_node_name == name_) {
PortRef local_peer;
if (GetPort(descriptor.peer_port_name, &local_peer) == OK)
delegate_->PortStatusChanged(local_peer);
}
}
}
return OK;
}
int Node::BeginProxying(const PortRef& port_ref) {
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kBuffering)
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
port->state = Port::kProxying;
}
int rv = ForwardUserMessagesFromProxy(port_ref);
if (rv != OK)
return rv;
bool try_remove_proxy_immediately;
ScopedEvent closure_event;
NodeName closure_target_node;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kProxying)
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
try_remove_proxy_immediately = port->remove_proxy_on_last_message;
if (try_remove_proxy_immediately) {
// Make sure we propagate closure to our current peer.
closure_target_node = port->peer_node_name;
closure_event = std::make_unique<ObserveClosureEvent>(
port->peer_port_name, port->last_sequence_num_to_receive);
}
}
if (try_remove_proxy_immediately) {
TryRemoveProxy(port_ref);
delegate_->ForwardEvent(closure_target_node, std::move(closure_event));
} else {
InitiateProxyRemoval(port_ref);
}
return OK;
}
int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
for (;;) {
// NOTE: We forward messages in sequential order here so that we maintain
// the message queue's notion of next sequence number. That's useful for the
// proxy removal process as we can tell when this port has seen all of the
// messages it is expected to see.
std::unique_ptr<UserMessageEvent> message;
{
SinglePortLocker locker(&port_ref);
locker.port()->message_queue.GetNextMessage(&message, nullptr);
if (!message)
break;
}
NodeName target_node;
int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
true /* ignore_closed_peer */,
message.get(), &target_node);
if (rv != OK)
return rv;
delegate_->ForwardEvent(target_node, std::move(message));
}
return OK;
}
void Node::InitiateProxyRemoval(const PortRef& port_ref) {
NodeName peer_node_name;
PortName peer_port_name;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
peer_node_name = port->peer_node_name;
peer_port_name = port->peer_port_name;
}
// To remove this node, we start by notifying the connected graph that we are
// a proxy. This allows whatever port is referencing this node to skip it.
// Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
// the peer was closed in the meantime).
delegate_->ForwardEvent(peer_node_name,
std::make_unique<ObserveProxyEvent>(
peer_port_name, name_, port_ref.name(),
peer_node_name, peer_port_name));
}
void Node::TryRemoveProxy(const PortRef& port_ref) {
bool should_erase = false;
NodeName removal_target_node;
ScopedEvent removal_event;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
DCHECK(port->state == Port::kProxying);
// Make sure we have seen ObserveProxyAck before removing the port.
if (!port->remove_proxy_on_last_message)
return;
if (!CanAcceptMoreMessages(port)) {
should_erase = true;
if (port->send_on_proxy_removal) {
removal_target_node = port->send_on_proxy_removal->first;
removal_event = std::move(port->send_on_proxy_removal->second);
}
} else {
DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
<< " now; waiting for more messages";
}
}
if (should_erase)
ErasePort(port_ref.name());
if (removal_event)
delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
}
void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
const PortName& port_name) {
// Wipes out all ports whose peer node matches |node_name| and whose peer port
// matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
// node is matched.
std::vector<PortRef> ports_to_notify;
std::vector<PortName> dead_proxies_to_broadcast;
std::vector<std::unique_ptr<UserMessageEvent>> undelivered_messages;
{
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock ports_lock(ports_lock_);
for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
PortRef port_ref(iter->first, iter->second);
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->peer_node_name == node_name &&
(port_name == kInvalidPortName ||
port->peer_port_name == port_name)) {
if (!port->peer_closed) {
// Treat this as immediate peer closure. It's an exceptional
// condition akin to a broken pipe, so we don't care about losing
// messages.
port->peer_closed = true;
port->last_sequence_num_to_receive =
port->message_queue.next_sequence_num() - 1;
if (port->state == Port::kReceiving)
ports_to_notify.push_back(PortRef(iter->first, port));
}
// We don't expect to forward any further messages, and we don't
// expect to receive a Port{Accepted,Rejected} event. Because we're
// a proxy with no active peer, we cannot use the normal proxy removal
// procedure of forward-propagating an ObserveProxy. Instead we
// broadcast our own death so it can be back-propagated. This is
// inefficient but rare.
if (port->state != Port::kReceiving) {
dead_proxies_to_broadcast.push_back(iter->first);
std::vector<std::unique_ptr<UserMessageEvent>> messages;
iter->second->message_queue.TakeAllMessages(&messages);
for (auto& message : messages)
undelivered_messages.emplace_back(std::move(message));
}
}
}
}
}
for (const auto& proxy_name : dead_proxies_to_broadcast) {
ErasePort(proxy_name);
DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
}
// Wake up any receiving ports who have just observed simulated peer closure.
for (const auto& port : ports_to_notify)
delegate_->PortStatusChanged(port);
for (const auto& proxy_name : dead_proxies_to_broadcast) {
// Broadcast an event signifying that this proxy is no longer functioning.
delegate_->BroadcastEvent(std::make_unique<ObserveProxyEvent>(
kInvalidPortName, name_, proxy_name, kInvalidNodeName,
kInvalidPortName));
// Also process death locally since the port that points this closed one
// could be on the current node.
// Note: Although this is recursive, only a single port is involved which
// limits the expected branching to 1.
DestroyAllPortsWithPeer(name_, proxy_name);
}
// Close any ports referenced by undelivered messages.
for (const auto& message : undelivered_messages) {
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef ref;
if (GetPort(message->ports()[i], &ref) == OK)
ClosePort(ref);
}
}
}
Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
: node_(node), delegate_(delegate) {
DCHECK(node_);
}
Node::DelegateHolder::~DelegateHolder() {}
#if DCHECK_IS_ON()
void Node::DelegateHolder::EnsureSafeDelegateAccess() const {
PortLocker::AssertNoPortsLockedOnCurrentThread();
base::AutoLock lock(node_->ports_lock_);
}
#endif
} // namespace ports
} // namespace core
} // namespace mojo