C++程序  |  368行  |  10.75 KB

/**
 * Copyright (C) 2010 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "logging.h"
#include "status.h"
#include "worker.h"

#include <time.h>

//#define WORKER_DEBUG
#ifdef  WORKER_DEBUG

#define DBG(...) LOGD(__VA_ARGS__)

#else

#define DBG(...)

#endif

void * WorkerThread::Work(void *param) {
    WorkerThread *t = (WorkerThread *)param;
    android_atomic_acquire_store(STATE_RUNNING, &t->state_);
    void * v = t->Worker(t->workerParam_);
    android_atomic_acquire_store(STATE_STOPPED, &t->state_);
    return v;
}

bool WorkerThread::isRunning() {
    DBG("WorkerThread::isRunning E");
    bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING;
    DBG("WorkerThread::isRunning X ret_value=%d", ret_value);
    return ret_value;
}

WorkerThread::WorkerThread() {
    DBG("WorkerThread::WorkerThread E");
    state_ = STATE_INITIALIZED;
    pthread_mutex_init(&mutex_, NULL);
    pthread_cond_init(&cond_, NULL);
    DBG("WorkerThread::WorkerThread X");
}

WorkerThread::~WorkerThread() {
    DBG("WorkerThread::~WorkerThread E");
    Stop();
    pthread_mutex_destroy(&mutex_);
    DBG("WorkerThread::~WorkerThread X");
}

// Return true if changed from STATE_RUNNING to STATE_STOPPING
bool WorkerThread::BeginStopping() {
    DBG("WorkerThread::BeginStopping E");
    bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0);
    DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value);
    return ret_value;
}

// Wait until state is not STATE_STOPPING
void WorkerThread::WaitUntilStopped() {
    DBG("WorkerThread::WaitUntilStopped E");
    pthread_cond_signal(&cond_);
    while(android_atomic_release_load(&state_) == STATE_STOPPING) {
        usleep(200000);
    }
    DBG("WorkerThread::WaitUntilStopped X");
}

void WorkerThread::Stop() {
    DBG("WorkerThread::Stop E");
    if (BeginStopping()) {
        WaitUntilStopped();
    }
    DBG("WorkerThread::Stop X");
}

int WorkerThread::Run(void *workerParam) {
    DBG("WorkerThread::Run E workerParam=%p", workerParam);
    int status;
    int ret;

    workerParam_ = workerParam;

    ret = pthread_attr_init(&attr_);
    if (ret != 0) {
        LOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret));
        return STATUS_ERR;
    }
    ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED);
    if (ret != 0) {
        LOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s",
                strerror(ret));
        return STATUS_ERR;
    }
    ret = pthread_create(&tid_, &attr_,
                (void * (*)(void *))&WorkerThread::Work, this);
    if (ret != 0) {
        LOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret));
        return STATUS_ERR;
    }

    // Wait until worker is running
    while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) {
        usleep(200000);
    }

    DBG("WorkerThread::Run X workerParam=%p", workerParam);
    return STATUS_OK;
}


class WorkerQueueThread : public WorkerThread {
  private:
    friend class WorkerQueue;

  public:
    WorkerQueueThread() {
    }

    virtual ~WorkerQueueThread() {
        Stop();
    }

    void * Worker(void *param) {
        DBG("WorkerQueueThread::Worker E");
        WorkerQueue *wq = (WorkerQueue *)param;

        // Do the work until we're told to stop
        while (isRunning()) {
            pthread_mutex_lock(&mutex_);
            while (isRunning() && wq->q_.size() == 0) {
                if (wq->delayed_q_.size() == 0) {
                    // Both queue's are empty so wait
                    pthread_cond_wait(&cond_, &mutex_);
                } else {
                    // delayed_q_ is not empty, move any
                    // timed out records to q_.
                    int64_t now = android::elapsedRealtime();
                    while((wq->delayed_q_.size() != 0) &&
                            ((wq->delayed_q_.top()->time - now) <= 0)) {
                        struct WorkerQueue::Record *r = wq->delayed_q_.top();
                        DBG("WorkerQueueThread::Worker move p=%p time=%lldms",
                                r->p, r->time);
                        wq->delayed_q_.pop();
                        wq->q_.push_back(r);
                    }

                    if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) {
                        // We need to do a timed wait
                        struct timeval tv;
                        struct timespec ts;
                        struct WorkerQueue::Record *r = wq->delayed_q_.top();
                        int64_t delay_ms = r->time - now;
                        DBG("WorkerQueueThread::Worker wait"
                            " p=%p time=%lldms delay_ms=%lldms",
                                r->p, r->time, delay_ms);
                        gettimeofday(&tv, NULL);
                        ts.tv_sec = tv.tv_sec + (delay_ms / 1000);
                        ts.tv_nsec = (tv.tv_usec +
                                        ((delay_ms % 1000) * 1000)) * 1000;
                        pthread_cond_timedwait(&cond_, &mutex_, &ts);
                    }
                }
            }
            if (isRunning()) {
                struct WorkerQueue::Record *r = wq->q_.front();
                wq->q_.pop_front();
                void *p = r->p;
                wq->release_record(r);
                pthread_mutex_unlock(&mutex_);
                wq->Process(r->p);
            } else {
                pthread_mutex_unlock(&mutex_);
            }
        }
        DBG("WorkerQueueThread::Worker X");
        return NULL;
    }
};

WorkerQueue::WorkerQueue() {
    DBG("WorkerQueue::WorkerQueue E");
    wqt_ = new WorkerQueueThread();
    DBG("WorkerQueue::WorkerQueue X");
}

WorkerQueue::~WorkerQueue() {
    DBG("WorkerQueue::~WorkerQueue E");
    Stop();

    Record *r;
    pthread_mutex_lock(&wqt_->mutex_);
    while(free_list_.size() != 0) {
        r = free_list_.front();
        free_list_.pop_front();
        DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r);
        delete r;
    }
    while(delayed_q_.size() != 0) {
        r = delayed_q_.top();
        delayed_q_.pop();
        DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r);
        delete r;
    }
    pthread_mutex_unlock(&wqt_->mutex_);

    delete wqt_;
    DBG("WorkerQueue::~WorkerQueue X");
}

int WorkerQueue::Run() {
    return wqt_->Run(this);
}

void WorkerQueue::Stop() {
    wqt_->Stop();
}

/**
 * Obtain a record from free_list if it is not empty, fill in the record with provided
 * information: *p and delay_in_ms
 */
struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) {
    struct Record *r;
    if (free_list_.size() == 0) {
        r = new Record();
        DBG("WorkerQueue::obtain_record new r=%p", r);
    } else {
        r = free_list_.front();
        DBG("WorkerQueue::obtain_record reuse r=%p", r);
        free_list_.pop_front();
    }
    r->p = p;
    if (delay_in_ms != 0) {
        r->time = android::elapsedRealtime() + delay_in_ms;
    } else {
        r->time = 0;
    }
    return r;
}

/**
 * release a record and insert into the front of the free_list
 */
void WorkerQueue::release_record(struct Record *r) {
    DBG("WorkerQueue::release_record r=%p", r);
    free_list_.push_front(r);
}

/**
 * Add a record to processing queue q_
 */
void WorkerQueue::Add(void *p) {
    DBG("WorkerQueue::Add E:");
    pthread_mutex_lock(&wqt_->mutex_);
    struct Record *r = obtain_record(p, 0);
    q_.push_back(r);
    if (q_.size() == 1) {
        pthread_cond_signal(&wqt_->cond_);
    }
    pthread_mutex_unlock(&wqt_->mutex_);
    DBG("WorkerQueue::Add X:");
}

void WorkerQueue::AddDelayed(void *p, int delay_in_ms) {
    DBG("WorkerQueue::AddDelayed E:");
    if (delay_in_ms <= 0) {
        Add(p);
    } else {
        pthread_mutex_lock(&wqt_->mutex_);
        struct Record *r = obtain_record(p, delay_in_ms);
        delayed_q_.push(r);
#ifdef WORKER_DEBUG
        int64_t now = android::elapsedRealtime();
        DBG("WorkerQueue::AddDelayed"
            " p=%p delay_in_ms=%d now=%lldms top->p=%p"
            " top->time=%lldms diff=%lldms",
                p, delay_in_ms, now, delayed_q_.top()->p,
                delayed_q_.top()->time, delayed_q_.top()->time - now);
#endif
        if ((q_.size() == 0) && (delayed_q_.top() == r)) {
            // q_ is empty and the new record is at delayed_q_.top
            // so we signal the waiting thread so it can readjust
            // the wait time.
            DBG("WorkerQueue::AddDelayed signal");
            pthread_cond_signal(&wqt_->cond_);
        }
        pthread_mutex_unlock(&wqt_->mutex_);
    }
    DBG("WorkerQueue::AddDelayed X:");
}


class TestWorkerQueue : public WorkerQueue {
    virtual void Process(void *p) {
        LOGD("TestWorkerQueue::Process: EX p=%p", p);
    }
};

class TesterThread : public WorkerThread {
  public:
    void * Worker(void *param)
    {
        LOGD("TesterThread::Worker E param=%p", param);
        WorkerQueue *wq = (WorkerQueue *)param;

        // Test AddDelayed
        wq->AddDelayed((void *)1000, 1000);
        wq->Add((void *)0);
        wq->Add((void *)0);
        wq->Add((void *)0);
        wq->Add((void *)0);
        wq->AddDelayed((void *)100, 100);
        wq->AddDelayed((void *)2000, 2000);

        for (int i = 1; isRunning(); i++) {
            LOGD("TesterThread: looping %d", i);
            wq->Add((void *)i);
            wq->Add((void *)i);
            wq->Add((void *)i);
            wq->Add((void *)i);
            sleep(1);
        }

        LOGD("TesterThread::Worker X param=%p", param);

        return NULL;
    }
};

void testWorker() {
    LOGD("testWorker E: ********");

    // Test we can create a thread and delete it
    TesterThread *tester = new TesterThread();
    delete tester;

    TestWorkerQueue *wq = new TestWorkerQueue();
    if (wq->Run() == STATUS_OK) {
        LOGD("testWorker WorkerQueue %p running", wq);

        // Test we can run a thread, stop it then delete it
        tester = new TesterThread();
        tester->Run(wq);
        LOGD("testWorker tester %p running", tester);
        sleep(10);
        LOGD("testWorker tester %p stopping", tester);
        tester->Stop();
        LOGD("testWorker tester %p stopped", tester);
        wq->Stop();
        LOGD("testWorker wq %p stopped", wq);
    }
    LOGD("testWorker X: ********\n");
}