// Copyright 2015 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/edk/system/wait_set_dispatcher.h" #include <stdint.h> #include <algorithm> #include <utility> #include "base/logging.h" #include "mojo/edk/system/awakable.h" namespace mojo { namespace edk { class WaitSetDispatcher::Waiter final : public Awakable { public: explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {} ~Waiter() {} // |Awakable| implementation. bool Awake(MojoResult result, uintptr_t context) override { // Note: This is called with various Mojo locks held. dispatcher_->WakeDispatcher(result, context); // Removes |this| from the dispatcher's list of waiters. return false; } private: WaitSetDispatcher* const dispatcher_; }; WaitSetDispatcher::WaitState::WaitState() {} WaitSetDispatcher::WaitState::WaitState(const WaitState& other) = default; WaitSetDispatcher::WaitState::~WaitState() {} WaitSetDispatcher::WaitSetDispatcher() : waiter_(new WaitSetDispatcher::Waiter(this)) {} Dispatcher::Type WaitSetDispatcher::GetType() const { return Type::WAIT_SET; } MojoResult WaitSetDispatcher::Close() { base::AutoLock lock(lock_); if (is_closed_) return MOJO_RESULT_INVALID_ARGUMENT; is_closed_ = true; { base::AutoLock locker(awakable_lock_); awakable_list_.CancelAll(); } for (const auto& entry : waiting_dispatchers_) entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr); waiting_dispatchers_.clear(); base::AutoLock locker(awoken_lock_); awoken_queue_.clear(); processed_dispatchers_.clear(); return MOJO_RESULT_OK; } MojoResult WaitSetDispatcher::AddWaitingDispatcher( const scoped_refptr<Dispatcher>& dispatcher, MojoHandleSignals signals, uintptr_t context) { if (dispatcher == this) return MOJO_RESULT_INVALID_ARGUMENT; base::AutoLock lock(lock_); if (is_closed_) return MOJO_RESULT_INVALID_ARGUMENT; uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); auto it = waiting_dispatchers_.find(dispatcher_handle); if (it != waiting_dispatchers_.end()) { return MOJO_RESULT_ALREADY_EXISTS; } const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals, dispatcher_handle, nullptr); if (result == MOJO_RESULT_INVALID_ARGUMENT) { // Dispatcher is closed. return result; } else if (result != MOJO_RESULT_OK) { WakeDispatcher(result, dispatcher_handle); } WaitState state; state.dispatcher = dispatcher; state.context = context; state.signals = signals; bool inserted = waiting_dispatchers_.insert( std::make_pair(dispatcher_handle, state)).second; DCHECK(inserted); return MOJO_RESULT_OK; } MojoResult WaitSetDispatcher::RemoveWaitingDispatcher( const scoped_refptr<Dispatcher>& dispatcher) { uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); base::AutoLock lock(lock_); if (is_closed_) return MOJO_RESULT_INVALID_ARGUMENT; auto it = waiting_dispatchers_.find(dispatcher_handle); if (it == waiting_dispatchers_.end()) return MOJO_RESULT_NOT_FOUND; dispatcher->RemoveAwakable(waiter_.get(), nullptr); // At this point, it should not be possible for |waiter_| to be woken with // |dispatcher|. waiting_dispatchers_.erase(it); base::AutoLock locker(awoken_lock_); int num_erased = 0; for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) { if (it->first == dispatcher_handle) { it = awoken_queue_.erase(it); num_erased++; } else { ++it; } } // The dispatcher should only exist in the queue once. DCHECK_LE(num_erased, 1); processed_dispatchers_.erase( std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(), dispatcher_handle), processed_dispatchers_.end()); return MOJO_RESULT_OK; } MojoResult WaitSetDispatcher::GetReadyDispatchers( uint32_t* count, DispatcherVector* dispatchers, MojoResult* results, uintptr_t* contexts) { base::AutoLock lock(lock_); if (is_closed_) return MOJO_RESULT_INVALID_ARGUMENT; dispatchers->clear(); // Re-queue any already retrieved dispatchers. These should be the dispatchers // that were returned on the last call to this function. This loop is // necessary to preserve the logically level-triggering behaviour of waiting // in Mojo. In particular, if no action is taken on a signal, that signal // continues to be satisfied, and therefore a |MojoWait()| on that // handle/signal continues to return immediately. std::deque<uintptr_t> pending; { base::AutoLock locker(awoken_lock_); pending.swap(processed_dispatchers_); } for (uintptr_t d : pending) { auto it = waiting_dispatchers_.find(d); // Anything in |processed_dispatchers_| should also be in // |waiting_dispatchers_| since dispatchers are removed from both in // |RemoveWaitingDispatcherImplNoLock()|. DCHECK(it != waiting_dispatchers_.end()); // |awoken_mutex_| cannot be held here because // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This // mutex is held while running |WakeDispatcher()| below, which needs to // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in // a deadlock. const MojoResult result = it->second.dispatcher->AddAwakable( waiter_.get(), it->second.signals, d, nullptr); if (result == MOJO_RESULT_INVALID_ARGUMENT) { // Dispatcher is closed. Implicitly remove it from the wait set since // it may be impossible to remove using |MojoRemoveHandle()|. waiting_dispatchers_.erase(it); } else if (result != MOJO_RESULT_OK) { WakeDispatcher(result, d); } } const uint32_t max_woken = *count; uint32_t num_woken = 0; base::AutoLock locker(awoken_lock_); while (!awoken_queue_.empty() && num_woken < max_woken) { uintptr_t d = awoken_queue_.front().first; MojoResult result = awoken_queue_.front().second; awoken_queue_.pop_front(); auto it = waiting_dispatchers_.find(d); DCHECK(it != waiting_dispatchers_.end()); results[num_woken] = result; dispatchers->push_back(it->second.dispatcher); if (contexts) contexts[num_woken] = it->second.context; if (result != MOJO_RESULT_CANCELLED) { processed_dispatchers_.push_back(d); } else { // |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed. // Return it, but also implcitly remove it from the wait set. waiting_dispatchers_.erase(it); } num_woken++; } *count = num_woken; if (!num_woken) return MOJO_RESULT_SHOULD_WAIT; return MOJO_RESULT_OK; } HandleSignalsState WaitSetDispatcher::GetHandleSignalsState() const { base::AutoLock lock(lock_); return GetHandleSignalsStateNoLock(); } HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateNoLock() const { lock_.AssertAcquired(); if (is_closed_) return HandleSignalsState(); HandleSignalsState rv; rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; base::AutoLock locker(awoken_lock_); if (!awoken_queue_.empty() || !processed_dispatchers_.empty()) rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; return rv; } MojoResult WaitSetDispatcher::AddAwakable(Awakable* awakable, MojoHandleSignals signals, uintptr_t context, HandleSignalsState* signals_state) { base::AutoLock lock(lock_); // |awakable_lock_| is acquired here instead of immediately before adding to // |awakable_list_| because we need to check the signals state and add to // |awakable_list_| as an atomic operation. If the pair isn't atomic, it is // possible for the signals state to change after it is checked, but before // the awakable is added. In that case, the added awakable won't be signalled. base::AutoLock awakable_locker(awakable_lock_); HandleSignalsState state(GetHandleSignalsStateNoLock()); if (state.satisfies(signals)) { if (signals_state) *signals_state = state; return MOJO_RESULT_ALREADY_EXISTS; } if (!state.can_satisfy(signals)) { if (signals_state) *signals_state = state; return MOJO_RESULT_FAILED_PRECONDITION; } awakable_list_.Add(awakable, signals, context); return MOJO_RESULT_OK; } void WaitSetDispatcher::RemoveAwakable(Awakable* awakable, HandleSignalsState* signals_state) { { base::AutoLock locker(awakable_lock_); awakable_list_.Remove(awakable); } if (signals_state) *signals_state = GetHandleSignalsState(); } bool WaitSetDispatcher::BeginTransit() { // You can't transfer wait sets! return false; } WaitSetDispatcher::~WaitSetDispatcher() { DCHECK(waiting_dispatchers_.empty()); DCHECK(awoken_queue_.empty()); DCHECK(processed_dispatchers_.empty()); } void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) { { base::AutoLock locker(awoken_lock_); if (result == MOJO_RESULT_ALREADY_EXISTS) result = MOJO_RESULT_OK; awoken_queue_.push_back(std::make_pair(context, result)); } base::AutoLock locker(awakable_lock_); HandleSignalsState signals_state; signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; awakable_list_.AwakeForStateChange(signals_state); } } // namespace edk } // namespace mojo