// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
#define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
#include "base/base_export.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/pending_task.h"
#include "base/sequence_checker.h"
#include "base/synchronization/lock.h"
#include "base/time/time.h"
namespace base {
class BasicPostTaskPerfTest;
namespace internal {
// Implements a queue of tasks posted to the message loop running on the current
// thread. This class takes care of synchronizing posting tasks from different
// threads and together with MessageLoop ensures clean shutdown.
class BASE_EXPORT IncomingTaskQueue
: public RefCountedThreadSafe<IncomingTaskQueue> {
public:
// TODO(gab): Move this to SequencedTaskSource::Observer in
// https://chromium-review.googlesource.com/c/chromium/src/+/1088762.
class Observer {
public:
virtual ~Observer() = default;
// Notifies this Observer that it is about to enqueue |task|. The Observer
// may alter |task| as a result (e.g. add metadata to the PendingTask
// struct). This may be called while holding a lock and shouldn't perform
// logic requiring synchronization (override DidQueueTask() for that).
virtual void WillQueueTask(PendingTask* task) = 0;
// Notifies this Observer that a task was queued in the IncomingTaskQueue it
// observes. |was_empty| is true if the task source was empty (i.e.
// |!HasTasks()|) before this task was posted. DidQueueTask() can be invoked
// from any thread.
virtual void DidQueueTask(bool was_empty) = 0;
};
// Provides a read and remove only view into a task queue.
class ReadAndRemoveOnlyQueue {
public:
ReadAndRemoveOnlyQueue() = default;
virtual ~ReadAndRemoveOnlyQueue() = default;
// Returns the next task. HasTasks() is assumed to be true.
virtual const PendingTask& Peek() = 0;
// Removes and returns the next task. HasTasks() is assumed to be true.
virtual PendingTask Pop() = 0;
// Whether this queue has tasks.
virtual bool HasTasks() = 0;
// Removes all tasks.
virtual void Clear() = 0;
private:
DISALLOW_COPY_AND_ASSIGN(ReadAndRemoveOnlyQueue);
};
// Provides a read-write task queue.
class Queue : public ReadAndRemoveOnlyQueue {
public:
Queue() = default;
~Queue() override = default;
// Adds the task to the end of the queue.
virtual void Push(PendingTask pending_task) = 0;
private:
DISALLOW_COPY_AND_ASSIGN(Queue);
};
// Constructs an IncomingTaskQueue which will invoke |task_queue_observer|
// when tasks are queued. |task_queue_observer| will be bound to this
// IncomingTaskQueue's lifetime. Ownership is required as opposed to a raw
// pointer since IncomingTaskQueue is ref-counted. For the same reasons,
// |task_queue_observer| needs to support being invoked racily during
// shutdown).
explicit IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer);
// Appends a task to the incoming queue. Posting of all tasks is routed though
// AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting
// task is properly synchronized between different threads.
//
// Returns true if the task was successfully added to the queue, otherwise
// returns false. In all cases, the ownership of |task| is transferred to the
// called method.
bool AddToIncomingQueue(const Location& from_here,
OnceClosure task,
TimeDelta delay,
Nestable nestable);
// Instructs this IncomingTaskQueue to stop accepting tasks, this cannot be
// undone. Note that the registered IncomingTaskQueue::Observer may still
// racily receive a few DidQueueTask() calls while the Shutdown() signal
// propagates to other threads and it needs to support that.
void Shutdown();
ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; }
Queue& delayed_tasks() { return delayed_tasks_; }
Queue& deferred_tasks() { return deferred_tasks_; }
bool HasPendingHighResolutionTasks() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return delayed_tasks_.HasPendingHighResolutionTasks();
}
// Reports UMA metrics about its queues before the MessageLoop goes to sleep
// per being idle.
void ReportMetricsOnIdle() const;
private:
friend class base::BasicPostTaskPerfTest;
friend class RefCountedThreadSafe<IncomingTaskQueue>;
// These queues below support the previous MessageLoop behavior of
// maintaining three queue queues to process tasks:
//
// TriageQueue
// The first queue to receive all tasks for the processing sequence (when
// reloading from the thread-safe |incoming_queue_|). Tasks are generally
// either dispatched immediately or sent to the queues below.
//
// DelayedQueue
// The queue for holding tasks that should be run later and sorted by expected
// run time.
//
// DeferredQueue
// The queue for holding tasks that couldn't be run while the MessageLoop was
// nested. These are generally processed during the idle stage.
//
// Many of these do not share implementations even though they look like they
// could because of small quirks (reloading semantics) or differing underlying
// data strucutre (TaskQueue vs DelayedTaskQueue).
// The starting point for all tasks on the sequence processing the tasks.
class TriageQueue : public ReadAndRemoveOnlyQueue {
public:
TriageQueue(IncomingTaskQueue* outer);
~TriageQueue() override;
// ReadAndRemoveOnlyQueue:
// The methods below will attempt to reload from the incoming queue if the
// queue itself is empty (Clear() has special logic to reload only once
// should destructors post more tasks).
const PendingTask& Peek() override;
PendingTask Pop() override;
// Whether this queue has tasks after reloading from the incoming queue.
bool HasTasks() override;
void Clear() override;
private:
void ReloadFromIncomingQueueIfEmpty();
IncomingTaskQueue* const outer_;
TaskQueue queue_;
DISALLOW_COPY_AND_ASSIGN(TriageQueue);
};
class DelayedQueue : public Queue {
public:
DelayedQueue();
~DelayedQueue() override;
// Queue:
const PendingTask& Peek() override;
PendingTask Pop() override;
// Whether this queue has tasks after sweeping the cancelled ones in front.
bool HasTasks() override;
void Clear() override;
void Push(PendingTask pending_task) override;
size_t Size() const;
bool HasPendingHighResolutionTasks() const {
return pending_high_res_tasks_ > 0;
}
private:
DelayedTaskQueue queue_;
// Number of high resolution tasks in |queue_|.
int pending_high_res_tasks_ = 0;
SEQUENCE_CHECKER(sequence_checker_);
DISALLOW_COPY_AND_ASSIGN(DelayedQueue);
};
class DeferredQueue : public Queue {
public:
DeferredQueue();
~DeferredQueue() override;
// Queue:
const PendingTask& Peek() override;
PendingTask Pop() override;
bool HasTasks() override;
void Clear() override;
void Push(PendingTask pending_task) override;
private:
TaskQueue queue_;
SEQUENCE_CHECKER(sequence_checker_);
DISALLOW_COPY_AND_ASSIGN(DeferredQueue);
};
virtual ~IncomingTaskQueue();
// Adds a task to |incoming_queue_|. The caller retains ownership of
// |pending_task|, but this function will reset the value of
// |pending_task->task|. This is needed to ensure that the posting call stack
// does not retain |pending_task->task| beyond this function call.
bool PostPendingTask(PendingTask* pending_task);
// Does the real work of posting a pending task. Returns true if
// |incoming_queue_| was empty before |pending_task| was posted.
bool PostPendingTaskLockRequired(PendingTask* pending_task);
// Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called
// from the sequence processing the tasks.
void ReloadWorkQueue(TaskQueue* work_queue);
// Checks calls made only on the MessageLoop thread.
SEQUENCE_CHECKER(sequence_checker_);
const std::unique_ptr<Observer> task_queue_observer_;
// Queue for initial triaging of tasks on the |sequence_checker_| sequence.
TriageQueue triage_tasks_;
// Queue for delayed tasks on the |sequence_checker_| sequence.
DelayedQueue delayed_tasks_;
// Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
DeferredQueue deferred_tasks_;
// Synchronizes access to all members below this line.
base::Lock incoming_queue_lock_;
// An incoming queue of tasks that are acquired under a mutex for processing
// on this instance's thread. These tasks have not yet been been pushed to
// |triage_tasks_|.
TaskQueue incoming_queue_;
// True if new tasks should be accepted.
bool accept_new_tasks_ = true;
// The next sequence number to use for delayed tasks.
int next_sequence_num_ = 0;
// True if the outgoing queue (|triage_tasks_|) is empty. Toggled under
// |incoming_queue_lock_| in ReloadWorkQueue() so that
// PostPendingTaskLockRequired() can tell, without accessing the thread unsafe
// |triage_tasks_|, if the IncomingTaskQueue has been made non-empty by a
// PostTask() (and needs to inform its Observer).
bool triage_queue_empty_ = true;
DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
};
} // namespace internal
} // namespace base
#endif // BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_