C++程序  |  228行  |  4.63 KB

/* Test program that performs producer-consumer style communication through
 * a circular buffer. This test program is a slightly modified version of the
 * test program made available by Miguel Ojeda
 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
 */


#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>
#include <semaphore.h>
#include <fcntl.h>
#include "../../config.h"


/** gcc versions 4.1.0 and later have support for atomic builtins. */

#ifndef HAVE_BUILTIN_ATOMIC
#error Sorry, but this test program can only be compiled by a compiler that\
has built-in functions for atomic memory access.
#endif


#define BUFFER_MAX (2)
#define DATA_SEMAPHORE_NAME "cb-data-semaphore"
#define FREE_SEMAPHORE_NAME "cb-free-semaphore"


typedef int data_t;

typedef struct {
  /* Counting semaphore representing the number of data items in the buffer. */
  sem_t* data;
  /* Counting semaphore representing the number of free elements. */
  sem_t* free;
  /* Position where a new elements should be written. */
  int in;
  /* Position from where an element can be removed. */
  int out;
  /* Mutex that protects 'in'. */
  pthread_mutex_t mutex_in;
  /* Mutex that protects 'out'. */
  pthread_mutex_t mutex_out;
  /* Data buffer. */
  data_t buffer[BUFFER_MAX];
} buffer_t;

static int quiet = 0;
static int use_locking = 1;

static __inline__
int fetch_and_add(int* p, int i)
{
  return __sync_fetch_and_add(p, i);
}

static sem_t* create_semaphore(const char* const name, const int value)
{
#ifdef VGO_darwin
  char name_and_pid[32];
  snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid());
  sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value);
  if (p == SEM_FAILED) {
    perror("sem_open");
    return NULL;
  }
  return p;
#else
  sem_t* p = malloc(sizeof(*p));
  if (p)
    sem_init(p, 0, value);
  return p;
#endif
}

static void destroy_semaphore(const char* const name, sem_t* p)
{
#ifdef VGO_darwin
  sem_close(p);
  sem_unlink(name);
#else
  sem_destroy(p);
  free(p);
#endif
}

static void buffer_init(buffer_t * b)
{
  b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
  b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);

  pthread_mutex_init(&b->mutex_in, NULL);
  pthread_mutex_init(&b->mutex_out, NULL);

  b->in = 0;
  b->out = 0;
}

static void buffer_recv(buffer_t* b, data_t* d)
{
  int out;
  sem_wait(b->data);
  if (use_locking)
    pthread_mutex_lock(&b->mutex_out);
  out = fetch_and_add(&b->out, 1);
  if (out >= BUFFER_MAX)
  {
    fetch_and_add(&b->out, -BUFFER_MAX);
    out -= BUFFER_MAX;
  }
  *d = b->buffer[out];
  if (use_locking)
    pthread_mutex_unlock(&b->mutex_out);
  if (! quiet)
  {
    printf("received %d from buffer[%d]\n", *d, out);
    fflush(stdout);
  }
  sem_post(b->free);
}

static void buffer_send(buffer_t* b, data_t* d)
{
  int in;
  sem_wait(b->free);
  if (use_locking)
    pthread_mutex_lock(&b->mutex_in);
  in = fetch_and_add(&b->in, 1);
  if (in >= BUFFER_MAX)
  {
    fetch_and_add(&b->in, -BUFFER_MAX);
    in -= BUFFER_MAX;
  }
  b->buffer[in] = *d;
  if (use_locking)
    pthread_mutex_unlock(&b->mutex_in);
  if (! quiet)
  {
    printf("sent %d to buffer[%d]\n", *d, in);
    fflush(stdout);
  }
  sem_post(b->data);
}

static void buffer_destroy(buffer_t* b)
{
  destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
  destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);

  pthread_mutex_destroy(&b->mutex_in);
  pthread_mutex_destroy(&b->mutex_out);
}

static buffer_t b;

static void producer(int* id)
{
  buffer_send(&b, id);
  pthread_exit(NULL);
}

#define MAXSLEEP (100 * 1000)

static void consumer(int* id)
{
  int d;
  usleep(rand() % MAXSLEEP);
  buffer_recv(&b, &d);
  if (! quiet)
  {
    printf("%i: %i\n", *id, d);
    fflush(stdout);
  }
  pthread_exit(NULL);
}

#define THREADS (10)

int main(int argc, char** argv)
{
  pthread_t producers[THREADS];
  pthread_t consumers[THREADS];
  int thread_arg[THREADS];
  int i;
  int optchar;

  while ((optchar = getopt(argc, argv, "nq")) != EOF)
  {
    switch (optchar)
    {
    case 'n':
      use_locking = 0;
      break;
    case 'q':
      quiet = 1;
      break;
    }
  }

  srand(time(NULL));

  buffer_init(&b);

  for (i = 0; i < THREADS; ++i)
  {
    thread_arg[i] = i;
    pthread_create(producers + i, NULL,
                   (void * (*)(void *)) producer, &thread_arg[i]);
  }

  for (i = 0; i < THREADS; ++i)
    pthread_create(consumers + i, NULL,
                   (void * (*)(void *)) consumer, &thread_arg[i]);

  for (i = 0; i < THREADS; ++i)
  {
    pthread_join(producers[i], NULL);
    pthread_join(consumers[i], NULL);
  }

  buffer_destroy(&b);

  return 0;
}