普通文本  |  313行  |  9.46 KB

// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/edk/system/wait_set_dispatcher.h"

#include <stdint.h>

#include <algorithm>
#include <utility>

#include "base/logging.h"
#include "mojo/edk/system/awakable.h"

namespace mojo {
namespace edk {

class WaitSetDispatcher::Waiter final : public Awakable {
 public:
  explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
  ~Waiter() {}

  // |Awakable| implementation.
  bool Awake(MojoResult result, uintptr_t context) override {
    // Note: This is called with various Mojo locks held.
    dispatcher_->WakeDispatcher(result, context);
    // Removes |this| from the dispatcher's list of waiters.
    return false;
  }

 private:
  WaitSetDispatcher* const dispatcher_;
};

WaitSetDispatcher::WaitState::WaitState() {}

WaitSetDispatcher::WaitState::WaitState(const WaitState& other) = default;

WaitSetDispatcher::WaitState::~WaitState() {}

WaitSetDispatcher::WaitSetDispatcher()
    : waiter_(new WaitSetDispatcher::Waiter(this)) {}

Dispatcher::Type WaitSetDispatcher::GetType() const {
  return Type::WAIT_SET;
}

MojoResult WaitSetDispatcher::Close() {
  base::AutoLock lock(lock_);

  if (is_closed_)
    return MOJO_RESULT_INVALID_ARGUMENT;
  is_closed_ = true;

  {
    base::AutoLock locker(awakable_lock_);
    awakable_list_.CancelAll();
  }

  for (const auto& entry : waiting_dispatchers_)
    entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
  waiting_dispatchers_.clear();

  base::AutoLock locker(awoken_lock_);
  awoken_queue_.clear();
  processed_dispatchers_.clear();

  return MOJO_RESULT_OK;
}

MojoResult WaitSetDispatcher::AddWaitingDispatcher(
    const scoped_refptr<Dispatcher>& dispatcher,
    MojoHandleSignals signals,
    uintptr_t context) {
  if (dispatcher == this)
    return MOJO_RESULT_INVALID_ARGUMENT;

  base::AutoLock lock(lock_);

  if (is_closed_)
    return MOJO_RESULT_INVALID_ARGUMENT;

  uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
  auto it = waiting_dispatchers_.find(dispatcher_handle);
  if (it != waiting_dispatchers_.end()) {
    return MOJO_RESULT_ALREADY_EXISTS;
  }

  const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
                                                    dispatcher_handle, nullptr);
  if (result == MOJO_RESULT_INVALID_ARGUMENT) {
    // Dispatcher is closed.
    return result;
  } else if (result != MOJO_RESULT_OK) {
    WakeDispatcher(result, dispatcher_handle);
  }

  WaitState state;
  state.dispatcher = dispatcher;
  state.context = context;
  state.signals = signals;
  bool inserted = waiting_dispatchers_.insert(
      std::make_pair(dispatcher_handle, state)).second;
  DCHECK(inserted);

  return MOJO_RESULT_OK;
}

MojoResult WaitSetDispatcher::RemoveWaitingDispatcher(
    const scoped_refptr<Dispatcher>& dispatcher) {
  uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());

  base::AutoLock lock(lock_);
  if (is_closed_)
    return MOJO_RESULT_INVALID_ARGUMENT;

  auto it = waiting_dispatchers_.find(dispatcher_handle);
  if (it == waiting_dispatchers_.end())
    return MOJO_RESULT_NOT_FOUND;

  dispatcher->RemoveAwakable(waiter_.get(), nullptr);
  // At this point, it should not be possible for |waiter_| to be woken with
  // |dispatcher|.
  waiting_dispatchers_.erase(it);

  base::AutoLock locker(awoken_lock_);
  int num_erased = 0;
  for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
    if (it->first == dispatcher_handle) {
      it = awoken_queue_.erase(it);
      num_erased++;
    } else {
      ++it;
    }
  }
  // The dispatcher should only exist in the queue once.
  DCHECK_LE(num_erased, 1);
  processed_dispatchers_.erase(
      std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
                  dispatcher_handle),
      processed_dispatchers_.end());

  return MOJO_RESULT_OK;
}

MojoResult WaitSetDispatcher::GetReadyDispatchers(
    uint32_t* count,
    DispatcherVector* dispatchers,
    MojoResult* results,
    uintptr_t* contexts) {
  base::AutoLock lock(lock_);

  if (is_closed_)
    return MOJO_RESULT_INVALID_ARGUMENT;

  dispatchers->clear();

  // Re-queue any already retrieved dispatchers. These should be the dispatchers
  // that were returned on the last call to this function. This loop is
  // necessary to preserve the logically level-triggering behaviour of waiting
  // in Mojo. In particular, if no action is taken on a signal, that signal
  // continues to be satisfied, and therefore a |MojoWait()| on that
  // handle/signal continues to return immediately.
  std::deque<uintptr_t> pending;
  {
    base::AutoLock locker(awoken_lock_);
    pending.swap(processed_dispatchers_);
  }
  for (uintptr_t d : pending) {
    auto it = waiting_dispatchers_.find(d);
    // Anything in |processed_dispatchers_| should also be in
    // |waiting_dispatchers_| since dispatchers are removed from both in
    // |RemoveWaitingDispatcherImplNoLock()|.
    DCHECK(it != waiting_dispatchers_.end());

    // |awoken_mutex_| cannot be held here because
    // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
    // mutex is held while running |WakeDispatcher()| below, which needs to
    // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
    // a deadlock.
    const MojoResult result = it->second.dispatcher->AddAwakable(
        waiter_.get(), it->second.signals, d, nullptr);

    if (result == MOJO_RESULT_INVALID_ARGUMENT) {
      // Dispatcher is closed. Implicitly remove it from the wait set since
      // it may be impossible to remove using |MojoRemoveHandle()|.
      waiting_dispatchers_.erase(it);
    } else if (result != MOJO_RESULT_OK) {
      WakeDispatcher(result, d);
    }
  }

  const uint32_t max_woken = *count;
  uint32_t num_woken = 0;

  base::AutoLock locker(awoken_lock_);
  while (!awoken_queue_.empty() && num_woken < max_woken) {
    uintptr_t d = awoken_queue_.front().first;
    MojoResult result = awoken_queue_.front().second;
    awoken_queue_.pop_front();

    auto it = waiting_dispatchers_.find(d);
    DCHECK(it != waiting_dispatchers_.end());

    results[num_woken] = result;
    dispatchers->push_back(it->second.dispatcher);
    if (contexts)
      contexts[num_woken] = it->second.context;

    if (result != MOJO_RESULT_CANCELLED) {
      processed_dispatchers_.push_back(d);
    } else {
      // |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed.
      // Return it, but also implcitly remove it from the wait set.
      waiting_dispatchers_.erase(it);
    }

    num_woken++;
  }

  *count = num_woken;
  if (!num_woken)
    return MOJO_RESULT_SHOULD_WAIT;

  return MOJO_RESULT_OK;
}

HandleSignalsState WaitSetDispatcher::GetHandleSignalsState() const {
  base::AutoLock lock(lock_);
  return GetHandleSignalsStateNoLock();
}

HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateNoLock() const {
  lock_.AssertAcquired();
  if (is_closed_)
    return HandleSignalsState();

  HandleSignalsState rv;
  rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
  base::AutoLock locker(awoken_lock_);
  if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
    rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
  return rv;
}

MojoResult WaitSetDispatcher::AddAwakable(Awakable* awakable,
                                          MojoHandleSignals signals,
                                          uintptr_t context,
                                          HandleSignalsState* signals_state) {
  base::AutoLock lock(lock_);
  // |awakable_lock_| is acquired here instead of immediately before adding to
  // |awakable_list_| because we need to check the signals state and add to
  // |awakable_list_| as an atomic operation. If the pair isn't atomic, it is
  // possible for the signals state to change after it is checked, but before
  // the awakable is added. In that case, the added awakable won't be signalled.
  base::AutoLock awakable_locker(awakable_lock_);
  HandleSignalsState state(GetHandleSignalsStateNoLock());
  if (state.satisfies(signals)) {
    if (signals_state)
      *signals_state = state;
    return MOJO_RESULT_ALREADY_EXISTS;
  }
  if (!state.can_satisfy(signals)) {
    if (signals_state)
      *signals_state = state;
    return MOJO_RESULT_FAILED_PRECONDITION;
  }

  awakable_list_.Add(awakable, signals, context);
  return MOJO_RESULT_OK;
}

void WaitSetDispatcher::RemoveAwakable(Awakable* awakable,
                                       HandleSignalsState* signals_state) {
  {
    base::AutoLock locker(awakable_lock_);
    awakable_list_.Remove(awakable);
  }
  if (signals_state)
    *signals_state = GetHandleSignalsState();
}

bool WaitSetDispatcher::BeginTransit() {
  // You can't transfer wait sets!
  return false;
}

WaitSetDispatcher::~WaitSetDispatcher() {
  DCHECK(waiting_dispatchers_.empty());
  DCHECK(awoken_queue_.empty());
  DCHECK(processed_dispatchers_.empty());
}

void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
  {
    base::AutoLock locker(awoken_lock_);

    if (result == MOJO_RESULT_ALREADY_EXISTS)
      result = MOJO_RESULT_OK;

    awoken_queue_.push_back(std::make_pair(context, result));
  }

  base::AutoLock locker(awakable_lock_);
  HandleSignalsState signals_state;
  signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
  signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
  awakable_list_.AwakeForStateChange(signals_state);
}

}  // namespace edk
}  // namespace mojo