#include "SkTaskGroup.h"

#include "SkCondVar.h"
#include "SkRunnable.h"
#include "SkTDArray.h"
#include "SkThread.h"
#include "SkThreadUtils.h"

#if defined(SK_BUILD_FOR_WIN32)
    static inline int num_cores() {
        SYSTEM_INFO sysinfo;
        GetSystemInfo(&sysinfo);
        return sysinfo.dwNumberOfProcessors;
    }
#else
    #include <unistd.h>
    static inline int num_cores() {
        return (int) sysconf(_SC_NPROCESSORS_ONLN);
    }
#endif

namespace {

class ThreadPool : SkNoncopyable {
public:
    static void Add(SkRunnable* task, int32_t* pending) {
        if (!gGlobal) {  // If we have no threads, run synchronously.
            return task->run();
        }
        gGlobal->add(&CallRunnable, task, pending);
    }

    static void Add(void (*fn)(void*), void* arg, int32_t* pending) {
        if (!gGlobal) {
            return fn(arg);
        }
        gGlobal->add(fn, arg, pending);
    }

    static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) {
        if (!gGlobal) {
            for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
            return;
        }
        gGlobal->batch(fn, args, N, stride, pending);
    }

    static void Wait(int32_t* pending) {
        if (!gGlobal) {  // If we have no threads, the work must already be done.
            SkASSERT(*pending == 0);
            return;
        }
        while (sk_acquire_load(pending) > 0) {  // Pairs with sk_atomic_dec here or in Loop.
            // Lend a hand until our SkTaskGroup of interest is done.
            Work work;
            {
                AutoLock lock(&gGlobal->fReady);
                if (gGlobal->fWork.isEmpty()) {
                    // Someone has picked up all the work (including ours).  How nice of them!
                    // (They may still be working on it, so we can't assert *pending == 0 here.)
                    continue;
                }
                gGlobal->fWork.pop(&work);
            }
            // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
            // We threads gotta stick together.  We're always making forward progress.
            work.fn(work.arg);
            sk_atomic_dec(work.pending);  // Release pairs with the sk_acquire_load() just above.
        }
    }

private:
    struct AutoLock {
        AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
        ~AutoLock() { fC->unlock(); }
    private:
        SkCondVar* fC;
    };

    static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }

    struct Work {
        void (*fn)(void*);  // A function to call,
        void* arg;          // its argument,
        int32_t* pending;   // then sk_atomic_dec(pending) afterwards.
    };

    explicit ThreadPool(int threads) : fDraining(false) {
        if (threads == -1) {
            threads = num_cores();
        }
        for (int i = 0; i < threads; i++) {
            fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
            fThreads.top()->start();
        }
    }

    ~ThreadPool() {
        SkASSERT(fWork.isEmpty());  // All SkTaskGroups should be destroyed by now.
        {
            AutoLock lock(&fReady);
            fDraining = true;
            fReady.broadcast();
        }
        for (int i = 0; i < fThreads.count(); i++) {
            fThreads[i]->join();
        }
        SkASSERT(fWork.isEmpty());  // Can't hurt to double check.
        fThreads.deleteAll();
    }

    void add(void (*fn)(void*), void* arg, int32_t* pending) {
        Work work = { fn, arg, pending };
        sk_atomic_inc(pending);  // No barrier needed.
        {
            AutoLock lock(&fReady);
            fWork.push(work);
            fReady.signal();
        }
    }

    void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) {
        sk_atomic_add(pending, N);  // No barrier needed.
        {
            AutoLock lock(&fReady);
            Work* batch = fWork.append(N);
            for (int i = 0; i < N; i++) {
                Work work = { fn, (char*)arg + i*stride, pending };
                batch[i] = work;
            }
            fReady.broadcast();
        }
    }

    static void Loop(void* arg) {
        ThreadPool* pool = (ThreadPool*)arg;
        Work work;
        while (true) {
            {
                AutoLock lock(&pool->fReady);
                while (pool->fWork.isEmpty()) {
                    if (pool->fDraining) {
                        return;
                    }
                    pool->fReady.wait();
                }
                pool->fWork.pop(&work);
            }
            work.fn(work.arg);
            sk_atomic_dec(work.pending);  // Release pairs with sk_acquire_load() in Wait().
        }
    }

    SkTDArray<Work>      fWork;
    SkTDArray<SkThread*> fThreads;
    SkCondVar            fReady;
    bool                 fDraining;

    static ThreadPool* gGlobal;
    friend struct SkTaskGroup::Enabler;
};
ThreadPool* ThreadPool::gGlobal = NULL;

}  // namespace

SkTaskGroup::Enabler::Enabler(int threads) {
    SkASSERT(ThreadPool::gGlobal == NULL);
    if (threads != 0 && SkCondVar::Supported()) {
        ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
    }
}

SkTaskGroup::Enabler::~Enabler() {
    SkDELETE(ThreadPool::gGlobal);
}

SkTaskGroup::SkTaskGroup() : fPending(0) {}

void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending); }
void SkTaskGroup::add(SkRunnable* task)             { ThreadPool::Add(task, &fPending); }
void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); }
void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) {
    ThreadPool::Batch(fn, args, N, stride, &fPending);
}