C++程序  |  359行  |  10.78 KB

/*
 * Copyright 2011 Google Inc.
 *
 * Use of this source code is governed by a BSD-style license that can be
 * found in the LICENSE file.
 */
#include <netdb.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include "SkSockets.h"
#include "SkData.h"

SkSocket::SkSocket() {
    fMaxfd = 0;
    FD_ZERO(&fMasterSet);
    fConnected = false;
    fReady = false;
    fReadSuspended = false;
    fWriteSuspended = false;
    fSockfd = this->createSocket();
}

SkSocket::~SkSocket() {
    this->closeSocket(fSockfd);
    shutdown(fSockfd, 2); //stop sending/receiving
}

int SkSocket::createSocket() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        SkDebugf("ERROR opening socket\n");
        return -1;
    }
    int reuse = 1;

    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)) < 0) {
        SkDebugf("error: %s\n", strerror(errno));
        return -1;
    }
#ifdef NONBLOCKING_SOCKETS
    this->setNonBlocking(sockfd);
#endif
    //SkDebugf("Opened fd:%d\n", sockfd);
    fReady = true;
    return sockfd;
}

void SkSocket::closeSocket(int sockfd) {
    if (!fReady)
        return;

    close(sockfd);
    //SkDebugf("Closed fd:%d\n", sockfd);

    if (FD_ISSET(sockfd, &fMasterSet)) {
        FD_CLR(sockfd, &fMasterSet);
        if (sockfd >= fMaxfd) {
            while (FD_ISSET(fMaxfd, &fMasterSet) == false && fMaxfd > 0)
                fMaxfd -= 1;
        }
    }
    if (0 == fMaxfd)
        fConnected = false;
}

void SkSocket::onFailedConnection(int sockfd) {
    this->closeSocket(sockfd);
}

void SkSocket::setNonBlocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
}

void SkSocket::addToMasterSet(int sockfd) {
    FD_SET(sockfd, &fMasterSet);
    if (sockfd > fMaxfd)
        fMaxfd = sockfd;
}

int SkSocket::readPacket(void (*onRead)(int, const void*, size_t, DataType,
                                        void*), void* context) {
    if (!fConnected || !fReady || NULL == onRead || NULL == context
        || fReadSuspended)
        return -1;

    int totalBytesRead = 0;

    char packet[PACKET_SIZE];
    for (int i = 0; i <= fMaxfd; ++i) {
        if (!FD_ISSET (i, &fMasterSet))
            continue;

        memset(packet, 0, PACKET_SIZE);
        SkDynamicMemoryWStream stream;
        int attempts = 0;
        bool failure = false;
        int bytesReadInTransfer = 0;
        int bytesReadInPacket = 0;
        header h;
        h.done = false;
        h.bytes = 0;
        while (!h.done && fConnected && !failure) {
            int retval = read(i, packet + bytesReadInPacket,
                              PACKET_SIZE - bytesReadInPacket);

            ++attempts;
            if (retval < 0) {
#ifdef NONBLOCKING_SOCKETS
                if (errno == EWOULDBLOCK || errno == EAGAIN) {
                    if (bytesReadInPacket > 0 || bytesReadInTransfer > 0)
                        continue; //incomplete packet or frame, keep tring
                    else
                        break; //nothing to read
                }
#endif
                //SkDebugf("Read() failed with error: %s\n", strerror(errno));
                failure = true;
                break;
            }

            if (retval == 0) {
                //SkDebugf("Peer closed connection or connection failed\n");
                failure = true;
                break;
            }

            SkASSERT(retval > 0);
            bytesReadInPacket += retval;
            if (bytesReadInPacket < PACKET_SIZE) {
                //SkDebugf("Read %d/%d\n", bytesReadInPacket, PACKET_SIZE);
                continue; //incomplete packet, keep trying
            }

            SkASSERT((bytesReadInPacket == PACKET_SIZE) && !failure);
            memcpy(&h.done, packet, sizeof(bool));
            memcpy(&h.bytes, packet + sizeof(bool), sizeof(int));
            memcpy(&h.type, packet + sizeof(bool) + sizeof(int), sizeof(DataType));
            if (h.bytes > CONTENT_SIZE || h.bytes <= 0) {
                //SkDebugf("bad packet\n");
                failure = true;
                break;
            }
            //SkDebugf("read packet(done:%d, bytes:%d) from fd:%d in %d tries\n",
            //         h.done, h.bytes, fSockfd, attempts);
            stream.write(packet + HEADER_SIZE, h.bytes);
            bytesReadInPacket = 0;
            attempts = 0;
            bytesReadInTransfer += h.bytes;
        }

        if (failure) {
            onRead(i, NULL, 0, h.type, context);
            this->onFailedConnection(i);
            continue;
        }

        if (bytesReadInTransfer > 0) {
            SkData* data = stream.copyToData();
            SkASSERT(data->size() == bytesReadInTransfer);
            onRead(i, data->data(), data->size(), h.type, context);
            data->unref();

            totalBytesRead += bytesReadInTransfer;
        }
    }
    return totalBytesRead;
}

int SkSocket::writePacket(void* data, size_t size, DataType type) {
    if (size < 0|| NULL == data || !fConnected || !fReady || fWriteSuspended)
        return -1;

    int totalBytesWritten = 0;
    header h;
    char packet[PACKET_SIZE];
    for (int i = 0; i <= fMaxfd; ++i) {
        if (!FD_ISSET (i, &fMasterSet))
            continue;

        int bytesWrittenInTransfer = 0;
        int bytesWrittenInPacket = 0;
        int attempts = 0;
        bool failure = false;
        while (bytesWrittenInTransfer < size && fConnected && !failure) {
            memset(packet, 0, PACKET_SIZE);
            h.done = (size - bytesWrittenInTransfer <= CONTENT_SIZE);
            h.bytes = (h.done) ? size - bytesWrittenInTransfer : CONTENT_SIZE;
            h.type = type;
            memcpy(packet, &h.done, sizeof(bool));
            memcpy(packet + sizeof(bool), &h.bytes, sizeof(int));
            memcpy(packet + sizeof(bool) + sizeof(int), &h.type, sizeof(DataType));
            memcpy(packet + HEADER_SIZE, (char*)data + bytesWrittenInTransfer,
                   h.bytes);

            int retval = write(i, packet + bytesWrittenInPacket,
                               PACKET_SIZE - bytesWrittenInPacket);
            attempts++;

            if (retval < 0) {
                if (errno == EPIPE) {
                    //SkDebugf("broken pipe, client closed connection");
                    failure = true;
                    break;
                }
#ifdef NONBLOCKING_SOCKETS
                else if (errno == EWOULDBLOCK || errno == EAGAIN) {
                    if (bytesWrittenInPacket > 0 || bytesWrittenInTransfer > 0)
                        continue; //incomplete packet or frame, keep trying
                    else
                        break; //client not available, skip current transfer
                }
#endif
                else {
                    //SkDebugf("write(%d) failed with error:%s\n", i,
                    //         strerror(errno));
                    failure = true;
                    break;
                }
            }

            bytesWrittenInPacket += retval;
            if (bytesWrittenInPacket < PACKET_SIZE)
                continue; //incomplete packet, keep trying

            SkASSERT(bytesWrittenInPacket == PACKET_SIZE);
            //SkDebugf("wrote to packet(done:%d, bytes:%d) to fd:%d in %d tries\n",
            //         h.done, h.bytes, i, attempts);
            bytesWrittenInTransfer += h.bytes;
            bytesWrittenInPacket = 0;
            attempts = 0;
        }

        if (failure)
            this->onFailedConnection(i);

        totalBytesWritten += bytesWrittenInTransfer;
    }
    return totalBytesWritten;
}

////////////////////////////////////////////////////////////////////////////////
SkTCPServer::SkTCPServer(int port) {
    sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_addr.s_addr = INADDR_ANY;
    serverAddr.sin_port = htons(port);

    if (bind(fSockfd, (sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
        SkDebugf("ERROR on binding: %s\n", strerror(errno));
        fReady = false;
    }
}

SkTCPServer::~SkTCPServer() {
    this->disconnectAll();
}

int SkTCPServer::acceptConnections() {
    if (!fReady)
        return -1;

    listen(fSockfd, MAX_WAITING_CLIENTS);
    int newfd;
    for (int i = 0; i < MAX_WAITING_CLIENTS; ++i) {
#ifdef NONBLOCKING_SOCKETS
        fd_set workingSet;
        FD_ZERO(&workingSet);
        FD_SET(fSockfd, &workingSet);
        timeval timeout;
        timeout.tv_sec  = 0;
        timeout.tv_usec = 0;
        int sel = select(fSockfd + 1, &workingSet, NULL, NULL, &timeout);
        if (sel < 0) {
            SkDebugf("select() failed with error %s\n", strerror(errno));
            continue;
        }
        if (sel == 0) //select() timed out
            continue;
#endif
        sockaddr_in clientAddr;
        socklen_t clientLen = sizeof(clientAddr);
        newfd = accept(fSockfd, (struct sockaddr*)&clientAddr, &clientLen);
        if (newfd< 0) {
            SkDebugf("accept() failed with error %s\n", strerror(errno));
            continue;
        }
        SkDebugf("New incoming connection - %d\n", newfd);
        fConnected = true;
#ifdef NONBLOCKING_SOCKETS
        this->setNonBlocking(newfd);
#endif
        this->addToMasterSet(newfd);
    }
    return 0;
}


int SkTCPServer::disconnectAll() {
    if (!fConnected || !fReady)
        return -1;
    for (int i = 0; i <= fMaxfd; ++i) {
        if (FD_ISSET(i, &fMasterSet))
            this->closeSocket(i);
    }
    fConnected = false;
    return 0;
}

////////////////////////////////////////////////////////////////////////////////
SkTCPClient::SkTCPClient(const char* hostname, int port) {
    //Add fSockfd since the client will be using it to read/write
    this->addToMasterSet(fSockfd);

    hostent* server = gethostbyname(hostname);
    if (server) {
        fServerAddr.sin_family = AF_INET;
        memcpy((char*)&fServerAddr.sin_addr.s_addr, (char*)server->h_addr,
               server->h_length);
        fServerAddr.sin_port = htons(port);
    }
    else {
        //SkDebugf("ERROR, no such host\n");
        fReady = false;
    }
}

void SkTCPClient::onFailedConnection(int sockfd) { //cleanup and recreate socket
    SkASSERT(sockfd == fSockfd);
    this->closeSocket(fSockfd);
    fSockfd = this->createSocket();
    //Add fSockfd since the client will be using it to read/write
    this->addToMasterSet(fSockfd);
}

int SkTCPClient::connectToServer() {
    if (!fReady)
        return -1;
    if (fConnected)
        return 0;

    int conn = connect(fSockfd, (sockaddr*)&fServerAddr, sizeof(fServerAddr));
    if (conn < 0) {
#ifdef NONBLOCKING_SOCKETS
        if (errno == EINPROGRESS || errno == EALREADY)
            return conn;
#endif
        if (errno != EISCONN) {
            //SkDebugf("error: %s\n", strerror(errno));
            this->onFailedConnection(fSockfd);
            return conn;
        }
    }
    fConnected = true;
    SkDebugf("Succesfully reached server\n");
    return 0;
}