/* * Copyright (C) 2007 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "mq" #include <assert.h> #include <errno.h> #include <fcntl.h> #include <pthread.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/socket.h> #include <sys/types.h> #include <sys/un.h> #include <sys/uio.h> #include <cutils/array.h> #include <cutils/hashmap.h> #include <cutils/selector.h> #include "loghack.h" #include "buffer.h" /** Number of dead peers to remember. */ #define PEER_HISTORY (16) typedef struct sockaddr SocketAddress; typedef struct sockaddr_un UnixAddress; /** * Process/user/group ID. We don't use ucred directly because it's only * available on Linux. */ typedef struct { pid_t pid; uid_t uid; gid_t gid; } Credentials; /** Listens for bytes coming from remote peers. */ typedef void BytesListener(Credentials credentials, char* bytes, size_t size); /** Listens for the deaths of remote peers. */ typedef void DeathListener(pid_t pid); /** Types of packets. */ typedef enum { /** Request for a connection to another peer. */ CONNECTION_REQUEST, /** A connection to another peer. */ CONNECTION, /** Reports a failed connection attempt. */ CONNECTION_ERROR, /** A generic packet of bytes. */ BYTES, } PacketType; typedef enum { /** Reading a packet header. */ READING_HEADER, /** Waiting for a connection from the master. */ ACCEPTING_CONNECTION, /** Reading bytes. */ READING_BYTES, } InputState; /** A packet header. */ // TODO: Use custom headers for master->peer, peer->master, peer->peer. typedef struct { PacketType type; union { /** Packet size. Used for BYTES. */ size_t size; /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */ Credentials credentials; }; } Header; /** A packet which will be sent to a peer. */ typedef struct OutgoingPacket OutgoingPacket; struct OutgoingPacket { /** Packet header. */ Header header; union { /** Connection to peer. Used with CONNECTION. */ int socket; /** Buffer of bytes. Used with BYTES. */ Buffer* bytes; }; /** Frees all resources associated with this packet. */ void (*free)(OutgoingPacket* packet); /** Optional context. */ void* context; /** Next packet in the queue. */ OutgoingPacket* nextPacket; }; /** Represents a remote peer. */ typedef struct PeerProxy PeerProxy; /** Local peer state. You typically have one peer per process. */ typedef struct { /** This peer's PID. */ pid_t pid; /** * Map from pid to peer proxy. The peer has a peer proxy for each remote * peer it's connected to. * * Acquire mutex before use. */ Hashmap* peerProxies; /** Manages I/O. */ Selector* selector; /** Used to synchronize operations with the selector thread. */ pthread_mutex_t mutex; /** Is this peer the master? */ bool master; /** Peer proxy for the master. */ PeerProxy* masterProxy; /** Listens for packets from remote peers. */ BytesListener* onBytes; /** Listens for deaths of remote peers. */ DeathListener* onDeath; /** Keeps track of recently dead peers. Requires mutex. */ pid_t deadPeers[PEER_HISTORY]; size_t deadPeerCursor; } Peer; struct PeerProxy { /** Credentials of the remote process. */ Credentials credentials; /** Keeps track of data coming in from the remote peer. */ InputState inputState; Buffer* inputBuffer; PeerProxy* connecting; /** File descriptor for this peer. */ SelectableFd* fd; /** * Queue of packets to be written out to the remote peer. * * Requires mutex. */ // TODO: Limit queue length. OutgoingPacket* currentPacket; OutgoingPacket* lastPacket; /** Used to write outgoing header. */ Buffer outgoingHeader; /** True if this is the master's proxy. */ bool master; /** Reference back to the local peer. */ Peer* peer; /** * Used in master only. Maps this peer proxy to other peer proxies to * which the peer has been connected to. Maps pid to PeerProxy. Helps * keep track of which connections we've sent to whom. */ Hashmap* connections; }; /** Server socket path. */ static const char* MASTER_PATH = "/master.peer"; /** Credentials of the master peer. */ static const Credentials MASTER_CREDENTIALS = {0, 0, 0}; /** Creates a peer proxy and adds it to the peer proxy map. */ static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials); /** Sets the non-blocking flag on a descriptor. */ static void setNonBlocking(int fd) { int flags; if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); } if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); } } /** Closes a fd and logs a warning if the close fails. */ static void closeWithWarning(int fd) { int result = close(fd); if (result == -1) { LOGW("close() error: %s", strerror(errno)); } } /** Hashes pid_t keys. */ static int pidHash(void* key) { pid_t* pid = (pid_t*) key; return (int) (*pid); } /** Compares pid_t keys. */ static bool pidEquals(void* keyA, void* keyB) { pid_t* a = (pid_t*) keyA; pid_t* b = (pid_t*) keyB; return *a == *b; } /** Gets the master address. Not thread safe. */ static UnixAddress* getMasterAddress() { static UnixAddress masterAddress; static bool initialized = false; if (initialized == false) { masterAddress.sun_family = AF_LOCAL; strcpy(masterAddress.sun_path, MASTER_PATH); initialized = true; } return &masterAddress; } /** Gets exclusive access to the peer for this thread. */ static void peerLock(Peer* peer) { pthread_mutex_lock(&peer->mutex); } /** Releases exclusive access to the peer. */ static void peerUnlock(Peer* peer) { pthread_mutex_unlock(&peer->mutex); } /** Frees a simple, i.e. header-only, outgoing packet. */ static void outgoingPacketFree(OutgoingPacket* packet) { LOGD("Freeing outgoing packet."); free(packet); } /** * Prepare to read a new packet from the peer. */ static void peerProxyExpectHeader(PeerProxy* peerProxy) { peerProxy->inputState = READING_HEADER; bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header)); } /** Sets up the buffer for the outgoing header. */ static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) { peerProxy->outgoingHeader.data = (char*) &(peerProxy->currentPacket->header); peerProxy->outgoingHeader.size = sizeof(Header); bufferPrepareForWrite(&peerProxy->outgoingHeader); } /** Adds a packet to the end of the queue. Callers must have the mutex. */ static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy, OutgoingPacket* newPacket) { newPacket->nextPacket = NULL; // Just in case. if (peerProxy->currentPacket == NULL) { // The queue is empty. peerProxy->currentPacket = newPacket; peerProxy->lastPacket = newPacket; peerProxyPrepareOutgoingHeader(peerProxy); } else { peerProxy->lastPacket->nextPacket = newPacket; } } /** Takes the peer lock and enqueues the given packet. */ static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy, OutgoingPacket* newPacket) { Peer* peer = peerProxy->peer; peerLock(peer); peerProxyEnqueueOutgoingPacket(peerProxy, newPacket); peerUnlock(peer); } /** * Frees current packet and moves to the next one. Returns true if there is * a next packet or false if the queue is empty. */ static bool peerProxyNextPacket(PeerProxy* peerProxy) { Peer* peer = peerProxy->peer; peerLock(peer); OutgoingPacket* current = peerProxy->currentPacket; if (current == NULL) { // The queue is already empty. peerUnlock(peer); return false; } OutgoingPacket* next = current->nextPacket; peerProxy->currentPacket = next; current->nextPacket = NULL; current->free(current); if (next == NULL) { // The queue is empty. peerProxy->lastPacket = NULL; peerUnlock(peer); return false; } else { peerUnlock(peer); peerProxyPrepareOutgoingHeader(peerProxy); // TODO: Start writing next packet? It would reduce the number of // system calls, but we could also starve other peers. return true; } } /** * Checks whether a peer died recently. */ static bool peerIsDead(Peer* peer, pid_t pid) { size_t i; for (i = 0; i < PEER_HISTORY; i++) { pid_t deadPeer = peer->deadPeers[i]; if (deadPeer == 0) { return false; } if (deadPeer == pid) { return true; } } return false; } /** * Cleans up connection information. */ static bool peerProxyRemoveConnection(void* key, void* value, void* context) { PeerProxy* deadPeer = (PeerProxy*) context; PeerProxy* otherPeer = (PeerProxy*) value; hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid)); return true; } /** * Called when the peer dies. */ static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) { if (errnoIsSet) { LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid, strerror(errno)); } else { LOGI("Peer %d died.", peerProxy->credentials.pid); } // If we lost the master, we're up a creek. We can't let this happen. if (peerProxy->master) { LOG_ALWAYS_FATAL("Lost connection to master."); } Peer* localPeer = peerProxy->peer; pid_t pid = peerProxy->credentials.pid; peerLock(localPeer); // Remember for awhile that the peer died. localPeer->deadPeers[localPeer->deadPeerCursor] = peerProxy->credentials.pid; localPeer->deadPeerCursor++; if (localPeer->deadPeerCursor == PEER_HISTORY) { localPeer->deadPeerCursor = 0; } // Remove from peer map. hashmapRemove(localPeer->peerProxies, &pid); // External threads can no longer get to this peer proxy, so we don't // need the lock anymore. peerUnlock(localPeer); // Remove the fd from the selector. if (peerProxy->fd != NULL) { peerProxy->fd->remove = true; } // Clear outgoing packet queue. while (peerProxyNextPacket(peerProxy)) {} bufferFree(peerProxy->inputBuffer); // This only applies to the master. if (peerProxy->connections != NULL) { // We can't leave these other maps pointing to freed memory. hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection, peerProxy); hashmapFree(peerProxy->connections); } // Invoke death listener. localPeer->onDeath(pid); // Free the peer proxy itself. free(peerProxy); } static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) { if (errno == EINTR) { // Log interruptions but otherwise ignore them. LOGW("%s() interrupted.", functionName); } else if (errno == EAGAIN) { LOGD("EWOULDBLOCK"); // Ignore. } else { LOGW("Error returned by %s().", functionName); peerProxyKill(peerProxy, true); } } /** * Buffers output sent to a peer. May be called multiple times until the entire * buffer is filled. Returns true when the buffer is empty. */ static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) { ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd); if (size < 0) { peerProxyHandleError(peerProxy, "write"); return false; } else { return bufferWriteComplete(outgoing); } } /** Writes packet bytes to peer. */ static void peerProxyWriteBytes(PeerProxy* peerProxy) { Buffer* buffer = peerProxy->currentPacket->bytes; if (peerProxyWriteFromBuffer(peerProxy, buffer)) { LOGD("Bytes written."); peerProxyNextPacket(peerProxy); } } /** Sends a socket to the peer. */ static void peerProxyWriteConnection(PeerProxy* peerProxy) { int socket = peerProxy->currentPacket->socket; // Why does sending and receiving fds have to be such a PITA? struct msghdr msg; struct iovec iov[1]; union { struct cmsghdr cm; char control[CMSG_SPACE(sizeof(int))]; } control_un; struct cmsghdr *cmptr; msg.msg_control = control_un.control; msg.msg_controllen = sizeof(control_un.control); cmptr = CMSG_FIRSTHDR(&msg); cmptr->cmsg_len = CMSG_LEN(sizeof(int)); cmptr->cmsg_level = SOL_SOCKET; cmptr->cmsg_type = SCM_RIGHTS; // Store the socket in the message. *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket; msg.msg_name = NULL; msg.msg_namelen = 0; iov[0].iov_base = ""; iov[0].iov_len = 1; msg.msg_iov = iov; msg.msg_iovlen = 1; ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0); if (result < 0) { peerProxyHandleError(peerProxy, "sendmsg"); } else { // Success. Queue up the next packet. peerProxyNextPacket(peerProxy); } } /** * Writes some outgoing data. */ static void peerProxyWrite(SelectableFd* fd) { // TODO: Try to write header and body with one system call. PeerProxy* peerProxy = (PeerProxy*) fd->data; OutgoingPacket* current = peerProxy->currentPacket; if (current == NULL) { // We have nothing left to write. return; } // Write the header. Buffer* outgoingHeader = &peerProxy->outgoingHeader; bool headerWritten = bufferWriteComplete(outgoingHeader); if (!headerWritten) { LOGD("Writing header..."); headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader); if (headerWritten) { LOGD("Header written."); } } // Write body. if (headerWritten) { PacketType type = current->header.type; switch (type) { case CONNECTION: peerProxyWriteConnection(peerProxy); break; case BYTES: peerProxyWriteBytes(peerProxy); break; case CONNECTION_REQUEST: case CONNECTION_ERROR: // These packets consist solely of a header. peerProxyNextPacket(peerProxy); break; default: LOG_ALWAYS_FATAL("Unknown packet type: %d", type); } } } /** * Sets up a peer proxy's fd before we try to select() it. */ static void peerProxyBeforeSelect(SelectableFd* fd) { LOGD("Before select..."); PeerProxy* peerProxy = (PeerProxy*) fd->data; peerLock(peerProxy->peer); bool hasPackets = peerProxy->currentPacket != NULL; peerUnlock(peerProxy->peer); if (hasPackets) { LOGD("Packets found. Setting onWritable()."); fd->onWritable = &peerProxyWrite; } else { // We have nothing to write. fd->onWritable = NULL; } } /** Prepare to read bytes from the peer. */ static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) { LOGD("Expecting %d bytes.", header->size); peerProxy->inputState = READING_BYTES; if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) { LOGW("Couldn't allocate memory for incoming data. Size: %u", (unsigned int) header->size); // TODO: Ignore the packet and log a warning? peerProxyKill(peerProxy, false); } } /** * Gets a peer proxy for the given ID. Creates a peer proxy if necessary. * Sends a connection request to the master if desired. * * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died * or ENOMEM if memory couldn't be allocated. */ static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid, bool requestConnection) { if (pid == peer->pid) { errno = EINVAL; return NULL; } if (peerIsDead(peer, pid)) { errno = EHOSTDOWN; return NULL; } PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid); if (peerProxy != NULL) { return peerProxy; } // If this is the master peer, we already know about all peers. if (peer->master) { errno = EHOSTDOWN; return NULL; } // Try to create a peer proxy. Credentials credentials; credentials.pid = pid; // Fake gid and uid until we have the real thing. The real creds are // filled in by masterProxyExpectConnection(). These fake creds will // never be exposed to the user. credentials.uid = 0; credentials.gid = 0; // Make sure we can allocate the connection request packet. OutgoingPacket* packet = NULL; if (requestConnection) { packet = calloc(1, sizeof(OutgoingPacket)); if (packet == NULL) { errno = ENOMEM; return NULL; } packet->header.type = CONNECTION_REQUEST; packet->header.credentials = credentials; packet->free = &outgoingPacketFree; } peerProxy = peerProxyCreate(peer, credentials); if (peerProxy == NULL) { free(packet); errno = ENOMEM; return NULL; } else { // Send a connection request to the master. if (requestConnection) { PeerProxy* masterProxy = peer->masterProxy; peerProxyEnqueueOutgoingPacket(masterProxy, packet); } return peerProxy; } } /** * Switches the master peer proxy into a state where it's waiting for a * connection from the master. */ static void masterProxyExpectConnection(PeerProxy* masterProxy, Header* header) { // TODO: Restructure things so we don't need this check. // Verify that this really is the master. if (!masterProxy->master) { LOGW("Non-master process %d tried to send us a connection.", masterProxy->credentials.pid); // Kill off the evil peer. peerProxyKill(masterProxy, false); return; } masterProxy->inputState = ACCEPTING_CONNECTION; Peer* localPeer = masterProxy->peer; // Create a peer proxy so we have somewhere to stash the creds. // See if we already have a proxy set up. pid_t pid = header->credentials.pid; peerLock(localPeer); PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false); if (peerProxy == NULL) { LOGW("Peer proxy creation failed: %s", strerror(errno)); } else { // Fill in full credentials. peerProxy->credentials = header->credentials; } peerUnlock(localPeer); // Keep track of which peer proxy we're accepting a connection for. masterProxy->connecting = peerProxy; } /** * Reads input from a peer process. */ static void peerProxyRead(SelectableFd* fd); /** Sets up fd callbacks. */ static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) { peerProxy->fd = fd; fd->data = peerProxy; fd->onReadable = &peerProxyRead; fd->beforeSelect = &peerProxyBeforeSelect; // Make the socket non-blocking. setNonBlocking(fd->fd); } /** * Accepts a connection sent by the master proxy. */ static void masterProxyAcceptConnection(PeerProxy* masterProxy) { struct msghdr msg; struct iovec iov[1]; ssize_t size; char ignored; int incomingFd; // TODO: Reuse code which writes the connection. Who the heck designed // this API anyway? union { struct cmsghdr cm; char control[CMSG_SPACE(sizeof(int))]; } control_un; struct cmsghdr *cmptr; msg.msg_control = control_un.control; msg.msg_controllen = sizeof(control_un.control); msg.msg_name = NULL; msg.msg_namelen = 0; // We sent 1 byte of data so we can detect EOF. iov[0].iov_base = &ignored; iov[0].iov_len = 1; msg.msg_iov = iov; msg.msg_iovlen = 1; size = recvmsg(masterProxy->fd->fd, &msg, 0); if (size < 0) { if (errno == EINTR) { // Log interruptions but otherwise ignore them. LOGW("recvmsg() interrupted."); return; } else if (errno == EAGAIN) { // Keep waiting for the connection. return; } else { LOG_ALWAYS_FATAL("Error reading connection from master: %s", strerror(errno)); } } else if (size == 0) { // EOF. LOG_ALWAYS_FATAL("Received EOF from master."); } // Extract fd from message. if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) { if (cmptr->cmsg_level != SOL_SOCKET) { LOG_ALWAYS_FATAL("Expected SOL_SOCKET."); } if (cmptr->cmsg_type != SCM_RIGHTS) { LOG_ALWAYS_FATAL("Expected SCM_RIGHTS."); } incomingFd = *((int*) CMSG_DATA(cmptr)); } else { LOG_ALWAYS_FATAL("Expected fd."); } // The peer proxy this connection is for. PeerProxy* peerProxy = masterProxy->connecting; if (peerProxy == NULL) { LOGW("Received connection for unknown peer."); closeWithWarning(incomingFd); } else { Peer* peer = masterProxy->peer; SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd); if (selectableFd == NULL) { LOGW("Error adding fd to selector for %d.", peerProxy->credentials.pid); closeWithWarning(incomingFd); peerProxyKill(peerProxy, false); } peerProxySetFd(peerProxy, selectableFd); } peerProxyExpectHeader(masterProxy); } /** * Frees an outgoing packet containing a connection. */ static void outgoingPacketFreeSocket(OutgoingPacket* packet) { closeWithWarning(packet->socket); outgoingPacketFree(packet); } /** * Connects two known peers. */ static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) { int sockets[2]; int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets); if (result == -1) { LOGW("socketpair() error: %s", strerror(errno)); // TODO: Send CONNECTION_FAILED packets to peers. return; } OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket)); OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket)); if (packetA == NULL || packetB == NULL) { free(packetA); free(packetB); LOGW("malloc() error. Failed to tell process %d that process %d is" " dead.", peerA->credentials.pid, peerB->credentials.pid); return; } packetA->header.type = CONNECTION; packetB->header.type = CONNECTION; packetA->header.credentials = peerB->credentials; packetB->header.credentials = peerA->credentials; packetA->socket = sockets[0]; packetB->socket = sockets[1]; packetA->free = &outgoingPacketFreeSocket; packetB->free = &outgoingPacketFreeSocket; peerLock(peerA->peer); peerProxyEnqueueOutgoingPacket(peerA, packetA); peerProxyEnqueueOutgoingPacket(peerB, packetB); peerUnlock(peerA->peer); } /** * Informs a peer that the peer they're trying to connect to couldn't be * found. */ static void masterReportConnectionError(PeerProxy* peerProxy, Credentials credentials) { OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); if (packet == NULL) { LOGW("malloc() error. Failed to tell process %d that process %d is" " dead.", peerProxy->credentials.pid, credentials.pid); return; } packet->header.type = CONNECTION_ERROR; packet->header.credentials = credentials; packet->free = &outgoingPacketFree; peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet); } /** * Handles a request to be connected to another peer. */ static void masterHandleConnectionRequest(PeerProxy* peerProxy, Header* header) { Peer* master = peerProxy->peer; pid_t targetPid = header->credentials.pid; if (!hashmapContainsKey(peerProxy->connections, &targetPid)) { // We haven't connected these peers yet. PeerProxy* targetPeer = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid); if (targetPeer == NULL) { // Unknown process. masterReportConnectionError(peerProxy, header->credentials); } else { masterConnectPeers(peerProxy, targetPeer); } } // This packet is complete. Get ready for the next one. peerProxyExpectHeader(peerProxy); } /** * The master told us this peer is dead. */ static void masterProxyHandleConnectionError(PeerProxy* masterProxy, Header* header) { Peer* peer = masterProxy->peer; // Look up the peer proxy. pid_t pid = header->credentials.pid; PeerProxy* peerProxy = NULL; peerLock(peer); peerProxy = hashmapGet(peer->peerProxies, &pid); peerUnlock(peer); if (peerProxy != NULL) { LOGI("Couldn't connect to %d.", pid); peerProxyKill(peerProxy, false); } else { LOGW("Peer proxy for %d not found. This shouldn't happen.", pid); } peerProxyExpectHeader(masterProxy); } /** * Handles a packet header. */ static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) { switch (header->type) { case CONNECTION_REQUEST: masterHandleConnectionRequest(peerProxy, header); break; case CONNECTION: masterProxyExpectConnection(peerProxy, header); break; case CONNECTION_ERROR: masterProxyHandleConnectionError(peerProxy, header); break; case BYTES: peerProxyExpectBytes(peerProxy, header); break; default: LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid, header->type); peerProxyKill(peerProxy, false); } } /** * Buffers input sent by peer. May be called multiple times until the entire * buffer is filled. Returns true when the buffer is full. */ static bool peerProxyBufferInput(PeerProxy* peerProxy) { Buffer* in = peerProxy->inputBuffer; ssize_t size = bufferRead(in, peerProxy->fd->fd); if (size < 0) { peerProxyHandleError(peerProxy, "read"); return false; } else if (size == 0) { // EOF. LOGI("EOF"); peerProxyKill(peerProxy, false); return false; } else if (bufferReadComplete(in)) { // We're done! return true; } else { // Continue reading. return false; } } /** * Reads input from a peer process. */ static void peerProxyRead(SelectableFd* fd) { LOGD("Reading..."); PeerProxy* peerProxy = (PeerProxy*) fd->data; int state = peerProxy->inputState; Buffer* in = peerProxy->inputBuffer; switch (state) { case READING_HEADER: if (peerProxyBufferInput(peerProxy)) { LOGD("Header read."); // We've read the complete header. Header* header = (Header*) in->data; peerProxyHandleHeader(peerProxy, header); } break; case READING_BYTES: LOGD("Reading bytes..."); if (peerProxyBufferInput(peerProxy)) { LOGD("Bytes read."); // We have the complete packet. Notify bytes listener. peerProxy->peer->onBytes(peerProxy->credentials, in->data, in->size); // Get ready for the next packet. peerProxyExpectHeader(peerProxy); } break; case ACCEPTING_CONNECTION: masterProxyAcceptConnection(peerProxy); break; default: LOG_ALWAYS_FATAL("Unknown state: %d", state); } } static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) { PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy)); if (peerProxy == NULL) { return NULL; } peerProxy->inputBuffer = bufferCreate(sizeof(Header)); if (peerProxy->inputBuffer == NULL) { free(peerProxy); return NULL; } peerProxy->peer = peer; peerProxy->credentials = credentials; // Initial state == expecting a header. peerProxyExpectHeader(peerProxy); // Add this proxy to the map. Make sure the key points to the stable memory // inside of the peer proxy itself. pid_t* pid = &(peerProxy->credentials.pid); hashmapPut(peer->peerProxies, pid, peerProxy); return peerProxy; } /** Accepts a connection to the master peer. */ static void masterAcceptConnection(SelectableFd* listenerFd) { // Accept connection. int socket = accept(listenerFd->fd, NULL, NULL); if (socket == -1) { LOGW("accept() error: %s", strerror(errno)); return; } LOGD("Accepted connection as fd %d.", socket); // Get credentials. Credentials credentials; struct ucred ucredentials; socklen_t credentialsSize = sizeof(struct ucred); int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED, &ucredentials, &credentialsSize); // We might want to verify credentialsSize. if (result == -1) { LOGW("getsockopt() error: %s", strerror(errno)); closeWithWarning(socket); return; } // Copy values into our own structure so we know we have the types right. credentials.pid = ucredentials.pid; credentials.uid = ucredentials.uid; credentials.gid = ucredentials.gid; LOGI("Accepted connection from process %d.", credentials.pid); Peer* masterPeer = (Peer*) listenerFd->data; peerLock(masterPeer); // Make sure we don't already have a connection from that process. PeerProxy* peerProxy = hashmapGet(masterPeer->peerProxies, &credentials.pid); if (peerProxy != NULL) { peerUnlock(masterPeer); LOGW("Alread connected to process %d.", credentials.pid); closeWithWarning(socket); return; } // Add connection to the selector. SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket); if (socketFd == NULL) { peerUnlock(masterPeer); LOGW("malloc() failed."); closeWithWarning(socket); return; } // Create a peer proxy. peerProxy = peerProxyCreate(masterPeer, credentials); peerUnlock(masterPeer); if (peerProxy == NULL) { LOGW("malloc() failed."); socketFd->remove = true; closeWithWarning(socket); } peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals); peerProxySetFd(peerProxy, socketFd); } /** * Creates the local peer. */ static Peer* peerCreate() { Peer* peer = calloc(1, sizeof(Peer)); if (peer == NULL) { LOG_ALWAYS_FATAL("malloc() error."); } peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals); peer->selector = selectorCreate(); pthread_mutexattr_t attributes; if (pthread_mutexattr_init(&attributes) != 0) { LOG_ALWAYS_FATAL("pthread_mutexattr_init() error."); } if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) { LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error."); } if (pthread_mutex_init(&peer->mutex, &attributes) != 0) { LOG_ALWAYS_FATAL("pthread_mutex_init() error."); } peer->pid = getpid(); return peer; } /** The local peer. */ static Peer* localPeer; /** Frees a packet of bytes. */ static void outgoingPacketFreeBytes(OutgoingPacket* packet) { LOGD("Freeing outgoing packet."); bufferFree(packet->bytes); free(packet); } /** * Sends a packet of bytes to a remote peer. Returns 0 on success. * * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno * to EINVAL if pid is the same as the local pid. */ int peerSendBytes(pid_t pid, const char* bytes, size_t size) { Peer* peer = localPeer; assert(peer != NULL); OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); if (packet == NULL) { errno = ENOMEM; return -1; } Buffer* copy = bufferCreate(size); if (copy == NULL) { free(packet); errno = ENOMEM; return -1; } // Copy data. memcpy(copy->data, bytes, size); copy->size = size; packet->bytes = copy; packet->header.type = BYTES; packet->header.size = size; packet->free = outgoingPacketFreeBytes; bufferPrepareForWrite(packet->bytes); peerLock(peer); PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); if (peerProxy == NULL) { // The peer is already dead or we couldn't alloc memory. Either way, // errno is set. peerUnlock(peer); packet->free(packet); return -1; } else { peerProxyEnqueueOutgoingPacket(peerProxy, packet); peerUnlock(peer); selectorWakeUp(peer->selector); return 0; } } /** Keeps track of how to free shared bytes. */ typedef struct { void (*free)(void* context); void* context; } SharedBytesFreer; /** Frees shared bytes. */ static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) { SharedBytesFreer* sharedBytesFreer = (SharedBytesFreer*) packet->context; sharedBytesFreer->free(sharedBytesFreer->context); free(sharedBytesFreer); free(packet); } /** * Sends a packet of bytes to a remote peer without copying the bytes. Calls * free() with context after the bytes have been sent. * * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno * to EINVAL if pid is the same as the local pid. */ int peerSendSharedBytes(pid_t pid, char* bytes, size_t size, void (*free)(void* context), void* context) { Peer* peer = localPeer; assert(peer != NULL); OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); if (packet == NULL) { errno = ENOMEM; return -1; } Buffer* wrapper = bufferWrap(bytes, size, size); if (wrapper == NULL) { free(packet); errno = ENOMEM; return -1; } SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer)); if (sharedBytesFreer == NULL) { free(packet); free(wrapper); errno = ENOMEM; return -1; } sharedBytesFreer->free = free; sharedBytesFreer->context = context; packet->bytes = wrapper; packet->context = sharedBytesFreer; packet->header.type = BYTES; packet->header.size = size; packet->free = &outgoingPacketFreeSharedBytes; bufferPrepareForWrite(packet->bytes); peerLock(peer); PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); if (peerProxy == NULL) { // The peer is already dead or we couldn't alloc memory. Either way, // errno is set. peerUnlock(peer); packet->free(packet); return -1; } else { peerProxyEnqueueOutgoingPacket(peerProxy, packet); peerUnlock(peer); selectorWakeUp(peer->selector); return 0; } } /** * Starts the master peer. The master peer differs from other peers in that * it is responsible for connecting the other peers. You can only have one * master peer. * * Goes into an I/O loop and does not return. */ void masterPeerInitialize(BytesListener* bytesListener, DeathListener* deathListener) { // Create and bind socket. int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0); if (listenerSocket == -1) { LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); } unlink(MASTER_PATH); int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(), sizeof(UnixAddress)); if (result == -1) { LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno)); } LOGD("Listener socket: %d", listenerSocket); // Queue up to 16 connections. result = listen(listenerSocket, 16); if (result != 0) { LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno)); } // Make socket non-blocking. setNonBlocking(listenerSocket); // Create the peer for this process. Fail if we already have one. if (localPeer != NULL) { LOG_ALWAYS_FATAL("Peer is already initialized."); } localPeer = peerCreate(); if (localPeer == NULL) { LOG_ALWAYS_FATAL("malloc() failed."); } localPeer->master = true; localPeer->onBytes = bytesListener; localPeer->onDeath = deathListener; // Make listener socket selectable. SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket); if (listenerFd == NULL) { LOG_ALWAYS_FATAL("malloc() error."); } listenerFd->data = localPeer; listenerFd->onReadable = &masterAcceptConnection; } /** * Starts a local peer. * * Goes into an I/O loop and does not return. */ void peerInitialize(BytesListener* bytesListener, DeathListener* deathListener) { // Connect to master peer. int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0); if (masterSocket == -1) { LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); } int result = connect(masterSocket, (SocketAddress*) getMasterAddress(), sizeof(UnixAddress)); if (result != 0) { LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno)); } // Create the peer for this process. Fail if we already have one. if (localPeer != NULL) { LOG_ALWAYS_FATAL("Peer is already initialized."); } localPeer = peerCreate(); if (localPeer == NULL) { LOG_ALWAYS_FATAL("malloc() failed."); } localPeer->onBytes = bytesListener; localPeer->onDeath = deathListener; // Make connection selectable. SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket); if (masterFd == NULL) { LOG_ALWAYS_FATAL("malloc() error."); } // Create a peer proxy for the master peer. PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS); if (masterProxy == NULL) { LOG_ALWAYS_FATAL("malloc() error."); } peerProxySetFd(masterProxy, masterFd); masterProxy->master = true; localPeer->masterProxy = masterProxy; } /** Starts the master peer I/O loop. Doesn't return. */ void peerLoop() { assert(localPeer != NULL); // Start selector. selectorLoop(localPeer->selector); }