/* * Copyright (c) 2014 Oracle and/or its affiliates. All Rights Reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it would be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * * Author: Alexey Kodanev <alexey.kodanev@oracle.com> * */ #include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #include <netdb.h> #include <netinet/in.h> #include <arpa/inet.h> #include <poll.h> #include <time.h> #include <string.h> #include <unistd.h> #include <errno.h> #include "test.h" #include "lapi/posix_clocks.h" #include "safe_macros.h" char *TCID = "tcp_fastopen"; static const int max_msg_len = 1500; /* TCP server requiers */ #ifndef TCP_FASTOPEN #define TCP_FASTOPEN 23 #endif #ifndef SO_BUSY_POLL #define SO_BUSY_POLL 46 #endif /* TCP client requiers */ #ifndef MSG_FASTOPEN #define MSG_FASTOPEN 0x20000000 /* Send data in TCP SYN */ #endif enum { TCP_SERVER = 0, TCP_CLIENT, }; static int tcp_mode; enum { TFO_ENABLED = 0, TFO_DISABLED, }; static int tfo_support; static int fastopen_api; static const char tfo_cfg[] = "/proc/sys/net/ipv4/tcp_fastopen"; static const char tcp_tw_reuse[] = "/proc/sys/net/ipv4/tcp_tw_reuse"; static int tw_reuse_changed; static int tfo_cfg_value; static int tfo_bit_num; static int tfo_cfg_changed; static int tfo_queue_size = 100; static int max_queue_len = 100; static const int client_byte = 0x43; static const int server_byte = 0x53; static const int start_byte = 0x24; static const int start_fin_byte = 0x25; static const int end_byte = 0x0a; static int client_msg_size = 32; static int server_msg_size = 128; static char *client_msg; static char *server_msg; /* * The number of requests from client after * which server has to close the connection. */ static int server_max_requests = 3; static int client_max_requests = 10; static int clients_num = 2; static char *tcp_port = "61000"; static char *server_addr = "localhost"; static int busy_poll = -1; /* server socket */ static int sfd; /* how long a client must wait for the server's reply, microsec */ static long wait_timeout = 10000000; /* in the end test will save time result in this file */ static char *rpath = "./tfo_result"; static int force_run; static int verbose; static char *narg, *Narg, *qarg, *rarg, *Rarg, *aarg, *Targ, *barg; static const option_t options[] = { /* server params */ {"R:", NULL, &Rarg}, {"q:", NULL, &qarg}, /* client params */ {"H:", NULL, &server_addr}, {"a:", NULL, &aarg}, {"n:", NULL, &narg}, {"N:", NULL, &Narg}, {"T:", NULL, &Targ}, {"r:", NULL, &rarg}, {"d:", NULL, &rpath}, /* common */ {"g:", NULL, &tcp_port}, {"b:", NULL, &barg}, {"F", &force_run, NULL}, {"l", &tcp_mode, NULL}, {"o", &fastopen_api, NULL}, {"O", &tfo_support, NULL}, {"v", &verbose, NULL}, {NULL, NULL, NULL} }; static void help(void) { printf("\n -F Force to run\n"); printf(" -v Verbose\n"); printf(" -o Use old TCP API, default is new TCP API\n"); printf(" -O TFO support is off, default is on\n"); printf(" -l Become TCP Client, default is TCP server\n"); printf(" -g x x - server port, default is %s\n", tcp_port); printf(" -b x x - low latency busy poll timeout\n"); printf("\n Client:\n"); printf(" -H x x - server name or ip address, default is '%s'\n", server_addr); printf(" -a x x - num of clients running in parallel\n"); printf(" -r x x - num of client requests\n"); printf(" -n x Client message size, max msg size is '%d'\n", max_msg_len); printf(" -N x Server message size, max msg size is '%d'\n", max_msg_len); printf(" -T x Reply timeout, default is '%ld' (microsec)\n", wait_timeout); printf(" -d x x is a path to the file where results are saved\n"); printf("\n Server:\n"); printf(" -R x x - num of requests, after which conn. closed\n"); printf(" -q x x - server's limit on the queue of TFO requests\n"); } /* common structure for TCP server and TCP client */ struct tcp_func { void (*init)(void); void (*run)(void); void (*cleanup)(void); }; static struct tcp_func tcp; #define MAX_THREADS 10000 static pthread_attr_t attr; static pthread_t *thread_ids; static struct addrinfo *remote_addrinfo; static struct addrinfo *local_addrinfo; static const struct linger clo = { 1, 3 }; static void do_cleanup(void) { free(client_msg); free(server_msg); tcp.cleanup(); if (tfo_cfg_changed) { SAFE_FILE_SCANF(NULL, tfo_cfg, "%d", &tfo_cfg_value); tfo_cfg_value &= ~tfo_bit_num; tfo_cfg_value |= !tfo_support << (tfo_bit_num - 1); tst_resm(TINFO, "unset '%s' back to '%d'", tfo_cfg, tfo_cfg_value); SAFE_FILE_PRINTF(NULL, tfo_cfg, "%d", tfo_cfg_value); } if (tw_reuse_changed) { SAFE_FILE_PRINTF(NULL, tcp_tw_reuse, "0"); tst_resm(TINFO, "unset '%s' back to '0'", tcp_tw_reuse); } } TST_DECLARE_ONCE_FN(cleanup, do_cleanup) static int sock_recv_poll(int fd, char *buf, int buf_size, int offset) { struct pollfd pfd; pfd.fd = fd; pfd.events = POLLIN; int len = -1; while (1) { errno = 0; int ret = poll(&pfd, 1, wait_timeout / 1000); if (ret == -1) { if (errno == EINTR) continue; break; } if (ret == 0) { errno = ETIME; break; } if (ret != 1 || !(pfd.revents & POLLIN)) break; errno = 0; len = recv(fd, buf + offset, buf_size - offset, MSG_DONTWAIT); if (len == -1 && errno == EINTR) continue; else break; } return len; } static int client_recv(int *fd, char *buf) { int len, offset = 0; while (1) { errno = 0; len = sock_recv_poll(*fd, buf, server_msg_size, offset); /* socket closed or msg is not valid */ if (len < 1 || (offset + len) > server_msg_size || (buf[0] != start_byte && buf[0] != start_fin_byte)) { if (!errno) errno = ENOMSG; break; } offset += len; if (buf[offset - 1] != end_byte) continue; if (verbose) { tst_resm_hexd(TINFO, buf, offset, "msg recv from sock %d:", *fd); } /* recv last msg, close socket */ if (buf[0] == start_fin_byte) break; return 0; } shutdown(*fd, SHUT_WR); SAFE_CLOSE(cleanup, *fd); *fd = -1; return (errno) ? -1 : 0; } static int client_connect_send(const char *msg, int size) { int cfd = socket(remote_addrinfo->ai_family, SOCK_STREAM, 0); const int flag = 1; if (cfd == -1) return cfd; setsockopt(cfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); if (busy_poll >= 0) { setsockopt(cfd, SOL_SOCKET, SO_BUSY_POLL, &busy_poll, sizeof(busy_poll)); } if (fastopen_api == TFO_ENABLED) { /* Replaces connect() + send()/write() */ if (sendto(cfd, msg, size, MSG_FASTOPEN | MSG_NOSIGNAL, remote_addrinfo->ai_addr, remote_addrinfo->ai_addrlen) != size) { SAFE_CLOSE(cleanup, cfd); return -1; } } else { /* old TCP API */ if (connect(cfd, remote_addrinfo->ai_addr, remote_addrinfo->ai_addrlen)) { SAFE_CLOSE(cleanup, cfd); return -1; } if (send(cfd, msg, size, MSG_NOSIGNAL) != client_msg_size) { SAFE_CLOSE(cleanup, cfd); return -1; } } return cfd; } void *client_fn(LTP_ATTRIBUTE_UNUSED void *arg) { char buf[server_msg_size]; int cfd, i; intptr_t err = 0; /* connect & send requests */ cfd = client_connect_send(client_msg, client_msg_size); if (cfd == -1) { err = errno; goto out; } if (client_recv(&cfd, buf)) { err = errno; goto out; } for (i = 1; i < client_max_requests; ++i) { /* check connection, it can be closed */ int ret = 0; if (cfd != -1) ret = recv(cfd, buf, 1, MSG_DONTWAIT); if (ret == 0) { /* try to reconnect and send */ if (cfd != -1) SAFE_CLOSE(cleanup, cfd); cfd = client_connect_send(client_msg, client_msg_size); if (cfd == -1) { err = errno; goto out; } if (client_recv(&cfd, buf)) { err = errno; break; } continue; } else if (ret > 0) { err = EMSGSIZE; break; } if (verbose) { tst_resm_hexd(TINFO, client_msg, client_msg_size, "try to send msg[%d]", i); } if (send(cfd, client_msg, client_msg_size, MSG_NOSIGNAL) != client_msg_size) { err = ECOMM; break; } if (client_recv(&cfd, buf)) { err = errno; break; } } if (cfd != -1) SAFE_CLOSE(cleanup, cfd); out: return (void *) err; } union net_size_field { char bytes[2]; uint16_t value; }; static void make_client_request(void) { client_msg[0] = start_byte; /* set size for reply */ union net_size_field net_size; net_size.value = htons(server_msg_size); client_msg[1] = net_size.bytes[0]; client_msg[2] = net_size.bytes[1]; client_msg[client_msg_size - 1] = end_byte; } static int parse_client_request(const char *msg) { union net_size_field net_size; net_size.bytes[0] = msg[1]; net_size.bytes[1] = msg[2]; int size = ntohs(net_size.value); if (size < 2 || size > max_msg_len) return -1; return size; } static struct timespec tv_client_start; static struct timespec tv_client_end; static void client_init(void) { if (clients_num >= MAX_THREADS) { tst_brkm(TBROK, cleanup, "Unexpected num of clients '%d'", clients_num); } thread_ids = SAFE_MALLOC(NULL, sizeof(pthread_t) * clients_num); client_msg = SAFE_MALLOC(NULL, client_msg_size); memset(client_msg, client_byte, client_msg_size); make_client_request(); struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_socktype = SOCK_STREAM; int err = getaddrinfo(server_addr, tcp_port, &hints, &remote_addrinfo); if (err) { tst_brkm(TBROK, cleanup, "getaddrinfo of '%s' failed, %s", server_addr, gai_strerror(err)); } tst_resm(TINFO, "TCP Fast Open over IPv%s", (remote_addrinfo->ai_family == AF_INET6) ? "6" : "4"); clock_gettime(CLOCK_MONOTONIC_RAW, &tv_client_start); int i; for (i = 0; i < clients_num; ++i) { if (pthread_create(&thread_ids[i], 0, client_fn, NULL) != 0) { tst_brkm(TBROK | TERRNO, cleanup, "pthread_create failed at %s:%d", __FILE__, __LINE__); } } } static void client_run(void) { void *res = NULL; long clnt_time = 0; int i; for (i = 0; i < clients_num; ++i) { pthread_join(thread_ids[i], &res); if (res) { tst_brkm(TBROK, cleanup, "client[%d] failed: %s", i, strerror((intptr_t)res)); } } clock_gettime(CLOCK_MONOTONIC_RAW, &tv_client_end); clnt_time = (tv_client_end.tv_sec - tv_client_start.tv_sec) * 1000 + (tv_client_end.tv_nsec - tv_client_start.tv_nsec) / 1000000; tst_resm(TINFO, "total time '%ld' ms", clnt_time); /* ask server to terminate */ client_msg[0] = start_fin_byte; int cfd = client_connect_send(client_msg, client_msg_size); if (cfd != -1) { shutdown(cfd, SHUT_WR); SAFE_CLOSE(NULL, cfd); } /* the script tcp_fastopen_run.sh will remove it */ SAFE_FILE_PRINTF(cleanup, rpath, "%ld", clnt_time); } static void client_cleanup(void) { free(thread_ids); if (remote_addrinfo) freeaddrinfo(remote_addrinfo); } static char *make_server_reply(int size) { char *send_msg = SAFE_MALLOC(NULL, size); memset(send_msg, server_byte, size - 1); send_msg[0] = start_byte; send_msg[size - 1] = end_byte; return send_msg; } void *server_fn(void *cfd) { int client_fd = (intptr_t) cfd; int num_requests = 0, offset = 0; /* Reply will be constructed from first client request */ char *send_msg = NULL; int send_msg_size = 0; char recv_msg[max_msg_len]; setsockopt(client_fd, SOL_SOCKET, SO_LINGER, &clo, sizeof(clo)); if (busy_poll >= 0) { setsockopt(client_fd, SOL_SOCKET, SO_BUSY_POLL, &busy_poll, sizeof(busy_poll)); } ssize_t recv_len; while (1) { recv_len = sock_recv_poll(client_fd, recv_msg, max_msg_len, offset); if (recv_len == 0) break; if (recv_len < 0 || (offset + recv_len) > max_msg_len || (recv_msg[0] != start_byte && recv_msg[0] != start_fin_byte)) { tst_resm(TFAIL, "recv failed, sock '%d'", client_fd); goto out; } offset += recv_len; if (recv_msg[offset - 1] != end_byte) { /* msg is not complete, continue recv */ continue; } /* client asks to terminate */ if (recv_msg[0] == start_fin_byte) goto out; if (verbose) { tst_resm_hexd(TINFO, recv_msg, offset, "msg recv from sock %d:", client_fd); } /* if we send reply for the first time, construct it here */ if (!send_msg) { send_msg_size = parse_client_request(recv_msg); if (send_msg_size < 0) { tst_resm(TFAIL, "wrong msg size '%d'", send_msg_size); goto out; } send_msg = make_server_reply(send_msg_size); } /* * It will tell client that server is going * to close this connection. */ if (++num_requests >= server_max_requests) send_msg[0] = start_fin_byte; if (send(client_fd, send_msg, send_msg_size, MSG_NOSIGNAL) == -1) { tst_resm(TFAIL | TERRNO, "send failed"); goto out; } offset = 0; if (num_requests >= server_max_requests) { /* max reqs, close socket */ shutdown(client_fd, SHUT_WR); break; } } free(send_msg); SAFE_CLOSE(cleanup, client_fd); return NULL; out: free(send_msg); SAFE_CLOSE(cleanup, client_fd); cleanup(); tst_exit(); } static void server_thread_add(intptr_t client_fd) { pthread_t id; if (pthread_create(&id, &attr, server_fn, (void *) client_fd)) { tst_brkm(TBROK | TERRNO, cleanup, "pthread_create failed at %s:%d", __FILE__, __LINE__); } } static void server_init(void) { struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_INET6; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; if (getaddrinfo(NULL, tcp_port, &hints, &local_addrinfo) != 0) tst_brkm(TBROK | TERRNO, cleanup, "getaddrinfo failed"); /* IPv6 socket is also able to access IPv4 protocol stack */ sfd = SAFE_SOCKET(cleanup, AF_INET6, SOCK_STREAM, 0); tst_resm(TINFO, "assigning a name to the server socket..."); if (!local_addrinfo) tst_brkm(TBROK, cleanup, "failed to get the address"); SAFE_BIND(cleanup, sfd, local_addrinfo->ai_addr, local_addrinfo->ai_addrlen); freeaddrinfo(local_addrinfo); const int flag = 1; setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); if (fastopen_api == TFO_ENABLED) { if (setsockopt(sfd, IPPROTO_TCP, TCP_FASTOPEN, &tfo_queue_size, sizeof(tfo_queue_size)) == -1) tst_brkm(TBROK, cleanup, "Can't set TFO sock. options"); } SAFE_LISTEN(cleanup, sfd, max_queue_len); tst_resm(TINFO, "Listen on the socket '%d', port '%s'", sfd, tcp_port); } static void server_cleanup(void) { SAFE_CLOSE(NULL, sfd); } static void server_run(void) { /* IPv4 source address will be mapped to IPv6 address */ struct sockaddr_in6 addr6; socklen_t addr_size = sizeof(addr6); pthread_attr_init(&attr); /* * detaching threads allow to reclaim thread's resources * once a thread finishes its work. */ if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) tst_brkm(TBROK | TERRNO, cleanup, "setdetachstate failed"); while (1) { int client_fd = accept(sfd, (struct sockaddr *)&addr6, &addr_size); if (client_fd == -1) tst_brkm(TBROK, cleanup, "Can't create client socket"); if (verbose) { char addr_buf[INET6_ADDRSTRLEN]; tst_resm(TINFO, "conn: port '%d', addr '%s'", addr6.sin6_port, inet_ntop(AF_INET6, &addr6.sin6_addr, addr_buf, INET6_ADDRSTRLEN)); } server_thread_add(client_fd); } } static void check_opt(const char *name, char *arg, int *val, int lim) { if (arg) { if (sscanf(arg, "%i", val) != 1) tst_brkm(TBROK, NULL, "-%s option arg is not a number", name); if (*val < lim) tst_brkm(TBROK, NULL, "-%s option arg is less than %d", name, lim); } } static void check_opt_l(const char *name, char *arg, long *val, long lim) { if (arg) { if (sscanf(arg, "%ld", val) != 1) tst_brkm(TBROK, NULL, "-%s option arg is not a number", name); if (*val < lim) tst_brkm(TBROK, NULL, "-%s option arg is less than %ld", name, lim); } } static void setup(int argc, char *argv[]) { tst_parse_opts(argc, argv, options, help); /* if client_num is not set, use num of processors */ clients_num = sysconf(_SC_NPROCESSORS_ONLN); check_opt("a", aarg, &clients_num, 1); check_opt("r", rarg, &client_max_requests, 1); check_opt("R", Rarg, &server_max_requests, 1); check_opt("n", narg, &client_msg_size, 1); check_opt("N", Narg, &server_msg_size, 1); check_opt("q", qarg, &tfo_queue_size, 1); check_opt_l("T", Targ, &wait_timeout, 0L); check_opt("b", barg, &busy_poll, 0); if (!force_run) tst_require_root(); if (!force_run && tst_kvercmp(3, 7, 0) < 0) { tst_brkm(TCONF, NULL, "Test must be run with kernel 3.7 or newer"); } if (!force_run && busy_poll >= 0 && tst_kvercmp(3, 11, 0) < 0) { tst_brkm(TCONF, NULL, "Test must be run with kernel 3.11 or newer"); } /* check tcp fast open knob */ if (!force_run && access(tfo_cfg, F_OK) == -1) tst_brkm(TCONF, NULL, "Failed to find '%s'", tfo_cfg); if (!force_run) { SAFE_FILE_SCANF(NULL, tfo_cfg, "%d", &tfo_cfg_value); tst_resm(TINFO, "'%s' is %d", tfo_cfg, tfo_cfg_value); } tst_sig(FORK, DEF_HANDLER, cleanup); tst_resm(TINFO, "TCP %s is using %s TCP API.", (tcp_mode == TCP_SERVER) ? "server" : "client", (fastopen_api == TFO_ENABLED) ? "Fastopen" : "old"); switch (tcp_mode) { case TCP_SERVER: tst_resm(TINFO, "max requests '%d'", server_max_requests); tcp.init = server_init; tcp.run = server_run; tcp.cleanup = server_cleanup; tfo_bit_num = 2; break; case TCP_CLIENT: tst_resm(TINFO, "connection: addr '%s', port '%s'", server_addr, tcp_port); tst_resm(TINFO, "client max req: %d", client_max_requests); tst_resm(TINFO, "clients num: %d", clients_num); tst_resm(TINFO, "client msg size: %d", client_msg_size); tst_resm(TINFO, "server msg size: %d", server_msg_size); tcp.init = client_init; tcp.run = client_run; tcp.cleanup = client_cleanup; tfo_bit_num = 1; break; } tfo_support = TFO_ENABLED == tfo_support; if (((tfo_cfg_value & tfo_bit_num) == tfo_bit_num) != tfo_support) { int value = (tfo_cfg_value & ~tfo_bit_num) | (tfo_support << (tfo_bit_num - 1)); tst_resm(TINFO, "set '%s' to '%d'", tfo_cfg, value); SAFE_FILE_PRINTF(cleanup, tfo_cfg, "%d", value); tfo_cfg_changed = 1; } int reuse_value = 0; SAFE_FILE_SCANF(cleanup, tcp_tw_reuse, "%d", &reuse_value); if (!reuse_value) { SAFE_FILE_PRINTF(cleanup, tcp_tw_reuse, "1"); tw_reuse_changed = 1; tst_resm(TINFO, "set '%s' to '1'", tcp_tw_reuse); } tst_resm(TINFO, "TFO support %s", (tfo_support) ? "enabled" : "disabled"); tcp.init(); } int main(int argc, char *argv[]) { setup(argc, argv); tcp.run(); cleanup(); tst_exit(); }