// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/system/wait_set_dispatcher.h"
#include <stdint.h>
#include <algorithm>
#include <utility>
#include "base/logging.h"
#include "mojo/edk/system/awakable.h"
namespace mojo {
namespace edk {
class WaitSetDispatcher::Waiter final : public Awakable {
public:
explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
~Waiter() {}
// |Awakable| implementation.
bool Awake(MojoResult result, uintptr_t context) override {
// Note: This is called with various Mojo locks held.
dispatcher_->WakeDispatcher(result, context);
// Removes |this| from the dispatcher's list of waiters.
return false;
}
private:
WaitSetDispatcher* const dispatcher_;
};
WaitSetDispatcher::WaitState::WaitState() {}
WaitSetDispatcher::WaitState::WaitState(const WaitState& other) = default;
WaitSetDispatcher::WaitState::~WaitState() {}
WaitSetDispatcher::WaitSetDispatcher()
: waiter_(new WaitSetDispatcher::Waiter(this)) {}
Dispatcher::Type WaitSetDispatcher::GetType() const {
return Type::WAIT_SET;
}
MojoResult WaitSetDispatcher::Close() {
base::AutoLock lock(lock_);
if (is_closed_)
return MOJO_RESULT_INVALID_ARGUMENT;
is_closed_ = true;
{
base::AutoLock locker(awakable_lock_);
awakable_list_.CancelAll();
}
for (const auto& entry : waiting_dispatchers_)
entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
waiting_dispatchers_.clear();
base::AutoLock locker(awoken_lock_);
awoken_queue_.clear();
processed_dispatchers_.clear();
return MOJO_RESULT_OK;
}
MojoResult WaitSetDispatcher::AddWaitingDispatcher(
const scoped_refptr<Dispatcher>& dispatcher,
MojoHandleSignals signals,
uintptr_t context) {
if (dispatcher == this)
return MOJO_RESULT_INVALID_ARGUMENT;
base::AutoLock lock(lock_);
if (is_closed_)
return MOJO_RESULT_INVALID_ARGUMENT;
uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
auto it = waiting_dispatchers_.find(dispatcher_handle);
if (it != waiting_dispatchers_.end()) {
return MOJO_RESULT_ALREADY_EXISTS;
}
const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
dispatcher_handle, nullptr);
if (result == MOJO_RESULT_INVALID_ARGUMENT) {
// Dispatcher is closed.
return result;
} else if (result != MOJO_RESULT_OK) {
WakeDispatcher(result, dispatcher_handle);
}
WaitState state;
state.dispatcher = dispatcher;
state.context = context;
state.signals = signals;
bool inserted = waiting_dispatchers_.insert(
std::make_pair(dispatcher_handle, state)).second;
DCHECK(inserted);
return MOJO_RESULT_OK;
}
MojoResult WaitSetDispatcher::RemoveWaitingDispatcher(
const scoped_refptr<Dispatcher>& dispatcher) {
uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
base::AutoLock lock(lock_);
if (is_closed_)
return MOJO_RESULT_INVALID_ARGUMENT;
auto it = waiting_dispatchers_.find(dispatcher_handle);
if (it == waiting_dispatchers_.end())
return MOJO_RESULT_NOT_FOUND;
dispatcher->RemoveAwakable(waiter_.get(), nullptr);
// At this point, it should not be possible for |waiter_| to be woken with
// |dispatcher|.
waiting_dispatchers_.erase(it);
base::AutoLock locker(awoken_lock_);
int num_erased = 0;
for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
if (it->first == dispatcher_handle) {
it = awoken_queue_.erase(it);
num_erased++;
} else {
++it;
}
}
// The dispatcher should only exist in the queue once.
DCHECK_LE(num_erased, 1);
processed_dispatchers_.erase(
std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
dispatcher_handle),
processed_dispatchers_.end());
return MOJO_RESULT_OK;
}
MojoResult WaitSetDispatcher::GetReadyDispatchers(
uint32_t* count,
DispatcherVector* dispatchers,
MojoResult* results,
uintptr_t* contexts) {
base::AutoLock lock(lock_);
if (is_closed_)
return MOJO_RESULT_INVALID_ARGUMENT;
dispatchers->clear();
// Re-queue any already retrieved dispatchers. These should be the dispatchers
// that were returned on the last call to this function. This loop is
// necessary to preserve the logically level-triggering behaviour of waiting
// in Mojo. In particular, if no action is taken on a signal, that signal
// continues to be satisfied, and therefore a |MojoWait()| on that
// handle/signal continues to return immediately.
std::deque<uintptr_t> pending;
{
base::AutoLock locker(awoken_lock_);
pending.swap(processed_dispatchers_);
}
for (uintptr_t d : pending) {
auto it = waiting_dispatchers_.find(d);
// Anything in |processed_dispatchers_| should also be in
// |waiting_dispatchers_| since dispatchers are removed from both in
// |RemoveWaitingDispatcherImplNoLock()|.
DCHECK(it != waiting_dispatchers_.end());
// |awoken_mutex_| cannot be held here because
// |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
// mutex is held while running |WakeDispatcher()| below, which needs to
// acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
// a deadlock.
const MojoResult result = it->second.dispatcher->AddAwakable(
waiter_.get(), it->second.signals, d, nullptr);
if (result == MOJO_RESULT_INVALID_ARGUMENT) {
// Dispatcher is closed. Implicitly remove it from the wait set since
// it may be impossible to remove using |MojoRemoveHandle()|.
waiting_dispatchers_.erase(it);
} else if (result != MOJO_RESULT_OK) {
WakeDispatcher(result, d);
}
}
const uint32_t max_woken = *count;
uint32_t num_woken = 0;
base::AutoLock locker(awoken_lock_);
while (!awoken_queue_.empty() && num_woken < max_woken) {
uintptr_t d = awoken_queue_.front().first;
MojoResult result = awoken_queue_.front().second;
awoken_queue_.pop_front();
auto it = waiting_dispatchers_.find(d);
DCHECK(it != waiting_dispatchers_.end());
results[num_woken] = result;
dispatchers->push_back(it->second.dispatcher);
if (contexts)
contexts[num_woken] = it->second.context;
if (result != MOJO_RESULT_CANCELLED) {
processed_dispatchers_.push_back(d);
} else {
// |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed.
// Return it, but also implcitly remove it from the wait set.
waiting_dispatchers_.erase(it);
}
num_woken++;
}
*count = num_woken;
if (!num_woken)
return MOJO_RESULT_SHOULD_WAIT;
return MOJO_RESULT_OK;
}
HandleSignalsState WaitSetDispatcher::GetHandleSignalsState() const {
base::AutoLock lock(lock_);
return GetHandleSignalsStateNoLock();
}
HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateNoLock() const {
lock_.AssertAcquired();
if (is_closed_)
return HandleSignalsState();
HandleSignalsState rv;
rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
base::AutoLock locker(awoken_lock_);
if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
return rv;
}
MojoResult WaitSetDispatcher::AddAwakable(Awakable* awakable,
MojoHandleSignals signals,
uintptr_t context,
HandleSignalsState* signals_state) {
base::AutoLock lock(lock_);
// |awakable_lock_| is acquired here instead of immediately before adding to
// |awakable_list_| because we need to check the signals state and add to
// |awakable_list_| as an atomic operation. If the pair isn't atomic, it is
// possible for the signals state to change after it is checked, but before
// the awakable is added. In that case, the added awakable won't be signalled.
base::AutoLock awakable_locker(awakable_lock_);
HandleSignalsState state(GetHandleSignalsStateNoLock());
if (state.satisfies(signals)) {
if (signals_state)
*signals_state = state;
return MOJO_RESULT_ALREADY_EXISTS;
}
if (!state.can_satisfy(signals)) {
if (signals_state)
*signals_state = state;
return MOJO_RESULT_FAILED_PRECONDITION;
}
awakable_list_.Add(awakable, signals, context);
return MOJO_RESULT_OK;
}
void WaitSetDispatcher::RemoveAwakable(Awakable* awakable,
HandleSignalsState* signals_state) {
{
base::AutoLock locker(awakable_lock_);
awakable_list_.Remove(awakable);
}
if (signals_state)
*signals_state = GetHandleSignalsState();
}
bool WaitSetDispatcher::BeginTransit() {
// You can't transfer wait sets!
return false;
}
WaitSetDispatcher::~WaitSetDispatcher() {
DCHECK(waiting_dispatchers_.empty());
DCHECK(awoken_queue_.empty());
DCHECK(processed_dispatchers_.empty());
}
void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
{
base::AutoLock locker(awoken_lock_);
if (result == MOJO_RESULT_ALREADY_EXISTS)
result = MOJO_RESULT_OK;
awoken_queue_.push_back(std::make_pair(context, result));
}
base::AutoLock locker(awakable_lock_);
HandleSignalsState signals_state;
signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
awakable_list_.AwakeForStateChange(signals_state);
}
} // namespace edk
} // namespace mojo