#include "SkTaskGroup.h" #include "SkCondVar.h" #include "SkRunnable.h" #include "SkTDArray.h" #include "SkThread.h" #include "SkThreadUtils.h" #if defined(SK_BUILD_FOR_WIN32) static inline int num_cores() { SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); return sysinfo.dwNumberOfProcessors; } #else #include <unistd.h> static inline int num_cores() { return (int) sysconf(_SC_NPROCESSORS_ONLN); } #endif namespace { class ThreadPool : SkNoncopyable { public: static void Add(SkRunnable* task, int32_t* pending) { if (!gGlobal) { // If we have no threads, run synchronously. return task->run(); } gGlobal->add(&CallRunnable, task, pending); } static void Add(void (*fn)(void*), void* arg, int32_t* pending) { if (!gGlobal) { return fn(arg); } gGlobal->add(fn, arg, pending); } static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) { if (!gGlobal) { for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } return; } gGlobal->batch(fn, args, N, stride, pending); } static void Wait(int32_t* pending) { if (!gGlobal) { // If we have no threads, the work must already be done. SkASSERT(*pending == 0); return; } while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here or in Loop. // Lend a hand until our SkTaskGroup of interest is done. Work work; { AutoLock lock(&gGlobal->fReady); if (gGlobal->fWork.isEmpty()) { // Someone has picked up all the work (including ours). How nice of them! // (They may still be working on it, so we can't assert *pending == 0 here.) continue; } gGlobal->fWork.pop(&work); } // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. // We threads gotta stick together. We're always making forward progress. work.fn(work.arg); sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_load() just above. } } private: struct AutoLock { AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } ~AutoLock() { fC->unlock(); } private: SkCondVar* fC; }; static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } struct Work { void (*fn)(void*); // A function to call, void* arg; // its argument, int32_t* pending; // then sk_atomic_dec(pending) afterwards. }; explicit ThreadPool(int threads) : fDraining(false) { if (threads == -1) { threads = num_cores(); } for (int i = 0; i < threads; i++) { fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); fThreads.top()->start(); } } ~ThreadPool() { SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now. { AutoLock lock(&fReady); fDraining = true; fReady.broadcast(); } for (int i = 0; i < fThreads.count(); i++) { fThreads[i]->join(); } SkASSERT(fWork.isEmpty()); // Can't hurt to double check. fThreads.deleteAll(); } void add(void (*fn)(void*), void* arg, int32_t* pending) { Work work = { fn, arg, pending }; sk_atomic_inc(pending); // No barrier needed. { AutoLock lock(&fReady); fWork.push(work); fReady.signal(); } } void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) { sk_atomic_add(pending, N); // No barrier needed. { AutoLock lock(&fReady); Work* batch = fWork.append(N); for (int i = 0; i < N; i++) { Work work = { fn, (char*)arg + i*stride, pending }; batch[i] = work; } fReady.broadcast(); } } static void Loop(void* arg) { ThreadPool* pool = (ThreadPool*)arg; Work work; while (true) { { AutoLock lock(&pool->fReady); while (pool->fWork.isEmpty()) { if (pool->fDraining) { return; } pool->fReady.wait(); } pool->fWork.pop(&work); } work.fn(work.arg); sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load() in Wait(). } } SkTDArray<Work> fWork; SkTDArray<SkThread*> fThreads; SkCondVar fReady; bool fDraining; static ThreadPool* gGlobal; friend struct SkTaskGroup::Enabler; }; ThreadPool* ThreadPool::gGlobal = NULL; } // namespace SkTaskGroup::Enabler::Enabler(int threads) { SkASSERT(ThreadPool::gGlobal == NULL); if (threads != 0 && SkCondVar::Supported()) { ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); } } SkTaskGroup::Enabler::~Enabler() { SkDELETE(ThreadPool::gGlobal); } SkTaskGroup::SkTaskGroup() : fPending(0) {} void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); } void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { ThreadPool::Batch(fn, args, N, stride, &fPending); }