// Copyright 2017 the V8 project authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_ #define V8_HEAP_ITEM_PARALLEL_JOB_H_ #include <memory> #include <vector> #include "src/base/atomic-utils.h" #include "src/base/logging.h" #include "src/base/macros.h" #include "src/base/optional.h" #include "src/cancelable-task.h" #include "src/counters.h" #include "src/globals.h" namespace v8 { namespace base { class Semaphore; } namespace internal { class Counters; class Isolate; // This class manages background tasks that process a set of items in parallel. // The first task added is executed on the same thread as |job.Run()| is called. // All other tasks are scheduled in the background. // // - Items need to inherit from ItemParallelJob::Item. // - Tasks need to inherit from ItemParallelJob::Task. // // Items need to be marked as finished after processing them. Task and Item // ownership is transferred to the job. // // Each parallel (non-main thread) task will report the time between the job // being created and it being scheduled to |gc_parallel_task_latency_histogram|. class V8_EXPORT_PRIVATE ItemParallelJob { public: class Task; class V8_EXPORT_PRIVATE Item { public: Item() = default; virtual ~Item() = default; // Marks an item as being finished. void MarkFinished() { CHECK_EQ(kProcessing, state_.exchange(kFinished)); } private: enum ProcessingState : uintptr_t { kAvailable, kProcessing, kFinished }; bool TryMarkingAsProcessing() { ProcessingState available = kAvailable; return state_.compare_exchange_strong(available, kProcessing); } bool IsFinished() { return state_ == kFinished; } std::atomic<ProcessingState> state_{kAvailable}; friend class ItemParallelJob; friend class ItemParallelJob::Task; DISALLOW_COPY_AND_ASSIGN(Item); }; class V8_EXPORT_PRIVATE Task : public CancelableTask { public: explicit Task(Isolate* isolate); virtual ~Task(); virtual void RunInParallel() = 0; protected: // Retrieves a new item that needs to be processed. Returns |nullptr| if // all items are processed. Upon returning an item, the task is required // to process the item and mark the item as finished after doing so. template <class ItemType> ItemType* GetItem() { while (items_considered_++ != items_->size()) { // Wrap around. if (cur_index_ == items_->size()) { cur_index_ = 0; } Item* item = (*items_)[cur_index_++]; if (item->TryMarkingAsProcessing()) { return static_cast<ItemType*>(item); } } return nullptr; } private: friend class ItemParallelJob; friend class Item; // Sets up state required before invoking Run(). If // |start_index is >= items_.size()|, this task will not process work items // (some jobs have more tasks than work items in order to parallelize post- // processing, e.g. scavenging). If |gc_parallel_task_latency_histogram| is // provided, it will be used to report histograms on the latency between // posting the task and it being scheduled. void SetupInternal( base::Semaphore* on_finish, std::vector<Item*>* items, size_t start_index, base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram); // We don't allow overriding this method any further. void RunInternal() final; std::vector<Item*>* items_ = nullptr; size_t cur_index_ = 0; size_t items_considered_ = 0; base::Semaphore* on_finish_ = nullptr; base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram_; DISALLOW_COPY_AND_ASSIGN(Task); }; ItemParallelJob(CancelableTaskManager* cancelable_task_manager, base::Semaphore* pending_tasks); ~ItemParallelJob(); // Adds a task to the job. Transfers ownership to the job. void AddTask(Task* task) { tasks_.push_back(std::unique_ptr<Task>(task)); } // Adds an item to the job. Transfers ownership to the job. void AddItem(Item* item) { items_.push_back(item); } int NumberOfItems() const { return static_cast<int>(items_.size()); } int NumberOfTasks() const { return static_cast<int>(tasks_.size()); } // Runs this job. Reporting metrics in a thread-safe manner to // |async_counters|. void Run(std::shared_ptr<Counters> async_counters); private: std::vector<Item*> items_; std::vector<std::unique_ptr<Task>> tasks_; CancelableTaskManager* cancelable_task_manager_; base::Semaphore* pending_tasks_; DISALLOW_COPY_AND_ASSIGN(ItemParallelJob); }; } // namespace internal } // namespace v8 #endif // V8_HEAP_ITEM_PARALLEL_JOB_H_