C++程序  |  544行  |  14.57 KB

/*****************************************************************************/
/* "NetPIPE" -- Network Protocol Independent Performance Evaluator.          */
/* Copyright 1997, 1998 Iowa State University Research Foundation, Inc.      */
/*                                                                           */
/* This program is free software; you can redistribute it and/or modify      */
/* it under the terms of the GNU General Public License as published by      */
/* the Free Software Foundation.  You should have received a copy of the     */
/* GNU General Public License along with this program; if not, write to the  */
/* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.   */
/*                                                                           */
/* Files needed for use:                                                     */
/*     * netpipe.c          ---- Driver source                               */
/*     * netpipe.h          ---- General include file                        */
/*     * TCP.c              ---- TCP calls source                            */
/*     * TCP.h              ---- Include file for TCP calls and data structs */
/*     * MPI.c              ---- MPI calls source                            */
/*     * MPI.h              ---- Include file for MPI calls and data structs */
/*     * PVM.c              ---- PVM calls source                            */
/*     * PVM.h              ---- Include file for PVM calls and data structs */
/* 2002/03/18 --- Modified to specify server interfaces - Robbie Williamson  */
/*		  (robbiew@us.ibm.com)					     */
/*****************************************************************************/
#include "netpipe.h"

extern char *optarg;

/* Return the current time in seconds, using a double precision number.      */
double When()
{
	struct timeval tp;
	gettimeofday(&tp, NULL);
	return ((double)tp.tv_sec + (double)tp.tv_usec * 1e-6);
}

int PrintUsage()
{
	printf("\n NETPIPE USAGE \n\n");
	printf("A: specify buffers alignment e.g.: <-A 1024>\n");
	printf("a: asynchronous receive (a.k.a. preposted receive)\n");
#if defined(TCP)
	printf("b: specify send and receive buffer sizes e.g. <-b 32768>\n");
	printf("h: specify hostname <-h host>\n");
	printf("H: specify server hostname <-H HOST> e.g. Multiple NICs\n");
#endif
	printf("i: specify increment step size e.g. <-i 64>\n");
	printf("l: lower bound start value e.g. <-i 1>\n");
	printf("O: specify buffer offset e.g. <-O 127>\n");
	printf("o: specify output filename <-o fn>\n");
	printf("P: print on screen\n");
#if defined(TCP)
	printf("p: specify port e.g. <-p 5150>\n");
#endif
	printf("r: receiver\n");
	printf("s: stream option\n");
	printf("t: transmitter\n");
	printf("u: upper bound stop value e.g. <-u 1048576>\n");
	printf("\n");
	exit(-12);
}

int main(int argc, char *argv[])
{
	FILE *out;		/* Output data file                          */
	char s[255];		/* Generic string                            */
	char *memtmp;
	char *memtmp1;

	int c,			/* option index                              */
	 i, j, n, nq,		/* Loop indices                              */
	 asyncReceive = 0,	/* Pre-post a receive buffer?                */
	    bufoffset = 0,	/* Align buffer to this                      */
	    bufalign = 16 * 1024,	/* Boundary to align buffer to              */
	    errFlag,		/* Error occurred in inner testing loop      */
	    nrepeat,		/* Number of time to do the transmission     */
	    len,		/* Number of bytes to be transmitted         */
	    inc = 0,		/* Increment value                           */
	    trans = -1,		/* Transmitter flag. 1 if transmitting.      */
	    server = 0,		/* Server flag. 1 if specifying server.      */
	    detailflag = 0,	/* Set to examine the signature curve detail */
	    bufszflag = 0,	/* Set to change the TCP socket buffer size  */
	    pert,		/* Perturbation value                        */
	    start = 1,		/* Starting value for signature curve        */
	    end = MAXINT,	/* Ending value for signature curve          */
	    streamopt = 0,	/* Streaming mode flag                       */
	    printopt = 0;	/* Debug print statements flag               */

	ArgStruct args;		/* Argumentsfor all the calls                */

	double t, t0, t1, t2,	/* Time variables                            */
	 tlast,			/* Time for the last transmission            */
	 latency;		/* Network message latency                   */

	Data bwdata[NSAMP];	/* Bandwidth curve data                      */

	short port = DEFPORT;	/* Port number for connection                */
#ifdef HAVE_GETRUSAGE
	struct rusage prev_rusage, curr_rusage;	/* Resource usage                */
	double user_time, sys_time;	/* User & system time used                   */
	double best_user_time, best_sys_time;	/* Total user & system time used     */
	double ut1, ut2, st1, st2;	/* User & system time ctrs for variance  */
	double ut_var, st_var;	/* Variance in user & system time            */
#endif

#ifdef MPI
	MPI_Init(&argc, &argv);
#endif

	strcpy(s, "NetPIPE.out");
#ifndef MPI
	if (argc < 2)
		PrintUsage();
#endif
	/* Parse the arguments. See Usage for description */
	while ((c = getopt(argc, argv, "PstrhH:p:o:A:O:l:u:i:b:a")) != -1) {
		switch (c) {
		case 'o':
			strcpy(s, optarg);
			break;

		case 't':
			trans = 1;
			break;

		case 'r':
			trans = 0;
			break;

		case 's':
			streamopt = 1;
			break;

		case 'l':	/*detailflag = 1; */
			start = atoi(optarg);
			if (start < 1) {
				fprintf(stderr, "Need a starting value >= 1\n");
				exit(743);
			}
			break;

		case 'u':	/*detailflag = 1; */
			end = atoi(optarg);
			break;

		case 'i':
			detailflag = 1;
			inc = atoi(optarg);
			break;

		case 'b':
			bufszflag = 1;
#ifdef TCP
			args.prot.rcvbufsz = atoi(optarg);
			args.prot.sndbufsz = args.prot.rcvbufsz;
#endif
			break;

		case 'P':
			printopt = 1;
			break;

		case 'A':
			bufalign = atoi(optarg);
			break;

		case 'O':
			bufoffset = atoi(optarg);
			break;

		case 'p':
			port = atoi(optarg);
			break;

		case 'h':
			if (trans == 1) {
				args.host = (char *)malloc(strlen(optarg) + 1);
				strcpy(args.host, optarg);
				printf("host is %s\n", args.host);
			} else {
				fprintf(stderr,
					"Error: -t must be specified before -h\n");
				exit(-11);
			}
			break;

		case 'H':
			if (trans == 0) {
				args.server_host =
				    (char *)malloc(strlen(optarg) + 1);
				strcpy(args.server_host, optarg);
				printf("server is %s\n", args.server_host);
				server = 1;
			} else {
				fprintf(stderr,
					"Error: -r must be specified before -H\n");
				exit(-11);
			}
			break;

		case 'a':
			asyncReceive = 1;
			break;

		default:
			PrintUsage();
			exit(-12);
		}
	}
	if (start > end) {
		fprintf(stderr, "Start MUST be LESS than end\n");
		exit(420132);
	}
#if defined(TCP) || defined(PVM)
	/*
	   It should be explicitly specified whether this is the transmitter
	   or the receiver.
	 */
	if (trans < 0) {
		fprintf(stderr, "Error: either -t or -r must be specified\n");
		exit(-11);
	}
#endif

	args.nbuff = TRIALS;
	args.tr = trans;
	args.sr = server;
	args.port = port;

#if defined(TCP)
	if (!bufszflag) {
		args.prot.sndbufsz = 0;
		args.prot.rcvbufsz = 0;
	} else
		fprintf(stderr, "Send and Recv Buffers are %d bytes\n",
			args.prot.sndbufsz);
#endif

	Setup(&args);
	Establish(&args);

	if (args.tr) {
		if ((out = fopen(s, "w")) == NULL) {
			fprintf(stderr, "Can't open %s for output\n", s);
			exit(1);
		}
	} else
		out = stdout;

	args.bufflen = 1;
	args.buff = (char *)malloc(args.bufflen);
	args.buff1 = (char *)malloc(args.bufflen);
	if (asyncReceive)
		PrepareToReceive(&args);
	Sync(&args);
	t0 = When();
	t0 = When();
	t0 = When();
#ifdef HAVE_GETRUSAGE
	getrusage(RUSAGE_SELF, &prev_rusage);
#endif
	t0 = When();
	for (i = 0; i < LATENCYREPS; i++) {
		if (args.tr) {
			SendData(&args);
			RecvData(&args);
			if (asyncReceive && (i < LATENCYREPS - 1)) {
				PrepareToReceive(&args);
			}
		} else {
			RecvData(&args);
			if (asyncReceive && (i < LATENCYREPS - 1)) {
				PrepareToReceive(&args);
			}
			SendData(&args);
		}
	}
	latency = (When() - t0) / (2 * LATENCYREPS);
#ifdef HAVE_GETRUSAGE
	getrusage(RUSAGE_SELF, &curr_rusage);
#endif
	free(args.buff);
	free(args.buff1);

	if (args.tr) {
		SendTime(&args, &latency);
	} else {
		RecvTime(&args, &latency);
	}
	if (args.tr && printopt) {
		fprintf(stderr, "Latency: %.7f\n", latency);
		fprintf(stderr, "Now starting main loop\n");
	}
	tlast = latency;
	if (inc == 0) {
		/* Set a starting value for the message size increment. */
		inc = (start > 1) ? start / 2 : 1;
	}

	/* Main loop of benchmark */
	for (nq = n = 0, len = start, errFlag = 0;
	     n < NSAMP - 3 && tlast < STOPTM && len <= end && !errFlag;
	     len = len + inc, nq++) {
		if (nq > 2 && !detailflag) {
			/*
			   This has the effect of exponentially increasing the block
			   size.  If detailflag is false, then the block size is
			   linearly increased (the increment is not adjusted).
			 */
			inc = ((nq % 2)) ? inc + inc : inc;
		}

		/* This is a perturbation loop to test nearby values */
		for (pert = (!detailflag && inc > PERT + 1) ? -PERT : 0;
		     pert <= PERT;
		     n++, pert += (!detailflag
				   && inc > PERT + 1) ? PERT : PERT + 1) {

			/* Calculate how many times to repeat the experiment. */
			if (args.tr) {
				nrepeat = MAX((RUNTM / ((double)args.bufflen /
							(args.bufflen - inc +
							 1.0) * tlast)),
					      TRIALS);
				SendRepeat(&args, nrepeat);
			} else {
				RecvRepeat(&args, &nrepeat);
			}

			/* Allocate the buffer */
			args.bufflen = len + pert;
			if ((args.buff =
			     (char *)malloc(args.bufflen + bufalign)) ==
			    (char *)NULL) {
				fprintf(stderr, "Couldn't allocate memory\n");
				errFlag = -1;
				break;
			}
			if ((args.buff1 =
			     (char *)malloc(args.bufflen + bufalign)) ==
			    (char *)NULL) {
				fprintf(stderr, "Couldn't allocate memory\n");
				errFlag = -1;
				break;
			}
			/*
			   Possibly align the data buffer: make memtmp and memtmp1
			   point to the original blocks (so they can be freed later),
			   then adjust args.buff and args.buff1 if the user requested it.
			 */
			memtmp = args.buff;
			memtmp1 = args.buff1;
			if (bufalign != 0)
				args.buff += (bufalign -
					      ((intptr_t) args.buff %
					       bufalign) +
					      bufoffset) % bufalign;

			if (bufalign != 0)
				args.buff1 += (bufalign -
					       ((intptr_t) args.buff1 %
						bufalign) +
					       bufoffset) % bufalign;

			if (args.tr && printopt)
				fprintf(stderr, "%3d: %9d bytes %4d times --> ",
					n, args.bufflen, nrepeat);

			/* Finally, we get to transmit or receive and time */
			if (args.tr) {
				/*
				   This is the transmitter: send the block TRIALS times, and
				   if we are not streaming, expect the receiver to return each
				   block.
				 */
				bwdata[n].t = LONGTIME;
				t2 = t1 = 0;
#ifdef HAVE_GETRUSAGE
				ut1 = ut2 = st1 = st2 = 0.0;
				best_user_time = best_sys_time = LONGTIME;
#endif
				for (i = 0; i < TRIALS; i++) {
					Sync(&args);
#ifdef HAVE_GETRUSAGE
					getrusage(RUSAGE_SELF, &prev_rusage);
#endif
					t0 = When();
					for (j = 0; j < nrepeat; j++) {
						if (asyncReceive && !streamopt) {
							PrepareToReceive(&args);
						}
						SendData(&args);
						if (!streamopt) {
							RecvData(&args);
						}
					}
					t = (When() -
					     t0) / ((1 + !streamopt) * nrepeat);
#ifdef HAVE_GETRUSAGE
					getrusage(RUSAGE_SELF, &curr_rusage);
					user_time =
					    ((curr_rusage.ru_utime.tv_sec -
					      prev_rusage.ru_utime.tv_sec) +
					     (double)
					     (curr_rusage.ru_utime.tv_usec -
					      prev_rusage.ru_utime.tv_usec) *
					     1.0E-6) / ((1 +
							 !streamopt) * nrepeat);
					sys_time =
					    ((curr_rusage.ru_stime.tv_sec -
					      prev_rusage.ru_stime.tv_sec) +
					     (double)
					     (curr_rusage.ru_stime.tv_usec -
					      prev_rusage.ru_stime.tv_usec) *
					     1.0E-6) / ((1 +
							 !streamopt) * nrepeat);
					ut2 += user_time * user_time;
					st2 += sys_time * sys_time;
					ut1 += user_time;
					st1 += sys_time;
					if ((user_time + sys_time) <
					    (best_user_time + best_sys_time)) {
						best_user_time = user_time;
						best_sys_time = sys_time;
					}
#endif

					if (!streamopt) {
						t2 += t * t;
						t1 += t;
						bwdata[n].t =
						    MIN(bwdata[n].t, t);
					}
				}
				if (!streamopt)
					SendTime(&args, &bwdata[n].t);
				else
					RecvTime(&args, &bwdata[n].t);

				if (!streamopt)
					bwdata[n].variance =
					    t2 / TRIALS -
					    t1 / TRIALS * t1 / TRIALS;

#ifdef HAVE_GETRUSAGE
				ut_var =
				    ut2 / TRIALS -
				    (ut1 / TRIALS) * (ut1 / TRIALS);
				st_var =
				    st2 / TRIALS -
				    (st1 / TRIALS) * (st1 / TRIALS);
#endif

			} else {
				/*
				   This is the receiver: receive the block TRIALS times, and
				   if we are not streaming, send the block back to the
				   sender.
				 */
				bwdata[n].t = LONGTIME;
				t2 = t1 = 0;
				for (i = 0; i < TRIALS; i++) {
					if (asyncReceive) {
						PrepareToReceive(&args);
					}
					Sync(&args);
					t0 = When();
					for (j = 0; j < nrepeat; j++) {
						RecvData(&args);
						if (asyncReceive
						    && (j < nrepeat - 1)) {
							PrepareToReceive(&args);
						}
						if (!streamopt)
							SendData(&args);
					}
					t = (When() -
					     t0) / ((1 + !streamopt) * nrepeat);

					if (streamopt) {
						t2 += t * t;
						t1 += t;
						bwdata[n].t =
						    MIN(bwdata[n].t, t);
					}
				}
				if (streamopt)
					SendTime(&args, &bwdata[n].t);
				else
					RecvTime(&args, &bwdata[n].t);

				if (streamopt)
					bwdata[n].variance =
					    t2 / TRIALS -
					    t1 / TRIALS * t1 / TRIALS;

			}
			tlast = bwdata[n].t;
			bwdata[n].bits = args.bufflen * CHARSIZE;
			bwdata[n].bps =
			    bwdata[n].bits / (bwdata[n].t * 1024 * 1024);
			bwdata[n].repeat = nrepeat;

			if (args.tr) {
				fprintf(out, "%.7f %.7f %d %d %.7f",
					bwdata[n].t, bwdata[n].bps,
					bwdata[n].bits, bwdata[n].bits / 8,
					bwdata[n].variance);
#ifdef HAVE_GETRUSAGE
				fprintf(out, " %.7f %.7f %.7f %.7f",
					ut1 / (double)TRIALS,
					st1 / (double)TRIALS, ut_var, st_var);
#endif
				fprintf(out, "\n");
			}
			fflush(out);

			free(memtmp);
			free(memtmp1);

			if (args.tr && printopt) {
				fprintf(stderr, " %6.3f Mbps in %.7f sec",
					bwdata[n].bps, tlast);
#ifdef HAVE_GETRUSAGE
				fprintf(stderr,
					", avg utime=%.7f avg stime=%.7f, ",
					ut1 / (double)TRIALS,
					st1 / (double)TRIALS);
				fprintf(stderr, "min utime=%.7f stime=%.7f, ",
					best_user_time, best_sys_time);
				fprintf(stderr, "utime var=%.7f stime var=%.7f",
					ut_var, st_var);
#endif
				fprintf(stderr, "\n");
			}
		}		/* End of perturbation loop */

	}			/* End of main loop  */

	if (args.tr)
		fclose(out);

	CleanUp(&args);
	return (0);
}