// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task_scheduler/scheduler_worker_pool.h"
#include <memory>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/task_runner.h"
#include "base/task_scheduler/delayed_task_manager.h"
#include "base/task_scheduler/scheduler_worker_pool_impl.h"
#include "base/task_scheduler/scheduler_worker_pool_params.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/task_scheduler/task_traits.h"
#include "base/task_scheduler/test_task_factory.h"
#include "base/task_scheduler/test_utils.h"
#include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h"
#if defined(OS_WIN)
#include "base/task_scheduler/platform_native_worker_pool_win.h"
#endif
namespace base {
namespace internal {
namespace {
constexpr size_t kMaxTasks = 4;
// By default, tests allow half of the pool to be used by background tasks.
constexpr size_t kMaxBackgroundTasks = kMaxTasks / 2;
constexpr size_t kNumThreadsPostingTasks = 4;
constexpr size_t kNumTasksPostedPerThread = 150;
enum class PoolType {
GENERIC,
#if defined(OS_WIN)
WINDOWS,
#endif
};
struct PoolExecutionType {
PoolType pool_type;
test::ExecutionMode execution_mode;
};
using PostNestedTask = test::TestTaskFactory::PostNestedTask;
class ThreadPostingTasks : public SimpleThread {
public:
// Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
// |worker_pool| through an |execution_mode| task runner. If
// |post_nested_task| is YES, each task posted by this thread posts another
// task when it runs.
ThreadPostingTasks(SchedulerWorkerPool* worker_pool,
test::ExecutionMode execution_mode,
PostNestedTask post_nested_task)
: SimpleThread("ThreadPostingTasks"),
worker_pool_(worker_pool),
post_nested_task_(post_nested_task),
factory_(test::CreateTaskRunnerWithExecutionMode(worker_pool,
execution_mode),
execution_mode) {
DCHECK(worker_pool_);
}
const test::TestTaskFactory* factory() const { return &factory_; }
private:
void Run() override {
EXPECT_FALSE(factory_.task_runner()->RunsTasksInCurrentSequence());
for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
EXPECT_TRUE(factory_.PostTask(post_nested_task_, Closure()));
}
SchedulerWorkerPool* const worker_pool_;
const scoped_refptr<TaskRunner> task_runner_;
const PostNestedTask post_nested_task_;
test::TestTaskFactory factory_;
DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
};
class TaskSchedulerWorkerPoolTest
: public testing::TestWithParam<PoolExecutionType> {
protected:
TaskSchedulerWorkerPoolTest()
: service_thread_("TaskSchedulerServiceThread") {}
void SetUp() override {
service_thread_.Start();
delayed_task_manager_.Start(service_thread_.task_runner());
CreateWorkerPool();
}
void TearDown() override {
service_thread_.Stop();
if (worker_pool_)
worker_pool_->JoinForTesting();
}
void CreateWorkerPool() {
ASSERT_FALSE(worker_pool_);
switch (GetParam().pool_type) {
case PoolType::GENERIC:
worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
"TestWorkerPool", "A", ThreadPriority::NORMAL,
task_tracker_.GetTrackedRef(), &delayed_task_manager_);
break;
#if defined(OS_WIN)
case PoolType::WINDOWS:
worker_pool_ = std::make_unique<PlatformNativeWorkerPoolWin>(
task_tracker_.GetTrackedRef(), &delayed_task_manager_);
break;
#endif
}
ASSERT_TRUE(worker_pool_);
}
void StartWorkerPool() {
ASSERT_TRUE(worker_pool_);
switch (GetParam().pool_type) {
case PoolType::GENERIC: {
SchedulerWorkerPoolImpl* scheduler_worker_pool_impl =
static_cast<SchedulerWorkerPoolImpl*>(worker_pool_.get());
scheduler_worker_pool_impl->Start(
SchedulerWorkerPoolParams(kMaxTasks, TimeDelta::Max()),
kMaxBackgroundTasks, service_thread_.task_runner(), nullptr,
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
break;
}
#if defined(OS_WIN)
case PoolType::WINDOWS: {
PlatformNativeWorkerPoolWin* scheduler_worker_pool_windows_impl =
static_cast<PlatformNativeWorkerPoolWin*>(worker_pool_.get());
scheduler_worker_pool_windows_impl->Start();
break;
}
#endif
}
}
Thread service_thread_;
TaskTracker task_tracker_ = {"Test"};
DelayedTaskManager delayed_task_manager_;
std::unique_ptr<SchedulerWorkerPool> worker_pool_;
private:
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolTest);
};
void ShouldNotRun() {
ADD_FAILURE() << "Ran a task that shouldn't run.";
}
} // namespace
TEST_P(TaskSchedulerWorkerPoolTest, PostTasks) {
StartWorkerPool();
// Create threads to post tasks.
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
worker_pool_.get(), GetParam().execution_mode, PostNestedTask::NO));
threads_posting_tasks.back()->Start();
}
// Wait for all tasks to run.
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
}
// Flush the task tracker to be sure that no task accesses its TestTaskFactory
// after |thread_posting_tasks| is destroyed.
task_tracker_.FlushForTesting();
}
TEST_P(TaskSchedulerWorkerPoolTest, NestedPostTasks) {
StartWorkerPool();
// Create threads to post tasks. Each task posted by these threads will post
// another task when it runs.
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
worker_pool_.get(), GetParam().execution_mode, PostNestedTask::YES));
threads_posting_tasks.back()->Start();
}
// Wait for all tasks to run.
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
}
// Flush the task tracker to be sure that no task accesses its TestTaskFactory
// after |thread_posting_tasks| is destroyed.
task_tracker_.FlushForTesting();
}
// Verify that a Task can't be posted after shutdown.
TEST_P(TaskSchedulerWorkerPoolTest, PostTaskAfterShutdown) {
StartWorkerPool();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
worker_pool_.get(), GetParam().execution_mode);
task_tracker_.Shutdown();
EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
}
// Verify that posting tasks after the pool was destroyed fails but doesn't
// crash.
TEST_P(TaskSchedulerWorkerPoolTest, PostAfterDestroy) {
StartWorkerPool();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
worker_pool_.get(), GetParam().execution_mode);
EXPECT_TRUE(task_runner->PostTask(FROM_HERE, DoNothing()));
task_tracker_.Shutdown();
worker_pool_->JoinForTesting();
worker_pool_.reset();
EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
}
// Verify that a Task runs shortly after its delay expires.
TEST_P(TaskSchedulerWorkerPoolTest, PostDelayedTask) {
StartWorkerPool();
WaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED);
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
worker_pool_.get(), GetParam().execution_mode);
// Wait until the task runner is up and running to make sure the test below is
// solely timing the delayed task, not bringing up a physical thread.
task_runner->PostTask(
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_ran)));
task_ran.Wait();
ASSERT_TRUE(!task_ran.IsSignaled());
// Post a task with a short delay.
TimeTicks start_time = TimeTicks::Now();
EXPECT_TRUE(task_runner->PostDelayedTask(
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_ran)),
TestTimeouts::tiny_timeout()));
// Wait until the task runs.
task_ran.Wait();
// Expect the task to run after its delay expires, but no more than 250
// ms after that.
const TimeDelta actual_delay = TimeTicks::Now() - start_time;
EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
EXPECT_LT(actual_delay,
TimeDelta::FromMilliseconds(250) + TestTimeouts::tiny_timeout());
}
// Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
// returns false when called from a task that isn't part of the sequence. Note:
// Tests that use TestTaskFactory already verify that
// RunsTasksInCurrentSequence() returns true when appropriate so this method
// complements it to get full coverage of that method.
TEST_P(TaskSchedulerWorkerPoolTest, SequencedRunsTasksInCurrentSequence) {
StartWorkerPool();
auto task_runner = test::CreateTaskRunnerWithExecutionMode(
worker_pool_.get(), GetParam().execution_mode);
auto sequenced_task_runner =
worker_pool_->CreateSequencedTaskRunnerWithTraits(TaskTraits());
WaitableEvent task_ran;
task_runner->PostTask(
FROM_HERE,
BindOnce(
[](scoped_refptr<TaskRunner> sequenced_task_runner,
WaitableEvent* task_ran) {
EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
task_ran->Signal();
},
sequenced_task_runner, Unretained(&task_ran)));
task_ran.Wait();
}
// Verify that tasks posted before Start run after Start.
TEST_P(TaskSchedulerWorkerPoolTest, PostBeforeStart) {
WaitableEvent task_1_running;
WaitableEvent task_2_running;
scoped_refptr<TaskRunner> task_runner =
worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
task_runner->PostTask(
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_1_running)));
task_runner->PostTask(
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_2_running)));
// Workers should not be created and tasks should not run before the pool is
// started. The sleep is to give time for the tasks to potentially run.
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
EXPECT_FALSE(task_1_running.IsSignaled());
EXPECT_FALSE(task_2_running.IsSignaled());
StartWorkerPool();
// Tasks should run shortly after the pool is started.
task_1_running.Wait();
task_2_running.Wait();
task_tracker_.FlushForTesting();
}
INSTANTIATE_TEST_CASE_P(GenericParallel,
TaskSchedulerWorkerPoolTest,
::testing::Values(PoolExecutionType{
PoolType::GENERIC, test::ExecutionMode::PARALLEL}));
INSTANTIATE_TEST_CASE_P(GenericSequenced,
TaskSchedulerWorkerPoolTest,
::testing::Values(PoolExecutionType{
PoolType::GENERIC,
test::ExecutionMode::SEQUENCED}));
#if defined(OS_WIN)
INSTANTIATE_TEST_CASE_P(WinParallel,
TaskSchedulerWorkerPoolTest,
::testing::Values(PoolExecutionType{
PoolType::WINDOWS, test::ExecutionMode::PARALLEL}));
INSTANTIATE_TEST_CASE_P(WinSequenced,
TaskSchedulerWorkerPoolTest,
::testing::Values(PoolExecutionType{
PoolType::WINDOWS,
test::ExecutionMode::SEQUENCED}));
#endif
} // namespace internal
} // namespace base