// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
#define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
#include <memory>
#include <queue>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "base/callback.h"
#include "base/containers/hash_tables.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/task_runner.h"
#include "mojo/edk/embedder/platform_handle_vector.h"
#include "mojo/edk/embedder/platform_shared_buffer.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/atomic_flag.h"
#include "mojo/edk/system/node_channel.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/ports/node_delegate.h"
namespace base {
class PortProvider;
}
namespace mojo {
namespace edk {
class Broker;
class Core;
class MachPortRelay;
class PortsMessage;
// The owner of ports::Node which facilitates core EDK implementation. All
// public interface methods are safe to call from any thread.
class NodeController : public ports::NodeDelegate,
public NodeChannel::Delegate {
public:
class PortObserver : public ports::UserData {
public:
virtual void OnPortStatusChanged() = 0;
protected:
~PortObserver() override {}
};
// |core| owns and out-lives us.
explicit NodeController(Core* core);
~NodeController() override;
const ports::NodeName& name() const { return name_; }
Core* core() const { return core_; }
ports::Node* node() const { return node_.get(); }
scoped_refptr<base::TaskRunner> io_task_runner() const {
return io_task_runner_;
}
#if defined(OS_MACOSX) && !defined(OS_IOS)
// Create the relay used to transfer mach ports between processes.
void CreateMachPortRelay(base::PortProvider* port_provider);
#endif
// Called exactly once, shortly after construction, and before any other
// methods are called on this object.
void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner);
// Connects this node to a child node. This node will initiate a handshake.
void ConnectToChild(base::ProcessHandle process_handle,
ConnectionParams connection_params,
const std::string& child_token,
const ProcessErrorCallback& process_error_callback);
// Closes all reserved ports which associated with the child process
// |child_token|.
void CloseChildPorts(const std::string& child_token);
// Close a connection to a peer associated with |peer_token|.
void ClosePeerConnection(const std::string& peer_token);
// Connects this node to a parent node. The parent node will initiate a
// handshake.
void ConnectToParent(ConnectionParams connection_params);
// Connects this node to a peer node. On success, |port| will be merged with
// the corresponding port in the peer node.
void ConnectToPeer(ConnectionParams connection_params,
const ports::PortRef& port,
const std::string& peer_token);
// Sets a port's observer. If |observer| is null the port's current observer
// is removed.
void SetPortObserver(const ports::PortRef& port,
scoped_refptr<PortObserver> observer);
// Closes a port. Use this in lieu of calling Node::ClosePort() directly, as
// it ensures the port's observer has also been removed.
void ClosePort(const ports::PortRef& port);
// Sends a message on a port to its peer.
int SendMessage(const ports::PortRef& port_ref,
std::unique_ptr<PortsMessage> message);
// Reserves a local port |port| associated with |token|. A peer holding a copy
// of |token| can merge one of its own ports into this one.
void ReservePort(const std::string& token, const ports::PortRef& port,
const std::string& child_token);
// Merges a local port |port| into a port reserved by |token| in the parent.
void MergePortIntoParent(const std::string& token,
const ports::PortRef& port);
// Merges two local ports together.
int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1);
// Creates a new shared buffer for use in the current process.
scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
// Request that the Node be shut down cleanly. This may take an arbitrarily
// long time to complete, at which point |callback| will be called.
//
// Note that while it is safe to continue using the NodeController's public
// interface after requesting shutdown, you do so at your own risk and there
// is NO guarantee that new messages will be sent or ports will complete
// transfer.
void RequestShutdown(const base::Closure& callback);
// Notifies the NodeController that we received a bad message from the given
// node.
void NotifyBadMessageFrom(const ports::NodeName& source_node,
const std::string& error);
private:
friend Core;
using NodeMap = std::unordered_map<ports::NodeName,
scoped_refptr<NodeChannel>>;
using OutgoingMessageQueue = std::queue<Channel::MessagePtr>;
struct ReservedPort {
ports::PortRef port;
const std::string child_token;
};
struct PeerConnection {
PeerConnection();
PeerConnection(const PeerConnection& other);
PeerConnection(PeerConnection&& other);
PeerConnection(scoped_refptr<NodeChannel> channel,
const ports::PortRef& local_port,
const std::string& peer_token);
~PeerConnection();
PeerConnection& operator=(const PeerConnection& other);
PeerConnection& operator=(PeerConnection&& other);
scoped_refptr<NodeChannel> channel;
ports::PortRef local_port;
std::string peer_token;
};
void ConnectToChildOnIOThread(
base::ProcessHandle process_handle,
ConnectionParams connection_params,
ports::NodeName token,
const ProcessErrorCallback& process_error_callback);
void ConnectToParentOnIOThread(ConnectionParams connection_params);
void ConnectToPeerOnIOThread(ConnectionParams connection_params,
ports::NodeName token,
ports::PortRef port,
const std::string& peer_token);
void ClosePeerConnectionOnIOThread(const std::string& node_name);
scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
scoped_refptr<NodeChannel> GetParentChannel();
scoped_refptr<NodeChannel> GetBrokerChannel();
void AddPeer(const ports::NodeName& name,
scoped_refptr<NodeChannel> channel,
bool start_channel);
void DropPeer(const ports::NodeName& name, NodeChannel* channel);
void SendPeerMessage(const ports::NodeName& name,
ports::ScopedMessage message);
void AcceptIncomingMessages();
void ProcessIncomingMessages();
void DropAllPeers();
// ports::NodeDelegate:
void GenerateRandomPortName(ports::PortName* port_name) override;
void AllocMessage(size_t num_header_bytes,
ports::ScopedMessage* message) override;
void ForwardMessage(const ports::NodeName& node,
ports::ScopedMessage message) override;
void BroadcastMessage(ports::ScopedMessage message) override;
void PortStatusChanged(const ports::PortRef& port) override;
// NodeChannel::Delegate:
void OnAcceptChild(const ports::NodeName& from_node,
const ports::NodeName& parent_name,
const ports::NodeName& token) override;
void OnAcceptParent(const ports::NodeName& from_node,
const ports::NodeName& token,
const ports::NodeName& child_name) override;
void OnAddBrokerClient(const ports::NodeName& from_node,
const ports::NodeName& client_name,
base::ProcessHandle process_handle) override;
void OnBrokerClientAdded(const ports::NodeName& from_node,
const ports::NodeName& client_name,
ScopedPlatformHandle broker_channel) override;
void OnAcceptBrokerClient(const ports::NodeName& from_node,
const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel) override;
void OnPortsMessage(const ports::NodeName& from_node,
Channel::MessagePtr message) override;
void OnRequestPortMerge(const ports::NodeName& from_node,
const ports::PortName& connector_port_name,
const std::string& token) override;
void OnRequestIntroduction(const ports::NodeName& from_node,
const ports::NodeName& name) override;
void OnIntroduce(const ports::NodeName& from_node,
const ports::NodeName& name,
ScopedPlatformHandle channel_handle) override;
void OnBroadcast(const ports::NodeName& from_node,
Channel::MessagePtr message) override;
#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
void OnRelayPortsMessage(const ports::NodeName& from_node,
base::ProcessHandle from_process,
const ports::NodeName& destination,
Channel::MessagePtr message) override;
void OnPortsMessageFromRelay(const ports::NodeName& from_node,
const ports::NodeName& source_node,
Channel::MessagePtr message) override;
#endif
void OnAcceptPeer(const ports::NodeName& from_node,
const ports::NodeName& token,
const ports::NodeName& peer_name,
const ports::PortName& port_name) override;
void OnChannelError(const ports::NodeName& from_node,
NodeChannel* channel) override;
#if defined(OS_MACOSX) && !defined(OS_IOS)
MachPortRelay* GetMachPortRelay() override;
#endif
// Cancels all pending port merges. These are merges which are supposed to
// be requested from the parent ASAP, and they may be cancelled if the
// connection to the parent is broken or never established.
void CancelPendingPortMerges();
// Marks this NodeController for destruction when the IO thread shuts down.
// This is used in case Core is torn down before the IO thread. Must only be
// called on the IO thread.
void DestroyOnIOThreadShutdown();
// If there is a registered shutdown callback (meaning shutdown has been
// requested, this checks the Node's status to see if clean shutdown is
// possible. If so, shutdown is performed and the shutdown callback is run.
void AttemptShutdownIfRequested();
// These are safe to access from any thread as long as the Node is alive.
Core* const core_;
const ports::NodeName name_;
const std::unique_ptr<ports::Node> node_;
scoped_refptr<base::TaskRunner> io_task_runner_;
// Guards |peers_| and |pending_peer_messages_|.
base::Lock peers_lock_;
// Channels to known peers, including parent and children, if any.
NodeMap peers_;
// Outgoing message queues for peers we've heard of but can't yet talk to.
std::unordered_map<ports::NodeName, OutgoingMessageQueue>
pending_peer_messages_;
// Guards |reserved_ports_| and |pending_child_tokens_|.
base::Lock reserved_ports_lock_;
// Ports reserved by token. Key is the port token.
base::hash_map<std::string, ReservedPort> reserved_ports_;
// TODO(amistry): This _really_ needs to be a bimap. Unfortunately, we don't
// have one yet :(
std::unordered_map<ports::NodeName, std::string> pending_child_tokens_;
// Guards |pending_port_merges_| and |reject_pending_merges_|.
base::Lock pending_port_merges_lock_;
// A set of port merge requests awaiting parent connection.
std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
// Indicates that new merge requests should be rejected because the parent has
// disconnected.
bool reject_pending_merges_ = false;
// Guards |parent_name_| and |bootstrap_parent_channel_|.
base::Lock parent_lock_;
// The name of our parent node, if any.
ports::NodeName parent_name_;
// A temporary reference to the parent channel before we know their name.
scoped_refptr<NodeChannel> bootstrap_parent_channel_;
// Guards |broker_name_|, |pending_broker_clients_|, and
// |pending_relay_messages_|.
base::Lock broker_lock_;
// The name of our broker node, if any.
ports::NodeName broker_name_;
// A queue of pending child names waiting to be connected to a broker.
std::queue<ports::NodeName> pending_broker_clients_;
// Messages waiting to be relayed by the broker once it's known.
std::unordered_map<ports::NodeName, OutgoingMessageQueue>
pending_relay_messages_;
// Guards |incoming_messages_| and |incoming_messages_task_posted_|.
base::Lock messages_lock_;
std::queue<ports::ScopedMessage> incoming_messages_;
// Ensures that there is only one incoming messages task posted to the IO
// thread.
bool incoming_messages_task_posted_ = false;
// Flag to fast-path checking |incoming_messages_|.
AtomicFlag incoming_messages_flag_;
// Guards |shutdown_callback_|.
base::Lock shutdown_lock_;
// Set by RequestShutdown(). If this is non-null, the controller will
// begin polling the Node to see if clean shutdown is possible any time the
// Node's state is modified by the controller.
base::Closure shutdown_callback_;
// Flag to fast-path checking |shutdown_callback_|.
AtomicFlag shutdown_callback_flag_;
// All other fields below must only be accessed on the I/O thread, i.e., the
// thread on which core_->io_task_runner() runs tasks.
// Channels to children during handshake.
NodeMap pending_children_;
using PeerNodeMap =
std::unordered_map<ports::NodeName, PeerConnection>;
PeerNodeMap peer_connections_;
// Maps from peer token to node name, pending or not.
std::unordered_map<std::string, ports::NodeName> peers_by_token_;
// Indicates whether this object should delete itself on IO thread shutdown.
// Must only be accessed from the IO thread.
bool destroy_on_io_thread_shutdown_ = false;
#if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
// Broker for sync shared buffer creation in children.
std::unique_ptr<Broker> broker_;
#endif
#if defined(OS_MACOSX) && !defined(OS_IOS)
base::Lock mach_port_relay_lock_;
// Relay for transferring mach ports to/from children.
std::unique_ptr<MachPortRelay> mach_port_relay_;
#endif
DISALLOW_COPY_AND_ASSIGN(NodeController);
};
} // namespace edk
} // namespace mojo
#endif // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_