普通文本  |  255行  |  7.8 KB

// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <memory>
#include <vector>

#include "base/atomicops.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/platform_thread.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/perf/perf_test.h"

namespace base {

namespace {

// A thread that waits for the caller to signal an event before proceeding to
// call Action::Run().
class PostingThread {
 public:
  class Action {
   public:
    virtual ~Action() = default;

    // Called after the thread is started and |start_event_| is signalled.
    virtual void Run() = 0;

   protected:
    Action() = default;

   private:
    DISALLOW_COPY_AND_ASSIGN(Action);
  };

  // Creates a PostingThread where the thread waits on |start_event| before
  // calling action->Run(). If a thread is returned, the thread is guaranteed to
  // be allocated and running and the caller must call Join() before destroying
  // the PostingThread.
  static std::unique_ptr<PostingThread> Create(WaitableEvent* start_event,
                                               std::unique_ptr<Action> action) {
    auto posting_thread =
        WrapUnique(new PostingThread(start_event, std::move(action)));

    if (!posting_thread->Start())
      return nullptr;

    return posting_thread;
  }

  ~PostingThread() { DCHECK_EQ(!thread_handle_.is_null(), join_called_); }

  void Join() {
    PlatformThread::Join(thread_handle_);
    join_called_ = true;
  }

 private:
  class Delegate final : public PlatformThread::Delegate {
   public:
    Delegate(PostingThread* outer, std::unique_ptr<Action> action)
        : outer_(outer), action_(std::move(action)) {
      DCHECK(outer_);
      DCHECK(action_);
    }

    ~Delegate() override = default;

   private:
    void ThreadMain() override {
      outer_->thread_started_.Signal();
      outer_->start_event_->Wait();
      action_->Run();
    }

    PostingThread* const outer_;
    const std::unique_ptr<Action> action_;

    DISALLOW_COPY_AND_ASSIGN(Delegate);
  };

  PostingThread(WaitableEvent* start_event, std::unique_ptr<Action> delegate)
      : start_event_(start_event),
        thread_started_(WaitableEvent::ResetPolicy::MANUAL,
                        WaitableEvent::InitialState::NOT_SIGNALED),
        delegate_(this, std::move(delegate)) {
    DCHECK(start_event_);
  }

  bool Start() {
    bool thread_created =
        PlatformThread::Create(0, &delegate_, &thread_handle_);
    if (thread_created)
      thread_started_.Wait();

    return thread_created;
  }

  bool join_called_ = false;
  WaitableEvent* const start_event_;
  WaitableEvent thread_started_;
  Delegate delegate_;

  PlatformThreadHandle thread_handle_;

  DISALLOW_COPY_AND_ASSIGN(PostingThread);
};

class MessageLoopPerfTest : public ::testing::TestWithParam<int> {
 public:
  MessageLoopPerfTest()
      : message_loop_task_runner_(SequencedTaskRunnerHandle::Get()),
        run_posting_threads_(WaitableEvent::ResetPolicy::MANUAL,
                             WaitableEvent::InitialState::NOT_SIGNALED) {}

  static std::string ParamInfoToString(
      ::testing::TestParamInfo<int> param_info) {
    return PostingThreadCountToString(param_info.param);
  }

  static std::string PostingThreadCountToString(int posting_threads) {
    // Special case 1 thread for thread vs threads.
    if (posting_threads == 1)
      return "1_Posting_Thread";

    return StringPrintf("%d_Posting_Threads", posting_threads);
  }

 protected:
  class ContinuouslyPostTasks final : public PostingThread::Action {
   public:
    ContinuouslyPostTasks(MessageLoopPerfTest* outer) : outer_(outer) {
      DCHECK(outer_);
    }
    ~ContinuouslyPostTasks() override = default;

   private:
    void Run() override {
      RepeatingClosure task_to_run =
          BindRepeating([](size_t* num_tasks_run) { ++*num_tasks_run; },
                        &outer_->num_tasks_run_);
      while (!outer_->stop_posting_threads_.IsSet()) {
        outer_->message_loop_task_runner_->PostTask(FROM_HERE, task_to_run);
        subtle::NoBarrier_AtomicIncrement(&outer_->num_tasks_posted_, 1);
      }
    }

    MessageLoopPerfTest* const outer_;

    DISALLOW_COPY_AND_ASSIGN(ContinuouslyPostTasks);
  };

  void SetUp() override {
    // This check is here because we can't ASSERT_TRUE in the constructor.
    ASSERT_TRUE(message_loop_task_runner_);
  }

  // Runs ActionType::Run() on |num_posting_threads| and requests test
  // termination around |duration|.
  template <typename ActionType>
  void RunTest(const int num_posting_threads, TimeDelta duration) {
    std::vector<std::unique_ptr<PostingThread>> threads;
    for (int i = 0; i < num_posting_threads; ++i) {
      threads.emplace_back(PostingThread::Create(
          &run_posting_threads_, std::make_unique<ActionType>(this)));
      // Don't assert here to simplify the code that requires a Join() call for
      // every created PostingThread.
      EXPECT_TRUE(threads[i]);
    }

    RunLoop run_loop;
    message_loop_task_runner_->PostDelayedTask(
        FROM_HERE,
        BindOnce(
            [](RunLoop* run_loop, AtomicFlag* stop_posting_threads) {
              stop_posting_threads->Set();
              run_loop->Quit();
            },
            &run_loop, &stop_posting_threads_),
        duration);

    TimeTicks post_task_start = TimeTicks::Now();
    run_posting_threads_.Signal();

    TimeTicks run_loop_start = TimeTicks::Now();
    run_loop.Run();
    tasks_run_duration_ = TimeTicks::Now() - run_loop_start;

    for (auto& thread : threads)
      thread->Join();

    tasks_posted_duration_ = TimeTicks::Now() - post_task_start;
  }

  size_t num_tasks_posted() const {
    return subtle::NoBarrier_Load(&num_tasks_posted_);
  }

  TimeDelta tasks_posted_duration() const { return tasks_posted_duration_; }

  size_t num_tasks_run() const { return num_tasks_run_; }

  TimeDelta tasks_run_duration() const { return tasks_run_duration_; }

 private:
  MessageLoop message_loop_;

  // Accessed on multiple threads, thread-safe or constant:
  const scoped_refptr<SequencedTaskRunner> message_loop_task_runner_;
  WaitableEvent run_posting_threads_;
  AtomicFlag stop_posting_threads_;
  subtle::AtomicWord num_tasks_posted_ = 0;

  // Accessed only on the test case thread:
  TimeDelta tasks_posted_duration_;
  TimeDelta tasks_run_duration_;
  size_t num_tasks_run_ = 0;

  DISALLOW_COPY_AND_ASSIGN(MessageLoopPerfTest);
};

}  // namespace

TEST_P(MessageLoopPerfTest, PostTaskRate) {
  // Measures the average rate of posting tasks from different threads and the
  // average rate that the message loop is running those tasks.
  RunTest<ContinuouslyPostTasks>(GetParam(), TimeDelta::FromSeconds(3));
  perf_test::PrintResult("task_posting", "",
                         PostingThreadCountToString(GetParam()),
                         tasks_posted_duration().InMicroseconds() /
                             static_cast<double>(num_tasks_posted()),
                         "us/task", true);
  perf_test::PrintResult("task_running", "",
                         PostingThreadCountToString(GetParam()),
                         tasks_run_duration().InMicroseconds() /
                             static_cast<double>(num_tasks_run()),
                         "us/task", true);
}

INSTANTIATE_TEST_CASE_P(,
                        MessageLoopPerfTest,
                        ::testing::Values(1, 5, 10),
                        MessageLoopPerfTest::ParamInfoToString);
}  // namespace base