/*
This file is part of Valgrind, a dynamic binary instrumentation
framework.
Copyright (C) 2008-2008 Google Inc
opensource@google.com
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
02111-1307, USA.
The GNU General Public License is contained in the file COPYING.
*/
// Author: Konstantin Serebryany <opensource@google.com>
//
// Here we define few simple classes that wrap pthread primitives.
//
// We need this to create unit tests for helgrind (or similar tool)
// that will work with different threading frameworks.
//
// If one needs to test helgrind's support for another threading library,
// he/she can create a copy of this file and replace pthread_ calls
// with appropriate calls to his/her library.
//
// Note, that some of the methods defined here are annotated with
// ANNOTATE_* macros defined in dynamic_annotations.h.
//
// DISCLAIMER: the classes defined in this header file
// are NOT intended for general use -- only for unit tests.
//
#ifndef THREAD_WRAPPERS_PTHREAD_H
#define THREAD_WRAPPERS_PTHREAD_H
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <queue>
#include <stdio.h>
#include <limits.h> // INT_MAX
#ifdef VGO_darwin
#include <libkern/OSAtomic.h>
#define NO_BARRIER
#define NO_TLS
#endif
#include <string>
using namespace std;
#include <sys/time.h>
#include <time.h>
#include "../../drd/drd.h"
#define ANNOTATE_NO_OP(arg) do { } while(0)
#define ANNOTATE_EXPECT_RACE(addr, descr) \
ANNOTATE_BENIGN_RACE_SIZED(addr, 4, "expected race")
static inline bool RunningOnValgrind() { return RUNNING_ON_VALGRIND; }
#include <assert.h>
#ifdef NDEBUG
# error "Pleeease, do not define NDEBUG"
#endif
#define CHECK assert
/// Set this to true if malloc() uses mutex on your platform as this may
/// introduce a happens-before arc for a pure happens-before race detector.
const bool kMallocUsesMutex = false;
/// Current time in milliseconds.
static inline int64_t GetCurrentTimeMillis() {
struct timeval now;
gettimeofday(&now, NULL);
return now.tv_sec * 1000 + now.tv_usec / 1000;
}
/// Copy tv to ts adding offset in milliseconds.
static inline void timeval2timespec(timeval *const tv,
timespec *ts,
int64_t offset_milli) {
const int64_t ten_9 = 1000000000LL;
const int64_t ten_6 = 1000000LL;
const int64_t ten_3 = 1000LL;
int64_t now_nsec = (int64_t)tv->tv_sec * ten_9;
now_nsec += (int64_t)tv->tv_usec * ten_3;
int64_t then_nsec = now_nsec + offset_milli * ten_6;
ts->tv_sec = then_nsec / ten_9;
ts->tv_nsec = then_nsec % ten_9;
}
class CondVar;
#ifndef NO_SPINLOCK
/// helgrind does not (yet) support spin locks, so we annotate them.
#ifndef VGO_darwin
class SpinLock {
public:
SpinLock() {
CHECK(0 == pthread_spin_init(&mu_, 0));
ANNOTATE_RWLOCK_CREATE((void*)&mu_);
}
~SpinLock() {
ANNOTATE_RWLOCK_DESTROY((void*)&mu_);
CHECK(0 == pthread_spin_destroy(&mu_));
}
void Lock() {
CHECK(0 == pthread_spin_lock(&mu_));
ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1);
}
void Unlock() {
ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1);
CHECK(0 == pthread_spin_unlock(&mu_));
}
private:
pthread_spinlock_t mu_;
};
#else
class SpinLock {
public:
// Mac OS X version.
SpinLock() : mu_(OS_SPINLOCK_INIT) {
ANNOTATE_RWLOCK_CREATE((void*)&mu_);
}
~SpinLock() {
ANNOTATE_RWLOCK_DESTROY((void*)&mu_);
}
void Lock() {
OSSpinLockLock(&mu_);
ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1);
}
void Unlock() {
ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1);
OSSpinLockUnlock(&mu_);
}
private:
OSSpinLock mu_;
};
#endif // VGO_darwin
#endif // NO_SPINLOCK
/// Just a boolean condition. Used by Mutex::LockWhen and similar.
class Condition {
public:
typedef bool (*func_t)(void*);
template <typename T>
Condition(bool (*func)(T*), T* arg)
: func_(reinterpret_cast<func_t>(func)), arg_(arg) {}
Condition(bool (*func)())
: func_(reinterpret_cast<func_t>(func)), arg_(NULL) {}
bool Eval() { return func_(arg_); }
private:
func_t func_;
void *arg_;
};
/// Wrapper for pthread_mutex_t.
///
/// pthread_mutex_t is *not* a reader-writer lock,
/// so the methods like ReaderLock() aren't really reader locks.
/// We can not use pthread_rwlock_t because it
/// does not work with pthread_cond_t.
///
/// TODO: We still need to test reader locks with this class.
/// Implement a mode where pthread_rwlock_t will be used
/// instead of pthread_mutex_t (only when not used with CondVar or LockWhen).
///
class Mutex {
friend class CondVar;
public:
Mutex() {
CHECK(0 == pthread_mutex_init(&mu_, NULL));
CHECK(0 == pthread_cond_init(&cv_, NULL));
signal_at_unlock_ = true; // Always signal at Unlock to make
// Mutex more friendly to hybrid detectors.
}
~Mutex() {
CHECK(0 == pthread_cond_destroy(&cv_));
CHECK(0 == pthread_mutex_destroy(&mu_));
}
void Lock() { CHECK(0 == pthread_mutex_lock(&mu_));}
bool TryLock() { return (0 == pthread_mutex_trylock(&mu_));}
void Unlock() {
if (signal_at_unlock_) {
CHECK(0 == pthread_cond_signal(&cv_));
}
CHECK(0 == pthread_mutex_unlock(&mu_));
}
void ReaderLock() { Lock(); }
bool ReaderTryLock() { return TryLock();}
void ReaderUnlock() { Unlock(); }
void LockWhen(Condition cond) { Lock(); WaitLoop(cond); }
void ReaderLockWhen(Condition cond) { Lock(); WaitLoop(cond); }
void Await(Condition cond) { WaitLoop(cond); }
bool ReaderLockWhenWithTimeout(Condition cond, int millis)
{ Lock(); return WaitLoopWithTimeout(cond, millis); }
bool LockWhenWithTimeout(Condition cond, int millis)
{ Lock(); return WaitLoopWithTimeout(cond, millis); }
bool AwaitWithTimeout(Condition cond, int millis)
{ return WaitLoopWithTimeout(cond, millis); }
private:
void WaitLoop(Condition cond) {
signal_at_unlock_ = true;
while(cond.Eval() == false) {
pthread_cond_wait(&cv_, &mu_);
}
ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_);
}
bool WaitLoopWithTimeout(Condition cond, int millis) {
struct timeval now;
struct timespec timeout;
int retcode = 0;
gettimeofday(&now, NULL);
timeval2timespec(&now, &timeout, millis);
signal_at_unlock_ = true;
while (cond.Eval() == false && retcode == 0) {
retcode = pthread_cond_timedwait(&cv_, &mu_, &timeout);
}
if(retcode == 0) {
ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_);
}
return cond.Eval();
}
// A hack. cv_ should be the first data member so that
// ANNOTATE_CONDVAR_WAIT(&MU, &MU) and ANNOTATE_CONDVAR_SIGNAL(&MU) works.
// (See also racecheck_unittest.cc)
pthread_cond_t cv_;
pthread_mutex_t mu_;
bool signal_at_unlock_; // Set to true if Wait was called.
};
class MutexLock { // Scoped Mutex Locker/Unlocker
public:
MutexLock(Mutex *mu)
: mu_(mu) {
mu_->Lock();
}
~MutexLock() {
mu_->Unlock();
}
private:
Mutex *mu_;
};
/// Wrapper for pthread_cond_t.
class CondVar {
public:
CondVar() { CHECK(0 == pthread_cond_init(&cv_, NULL)); }
~CondVar() { CHECK(0 == pthread_cond_destroy(&cv_)); }
void Wait(Mutex *mu) { CHECK(0 == pthread_cond_wait(&cv_, &mu->mu_)); }
bool WaitWithTimeout(Mutex *mu, int millis) {
struct timeval now;
struct timespec timeout;
gettimeofday(&now, NULL);
timeval2timespec(&now, &timeout, millis);
return 0 != pthread_cond_timedwait(&cv_, &mu->mu_, &timeout);
}
void Signal() { CHECK(0 == pthread_cond_signal(&cv_)); }
void SignalAll() { CHECK(0 == pthread_cond_broadcast(&cv_)); }
private:
pthread_cond_t cv_;
};
// pthreads do not allow to use condvar with rwlock so we can't make
// ReaderLock method of Mutex to be the real rw-lock.
// So, we need a special lock class to test reader locks.
#define NEEDS_SEPERATE_RW_LOCK
class RWLock {
public:
RWLock() { CHECK(0 == pthread_rwlock_init(&mu_, NULL)); }
~RWLock() { CHECK(0 == pthread_rwlock_destroy(&mu_)); }
void Lock() { CHECK(0 == pthread_rwlock_wrlock(&mu_)); }
void ReaderLock() { CHECK(0 == pthread_rwlock_rdlock(&mu_)); }
void Unlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); }
void ReaderUnlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); }
private:
pthread_cond_t dummy; // Damn, this requires some redesign...
pthread_rwlock_t mu_;
};
class ReaderLockScoped { // Scoped RWLock Locker/Unlocker
public:
ReaderLockScoped(RWLock *mu)
: mu_(mu) {
mu_->ReaderLock();
}
~ReaderLockScoped() {
mu_->ReaderUnlock();
}
private:
RWLock *mu_;
};
class WriterLockScoped { // Scoped RWLock Locker/Unlocker
public:
WriterLockScoped(RWLock *mu)
: mu_(mu) {
mu_->Lock();
}
~WriterLockScoped() {
mu_->Unlock();
}
private:
RWLock *mu_;
};
/// Wrapper for pthread_create()/pthread_join().
class MyThread {
public:
typedef void *(*worker_t)(void*);
MyThread(worker_t worker, void *arg = NULL, const char *name = NULL)
:w_(worker), arg_(arg), name_(name) {}
MyThread(void (*worker)(void), void *arg = NULL, const char *name = NULL)
:w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {}
MyThread(void (*worker)(void *), void *arg = NULL, const char *name = NULL)
:w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {}
~MyThread(){ w_ = NULL; arg_ = NULL;}
void Start() { CHECK(0 == pthread_create(&t_, NULL, (worker_t)ThreadBody, this));}
void Join() { CHECK(0 == pthread_join(t_, NULL));}
pthread_t tid() const { return t_; }
private:
static void ThreadBody(MyThread *my_thread) {
if (my_thread->name_) {
ANNOTATE_THREAD_NAME(my_thread->name_);
}
my_thread->w_(my_thread->arg_);
}
pthread_t t_;
worker_t w_;
void *arg_;
const char *name_;
};
/// Just a message queue.
class ProducerConsumerQueue {
public:
ProducerConsumerQueue(int unused) {
//ANNOTATE_PCQ_CREATE(this);
}
~ProducerConsumerQueue() {
CHECK(q_.empty());
//ANNOTATE_PCQ_DESTROY(this);
}
// Put.
void Put(void *item) {
mu_.Lock();
q_.push(item);
ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get()
//ANNOTATE_PCQ_PUT(this);
mu_.Unlock();
}
// Get.
// Blocks if the queue is empty.
void *Get() {
mu_.LockWhen(Condition(IsQueueNotEmpty, &q_));
void * item = NULL;
bool ok = TryGetInternal(&item);
CHECK(ok);
mu_.Unlock();
return item;
}
// If queue is not empty,
// remove an element from queue, put it into *res and return true.
// Otherwise return false.
bool TryGet(void **res) {
mu_.Lock();
bool ok = TryGetInternal(res);
mu_.Unlock();
return ok;
}
private:
Mutex mu_;
std::queue<void*> q_; // protected by mu_
// Requires mu_
bool TryGetInternal(void ** item_ptr) {
if (q_.empty())
return false;
*item_ptr = q_.front();
q_.pop();
//ANNOTATE_PCQ_GET(this);
return true;
}
static bool IsQueueNotEmpty(std::queue<void*> * queue) {
return !queue->empty();
}
};
/// Function pointer with zero, one or two parameters.
struct Closure {
typedef void (*F0)();
typedef void (*F1)(void *arg1);
typedef void (*F2)(void *arg1, void *arg2);
int n_params;
void *f;
void *param1;
void *param2;
void Execute() {
if (n_params == 0) {
(F0(f))();
} else if (n_params == 1) {
(F1(f))(param1);
} else {
CHECK(n_params == 2);
(F2(f))(param1, param2);
}
delete this;
}
};
Closure *NewCallback(void (*f)()) {
Closure *res = new Closure;
res->n_params = 0;
res->f = (void*)(f);
res->param1 = NULL;
res->param2 = NULL;
return res;
}
template <class P1>
Closure *NewCallback(void (*f)(P1), P1 p1) {
CHECK(sizeof(P1) <= sizeof(void*));
Closure *res = new Closure;
res->n_params = 1;
res->f = (void*)(f);
res->param1 = (void*)p1;
res->param2 = NULL;
return res;
}
template <class T, class P1, class P2>
Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) {
CHECK(sizeof(P1) <= sizeof(void*));
Closure *res = new Closure;
res->n_params = 2;
res->f = (void*)(f);
res->param1 = (void*)p1;
res->param2 = (void*)p2;
return res;
}
/*! A thread pool that uses ProducerConsumerQueue.
Usage:
{
ThreadPool pool(n_workers);
pool.StartWorkers();
pool.Add(NewCallback(func_with_no_args));
pool.Add(NewCallback(func_with_one_arg, arg));
pool.Add(NewCallback(func_with_two_args, arg1, arg2));
... // more calls to pool.Add()
// the ~ThreadPool() is called: we wait workers to finish
// and then join all threads in the pool.
}
*/
class ThreadPool {
public:
//! Create n_threads threads, but do not start.
explicit ThreadPool(int n_threads)
: queue_(INT_MAX) {
for (int i = 0; i < n_threads; i++) {
MyThread *thread = new MyThread(&ThreadPool::Worker, this);
workers_.push_back(thread);
}
}
//! Start all threads.
void StartWorkers() {
for (size_t i = 0; i < workers_.size(); i++) {
workers_[i]->Start();
}
}
//! Add a closure.
void Add(Closure *closure) {
queue_.Put(closure);
}
int num_threads() { return workers_.size();}
//! Wait workers to finish, then join all threads.
~ThreadPool() {
for (size_t i = 0; i < workers_.size(); i++) {
Add(NULL);
}
for (size_t i = 0; i < workers_.size(); i++) {
workers_[i]->Join();
delete workers_[i];
}
}
private:
std::vector<MyThread*> workers_;
ProducerConsumerQueue queue_;
static void *Worker(void *p) {
ThreadPool *pool = reinterpret_cast<ThreadPool*>(p);
while (true) {
Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get());
if(closure == NULL) {
return NULL;
}
closure->Execute();
}
}
};
#ifndef NO_BARRIER
/// Wrapper for pthread_barrier_t.
class Barrier{
public:
explicit Barrier(int n_threads) {CHECK(0 == pthread_barrier_init(&b_, 0, n_threads));}
~Barrier() {CHECK(0 == pthread_barrier_destroy(&b_));}
void Block() {
// helgrind 3.3.0 does not have an interceptor for barrier.
// but our current local version does.
// ANNOTATE_CONDVAR_SIGNAL(this);
pthread_barrier_wait(&b_);
// ANNOTATE_CONDVAR_WAIT(this, this);
}
private:
pthread_barrier_t b_;
};
#endif // NO_BARRIER
class BlockingCounter {
public:
explicit BlockingCounter(int initial_count) :
count_(initial_count) {}
bool DecrementCount() {
MutexLock lock(&mu_);
count_--;
return count_ == 0;
}
void Wait() {
mu_.LockWhen(Condition(&IsZero, &count_));
mu_.Unlock();
}
private:
static bool IsZero(int *arg) { return *arg == 0; }
Mutex mu_;
int count_;
};
int AtomicIncrement(volatile int *value, int increment);
#ifndef VGO_darwin
inline int AtomicIncrement(volatile int *value, int increment) {
return __sync_add_and_fetch(value, increment);
}
#else
// Mac OS X version.
inline int AtomicIncrement(volatile int *value, int increment) {
return OSAtomicAdd32(increment, value);
}
// TODO(timurrrr) this is a hack
#define memalign(A,B) malloc(B)
// TODO(timurrrr) this is a hack
int posix_memalign(void **out, size_t al, size_t size) {
*out = memalign(al, size);
return (*out == 0);
}
#endif // VGO_darwin
#endif // THREAD_WRAPPERS_PTHREAD_H
// vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker