// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/system/channel.h" #include <algorithm> #include "base/basictypes.h" #include "base/bind.h" #include "base/compiler_specific.h" #include "base/logging.h" #include "base/strings/stringprintf.h" #include "mojo/embedder/platform_handle_vector.h" #include "mojo/system/message_pipe_endpoint.h" #include "mojo/system/transport_data.h" namespace mojo { namespace system { COMPILE_ASSERT(Channel::kBootstrapEndpointId != MessageInTransit::kInvalidEndpointId, kBootstrapEndpointId_is_invalid); STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId Channel::kBootstrapEndpointId; Channel::EndpointInfo::EndpointInfo() : state(STATE_NORMAL), port() { } Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, unsigned port) : state(STATE_NORMAL), message_pipe(message_pipe), port(port) { } Channel::EndpointInfo::~EndpointInfo() { } Channel::Channel() : is_running_(false), next_local_id_(kBootstrapEndpointId) { } bool Channel::Init(scoped_ptr<RawChannel> raw_channel) { DCHECK(creation_thread_checker_.CalledOnValidThread()); DCHECK(raw_channel); // No need to take |lock_|, since this must be called before this object // becomes thread-safe. DCHECK(!is_running_no_lock()); raw_channel_ = raw_channel.Pass(); if (!raw_channel_->Init(this)) { raw_channel_.reset(); return false; } is_running_ = true; return true; } void Channel::Shutdown() { DCHECK(creation_thread_checker_.CalledOnValidThread()); IdToEndpointInfoMap to_destroy; { base::AutoLock locker(lock_); if (!is_running_no_lock()) return; // Note: Don't reset |raw_channel_|, in case we're being called from within // |OnReadMessage()| or |OnFatalError()|. raw_channel_->Shutdown(); is_running_ = false; // We need to deal with it outside the lock. std::swap(to_destroy, local_id_to_endpoint_info_map_); } size_t num_live = 0; size_t num_zombies = 0; for (IdToEndpointInfoMap::iterator it = to_destroy.begin(); it != to_destroy.end(); ++it) { if (it->second.state == EndpointInfo::STATE_NORMAL) { it->second.message_pipe->OnRemove(it->second.port); num_live++; } else { DCHECK(!it->second.message_pipe); num_zombies++; } } DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live << " live endpoints and " << num_zombies << " zombies"; } MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( scoped_refptr<MessagePipe> message_pipe, unsigned port) { DCHECK(message_pipe); DCHECK(port == 0 || port == 1); MessageInTransit::EndpointId local_id; { base::AutoLock locker(lock_); while (next_local_id_ == MessageInTransit::kInvalidEndpointId || local_id_to_endpoint_info_map_.find(next_local_id_) != local_id_to_endpoint_info_map_.end()) next_local_id_++; local_id = next_local_id_; next_local_id_++; // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid // some expensive reference count increment/decrements.) Once this is done, // we should be able to delete |EndpointInfo|'s default constructor. local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); } // This might fail if that port got an |OnPeerClose()| before attaching. if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id)) return local_id; // Note: If it failed, quite possibly the endpoint info was removed from that // map (there's a race between us adding it to the map above and calling // |Attach()|). And even if an entry exists for |local_id|, we need to check // that it's the one we added (and not some other one that was added since). { base::AutoLock locker(lock_); IdToEndpointInfoMap::iterator it = local_id_to_endpoint_info_map_.find(local_id); if (it != local_id_to_endpoint_info_map_.end() && it->second.message_pipe.get() == message_pipe.get() && it->second.port == port) { DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL); // TODO(vtl): FIXME -- This is wrong. We need to specify (to // |AttachMessagePipeEndpoint()| who's going to be responsible for calling // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to // run, then we'll get messages to an "invalid" local ID (for running, for // removal). local_id_to_endpoint_info_map_.erase(it); } } return MessageInTransit::kInvalidEndpointId; } bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, MessageInTransit::EndpointId remote_id) { EndpointInfo endpoint_info; { base::AutoLock locker(lock_); IdToEndpointInfoMap::const_iterator it = local_id_to_endpoint_info_map_.find(local_id); if (it == local_id_to_endpoint_info_map_.end()) return false; endpoint_info = it->second; } // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| // and ignore it. if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " "(local ID " << local_id << ", remote ID " << remote_id << ")"; return true; } // TODO(vtl): FIXME -- We need to handle the case that message pipe is already // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); return true; } void Channel::RunRemoteMessagePipeEndpoint( MessageInTransit::EndpointId local_id, MessageInTransit::EndpointId remote_id) { #if DCHECK_IS_ON { base::AutoLock locker(lock_); DCHECK(local_id_to_endpoint_info_map_.find(local_id) != local_id_to_endpoint_info_map_.end()); } #endif if (!SendControlMessage( MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, local_id, remote_id)) { HandleLocalError(base::StringPrintf( "Failed to send message to run remote message pipe endpoint (local ID " "%u, remote ID %u)", static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); } } bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) { base::AutoLock locker(lock_); if (!is_running_no_lock()) { // TODO(vtl): I think this is probably not an error condition, but I should // think about it (and the shutdown sequence) more carefully. LOG(WARNING) << "WriteMessage() after shutdown"; return false; } return raw_channel_->WriteMessage(message.Pass()); } bool Channel::IsWriteBufferEmpty() { base::AutoLock locker(lock_); if (!is_running_no_lock()) return true; return raw_channel_->IsWriteBufferEmpty(); } void Channel::DetachMessagePipeEndpoint( MessageInTransit::EndpointId local_id, MessageInTransit::EndpointId remote_id) { DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); bool should_send_remove_message = false; { base::AutoLock locker_(lock_); if (!is_running_no_lock()) return; IdToEndpointInfoMap::iterator it = local_id_to_endpoint_info_map_.find(local_id); DCHECK(it != local_id_to_endpoint_info_map_.end()); switch (it->second.state) { case EndpointInfo::STATE_NORMAL: it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; it->second.message_pipe = NULL; should_send_remove_message = (remote_id != MessageInTransit::kInvalidEndpointId); break; case EndpointInfo::STATE_WAIT_LOCAL_DETACH: local_id_to_endpoint_info_map_.erase(it); break; case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK: NOTREACHED(); break; case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK: it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; break; } } if (!should_send_remove_message) return; if (!SendControlMessage( MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, local_id, remote_id)) { HandleLocalError(base::StringPrintf( "Failed to send message to remove remote message pipe endpoint (local " "ID %u, remote ID %u)", static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); } } size_t Channel::GetSerializedPlatformHandleSize() const { return raw_channel_->GetSerializedPlatformHandleSize(); } Channel::~Channel() { // The channel should have been shut down first. DCHECK(!is_running_no_lock()); } void Channel::OnReadMessage( const MessageInTransit::View& message_view, embedder::ScopedPlatformHandleVectorPtr platform_handles) { switch (message_view.type()) { case MessageInTransit::kTypeMessagePipeEndpoint: case MessageInTransit::kTypeMessagePipe: OnReadMessageForDownstream(message_view, platform_handles.Pass()); break; case MessageInTransit::kTypeChannel: OnReadMessageForChannel(message_view, platform_handles.Pass()); break; default: HandleRemoteError(base::StringPrintf( "Received message of invalid type %u", static_cast<unsigned>(message_view.type()))); break; } } void Channel::OnFatalError(FatalError fatal_error) { switch (fatal_error) { case FATAL_ERROR_READ: // Most read errors aren't notable: they just reflect that the other side // tore down the channel. DVLOG(1) << "RawChannel fatal error (read)"; break; case FATAL_ERROR_WRITE: // Write errors are slightly notable: they probably shouldn't happen under // normal operation (but maybe the other side crashed). LOG(WARNING) << "RawChannel fatal error (write)"; break; } Shutdown(); } void Channel::OnReadMessageForDownstream( const MessageInTransit::View& message_view, embedder::ScopedPlatformHandleVectorPtr platform_handles) { DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint || message_view.type() == MessageInTransit::kTypeMessagePipe); MessageInTransit::EndpointId local_id = message_view.destination_id(); if (local_id == MessageInTransit::kInvalidEndpointId) { HandleRemoteError("Received message with no destination ID"); return; } EndpointInfo endpoint_info; { base::AutoLock locker(lock_); // Since we own |raw_channel_|, and this method and |Shutdown()| should only // be called from the creation thread, |raw_channel_| should never be null // here. DCHECK(is_running_no_lock()); IdToEndpointInfoMap::const_iterator it = local_id_to_endpoint_info_map_.find(local_id); if (it == local_id_to_endpoint_info_map_.end()) { HandleRemoteError(base::StringPrintf( "Received a message for nonexistent local destination ID %u", static_cast<unsigned>(local_id))); // This is strongly indicative of some problem. However, it's not a fatal // error, since it may indicate a bug (or hostile) remote process. Don't // die even for Debug builds, since handling this properly needs to be // tested (TODO(vtl)). DLOG(ERROR) << "This should not happen under normal operation."; return; } endpoint_info = it->second; } // Ignore messages for zombie endpoints (not an error). if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " << local_id << ", remote ID = " << message_view.source_id() << ")"; return; } // We need to duplicate the message (data), because |EnqueueMessage()| will // take ownership of it. scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); if (message_view.transport_data_buffer_size() > 0) { DCHECK(message_view.transport_data_buffer()); message->SetDispatchers( TransportData::DeserializeDispatchers( message_view.transport_data_buffer(), message_view.transport_data_buffer_size(), platform_handles.Pass(), this)); } MojoResult result = endpoint_info.message_pipe->EnqueueMessage( MessagePipe::GetPeerPort(endpoint_info.port), message.Pass()); if (result != MOJO_RESULT_OK) { // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint // has been closed (in an unavoidable race). This might also be a "remote" // error, e.g., if the remote side is sending invalid control messages (to // the message pipe). HandleLocalError(base::StringPrintf( "Failed to enqueue message to local ID %u (result %d)", static_cast<unsigned>(local_id), static_cast<int>(result))); return; } } void Channel::OnReadMessageForChannel( const MessageInTransit::View& message_view, embedder::ScopedPlatformHandleVectorPtr platform_handles) { DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel); // Currently, no channel messages take platform handles. if (platform_handles) { HandleRemoteError( "Received invalid channel message (has platform handles)"); NOTREACHED(); return; } switch (message_view.subtype()) { case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint: DVLOG(2) << "Handling channel message to run message pipe (local ID " << message_view.destination_id() << ", remote ID " << message_view.source_id() << ")"; if (!RunMessagePipeEndpoint(message_view.destination_id(), message_view.source_id())) { HandleRemoteError( "Received invalid channel message to run message pipe"); } break; case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint: DVLOG(2) << "Handling channel message to remove message pipe (local ID " << message_view.destination_id() << ", remote ID " << message_view.source_id() << ")"; if (!RemoveMessagePipeEndpoint(message_view.destination_id(), message_view.source_id())) { HandleRemoteError( "Received invalid channel message to remove message pipe"); } break; case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck: DVLOG(2) << "Handling channel message to ack remove message pipe (local " "ID " << message_view.destination_id() << ", remote ID " << message_view.source_id() << ")"; if (!RemoveMessagePipeEndpoint(message_view.destination_id(), message_view.source_id())) { HandleRemoteError( "Received invalid channel message to ack remove message pipe"); } break; default: HandleRemoteError("Received invalid channel message"); NOTREACHED(); break; } } bool Channel::RemoveMessagePipeEndpoint( MessageInTransit::EndpointId local_id, MessageInTransit::EndpointId remote_id) { EndpointInfo endpoint_info; { base::AutoLock locker(lock_); IdToEndpointInfoMap::iterator it = local_id_to_endpoint_info_map_.find(local_id); if (it == local_id_to_endpoint_info_map_.end()) { DVLOG(2) << "Remove message pipe error: not found"; return false; } // If it's waiting for the remove ack, just do it and return. if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) { local_id_to_endpoint_info_map_.erase(it); return true; } if (it->second.state != EndpointInfo::STATE_NORMAL) { DVLOG(2) << "Remove message pipe error: wrong state"; return false; } it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH; endpoint_info = it->second; it->second.message_pipe = NULL; } if (!SendControlMessage( MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, local_id, remote_id)) { HandleLocalError(base::StringPrintf( "Failed to send message to remove remote message pipe endpoint ack " "(local ID %u, remote ID %u)", static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); } endpoint_info.message_pipe->OnRemove(endpoint_info.port); return true; } bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, MessageInTransit::EndpointId local_id, MessageInTransit::EndpointId remote_id) { DVLOG(2) << "Sending channel control message: subtype " << subtype << ", local ID " << local_id << ", remote ID " << remote_id; scoped_ptr<MessageInTransit> message(new MessageInTransit( MessageInTransit::kTypeChannel, subtype, 0, NULL)); message->set_source_id(local_id); message->set_destination_id(remote_id); return WriteMessage(message.Pass()); } void Channel::HandleRemoteError(const base::StringPiece& error_message) { // TODO(vtl): Is this how we really want to handle this? Probably we want to // terminate the connection, since it's spewing invalid stuff. LOG(WARNING) << error_message; } void Channel::HandleLocalError(const base::StringPiece& error_message) { // TODO(vtl): Is this how we really want to handle this? // Sometimes we'll want to propagate the error back to the message pipe // (endpoint), and notify it that the remote is (effectively) closed. // Sometimes we'll want to kill the channel (and notify all the endpoints that // their remotes are dead. LOG(WARNING) << error_message; } } // namespace system } // namespace mojo