// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <memory>
#include <vector>
#include "base/atomicops.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/platform_thread.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/perf/perf_test.h"
namespace base {
namespace {
// A thread that waits for the caller to signal an event before proceeding to
// call Action::Run().
class PostingThread {
public:
class Action {
public:
virtual ~Action() = default;
// Called after the thread is started and |start_event_| is signalled.
virtual void Run() = 0;
protected:
Action() = default;
private:
DISALLOW_COPY_AND_ASSIGN(Action);
};
// Creates a PostingThread where the thread waits on |start_event| before
// calling action->Run(). If a thread is returned, the thread is guaranteed to
// be allocated and running and the caller must call Join() before destroying
// the PostingThread.
static std::unique_ptr<PostingThread> Create(WaitableEvent* start_event,
std::unique_ptr<Action> action) {
auto posting_thread =
WrapUnique(new PostingThread(start_event, std::move(action)));
if (!posting_thread->Start())
return nullptr;
return posting_thread;
}
~PostingThread() { DCHECK_EQ(!thread_handle_.is_null(), join_called_); }
void Join() {
PlatformThread::Join(thread_handle_);
join_called_ = true;
}
private:
class Delegate final : public PlatformThread::Delegate {
public:
Delegate(PostingThread* outer, std::unique_ptr<Action> action)
: outer_(outer), action_(std::move(action)) {
DCHECK(outer_);
DCHECK(action_);
}
~Delegate() override = default;
private:
void ThreadMain() override {
outer_->thread_started_.Signal();
outer_->start_event_->Wait();
action_->Run();
}
PostingThread* const outer_;
const std::unique_ptr<Action> action_;
DISALLOW_COPY_AND_ASSIGN(Delegate);
};
PostingThread(WaitableEvent* start_event, std::unique_ptr<Action> delegate)
: start_event_(start_event),
thread_started_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
delegate_(this, std::move(delegate)) {
DCHECK(start_event_);
}
bool Start() {
bool thread_created =
PlatformThread::Create(0, &delegate_, &thread_handle_);
if (thread_created)
thread_started_.Wait();
return thread_created;
}
bool join_called_ = false;
WaitableEvent* const start_event_;
WaitableEvent thread_started_;
Delegate delegate_;
PlatformThreadHandle thread_handle_;
DISALLOW_COPY_AND_ASSIGN(PostingThread);
};
class MessageLoopPerfTest : public ::testing::TestWithParam<int> {
public:
MessageLoopPerfTest()
: message_loop_task_runner_(SequencedTaskRunnerHandle::Get()),
run_posting_threads_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {}
static std::string ParamInfoToString(
::testing::TestParamInfo<int> param_info) {
return PostingThreadCountToString(param_info.param);
}
static std::string PostingThreadCountToString(int posting_threads) {
// Special case 1 thread for thread vs threads.
if (posting_threads == 1)
return "1_Posting_Thread";
return StringPrintf("%d_Posting_Threads", posting_threads);
}
protected:
class ContinuouslyPostTasks final : public PostingThread::Action {
public:
ContinuouslyPostTasks(MessageLoopPerfTest* outer) : outer_(outer) {
DCHECK(outer_);
}
~ContinuouslyPostTasks() override = default;
private:
void Run() override {
RepeatingClosure task_to_run =
BindRepeating([](size_t* num_tasks_run) { ++*num_tasks_run; },
&outer_->num_tasks_run_);
while (!outer_->stop_posting_threads_.IsSet()) {
outer_->message_loop_task_runner_->PostTask(FROM_HERE, task_to_run);
subtle::NoBarrier_AtomicIncrement(&outer_->num_tasks_posted_, 1);
}
}
MessageLoopPerfTest* const outer_;
DISALLOW_COPY_AND_ASSIGN(ContinuouslyPostTasks);
};
void SetUp() override {
// This check is here because we can't ASSERT_TRUE in the constructor.
ASSERT_TRUE(message_loop_task_runner_);
}
// Runs ActionType::Run() on |num_posting_threads| and requests test
// termination around |duration|.
template <typename ActionType>
void RunTest(const int num_posting_threads, TimeDelta duration) {
std::vector<std::unique_ptr<PostingThread>> threads;
for (int i = 0; i < num_posting_threads; ++i) {
threads.emplace_back(PostingThread::Create(
&run_posting_threads_, std::make_unique<ActionType>(this)));
// Don't assert here to simplify the code that requires a Join() call for
// every created PostingThread.
EXPECT_TRUE(threads[i]);
}
RunLoop run_loop;
message_loop_task_runner_->PostDelayedTask(
FROM_HERE,
BindOnce(
[](RunLoop* run_loop, AtomicFlag* stop_posting_threads) {
stop_posting_threads->Set();
run_loop->Quit();
},
&run_loop, &stop_posting_threads_),
duration);
TimeTicks post_task_start = TimeTicks::Now();
run_posting_threads_.Signal();
TimeTicks run_loop_start = TimeTicks::Now();
run_loop.Run();
tasks_run_duration_ = TimeTicks::Now() - run_loop_start;
for (auto& thread : threads)
thread->Join();
tasks_posted_duration_ = TimeTicks::Now() - post_task_start;
}
size_t num_tasks_posted() const {
return subtle::NoBarrier_Load(&num_tasks_posted_);
}
TimeDelta tasks_posted_duration() const { return tasks_posted_duration_; }
size_t num_tasks_run() const { return num_tasks_run_; }
TimeDelta tasks_run_duration() const { return tasks_run_duration_; }
private:
MessageLoop message_loop_;
// Accessed on multiple threads, thread-safe or constant:
const scoped_refptr<SequencedTaskRunner> message_loop_task_runner_;
WaitableEvent run_posting_threads_;
AtomicFlag stop_posting_threads_;
subtle::AtomicWord num_tasks_posted_ = 0;
// Accessed only on the test case thread:
TimeDelta tasks_posted_duration_;
TimeDelta tasks_run_duration_;
size_t num_tasks_run_ = 0;
DISALLOW_COPY_AND_ASSIGN(MessageLoopPerfTest);
};
} // namespace
TEST_P(MessageLoopPerfTest, PostTaskRate) {
// Measures the average rate of posting tasks from different threads and the
// average rate that the message loop is running those tasks.
RunTest<ContinuouslyPostTasks>(GetParam(), TimeDelta::FromSeconds(3));
perf_test::PrintResult("task_posting", "",
PostingThreadCountToString(GetParam()),
tasks_posted_duration().InMicroseconds() /
static_cast<double>(num_tasks_posted()),
"us/task", true);
perf_test::PrintResult("task_running", "",
PostingThreadCountToString(GetParam()),
tasks_run_duration().InMicroseconds() /
static_cast<double>(num_tasks_run()),
"us/task", true);
}
INSTANTIATE_TEST_CASE_P(,
MessageLoopPerfTest,
::testing::Values(1, 5, 10),
MessageLoopPerfTest::ParamInfoToString);
} // namespace base