普通文本  |  220行  |  7.54 KB

// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task_scheduler/scheduler_worker_pool.h"

#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
#include "base/task_scheduler/delayed_task_manager.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/threading/thread_local.h"

namespace base {
namespace internal {

namespace {

// The number of SchedulerWorkerPool that are alive in this process. This
// variable should only be incremented when the SchedulerWorkerPool instances
// are brought up (on the main thread; before any tasks are posted) and
// decremented when the same instances are brought down (i.e., only when unit
// tests tear down the task environment and never in production). This makes the
// variable const while worker threads are up and as such it doesn't need to be
// atomic. It is used to tell when a task is posted from the main thread after
// the task environment was brought down in unit tests so that
// SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting
// such callers know they should complete necessary work synchronously. Note:
// |!g_active_pools_count| is generally equivalent to
// |!TaskScheduler::GetInstance()| but has the advantage of being valid in
// task_scheduler unit tests that don't instantiate a full TaskScheduler.
int g_active_pools_count = 0;

// SchedulerWorkerPool that owns the current thread, if any.
LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
    tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER;

const SchedulerWorkerPool* GetCurrentWorkerPool() {
  return tls_current_worker_pool.Get().Get();
}

}  // namespace

// A task runner that runs tasks in parallel.
class SchedulerParallelTaskRunner : public TaskRunner {
 public:
  // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
  // long as |worker_pool| is alive.
  // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
  SchedulerParallelTaskRunner(const TaskTraits& traits,
                              SchedulerWorkerPool* worker_pool)
      : traits_(traits), worker_pool_(worker_pool) {
    DCHECK(worker_pool_);
  }

  // TaskRunner:
  bool PostDelayedTask(const Location& from_here,
                       OnceClosure closure,
                       TimeDelta delay) override {
    if (!g_active_pools_count)
      return false;

    // Post the task as part of a one-off single-task Sequence.
    return worker_pool_->PostTaskWithSequence(
        Task(from_here, std::move(closure), traits_, delay),
        MakeRefCounted<Sequence>());
  }

  bool RunsTasksInCurrentSequence() const override {
    return GetCurrentWorkerPool() == worker_pool_;
  }

 private:
  ~SchedulerParallelTaskRunner() override = default;

  const TaskTraits traits_;
  SchedulerWorkerPool* const worker_pool_;

  DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
};

// A task runner that runs tasks in sequence.
class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
 public:
  // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
  // so long as |worker_pool| is alive.
  // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
  SchedulerSequencedTaskRunner(const TaskTraits& traits,
                               SchedulerWorkerPool* worker_pool)
      : traits_(traits), worker_pool_(worker_pool) {
    DCHECK(worker_pool_);
  }

  // SequencedTaskRunner:
  bool PostDelayedTask(const Location& from_here,
                       OnceClosure closure,
                       TimeDelta delay) override {
    if (!g_active_pools_count)
      return false;

    Task task(from_here, std::move(closure), traits_, delay);
    task.sequenced_task_runner_ref = this;

    // Post the task as part of |sequence_|.
    return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
  }

  bool PostNonNestableDelayedTask(const Location& from_here,
                                  OnceClosure closure,
                                  base::TimeDelta delay) override {
    // Tasks are never nested within the task scheduler.
    return PostDelayedTask(from_here, std::move(closure), delay);
  }

  bool RunsTasksInCurrentSequence() const override {
    return sequence_->token() == SequenceToken::GetForCurrentThread();
  }

 private:
  ~SchedulerSequencedTaskRunner() override = default;

  // Sequence for all Tasks posted through this TaskRunner.
  const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>();

  const TaskTraits traits_;
  SchedulerWorkerPool* const worker_pool_;

  DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};

scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits(
    const TaskTraits& traits) {
  return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this);
}

scoped_refptr<SequencedTaskRunner>
SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits(
    const TaskTraits& traits) {
  return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this);
}

bool SchedulerWorkerPool::PostTaskWithSequence(
    Task task,
    scoped_refptr<Sequence> sequence) {
  DCHECK(task.task);
  DCHECK(sequence);

  if (!task_tracker_->WillPostTask(&task))
    return false;

  if (task.delayed_run_time.is_null()) {
    PostTaskWithSequenceNow(std::move(task), std::move(sequence));
  } else {
    // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
    // for details.
    CHECK(task.task);
    delayed_task_manager_->AddDelayedTask(
        std::move(task), BindOnce(
                             [](scoped_refptr<Sequence> sequence,
                                SchedulerWorkerPool* worker_pool, Task task) {
                               worker_pool->PostTaskWithSequenceNow(
                                   std::move(task), std::move(sequence));
                             },
                             std::move(sequence), Unretained(this)));
  }

  return true;
}

SchedulerWorkerPool::SchedulerWorkerPool(
    TrackedRef<TaskTracker> task_tracker,
    DelayedTaskManager* delayed_task_manager)
    : task_tracker_(std::move(task_tracker)),
      delayed_task_manager_(delayed_task_manager) {
  DCHECK(task_tracker_);
  DCHECK(delayed_task_manager_);
  ++g_active_pools_count;
}

SchedulerWorkerPool::~SchedulerWorkerPool() {
  --g_active_pools_count;
  DCHECK_GE(g_active_pools_count, 0);
}

void SchedulerWorkerPool::BindToCurrentThread() {
  DCHECK(!GetCurrentWorkerPool());
  tls_current_worker_pool.Get().Set(this);
}

void SchedulerWorkerPool::UnbindFromCurrentThread() {
  DCHECK(GetCurrentWorkerPool());
  tls_current_worker_pool.Get().Set(nullptr);
}

void SchedulerWorkerPool::PostTaskWithSequenceNow(
    Task task,
    scoped_refptr<Sequence> sequence) {
  DCHECK(task.task);
  DCHECK(sequence);

  // Confirm that |task| is ready to run (its delayed run time is either null or
  // in the past).
  DCHECK_LE(task.delayed_run_time, TimeTicks::Now());

  const bool sequence_was_empty = sequence->PushTask(std::move(task));
  if (sequence_was_empty) {
    // Try to schedule |sequence| if it was empty before |task| was inserted
    // into it. Otherwise, one of these must be true:
    // - |sequence| is already scheduled, or,
    // - The pool is running a Task from |sequence|. The pool is expected to
    //   reschedule |sequence| once it's done running the Task.
    sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this);
    if (sequence)
      OnCanScheduleSequence(std::move(sequence));
  }
}

}  // namespace internal
}  // namespace base