#include "SkTaskGroup.h"
#include "SkCondVar.h"
#include "SkRunnable.h"
#include "SkTDArray.h"
#include "SkThread.h"
#include "SkThreadUtils.h"
#if defined(SK_BUILD_FOR_WIN32)
static inline int num_cores() {
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
return sysinfo.dwNumberOfProcessors;
}
#else
#include <unistd.h>
static inline int num_cores() {
return (int) sysconf(_SC_NPROCESSORS_ONLN);
}
#endif
namespace {
class ThreadPool : SkNoncopyable {
public:
static void Add(SkRunnable* task, int32_t* pending) {
if (!gGlobal) { // If we have no threads, run synchronously.
return task->run();
}
gGlobal->add(&CallRunnable, task, pending);
}
static void Add(void (*fn)(void*), void* arg, int32_t* pending) {
if (!gGlobal) {
return fn(arg);
}
gGlobal->add(fn, arg, pending);
}
static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) {
if (!gGlobal) {
for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
return;
}
gGlobal->batch(fn, args, N, stride, pending);
}
static void Wait(int32_t* pending) {
if (!gGlobal) { // If we have no threads, the work must already be done.
SkASSERT(*pending == 0);
return;
}
while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here or in Loop.
// Lend a hand until our SkTaskGroup of interest is done.
Work work;
{
AutoLock lock(&gGlobal->fReady);
if (gGlobal->fWork.isEmpty()) {
// Someone has picked up all the work (including ours). How nice of them!
// (They may still be working on it, so we can't assert *pending == 0 here.)
continue;
}
gGlobal->fWork.pop(&work);
}
// This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
// We threads gotta stick together. We're always making forward progress.
work.fn(work.arg);
sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_load() just above.
}
}
private:
struct AutoLock {
AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
~AutoLock() { fC->unlock(); }
private:
SkCondVar* fC;
};
static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
struct Work {
void (*fn)(void*); // A function to call,
void* arg; // its argument,
int32_t* pending; // then sk_atomic_dec(pending) afterwards.
};
explicit ThreadPool(int threads) : fDraining(false) {
if (threads == -1) {
threads = num_cores();
}
for (int i = 0; i < threads; i++) {
fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
fThreads.top()->start();
}
}
~ThreadPool() {
SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now.
{
AutoLock lock(&fReady);
fDraining = true;
fReady.broadcast();
}
for (int i = 0; i < fThreads.count(); i++) {
fThreads[i]->join();
}
SkASSERT(fWork.isEmpty()); // Can't hurt to double check.
fThreads.deleteAll();
}
void add(void (*fn)(void*), void* arg, int32_t* pending) {
Work work = { fn, arg, pending };
sk_atomic_inc(pending); // No barrier needed.
{
AutoLock lock(&fReady);
fWork.push(work);
fReady.signal();
}
}
void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) {
sk_atomic_add(pending, N); // No barrier needed.
{
AutoLock lock(&fReady);
Work* batch = fWork.append(N);
for (int i = 0; i < N; i++) {
Work work = { fn, (char*)arg + i*stride, pending };
batch[i] = work;
}
fReady.broadcast();
}
}
static void Loop(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
Work work;
while (true) {
{
AutoLock lock(&pool->fReady);
while (pool->fWork.isEmpty()) {
if (pool->fDraining) {
return;
}
pool->fReady.wait();
}
pool->fWork.pop(&work);
}
work.fn(work.arg);
sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load() in Wait().
}
}
SkTDArray<Work> fWork;
SkTDArray<SkThread*> fThreads;
SkCondVar fReady;
bool fDraining;
static ThreadPool* gGlobal;
friend struct SkTaskGroup::Enabler;
};
ThreadPool* ThreadPool::gGlobal = NULL;
} // namespace
SkTaskGroup::Enabler::Enabler(int threads) {
SkASSERT(ThreadPool::gGlobal == NULL);
if (threads != 0 && SkCondVar::Supported()) {
ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
}
}
SkTaskGroup::Enabler::~Enabler() {
SkDELETE(ThreadPool::gGlobal);
}
SkTaskGroup::SkTaskGroup() : fPending(0) {}
void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); }
void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); }
void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); }
void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) {
ThreadPool::Batch(fn, args, N, stride, &fPending);
}