// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/message_pump/message_pump_mojo.h" #include <stdint.h> #include <algorithm> #include <map> #include <vector> #include "base/containers/small_map.h" #include "base/debug/alias.h" #include "base/lazy_instance.h" #include "base/logging.h" #include "base/threading/thread_local.h" #include "base/threading/thread_restrictions.h" #include "base/time/time.h" #include "mojo/message_pump/message_pump_mojo_handler.h" #include "mojo/message_pump/time_helper.h" #include "mojo/public/c/system/wait_set.h" namespace mojo { namespace common { namespace { base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky g_tls_current_pump = LAZY_INSTANCE_INITIALIZER; MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, base::TimeTicks now) { // The is_null() check matches that of HandleWatcher as well as how // |delayed_work_time| is used. if (time_ticks.is_null()) return MOJO_DEADLINE_INDEFINITE; const int64_t delta = (time_ticks - now).InMicroseconds(); return delta < 0 ? static_cast<MojoDeadline>(0) : static_cast<MojoDeadline>(delta); } } // namespace struct MessagePumpMojo::RunState { RunState() : should_quit(false) {} base::TimeTicks delayed_work_time; bool should_quit; }; MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0), event_(base::WaitableEvent::ResetPolicy::AUTOMATIC, base::WaitableEvent::InitialState::NOT_SIGNALED) { DCHECK(!current()) << "There is already a MessagePumpMojo instance on this thread."; g_tls_current_pump.Pointer()->Set(this); MojoResult result = CreateMessagePipe(nullptr, &read_handle_, &write_handle_); CHECK_EQ(result, MOJO_RESULT_OK); CHECK(read_handle_.is_valid()); CHECK(write_handle_.is_valid()); MojoHandle handle; result = MojoCreateWaitSet(&handle); CHECK_EQ(result, MOJO_RESULT_OK); wait_set_handle_.reset(Handle(handle)); CHECK(wait_set_handle_.is_valid()); result = MojoAddHandle(wait_set_handle_.get().value(), read_handle_.get().value(), MOJO_HANDLE_SIGNAL_READABLE); CHECK_EQ(result, MOJO_RESULT_OK); } MessagePumpMojo::~MessagePumpMojo() { DCHECK_EQ(this, current()); g_tls_current_pump.Pointer()->Set(NULL); } // static std::unique_ptr<base::MessagePump> MessagePumpMojo::Create() { return std::unique_ptr<MessagePump>(new MessagePumpMojo()); } // static MessagePumpMojo* MessagePumpMojo::current() { return g_tls_current_pump.Pointer()->Get(); } void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, const Handle& handle, MojoHandleSignals wait_signals, base::TimeTicks deadline) { CHECK(handler); DCHECK(handle.is_valid()); // Assume it's an error if someone tries to reregister an existing handle. CHECK_EQ(0u, handlers_.count(handle)); Handler handler_data; handler_data.handler = handler; handler_data.wait_signals = wait_signals; handler_data.deadline = deadline; handler_data.id = next_handler_id_++; handlers_[handle] = handler_data; if (!deadline.is_null()) { bool inserted = deadline_handles_.insert(handle).second; DCHECK(inserted); } MojoResult result = MojoAddHandle(wait_set_handle_.get().value(), handle.value(), wait_signals); // Because stopping a HandleWatcher is now asynchronous, it's possible for the // handle to no longer be open at this point. CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT); } void MessagePumpMojo::RemoveHandler(const Handle& handle) { MojoResult result = MojoRemoveHandle(wait_set_handle_.get().value(), handle.value()); // At this point, it's possible that the handle has been closed, which would // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also // possible for the handle to have already been removed, so all of the // possible error codes are valid here. CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND || result == MOJO_RESULT_INVALID_ARGUMENT); handlers_.erase(handle); deadline_handles_.erase(handle); } void MessagePumpMojo::AddObserver(Observer* observer) { observers_.AddObserver(observer); } void MessagePumpMojo::RemoveObserver(Observer* observer) { observers_.RemoveObserver(observer); } void MessagePumpMojo::Run(Delegate* delegate) { RunState run_state; RunState* old_state = NULL; { base::AutoLock auto_lock(run_state_lock_); old_state = run_state_; run_state_ = &run_state; } DoRunLoop(&run_state, delegate); { base::AutoLock auto_lock(run_state_lock_); run_state_ = old_state; } } void MessagePumpMojo::Quit() { base::AutoLock auto_lock(run_state_lock_); if (run_state_) run_state_->should_quit = true; } void MessagePumpMojo::ScheduleWork() { SignalControlPipe(); } void MessagePumpMojo::ScheduleDelayedWork( const base::TimeTicks& delayed_work_time) { base::AutoLock auto_lock(run_state_lock_); if (!run_state_) return; run_state_->delayed_work_time = delayed_work_time; } void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) { bool more_work_is_plausible = true; for (;;) { const bool block = !more_work_is_plausible; if (read_handle_.is_valid()) { more_work_is_plausible = DoInternalWork(*run_state, block); } else { more_work_is_plausible = DoNonMojoWork(*run_state, block); } if (run_state->should_quit) break; more_work_is_plausible |= delegate->DoWork(); if (run_state->should_quit) break; more_work_is_plausible |= delegate->DoDelayedWork( &run_state->delayed_work_time); if (run_state->should_quit) break; if (more_work_is_plausible) continue; more_work_is_plausible = delegate->DoIdleWork(); if (run_state->should_quit) break; } } bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { bool did_work = block; if (block) { // If the wait isn't blocking (deadline == 0), there's no point in waiting. // Wait sets do not require a wait operation to be performed in order to // retreive any ready handles. Performing a wait with deadline == 0 is // unnecessary work. did_work = WaitForReadyHandles(run_state); } did_work |= ProcessReadyHandles(); did_work |= RemoveExpiredHandles(); return did_work; } bool MessagePumpMojo::DoNonMojoWork(const RunState& run_state, bool block) { bool did_work = block; if (block) { const MojoDeadline deadline = GetDeadlineForWait(run_state); // Stolen from base/message_loop/message_pump_default.cc base::ThreadRestrictions::ScopedAllowWait allow_wait; if (deadline == MOJO_DEADLINE_INDEFINITE) { event_.Wait(); } else { if (deadline > 0) { event_.TimedWait(base::TimeDelta::FromMicroseconds(deadline)); } else { did_work = false; } } // Since event_ is auto-reset, we don't need to do anything special here // other than service each delegate method. } did_work |= RemoveExpiredHandles(); return did_work; } bool MessagePumpMojo::WaitForReadyHandles(const RunState& run_state) const { const MojoDeadline deadline = GetDeadlineForWait(run_state); const MojoResult wait_result = Wait( wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); if (wait_result == MOJO_RESULT_OK) { // Handles may be ready. Or not since wake-ups can be spurious in certain // circumstances. return true; } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) { return false; } base::debug::Alias(&wait_result); // Unexpected result is likely fatal, crash so we can determine cause. CHECK(false); return false; } bool MessagePumpMojo::ProcessReadyHandles() { // Maximum number of handles to retrieve and process. Experimentally, the 95th // percentile is 1 handle, and the long-term average is 1.1. However, this has // been seen to reach >10 under heavy load. 8 is a hand-wavy compromise. const uint32_t kMaxServiced = 8; uint32_t num_ready_handles = kMaxServiced; MojoHandle handles[kMaxServiced]; MojoResult handle_results[kMaxServiced]; const MojoResult get_result = MojoGetReadyHandles(wait_set_handle_.get().value(), &num_ready_handles, handles, handle_results, nullptr); CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT); if (get_result != MOJO_RESULT_OK) return false; DCHECK(num_ready_handles); DCHECK_LE(num_ready_handles, kMaxServiced); // Do this in two steps, because notifying a handler may remove/add other // handles that may have also been woken up. // First, enumerate the IDs of the ready handles. Then, iterate over the // handles and only take action if the ID hasn't changed. // Since the size of this map is bounded by |kMaxServiced|, use a SmallMap to // avoid the per-element allocation. base::SmallMap<std::map<Handle, int>, kMaxServiced> ready_handles; for (uint32_t i = 0; i < num_ready_handles; i++) { const Handle handle = Handle(handles[i]); // Skip the control handle. It's special. if (handle.value() == read_handle_.get().value()) continue; DCHECK(handle.is_valid()); const auto it = handlers_.find(handle); // Skip handles that have been removed. This is possible because // RemoveHandler() can be called with a handle that has been closed. Because // the handle is closed, the MojoRemoveHandle() call in RemoveHandler() // would have failed, but the handle is still in the wait set. Once the // handle is retrieved using MojoGetReadyHandles(), it is implicitly removed // from the set. The result is either the pending result that existed when // the handle was closed, or |MOJO_RESULT_CANCELLED| to indicate that the // handle was closed. if (it == handlers_.end()) continue; ready_handles[handle] = it->second.id; } for (uint32_t i = 0; i < num_ready_handles; i++) { const Handle handle = Handle(handles[i]); // If the handle has been removed, or it's ID has changed, skip over it. // If the handle's ID has changed, and it still satisfies its signals, // then it'll be caught in the next message pump iteration. const auto it = handlers_.find(handle); if ((handle.value() != read_handle_.get().value()) && (it == handlers_.end() || it->second.id != ready_handles[handle])) { continue; } switch (handle_results[i]) { case MOJO_RESULT_CANCELLED: case MOJO_RESULT_FAILED_PRECONDITION: DVLOG(1) << "Error: " << handle_results[i] << " handle: " << handle.value(); if (handle.value() == read_handle_.get().value()) { // The Mojo EDK is shutting down. We can't just quit the message pump // because that may cause the thread to quit, which causes the // thread's MessageLoop to be destroyed, which races with any use of // |Thread::task_runner()|. So instead, we enter a "dumb" mode which // bypasses Mojo and just acts like a trivial message pump. That way, // we can wait for the usual thread exiting mechanism to happen. // The dumb mode is indicated by releasing the control pipe's read // handle. read_handle_.reset(); } else { SignalHandleError(handle, handle_results[i]); } break; case MOJO_RESULT_OK: if (handle.value() == read_handle_.get().value()) { DVLOG(1) << "Signaled control pipe"; // Control pipe was written to. ReadMessageRaw(read_handle_.get(), nullptr, nullptr, nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); } else { DVLOG(1) << "Handle ready: " << handle.value(); SignalHandleReady(handle); } break; default: base::debug::Alias(&i); base::debug::Alias(&handle_results[i]); // Unexpected result is likely fatal, crash so we can determine cause. CHECK(false); } } return true; } bool MessagePumpMojo::RemoveExpiredHandles() { bool removed = false; // Notify and remove any handlers whose time has expired. First, iterate over // the set of handles that have a deadline, and add the expired handles to a // map of <Handle, id>. Then, iterate over those expired handles and remove // them. The two-step process is because a handler can add/remove new // handlers. std::map<Handle, int> expired_handles; const base::TimeTicks now(internal::NowTicks()); for (const Handle handle : deadline_handles_) { const auto it = handlers_.find(handle); // Expect any handle in |deadline_handles_| to also be in |handlers_| since // the two are modified in lock-step. DCHECK(it != handlers_.end()); if (!it->second.deadline.is_null() && it->second.deadline < now) expired_handles[handle] = it->second.id; } for (const auto& pair : expired_handles) { auto it = handlers_.find(pair.first); // Don't need to check deadline again since it can't change if id hasn't // changed. if (it != handlers_.end() && it->second.id == pair.second) { SignalHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED); removed = true; } } return removed; } void MessagePumpMojo::SignalControlPipe() { const MojoResult result = WriteMessageRaw(write_handle_.get(), NULL, 0, NULL, 0, MOJO_WRITE_MESSAGE_FLAG_NONE); if (result == MOJO_RESULT_FAILED_PRECONDITION) { // Mojo EDK is shutting down. event_.Signal(); return; } // If we can't write we likely won't wake up the thread and there is a strong // chance we'll deadlock. CHECK_EQ(MOJO_RESULT_OK, result); } MojoDeadline MessagePumpMojo::GetDeadlineForWait( const RunState& run_state) const { const base::TimeTicks now(internal::NowTicks()); MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time, now); for (const Handle handle : deadline_handles_) { auto it = handlers_.find(handle); DCHECK(it != handlers_.end()); deadline = std::min( TimeTicksToMojoDeadline(it->second.deadline, now), deadline); } return deadline; } void MessagePumpMojo::SignalHandleReady(Handle handle) { auto it = handlers_.find(handle); DCHECK(it != handlers_.end()); MessagePumpMojoHandler* handler = it->second.handler; WillSignalHandler(); handler->OnHandleReady(handle); DidSignalHandler(); } void MessagePumpMojo::SignalHandleError(Handle handle, MojoResult result) { auto it = handlers_.find(handle); DCHECK(it != handlers_.end()); MessagePumpMojoHandler* handler = it->second.handler; RemoveHandler(handle); WillSignalHandler(); handler->OnHandleError(handle, result); DidSignalHandler(); } void MessagePumpMojo::WillSignalHandler() { FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); } void MessagePumpMojo::DidSignalHandler() { FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler()); } } // namespace common } // namespace mojo