/** * Copyright (C) 2010 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "logging.h" #include "status.h" #include "worker.h" #include <time.h> //#define WORKER_DEBUG #ifdef WORKER_DEBUG #define DBG(...) LOGD(__VA_ARGS__) #else #define DBG(...) #endif void * WorkerThread::Work(void *param) { WorkerThread *t = (WorkerThread *)param; android_atomic_acquire_store(STATE_RUNNING, &t->state_); void * v = t->Worker(t->workerParam_); android_atomic_acquire_store(STATE_STOPPED, &t->state_); return v; } bool WorkerThread::isRunning() { DBG("WorkerThread::isRunning E"); bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING; DBG("WorkerThread::isRunning X ret_value=%d", ret_value); return ret_value; } WorkerThread::WorkerThread() { DBG("WorkerThread::WorkerThread E"); state_ = STATE_INITIALIZED; pthread_mutex_init(&mutex_, NULL); pthread_cond_init(&cond_, NULL); DBG("WorkerThread::WorkerThread X"); } WorkerThread::~WorkerThread() { DBG("WorkerThread::~WorkerThread E"); Stop(); pthread_mutex_destroy(&mutex_); DBG("WorkerThread::~WorkerThread X"); } // Return true if changed from STATE_RUNNING to STATE_STOPPING bool WorkerThread::BeginStopping() { DBG("WorkerThread::BeginStopping E"); bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0); DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value); return ret_value; } // Wait until state is not STATE_STOPPING void WorkerThread::WaitUntilStopped() { DBG("WorkerThread::WaitUntilStopped E"); pthread_cond_signal(&cond_); while(android_atomic_release_load(&state_) == STATE_STOPPING) { usleep(200000); } DBG("WorkerThread::WaitUntilStopped X"); } void WorkerThread::Stop() { DBG("WorkerThread::Stop E"); if (BeginStopping()) { WaitUntilStopped(); } DBG("WorkerThread::Stop X"); } int WorkerThread::Run(void *workerParam) { DBG("WorkerThread::Run E workerParam=%p", workerParam); int status; int ret; workerParam_ = workerParam; ret = pthread_attr_init(&attr_); if (ret != 0) { LOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret)); return STATUS_ERR; } ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED); if (ret != 0) { LOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s", strerror(ret)); return STATUS_ERR; } ret = pthread_create(&tid_, &attr_, (void * (*)(void *))&WorkerThread::Work, this); if (ret != 0) { LOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret)); return STATUS_ERR; } // Wait until worker is running while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) { usleep(200000); } DBG("WorkerThread::Run X workerParam=%p", workerParam); return STATUS_OK; } class WorkerQueueThread : public WorkerThread { private: friend class WorkerQueue; public: WorkerQueueThread() { } virtual ~WorkerQueueThread() { Stop(); } void * Worker(void *param) { DBG("WorkerQueueThread::Worker E"); WorkerQueue *wq = (WorkerQueue *)param; // Do the work until we're told to stop while (isRunning()) { pthread_mutex_lock(&mutex_); while (isRunning() && wq->q_.size() == 0) { if (wq->delayed_q_.size() == 0) { // Both queue's are empty so wait pthread_cond_wait(&cond_, &mutex_); } else { // delayed_q_ is not empty, move any // timed out records to q_. int64_t now = android::elapsedRealtime(); while((wq->delayed_q_.size() != 0) && ((wq->delayed_q_.top()->time - now) <= 0)) { struct WorkerQueue::Record *r = wq->delayed_q_.top(); DBG("WorkerQueueThread::Worker move p=%p time=%lldms", r->p, r->time); wq->delayed_q_.pop(); wq->q_.push_back(r); } if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) { // We need to do a timed wait struct timeval tv; struct timespec ts; struct WorkerQueue::Record *r = wq->delayed_q_.top(); int64_t delay_ms = r->time - now; DBG("WorkerQueueThread::Worker wait" " p=%p time=%lldms delay_ms=%lldms", r->p, r->time, delay_ms); gettimeofday(&tv, NULL); ts.tv_sec = tv.tv_sec + (delay_ms / 1000); ts.tv_nsec = (tv.tv_usec + ((delay_ms % 1000) * 1000)) * 1000; pthread_cond_timedwait(&cond_, &mutex_, &ts); } } } if (isRunning()) { struct WorkerQueue::Record *r = wq->q_.front(); wq->q_.pop_front(); void *p = r->p; wq->release_record(r); pthread_mutex_unlock(&mutex_); wq->Process(r->p); } else { pthread_mutex_unlock(&mutex_); } } DBG("WorkerQueueThread::Worker X"); return NULL; } }; WorkerQueue::WorkerQueue() { DBG("WorkerQueue::WorkerQueue E"); wqt_ = new WorkerQueueThread(); DBG("WorkerQueue::WorkerQueue X"); } WorkerQueue::~WorkerQueue() { DBG("WorkerQueue::~WorkerQueue E"); Stop(); Record *r; pthread_mutex_lock(&wqt_->mutex_); while(free_list_.size() != 0) { r = free_list_.front(); free_list_.pop_front(); DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r); delete r; } while(delayed_q_.size() != 0) { r = delayed_q_.top(); delayed_q_.pop(); DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r); delete r; } pthread_mutex_unlock(&wqt_->mutex_); delete wqt_; DBG("WorkerQueue::~WorkerQueue X"); } int WorkerQueue::Run() { return wqt_->Run(this); } void WorkerQueue::Stop() { wqt_->Stop(); } /** * Obtain a record from free_list if it is not empty, fill in the record with provided * information: *p and delay_in_ms */ struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) { struct Record *r; if (free_list_.size() == 0) { r = new Record(); DBG("WorkerQueue::obtain_record new r=%p", r); } else { r = free_list_.front(); DBG("WorkerQueue::obtain_record reuse r=%p", r); free_list_.pop_front(); } r->p = p; if (delay_in_ms != 0) { r->time = android::elapsedRealtime() + delay_in_ms; } else { r->time = 0; } return r; } /** * release a record and insert into the front of the free_list */ void WorkerQueue::release_record(struct Record *r) { DBG("WorkerQueue::release_record r=%p", r); free_list_.push_front(r); } /** * Add a record to processing queue q_ */ void WorkerQueue::Add(void *p) { DBG("WorkerQueue::Add E:"); pthread_mutex_lock(&wqt_->mutex_); struct Record *r = obtain_record(p, 0); q_.push_back(r); if (q_.size() == 1) { pthread_cond_signal(&wqt_->cond_); } pthread_mutex_unlock(&wqt_->mutex_); DBG("WorkerQueue::Add X:"); } void WorkerQueue::AddDelayed(void *p, int delay_in_ms) { DBG("WorkerQueue::AddDelayed E:"); if (delay_in_ms <= 0) { Add(p); } else { pthread_mutex_lock(&wqt_->mutex_); struct Record *r = obtain_record(p, delay_in_ms); delayed_q_.push(r); #ifdef WORKER_DEBUG int64_t now = android::elapsedRealtime(); DBG("WorkerQueue::AddDelayed" " p=%p delay_in_ms=%d now=%lldms top->p=%p" " top->time=%lldms diff=%lldms", p, delay_in_ms, now, delayed_q_.top()->p, delayed_q_.top()->time, delayed_q_.top()->time - now); #endif if ((q_.size() == 0) && (delayed_q_.top() == r)) { // q_ is empty and the new record is at delayed_q_.top // so we signal the waiting thread so it can readjust // the wait time. DBG("WorkerQueue::AddDelayed signal"); pthread_cond_signal(&wqt_->cond_); } pthread_mutex_unlock(&wqt_->mutex_); } DBG("WorkerQueue::AddDelayed X:"); } class TestWorkerQueue : public WorkerQueue { virtual void Process(void *p) { LOGD("TestWorkerQueue::Process: EX p=%p", p); } }; class TesterThread : public WorkerThread { public: void * Worker(void *param) { LOGD("TesterThread::Worker E param=%p", param); WorkerQueue *wq = (WorkerQueue *)param; // Test AddDelayed wq->AddDelayed((void *)1000, 1000); wq->Add((void *)0); wq->Add((void *)0); wq->Add((void *)0); wq->Add((void *)0); wq->AddDelayed((void *)100, 100); wq->AddDelayed((void *)2000, 2000); for (int i = 1; isRunning(); i++) { LOGD("TesterThread: looping %d", i); wq->Add((void *)i); wq->Add((void *)i); wq->Add((void *)i); wq->Add((void *)i); sleep(1); } LOGD("TesterThread::Worker X param=%p", param); return NULL; } }; void testWorker() { LOGD("testWorker E: ********"); // Test we can create a thread and delete it TesterThread *tester = new TesterThread(); delete tester; TestWorkerQueue *wq = new TestWorkerQueue(); if (wq->Run() == STATUS_OK) { LOGD("testWorker WorkerQueue %p running", wq); // Test we can run a thread, stop it then delete it tester = new TesterThread(); tester->Run(wq); LOGD("testWorker tester %p running", tester); sleep(10); LOGD("testWorker tester %p stopping", tester); tester->Stop(); LOGD("testWorker tester %p stopped", tester); wq->Stop(); LOGD("testWorker wq %p stopped", wq); } LOGD("testWorker X: ********\n"); }