// Copyright 2017 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/core/channel.h" #include <lib/fdio/limits.h> #include <lib/fdio/util.h> #include <lib/zx/channel.h> #include <lib/zx/handle.h> #include <zircon/processargs.h> #include <zircon/status.h> #include <zircon/syscalls.h> #include <algorithm> #include "base/bind.h" #include "base/containers/circular_deque.h" #include "base/files/scoped_file.h" #include "base/fuchsia/fuchsia_logging.h" #include "base/location.h" #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/message_loop/message_loop_current.h" #include "base/message_loop/message_pump_for_io.h" #include "base/stl_util.h" #include "base/synchronization/lock.h" #include "base/task_runner.h" #include "mojo/core/platform_handle_in_transit.h" namespace mojo { namespace core { namespace { const size_t kMaxBatchReadCapacity = 256 * 1024; bool UnwrapPlatformHandle(PlatformHandleInTransit handle, Channel::Message::HandleInfoEntry* info_out, std::vector<PlatformHandleInTransit>* handles_out) { DCHECK(handle.handle().is_valid()); if (!handle.handle().is_valid_fd()) { *info_out = {0u, 0u}; handles_out->emplace_back(std::move(handle)); return true; } // Each FDIO file descriptor is implemented using one or more native resources // and can be un-wrapped into a set of |handle| and |info| pairs, with |info| // consisting of an FDIO-defined type & arguments (see zircon/processargs.h). // // We try to transfer the FD, but if that fails (for example if the file has // already been dup()d into another FD) we may need to clone. zx_handle_t handles[FDIO_MAX_HANDLES] = {}; uint32_t info[FDIO_MAX_HANDLES] = {}; zx_status_t result = fdio_transfer_fd(handle.handle().GetFD().get(), 0, handles, info); if (result > 0) { // On success, the fd in |handle| has been transferred and is no longer // valid. Release from the PlatformHandle to avoid close()ing an invalid // an invalid handle. handle.CompleteTransit(); } else if (result == ZX_ERR_UNAVAILABLE) { // No luck, try cloning instead. result = fdio_clone_fd(handle.handle().GetFD().get(), 0, handles, info); } if (result <= 0) { ZX_DLOG(ERROR, result) << "fdio_transfer_fd(" << handle.handle().GetFD().get() << ")"; return false; } DCHECK_LE(result, FDIO_MAX_HANDLES); // We assume here that only the |PA_HND_TYPE| of the |info| really matters, // and that that is the same for all the underlying handles. *info_out = {PA_HND_TYPE(info[0]), result}; for (int i = 0; i < result; ++i) { DCHECK_EQ(PA_HND_TYPE(info[0]), PA_HND_TYPE(info[i])); DCHECK_EQ(0u, PA_HND_SUBTYPE(info[i])); handles_out->emplace_back( PlatformHandleInTransit(PlatformHandle(zx::handle(handles[i])))); } return true; } PlatformHandle WrapPlatformHandles(Channel::Message::HandleInfoEntry info, base::circular_deque<zx::handle>* handles) { PlatformHandle out_handle; if (!info.type) { out_handle = PlatformHandle(std::move(handles->front())); handles->pop_front(); } else { if (info.count > FDIO_MAX_HANDLES) return PlatformHandle(); // Fetch the required number of handles from |handles| and set up type info. zx_handle_t fd_handles[FDIO_MAX_HANDLES] = {}; uint32_t fd_infos[FDIO_MAX_HANDLES] = {}; for (int i = 0; i < info.count; ++i) { fd_handles[i] = (*handles)[i].get(); fd_infos[i] = PA_HND(info.type, 0); } // Try to wrap the handles into an FDIO file descriptor. base::ScopedFD out_fd; zx_status_t result = fdio_create_fd(fd_handles, fd_infos, info.count, out_fd.receive()); if (result != ZX_OK) { ZX_DLOG(ERROR, result) << "fdio_create_fd"; return PlatformHandle(); } // The handles are owned by FDIO now, so |release()| them before removing // the entries from |handles|. for (int i = 0; i < info.count; ++i) { ignore_result(handles->front().release()); handles->pop_front(); } out_handle = PlatformHandle(std::move(out_fd)); } return out_handle; } // A view over a Channel::Message object. The write queue uses these since // large messages may need to be sent in chunks. class MessageView { public: // Owns |message|. |offset| indexes the first unsent byte in the message. MessageView(Channel::MessagePtr message, size_t offset) : message_(std::move(message)), offset_(offset), handles_(message_->TakeHandlesForTransport()) { DCHECK_GT(message_->data_num_bytes(), offset_); } MessageView(MessageView&& other) { *this = std::move(other); } MessageView& operator=(MessageView&& other) { message_ = std::move(other.message_); offset_ = other.offset_; handles_ = std::move(other.handles_); return *this; } ~MessageView() {} const void* data() const { return static_cast<const char*>(message_->data()) + offset_; } size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } size_t data_offset() const { return offset_; } void advance_data_offset(size_t num_bytes) { DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); offset_ += num_bytes; } std::vector<PlatformHandleInTransit> TakeHandles() { if (handles_.empty()) return std::vector<PlatformHandleInTransit>(); // We can only pass Fuchsia handles via IPC, so unwrap any FDIO file- // descriptors in |handles_| into the underlying handles, and serialize the // metadata, if any, into the extra header. auto* handles_info = reinterpret_cast<Channel::Message::HandleInfoEntry*>( message_->mutable_extra_header()); memset(handles_info, 0, message_->extra_header_size()); std::vector<PlatformHandleInTransit> in_handles = std::move(handles_); handles_.reserve(in_handles.size()); for (size_t i = 0; i < in_handles.size(); i++) { if (!UnwrapPlatformHandle(std::move(in_handles[i]), &handles_info[i], &handles_)) return std::vector<PlatformHandleInTransit>(); } return std::move(handles_); } private: Channel::MessagePtr message_; size_t offset_; std::vector<PlatformHandleInTransit> handles_; DISALLOW_COPY_AND_ASSIGN(MessageView); }; class ChannelFuchsia : public Channel, public base::MessageLoopCurrent::DestructionObserver, public base::MessagePumpForIO::ZxHandleWatcher { public: ChannelFuchsia(Delegate* delegate, ConnectionParams connection_params, scoped_refptr<base::TaskRunner> io_task_runner) : Channel(delegate), self_(this), handle_( connection_params.TakeEndpoint().TakePlatformHandle().TakeHandle()), io_task_runner_(io_task_runner) { CHECK(handle_.is_valid()); } void Start() override { if (io_task_runner_->RunsTasksInCurrentSequence()) { StartOnIOThread(); } else { io_task_runner_->PostTask( FROM_HERE, base::BindOnce(&ChannelFuchsia::StartOnIOThread, this)); } } void ShutDownImpl() override { // Always shut down asynchronously when called through the public interface. io_task_runner_->PostTask( FROM_HERE, base::BindOnce(&ChannelFuchsia::ShutDownOnIOThread, this)); } void Write(MessagePtr message) override { bool write_error = false; { base::AutoLock lock(write_lock_); if (reject_writes_) return; if (!WriteNoLock(MessageView(std::move(message), 0))) reject_writes_ = write_error = true; } if (write_error) { // Do not synchronously invoke OnWriteError(). Write() may have been // called by the delegate and we don't want to re-enter it. io_task_runner_->PostTask( FROM_HERE, base::BindOnce(&ChannelFuchsia::OnWriteError, this, Error::kDisconnected)); } } void LeakHandle() override { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); leak_handle_ = true; } bool GetReadPlatformHandles(const void* payload, size_t payload_size, size_t num_handles, const void* extra_header, size_t extra_header_size, std::vector<PlatformHandle>* handles, bool* deferred) override { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); if (num_handles > std::numeric_limits<uint16_t>::max()) return false; // Locate the handle info and verify there is enough of it. if (!extra_header) return false; const auto* handles_info = reinterpret_cast<const Channel::Message::HandleInfoEntry*>( extra_header); size_t handles_info_size = sizeof(handles_info[0]) * num_handles; if (handles_info_size > extra_header_size) return false; // Some caller-supplied handles may be FDIO file-descriptors, which were // un-wrapped to more than one native platform resource handle for transfer. // We may therefore need to expect more than |num_handles| handles to have // been accumulated in |incoming_handles_|, based on the handle info. size_t num_raw_handles = 0u; for (size_t i = 0; i < num_handles; ++i) num_raw_handles += handles_info[i].type ? handles_info[i].count : 1; // If there are too few handles then we're not ready yet, so return true // indicating things are OK, but leave |handles| empty. if (incoming_handles_.size() < num_raw_handles) return true; handles->reserve(num_handles); for (size_t i = 0; i < num_handles; ++i) { handles->emplace_back( WrapPlatformHandles(handles_info[i], &incoming_handles_)); } return true; } private: ~ChannelFuchsia() override { DCHECK(!read_watch_); } void StartOnIOThread() { DCHECK(!read_watch_); base::MessageLoopCurrent::Get()->AddDestructionObserver(this); read_watch_.reset( new base::MessagePumpForIO::ZxHandleWatchController(FROM_HERE)); base::MessageLoopCurrentForIO::Get()->WatchZxHandle( handle_.get(), true /* persistent */, ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, read_watch_.get(), this); } void ShutDownOnIOThread() { base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this); read_watch_.reset(); if (leak_handle_) ignore_result(handle_.release()); handle_.reset(); // May destroy the |this| if it was the last reference. self_ = nullptr; } // base::MessageLoopCurrent::DestructionObserver: void WillDestroyCurrentMessageLoop() override { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); if (self_) ShutDownOnIOThread(); } // base::MessagePumpForIO::ZxHandleWatcher: void OnZxHandleSignalled(zx_handle_t handle, zx_signals_t signals) override { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); CHECK_EQ(handle, handle_.get()); DCHECK((ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED) & signals); // We always try to read message(s), even if ZX_CHANNEL_PEER_CLOSED, since // the peer may have closed while messages were still unread, in the pipe. bool validation_error = false; bool read_error = false; size_t next_read_size = 0; size_t buffer_capacity = 0; size_t total_bytes_read = 0; do { buffer_capacity = next_read_size; char* buffer = GetReadBuffer(&buffer_capacity); DCHECK_GT(buffer_capacity, 0u); uint32_t bytes_read = 0; uint32_t handles_read = 0; zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {}; zx_status_t read_result = handle_.read(0, buffer, buffer_capacity, &bytes_read, handles, base::size(handles), &handles_read); if (read_result == ZX_OK) { for (size_t i = 0; i < handles_read; ++i) { incoming_handles_.emplace_back(handles[i]); } total_bytes_read += bytes_read; if (!OnReadComplete(bytes_read, &next_read_size)) { read_error = true; validation_error = true; break; } } else if (read_result == ZX_ERR_BUFFER_TOO_SMALL) { DCHECK_LE(handles_read, base::size(handles)); next_read_size = bytes_read; } else if (read_result == ZX_ERR_SHOULD_WAIT) { break; } else { ZX_DLOG_IF(ERROR, read_result != ZX_ERR_PEER_CLOSED, read_result) << "zx_channel_read"; read_error = true; break; } } while (total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0); if (read_error) { // Stop receiving read notifications. read_watch_.reset(); if (validation_error) OnError(Error::kReceivedMalformedData); else OnError(Error::kDisconnected); } } // Attempts to write a message directly to the channel. If the full message // cannot be written, it's queued and a wait is initiated to write the message // ASAP on the I/O thread. bool WriteNoLock(MessageView message_view) { uint32_t write_bytes = 0; do { message_view.advance_data_offset(write_bytes); std::vector<PlatformHandleInTransit> outgoing_handles = message_view.TakeHandles(); zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {}; size_t handles_count = outgoing_handles.size(); DCHECK_LE(handles_count, base::size(handles)); for (size_t i = 0; i < handles_count; ++i) { DCHECK(outgoing_handles[i].handle().is_valid()); handles[i] = outgoing_handles[i].handle().GetHandle().get(); } write_bytes = std::min(message_view.data_num_bytes(), static_cast<size_t>(ZX_CHANNEL_MAX_MSG_BYTES)); zx_status_t result = handle_.write(0, message_view.data(), write_bytes, handles, handles_count); // zx_channel_write() consumes |handles| whether or not it succeeds, so // release() our copies now, to avoid them being double-closed. for (auto& outgoing_handle : outgoing_handles) outgoing_handle.CompleteTransit(); if (result != ZX_OK) { // TODO(fuchsia): Handle ZX_ERR_SHOULD_WAIT flow-control errors, once // the platform starts generating them. See https://crbug.com/754084. ZX_DLOG_IF(ERROR, result != ZX_ERR_PEER_CLOSED, result) << "WriteNoLock(zx_channel_write)"; return false; } } while (write_bytes < message_view.data_num_bytes()); return true; } void OnWriteError(Error error) { DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); DCHECK(reject_writes_); if (error == Error::kDisconnected) { // If we can't write because the pipe is disconnected then continue // reading to fetch any in-flight messages, relying on end-of-stream to // signal the actual disconnection. if (read_watch_) { // TODO: When we add flow-control for writes, we also need to reset the // write-watcher here. return; } } OnError(error); } // Keeps the Channel alive at least until explicit shutdown on the IO thread. scoped_refptr<Channel> self_; zx::channel handle_; scoped_refptr<base::TaskRunner> io_task_runner_; // These members are only used on the IO thread. std::unique_ptr<base::MessagePumpForIO::ZxHandleWatchController> read_watch_; base::circular_deque<zx::handle> incoming_handles_; bool leak_handle_ = false; base::Lock write_lock_; bool reject_writes_ = false; DISALLOW_COPY_AND_ASSIGN(ChannelFuchsia); }; } // namespace // static scoped_refptr<Channel> Channel::Create( Delegate* delegate, ConnectionParams connection_params, scoped_refptr<base::TaskRunner> io_task_runner) { return new ChannelFuchsia(delegate, std::move(connection_params), std::move(io_task_runner)); } } // namespace core } // namespace mojo