// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/edk/system/data_pipe_producer_dispatcher.h" #include <stddef.h> #include <stdint.h> #include <utility> #include "base/bind.h" #include "base/logging.h" #include "base/memory/ref_counted.h" #include "base/message_loop/message_loop.h" #include "mojo/edk/embedder/embedder_internal.h" #include "mojo/edk/embedder/platform_shared_buffer.h" #include "mojo/edk/system/configuration.h" #include "mojo/edk/system/core.h" #include "mojo/edk/system/data_pipe_control_message.h" #include "mojo/edk/system/node_controller.h" #include "mojo/edk/system/ports_message.h" #include "mojo/edk/system/request_context.h" #include "mojo/public/c/system/data_pipe.h" namespace mojo { namespace edk { namespace { const uint8_t kFlagPeerClosed = 0x01; #pragma pack(push, 1) struct SerializedState { MojoCreateDataPipeOptions options; uint64_t pipe_id; uint32_t write_offset; uint32_t available_capacity; uint8_t flags; char padding[7]; }; static_assert(sizeof(SerializedState) % 8 == 0, "Invalid SerializedState size."); #pragma pack(pop) } // namespace // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a // reference to the dispatcher to ensure it lives as long as the observed port. class DataPipeProducerDispatcher::PortObserverThunk : public NodeController::PortObserver { public: explicit PortObserverThunk( scoped_refptr<DataPipeProducerDispatcher> dispatcher) : dispatcher_(dispatcher) {} private: ~PortObserverThunk() override {} // NodeController::PortObserver: void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } scoped_refptr<DataPipeProducerDispatcher> dispatcher_; DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); }; DataPipeProducerDispatcher::DataPipeProducerDispatcher( NodeController* node_controller, const ports::PortRef& control_port, scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, const MojoCreateDataPipeOptions& options, bool initialized, uint64_t pipe_id) : options_(options), node_controller_(node_controller), control_port_(control_port), pipe_id_(pipe_id), shared_ring_buffer_(shared_ring_buffer), available_capacity_(options_.capacity_num_bytes) { if (initialized) { base::AutoLock lock(lock_); InitializeNoLock(); } } Dispatcher::Type DataPipeProducerDispatcher::GetType() const { return Type::DATA_PIPE_PRODUCER; } MojoResult DataPipeProducerDispatcher::Close() { base::AutoLock lock(lock_); DVLOG(1) << "Closing data pipe producer " << pipe_id_; return CloseNoLock(); } MojoResult DataPipeProducerDispatcher::Watch( MojoHandleSignals signals, const Watcher::WatchCallback& callback, uintptr_t context) { base::AutoLock lock(lock_); if (is_closed_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; return awakable_list_.AddWatcher( signals, callback, context, GetHandleSignalsStateNoLock()); } MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { base::AutoLock lock(lock_); if (is_closed_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; return awakable_list_.RemoveWatcher(context); } MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, uint32_t* num_bytes, MojoWriteDataFlags flags) { base::AutoLock lock(lock_); if (!shared_ring_buffer_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; if (in_two_phase_write_) return MOJO_RESULT_BUSY; if (peer_closed_) return MOJO_RESULT_FAILED_PRECONDITION; if (*num_bytes % options_.element_num_bytes != 0) return MOJO_RESULT_INVALID_ARGUMENT; if (*num_bytes == 0) return MOJO_RESULT_OK; // Nothing to do. if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) && (*num_bytes > available_capacity_)) { // Don't return "should wait" since you can't wait for a specified amount of // data. return MOJO_RESULT_OUT_OF_RANGE; } DCHECK_LE(available_capacity_, options_.capacity_num_bytes); uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); if (num_bytes_to_write == 0) return MOJO_RESULT_SHOULD_WAIT; HandleSignalsState old_state = GetHandleSignalsStateNoLock(); *num_bytes = num_bytes_to_write; CHECK(ring_buffer_mapping_); uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); CHECK(data); const uint8_t* source = static_cast<const uint8_t*>(elements); CHECK(source); DCHECK_LE(write_offset_, options_.capacity_num_bytes); uint32_t tail_bytes_to_write = std::min(options_.capacity_num_bytes - write_offset_, num_bytes_to_write); uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; DCHECK_GT(tail_bytes_to_write, 0u); memcpy(data + write_offset_, source, tail_bytes_to_write); if (head_bytes_to_write > 0) memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); DCHECK_LE(num_bytes_to_write, available_capacity_); available_capacity_ -= num_bytes_to_write; write_offset_ = (write_offset_ + num_bytes_to_write) % options_.capacity_num_bytes; HandleSignalsState new_state = GetHandleSignalsStateNoLock(); if (!new_state.equals(old_state)) awakable_list_.AwakeForStateChange(new_state); base::AutoUnlock unlock(lock_); NotifyWrite(num_bytes_to_write); return MOJO_RESULT_OK; } MojoResult DataPipeProducerDispatcher::BeginWriteData( void** buffer, uint32_t* buffer_num_bytes, MojoWriteDataFlags flags) { base::AutoLock lock(lock_); if (!shared_ring_buffer_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; if (in_two_phase_write_) return MOJO_RESULT_BUSY; if (peer_closed_) return MOJO_RESULT_FAILED_PRECONDITION; if (available_capacity_ == 0) { return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; } in_two_phase_write_ = true; *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_, available_capacity_); DCHECK_GT(*buffer_num_bytes, 0u); CHECK(ring_buffer_mapping_); uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); *buffer = data + write_offset_; return MOJO_RESULT_OK; } MojoResult DataPipeProducerDispatcher::EndWriteData( uint32_t num_bytes_written) { base::AutoLock lock(lock_); if (is_closed_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; if (!in_two_phase_write_) return MOJO_RESULT_FAILED_PRECONDITION; DCHECK(shared_ring_buffer_); DCHECK(ring_buffer_mapping_); // Note: Allow successful completion of the two-phase write even if the other // side has been closed. MojoResult rv = MOJO_RESULT_OK; if (num_bytes_written > available_capacity_ || num_bytes_written % options_.element_num_bytes != 0 || write_offset_ + num_bytes_written > options_.capacity_num_bytes) { rv = MOJO_RESULT_INVALID_ARGUMENT; } else { DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes); available_capacity_ -= num_bytes_written; write_offset_ = (write_offset_ + num_bytes_written) % options_.capacity_num_bytes; base::AutoUnlock unlock(lock_); NotifyWrite(num_bytes_written); } in_two_phase_write_ = false; // If we're now writable, we *became* writable (since we weren't writable // during the two-phase write), so awake producer awakables. HandleSignalsState new_state = GetHandleSignalsStateNoLock(); if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) awakable_list_.AwakeForStateChange(new_state); return rv; } HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { base::AutoLock lock(lock_); return GetHandleSignalsStateNoLock(); } MojoResult DataPipeProducerDispatcher::AddAwakable( Awakable* awakable, MojoHandleSignals signals, uintptr_t context, HandleSignalsState* signals_state) { base::AutoLock lock(lock_); if (!shared_ring_buffer_ || in_transit_) { if (signals_state) *signals_state = HandleSignalsState(); return MOJO_RESULT_INVALID_ARGUMENT; } UpdateSignalsStateNoLock(); HandleSignalsState state = GetHandleSignalsStateNoLock(); if (state.satisfies(signals)) { if (signals_state) *signals_state = state; return MOJO_RESULT_ALREADY_EXISTS; } if (!state.can_satisfy(signals)) { if (signals_state) *signals_state = state; return MOJO_RESULT_FAILED_PRECONDITION; } awakable_list_.Add(awakable, signals, context); return MOJO_RESULT_OK; } void DataPipeProducerDispatcher::RemoveAwakable( Awakable* awakable, HandleSignalsState* signals_state) { base::AutoLock lock(lock_); if ((!shared_ring_buffer_ || in_transit_) && signals_state) *signals_state = HandleSignalsState(); else if (signals_state) *signals_state = GetHandleSignalsStateNoLock(); awakable_list_.Remove(awakable); } void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, uint32_t* num_ports, uint32_t* num_handles) { base::AutoLock lock(lock_); DCHECK(in_transit_); *num_bytes = sizeof(SerializedState); *num_ports = 1; *num_handles = 1; } bool DataPipeProducerDispatcher::EndSerialize( void* destination, ports::PortName* ports, PlatformHandle* platform_handles) { SerializedState* state = static_cast<SerializedState*>(destination); memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); memset(state->padding, 0, sizeof(state->padding)); base::AutoLock lock(lock_); DCHECK(in_transit_); state->pipe_id = pipe_id_; state->write_offset = write_offset_; state->available_capacity = available_capacity_; state->flags = peer_closed_ ? kFlagPeerClosed : 0; ports[0] = control_port_.name(); buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); platform_handles[0] = buffer_handle_for_transit_.get(); return true; } bool DataPipeProducerDispatcher::BeginTransit() { base::AutoLock lock(lock_); if (in_transit_) return false; in_transit_ = !in_two_phase_write_; return in_transit_; } void DataPipeProducerDispatcher::CompleteTransitAndClose() { node_controller_->SetPortObserver(control_port_, nullptr); base::AutoLock lock(lock_); DCHECK(in_transit_); transferred_ = true; in_transit_ = false; ignore_result(buffer_handle_for_transit_.release()); CloseNoLock(); } void DataPipeProducerDispatcher::CancelTransit() { base::AutoLock lock(lock_); DCHECK(in_transit_); in_transit_ = false; buffer_handle_for_transit_.reset(); awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); } // static scoped_refptr<DataPipeProducerDispatcher> DataPipeProducerDispatcher::Deserialize(const void* data, size_t num_bytes, const ports::PortName* ports, size_t num_ports, PlatformHandle* handles, size_t num_handles) { if (num_ports != 1 || num_handles != 1 || num_bytes != sizeof(SerializedState)) { return nullptr; } const SerializedState* state = static_cast<const SerializedState*>(data); NodeController* node_controller = internal::g_core->GetNodeController(); ports::PortRef port; if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) return nullptr; PlatformHandle buffer_handle; std::swap(buffer_handle, handles[0]); scoped_refptr<PlatformSharedBuffer> ring_buffer = PlatformSharedBuffer::CreateFromPlatformHandle( state->options.capacity_num_bytes, false /* read_only */, ScopedPlatformHandle(buffer_handle)); if (!ring_buffer) { DLOG(ERROR) << "Failed to deserialize shared buffer handle."; return nullptr; } scoped_refptr<DataPipeProducerDispatcher> dispatcher = new DataPipeProducerDispatcher(node_controller, port, ring_buffer, state->options, false /* initialized */, state->pipe_id); { base::AutoLock lock(dispatcher->lock_); dispatcher->write_offset_ = state->write_offset; dispatcher->available_capacity_ = state->available_capacity; dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; dispatcher->InitializeNoLock(); dispatcher->UpdateSignalsStateNoLock(); } return dispatcher; } DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ && !ring_buffer_mapping_); } void DataPipeProducerDispatcher::InitializeNoLock() { lock_.AssertAcquired(); if (shared_ring_buffer_) { ring_buffer_mapping_ = shared_ring_buffer_->Map(0, options_.capacity_num_bytes); if (!ring_buffer_mapping_) { DLOG(ERROR) << "Failed to map shared buffer."; shared_ring_buffer_ = nullptr; } } base::AutoUnlock unlock(lock_); node_controller_->SetPortObserver( control_port_, make_scoped_refptr(new PortObserverThunk(this))); } MojoResult DataPipeProducerDispatcher::CloseNoLock() { lock_.AssertAcquired(); if (is_closed_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; is_closed_ = true; ring_buffer_mapping_.reset(); shared_ring_buffer_ = nullptr; awakable_list_.CancelAll(); if (!transferred_) { base::AutoUnlock unlock(lock_); node_controller_->ClosePort(control_port_); } return MOJO_RESULT_OK; } HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() const { lock_.AssertAcquired(); HandleSignalsState rv; if (!peer_closed_) { if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0) rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; } else { rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; } rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; return rv; } void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: " << num_bytes << " bytes written. [control_port=" << control_port_.name() << "]"; SendDataPipeControlMessage(node_controller_, control_port_, DataPipeCommand::DATA_WAS_WRITTEN, num_bytes); } void DataPipeProducerDispatcher::OnPortStatusChanged() { DCHECK(RequestContext::current()); base::AutoLock lock(lock_); // We stop observing the control port as soon it's transferred, but this can // race with events which are raised right before that happens. This is fine // to ignore. if (transferred_) return; DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; UpdateSignalsStateNoLock(); } void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { lock_.AssertAcquired(); bool was_peer_closed = peer_closed_; size_t previous_capacity = available_capacity_; ports::PortStatus port_status; int rv = node_controller_->node()->GetStatus(control_port_, &port_status); if (rv != ports::OK || !port_status.receiving_messages) { DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure" << " [control_port=" << control_port_.name() << "]"; peer_closed_ = true; } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { ports::ScopedMessage message; do { int rv = node_controller_->node()->GetMessage( control_port_, &message, nullptr); if (rv != ports::OK) peer_closed_ = true; if (message) { if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) { peer_closed_ = true; break; } const DataPipeControlMessage* m = static_cast<const DataPipeControlMessage*>( message->payload_bytes()); if (m->command != DataPipeCommand::DATA_WAS_READ) { DLOG(ERROR) << "Unexpected message from consumer."; peer_closed_ = true; break; } if (static_cast<size_t>(available_capacity_) + m->num_bytes > options_.capacity_num_bytes) { DLOG(ERROR) << "Consumer claims to have read too many bytes."; break; } DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that " << m->num_bytes << " bytes were read. [control_port=" << control_port_.name() << "]"; available_capacity_ += m->num_bytes; } } while (message); } if (peer_closed_ != was_peer_closed || available_capacity_ != previous_capacity) { awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); } } } // namespace edk } // namespace mojo