// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_ #define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_ #include "base/base_export.h" #include "base/callback.h" #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/pending_task.h" #include "base/sequence_checker.h" #include "base/synchronization/lock.h" #include "base/time/time.h" namespace base { class BasicPostTaskPerfTest; namespace internal { // Implements a queue of tasks posted to the message loop running on the current // thread. This class takes care of synchronizing posting tasks from different // threads and together with MessageLoop ensures clean shutdown. class BASE_EXPORT IncomingTaskQueue : public RefCountedThreadSafe<IncomingTaskQueue> { public: // TODO(gab): Move this to SequencedTaskSource::Observer in // https://chromium-review.googlesource.com/c/chromium/src/+/1088762. class Observer { public: virtual ~Observer() = default; // Notifies this Observer that it is about to enqueue |task|. The Observer // may alter |task| as a result (e.g. add metadata to the PendingTask // struct). This may be called while holding a lock and shouldn't perform // logic requiring synchronization (override DidQueueTask() for that). virtual void WillQueueTask(PendingTask* task) = 0; // Notifies this Observer that a task was queued in the IncomingTaskQueue it // observes. |was_empty| is true if the task source was empty (i.e. // |!HasTasks()|) before this task was posted. DidQueueTask() can be invoked // from any thread. virtual void DidQueueTask(bool was_empty) = 0; }; // Provides a read and remove only view into a task queue. class ReadAndRemoveOnlyQueue { public: ReadAndRemoveOnlyQueue() = default; virtual ~ReadAndRemoveOnlyQueue() = default; // Returns the next task. HasTasks() is assumed to be true. virtual const PendingTask& Peek() = 0; // Removes and returns the next task. HasTasks() is assumed to be true. virtual PendingTask Pop() = 0; // Whether this queue has tasks. virtual bool HasTasks() = 0; // Removes all tasks. virtual void Clear() = 0; private: DISALLOW_COPY_AND_ASSIGN(ReadAndRemoveOnlyQueue); }; // Provides a read-write task queue. class Queue : public ReadAndRemoveOnlyQueue { public: Queue() = default; ~Queue() override = default; // Adds the task to the end of the queue. virtual void Push(PendingTask pending_task) = 0; private: DISALLOW_COPY_AND_ASSIGN(Queue); }; // Constructs an IncomingTaskQueue which will invoke |task_queue_observer| // when tasks are queued. |task_queue_observer| will be bound to this // IncomingTaskQueue's lifetime. Ownership is required as opposed to a raw // pointer since IncomingTaskQueue is ref-counted. For the same reasons, // |task_queue_observer| needs to support being invoked racily during // shutdown). explicit IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer); // Appends a task to the incoming queue. Posting of all tasks is routed though // AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting // task is properly synchronized between different threads. // // Returns true if the task was successfully added to the queue, otherwise // returns false. In all cases, the ownership of |task| is transferred to the // called method. bool AddToIncomingQueue(const Location& from_here, OnceClosure task, TimeDelta delay, Nestable nestable); // Instructs this IncomingTaskQueue to stop accepting tasks, this cannot be // undone. Note that the registered IncomingTaskQueue::Observer may still // racily receive a few DidQueueTask() calls while the Shutdown() signal // propagates to other threads and it needs to support that. void Shutdown(); ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; } Queue& delayed_tasks() { return delayed_tasks_; } Queue& deferred_tasks() { return deferred_tasks_; } bool HasPendingHighResolutionTasks() const { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); return delayed_tasks_.HasPendingHighResolutionTasks(); } // Reports UMA metrics about its queues before the MessageLoop goes to sleep // per being idle. void ReportMetricsOnIdle() const; private: friend class base::BasicPostTaskPerfTest; friend class RefCountedThreadSafe<IncomingTaskQueue>; // These queues below support the previous MessageLoop behavior of // maintaining three queue queues to process tasks: // // TriageQueue // The first queue to receive all tasks for the processing sequence (when // reloading from the thread-safe |incoming_queue_|). Tasks are generally // either dispatched immediately or sent to the queues below. // // DelayedQueue // The queue for holding tasks that should be run later and sorted by expected // run time. // // DeferredQueue // The queue for holding tasks that couldn't be run while the MessageLoop was // nested. These are generally processed during the idle stage. // // Many of these do not share implementations even though they look like they // could because of small quirks (reloading semantics) or differing underlying // data strucutre (TaskQueue vs DelayedTaskQueue). // The starting point for all tasks on the sequence processing the tasks. class TriageQueue : public ReadAndRemoveOnlyQueue { public: TriageQueue(IncomingTaskQueue* outer); ~TriageQueue() override; // ReadAndRemoveOnlyQueue: // The methods below will attempt to reload from the incoming queue if the // queue itself is empty (Clear() has special logic to reload only once // should destructors post more tasks). const PendingTask& Peek() override; PendingTask Pop() override; // Whether this queue has tasks after reloading from the incoming queue. bool HasTasks() override; void Clear() override; private: void ReloadFromIncomingQueueIfEmpty(); IncomingTaskQueue* const outer_; TaskQueue queue_; DISALLOW_COPY_AND_ASSIGN(TriageQueue); }; class DelayedQueue : public Queue { public: DelayedQueue(); ~DelayedQueue() override; // Queue: const PendingTask& Peek() override; PendingTask Pop() override; // Whether this queue has tasks after sweeping the cancelled ones in front. bool HasTasks() override; void Clear() override; void Push(PendingTask pending_task) override; size_t Size() const; bool HasPendingHighResolutionTasks() const { return pending_high_res_tasks_ > 0; } private: DelayedTaskQueue queue_; // Number of high resolution tasks in |queue_|. int pending_high_res_tasks_ = 0; SEQUENCE_CHECKER(sequence_checker_); DISALLOW_COPY_AND_ASSIGN(DelayedQueue); }; class DeferredQueue : public Queue { public: DeferredQueue(); ~DeferredQueue() override; // Queue: const PendingTask& Peek() override; PendingTask Pop() override; bool HasTasks() override; void Clear() override; void Push(PendingTask pending_task) override; private: TaskQueue queue_; SEQUENCE_CHECKER(sequence_checker_); DISALLOW_COPY_AND_ASSIGN(DeferredQueue); }; virtual ~IncomingTaskQueue(); // Adds a task to |incoming_queue_|. The caller retains ownership of // |pending_task|, but this function will reset the value of // |pending_task->task|. This is needed to ensure that the posting call stack // does not retain |pending_task->task| beyond this function call. bool PostPendingTask(PendingTask* pending_task); // Does the real work of posting a pending task. Returns true if // |incoming_queue_| was empty before |pending_task| was posted. bool PostPendingTaskLockRequired(PendingTask* pending_task); // Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called // from the sequence processing the tasks. void ReloadWorkQueue(TaskQueue* work_queue); // Checks calls made only on the MessageLoop thread. SEQUENCE_CHECKER(sequence_checker_); const std::unique_ptr<Observer> task_queue_observer_; // Queue for initial triaging of tasks on the |sequence_checker_| sequence. TriageQueue triage_tasks_; // Queue for delayed tasks on the |sequence_checker_| sequence. DelayedQueue delayed_tasks_; // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence. DeferredQueue deferred_tasks_; // Synchronizes access to all members below this line. base::Lock incoming_queue_lock_; // An incoming queue of tasks that are acquired under a mutex for processing // on this instance's thread. These tasks have not yet been been pushed to // |triage_tasks_|. TaskQueue incoming_queue_; // True if new tasks should be accepted. bool accept_new_tasks_ = true; // The next sequence number to use for delayed tasks. int next_sequence_num_ = 0; // True if the outgoing queue (|triage_tasks_|) is empty. Toggled under // |incoming_queue_lock_| in ReloadWorkQueue() so that // PostPendingTaskLockRequired() can tell, without accessing the thread unsafe // |triage_tasks_|, if the IncomingTaskQueue has been made non-empty by a // PostTask() (and needs to inform its Observer). bool triage_queue_empty_ = true; DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue); }; } // namespace internal } // namespace base #endif // BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_