/* * Read a file and write the contents to stdout. If a given read takes * longer than 'max_us' time, then we schedule a new thread to handle * the next read. This avoids the coordinated omission problem, where * one request appears to take a long time, but in reality a lot of * requests would have been slow, but we don't notice since new submissions * are not being issued if just 1 is held up. * * One test case: * * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync) * * This will read randfile.gz and log the latencies of doing so, while * piping the output to gzip to decompress it. Any latencies over max_us * are logged when they happen, and latency buckets are displayed at the * end of the run * * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread * * Copyright (C) 2016 Jens Axboe * */ #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <sys/time.h> #include <sys/types.h> #include <sys/stat.h> #include <inttypes.h> #include <string.h> #include <pthread.h> #include <errno.h> #include <assert.h> #include "../flist.h" static int bs = 4096; static int max_us = 10000; static char *file; static int separate_writer = 1; #define PLAT_BITS 8 #define PLAT_VAL (1 << PLAT_BITS) #define PLAT_GROUP_NR 19 #define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL) #define PLAT_LIST_MAX 20 struct stats { unsigned int plat[PLAT_NR]; unsigned int nr_samples; unsigned int max; unsigned int min; unsigned int over; }; static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, }; struct thread_data { int exit; int done; pthread_mutex_t lock; pthread_cond_t cond; pthread_mutex_t done_lock; pthread_cond_t done_cond; pthread_t thread; }; struct writer_thread { struct flist_head list; struct flist_head done_list; struct stats s; struct thread_data thread; }; struct reader_thread { struct flist_head list; struct flist_head done_list; int started; int busy; int write_seq; struct stats s; struct thread_data thread; }; struct work_item { struct flist_head list; void *buf; size_t buf_size; off_t off; int fd; int seq; struct writer_thread *writer; struct reader_thread *reader; pthread_mutex_t lock; pthread_cond_t cond; pthread_t thread; }; static struct reader_thread reader_thread; static struct writer_thread writer_thread; uint64_t utime_since(const struct timeval *s, const struct timeval *e) { long sec, usec; uint64_t ret; sec = e->tv_sec - s->tv_sec; usec = e->tv_usec - s->tv_usec; if (sec > 0 && usec < 0) { sec--; usec += 1000000; } if (sec < 0 || (sec == 0 && usec < 0)) return 0; ret = sec * 1000000ULL + usec; return ret; } static struct work_item *find_seq(struct writer_thread *w, unsigned int seq) { struct work_item *work; struct flist_head *entry; if (flist_empty(&w->list)) return NULL; flist_for_each(entry, &w->list) { work = flist_entry(entry, struct work_item, list); if (work->seq == seq) return work; } return NULL; } static unsigned int plat_val_to_idx(unsigned int val) { unsigned int msb, error_bits, base, offset; /* Find MSB starting from bit 0 */ if (val == 0) msb = 0; else msb = sizeof(val)*8 - __builtin_clz(val) - 1; /* * MSB <= (PLAT_BITS-1), cannot be rounded off. Use * all bits of the sample as index */ if (msb <= PLAT_BITS) return val; /* Compute the number of error bits to discard*/ error_bits = msb - PLAT_BITS; /* Compute the number of buckets before the group */ base = (error_bits + 1) << PLAT_BITS; /* * Discard the error bits and apply the mask to find the * index for the buckets in the group */ offset = (PLAT_VAL - 1) & (val >> error_bits); /* Make sure the index does not exceed (array size - 1) */ return (base + offset) < (PLAT_NR - 1) ? (base + offset) : (PLAT_NR - 1); } /* * Convert the given index of the bucket array to the value * represented by the bucket */ static unsigned int plat_idx_to_val(unsigned int idx) { unsigned int error_bits, k, base; assert(idx < PLAT_NR); /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use * all bits of the sample as index */ if (idx < (PLAT_VAL << 1)) return idx; /* Find the group and compute the minimum value of that group */ error_bits = (idx >> PLAT_BITS) - 1; base = 1 << (error_bits + PLAT_BITS); /* Find its bucket number of the group */ k = idx % PLAT_VAL; /* Return the mean of the range of the bucket */ return base + ((k + 0.5) * (1 << error_bits)); } static void add_lat(struct stats *s, unsigned int us, const char *name) { int lat_index = 0; if (us > s->max) s->max = us; if (us < s->min) s->min = us; if (us > max_us) { fprintf(stderr, "%s latency=%u usec\n", name, us); s->over++; } lat_index = plat_val_to_idx(us); __sync_fetch_and_add(&s->plat[lat_index], 1); __sync_fetch_and_add(&s->nr_samples, 1); } static int write_work(struct work_item *work) { struct timeval s, e; ssize_t ret; gettimeofday(&s, NULL); ret = write(STDOUT_FILENO, work->buf, work->buf_size); gettimeofday(&e, NULL); assert(ret == work->buf_size); add_lat(&work->writer->s, utime_since(&s, &e), "write"); return work->seq + 1; } static void thread_exiting(struct thread_data *thread) { __sync_fetch_and_add(&thread->done, 1); pthread_cond_signal(&thread->done_cond); } static void *writer_fn(void *data) { struct writer_thread *wt = data; struct work_item *work; unsigned int seq = 1; work = NULL; while (!wt->thread.exit || !flist_empty(&wt->list)) { pthread_mutex_lock(&wt->thread.lock); if (work) { flist_add_tail(&work->list, &wt->done_list); work = NULL; } work = find_seq(wt, seq); if (work) flist_del_init(&work->list); else pthread_cond_wait(&wt->thread.cond, &wt->thread.lock); pthread_mutex_unlock(&wt->thread.lock); if (work) seq = write_work(work); } thread_exiting(&wt->thread); return NULL; } static void reader_work(struct work_item *work) { struct timeval s, e; ssize_t ret; size_t left; void *buf; off_t off; gettimeofday(&s, NULL); left = work->buf_size; buf = work->buf; off = work->off; while (left) { ret = pread(work->fd, buf, left, off); if (!ret) { fprintf(stderr, "zero read\n"); break; } else if (ret < 0) { fprintf(stderr, "errno=%d\n", errno); break; } left -= ret; off += ret; buf += ret; } gettimeofday(&e, NULL); add_lat(&work->reader->s, utime_since(&s, &e), "read"); pthread_cond_signal(&work->cond); if (separate_writer) { pthread_mutex_lock(&work->writer->thread.lock); flist_add_tail(&work->list, &work->writer->list); pthread_mutex_unlock(&work->writer->thread.lock); pthread_cond_signal(&work->writer->thread.cond); } else { struct reader_thread *rt = work->reader; struct work_item *next = NULL; struct flist_head *entry; /* * Write current work if it matches in sequence. */ if (work->seq == rt->write_seq) goto write_it; pthread_mutex_lock(&rt->thread.lock); flist_add_tail(&work->list, &rt->done_list); /* * See if the next work item is here, if so, write it */ work = NULL; flist_for_each(entry, &rt->done_list) { next = flist_entry(entry, struct work_item, list); if (next->seq == rt->write_seq) { work = next; flist_del(&work->list); break; } } pthread_mutex_unlock(&rt->thread.lock); if (work) { write_it: write_work(work); __sync_fetch_and_add(&rt->write_seq, 1); } } } static void *reader_one_off(void *data) { reader_work(data); return NULL; } static void *reader_fn(void *data) { struct reader_thread *rt = data; struct work_item *work; while (!rt->thread.exit || !flist_empty(&rt->list)) { work = NULL; pthread_mutex_lock(&rt->thread.lock); if (!flist_empty(&rt->list)) { work = flist_first_entry(&rt->list, struct work_item, list); flist_del_init(&work->list); } else pthread_cond_wait(&rt->thread.cond, &rt->thread.lock); pthread_mutex_unlock(&rt->thread.lock); if (work) { __sync_fetch_and_add(&rt->busy, 1); reader_work(work); __sync_fetch_and_sub(&rt->busy, 1); } } thread_exiting(&rt->thread); return NULL; } static void queue_work(struct reader_thread *rt, struct work_item *work) { if (!rt->started) { pthread_mutex_lock(&rt->thread.lock); flist_add_tail(&work->list, &rt->list); pthread_mutex_unlock(&rt->thread.lock); rt->started = 1; pthread_create(&rt->thread.thread, NULL, reader_fn, rt); } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) { flist_add_tail(&work->list, &rt->list); pthread_mutex_unlock(&rt->thread.lock); pthread_cond_signal(&rt->thread.cond); } else { int ret = pthread_create(&work->thread, NULL, reader_one_off, work); if (ret) fprintf(stderr, "pthread_create=%d\n", ret); else pthread_detach(work->thread); } } static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr, unsigned int **output) { unsigned long sum = 0; unsigned int len, i, j = 0; unsigned int oval_len = 0; unsigned int *ovals = NULL; int is_last; len = 0; while (len < PLAT_LIST_MAX && plist[len] != 0.0) len++; if (!len) return 0; /* * Calculate bucket values, note down max and min values */ is_last = 0; for (i = 0; i < PLAT_NR && !is_last; i++) { sum += io_u_plat[i]; while (sum >= (plist[j] / 100.0 * nr)) { assert(plist[j] <= 100.0); if (j == oval_len) { oval_len += 100; ovals = realloc(ovals, oval_len * sizeof(unsigned int)); } ovals[j] = plat_idx_to_val(i); is_last = (j == len - 1); if (is_last) break; j++; } } *output = ovals; return len; } static void show_latencies(struct stats *s, const char *msg) { unsigned int *ovals = NULL; unsigned int len, i; len = calc_percentiles(s->plat, s->nr_samples, &ovals); if (len) { fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg); for (i = 0; i < len; i++) fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]); } if (ovals) free(ovals); fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max); } static void init_thread(struct thread_data *thread) { pthread_cond_init(&thread->cond, NULL); pthread_cond_init(&thread->done_cond, NULL); pthread_mutex_init(&thread->lock, NULL); pthread_mutex_init(&thread->done_lock, NULL); thread->exit = 0; } static void exit_thread(struct thread_data *thread, void fn(struct writer_thread *), struct writer_thread *wt) { __sync_fetch_and_add(&thread->exit, 1); pthread_cond_signal(&thread->cond); while (!thread->done) { pthread_mutex_lock(&thread->done_lock); if (fn) { struct timeval tv; struct timespec ts; gettimeofday(&tv, NULL); ts.tv_sec = tv.tv_sec + 1; ts.tv_nsec = tv.tv_usec * 1000ULL; pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts); fn(wt); } else pthread_cond_wait(&thread->done_cond, &thread->done_lock); pthread_mutex_unlock(&thread->done_lock); } } static int usage(char *argv[]) { fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]); return 1; } static int parse_options(int argc, char *argv[]) { int c; while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) { switch (c) { case 'f': file = strdup(optarg); break; case 'b': bs = atoi(optarg); break; case 't': max_us = atoi(optarg); break; case 'w': separate_writer = atoi(optarg); if (!separate_writer) fprintf(stderr, "inline writing is broken\n"); break; case '?': default: return usage(argv); } } if (!file) return usage(argv); return 0; } static void prune_done_entries(struct writer_thread *wt) { FLIST_HEAD(list); if (flist_empty(&wt->done_list)) return; if (pthread_mutex_trylock(&wt->thread.lock)) return; if (!flist_empty(&wt->done_list)) flist_splice_init(&wt->done_list, &list); pthread_mutex_unlock(&wt->thread.lock); while (!flist_empty(&list)) { struct work_item *work; work = flist_first_entry(&list, struct work_item, list); flist_del(&work->list); pthread_cond_destroy(&work->cond); pthread_mutex_destroy(&work->lock); free(work->buf); free(work); } } int main(int argc, char *argv[]) { struct timeval s, re, we; struct reader_thread *rt; struct writer_thread *wt; unsigned long rate; struct stat sb; size_t bytes; off_t off; int fd, seq; if (parse_options(argc, argv)) return 1; fd = open(file, O_RDONLY); if (fd < 0) { perror("open"); return 2; } if (fstat(fd, &sb) < 0) { perror("stat"); return 3; } wt = &writer_thread; init_thread(&wt->thread); INIT_FLIST_HEAD(&wt->list); INIT_FLIST_HEAD(&wt->done_list); wt->s.max = 0; wt->s.min = -1U; pthread_create(&wt->thread.thread, NULL, writer_fn, wt); rt = &reader_thread; init_thread(&rt->thread); INIT_FLIST_HEAD(&rt->list); INIT_FLIST_HEAD(&rt->done_list); rt->s.max = 0; rt->s.min = -1U; rt->write_seq = 1; off = 0; seq = 0; bytes = 0; gettimeofday(&s, NULL); while (sb.st_size) { struct work_item *work; size_t this_len; struct timespec ts; struct timeval tv; prune_done_entries(wt); this_len = sb.st_size; if (this_len > bs) this_len = bs; work = calloc(1, sizeof(*work)); work->buf = malloc(this_len); work->buf_size = this_len; work->off = off; work->fd = fd; work->seq = ++seq; work->writer = wt; work->reader = rt; pthread_cond_init(&work->cond, NULL); pthread_mutex_init(&work->lock, NULL); queue_work(rt, work); gettimeofday(&tv, NULL); ts.tv_sec = tv.tv_sec; ts.tv_nsec = tv.tv_usec * 1000ULL; ts.tv_nsec += max_us * 1000ULL; if (ts.tv_nsec >= 1000000000ULL) { ts.tv_nsec -= 1000000000ULL; ts.tv_sec++; } pthread_mutex_lock(&work->lock); pthread_cond_timedwait(&work->cond, &work->lock, &ts); pthread_mutex_unlock(&work->lock); off += this_len; sb.st_size -= this_len; bytes += this_len; } exit_thread(&rt->thread, NULL, NULL); gettimeofday(&re, NULL); exit_thread(&wt->thread, prune_done_entries, wt); gettimeofday(&we, NULL); show_latencies(&rt->s, "READERS"); show_latencies(&wt->s, "WRITERS"); bytes /= 1024; rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &re); fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate); rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &we); fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate); close(fd); return 0; }