C++程序  |  604行  |  17.85 KB

/*
 * Copyright (C) 2011 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "LibAAH_RTP"
#include <media/stagefright/foundation/ADebug.h>

#include <netinet/in.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>

#include <media/stagefright/foundation/AMessage.h>
#include <utils/misc.h>

#include "aah_tx_player.h"
#include "aah_tx_sender.h"

namespace android {

const char* AAH_TXSender::kSendPacketIPAddr = "ipaddr";
const char* AAH_TXSender::kSendPacketPort = "port";
const char* AAH_TXSender::kSendPacketTRTPPacket = "trtp";

const int AAH_TXSender::kRetryTrimIntervalUs = 100000;
const int AAH_TXSender::kHeartbeatIntervalUs = 1000000;
const int AAH_TXSender::kRetryBufferCapacity = 100;
const nsecs_t AAH_TXSender::kHeartbeatTimeout = 600ull * 1000000000ull;

Mutex AAH_TXSender::sLock;
wp<AAH_TXSender> AAH_TXSender::sInstance;
uint32_t AAH_TXSender::sNextEpoch;
bool AAH_TXSender::sNextEpochValid = false;

AAH_TXSender::AAH_TXSender() : mSocket(-1) {
    mLastSentPacketTime = systemTime();
}

sp<AAH_TXSender> AAH_TXSender::GetInstance() {
    Mutex::Autolock autoLock(sLock);

    sp<AAH_TXSender> sender = sInstance.promote();

    if (sender == NULL) {
        sender = new AAH_TXSender();
        if (sender == NULL) {
            return NULL;
        }

        sender->mLooper = new ALooper();
        if (sender->mLooper == NULL) {
            return NULL;
        }

        sender->mReflector = new AHandlerReflector<AAH_TXSender>(sender.get());
        if (sender->mReflector == NULL) {
            return NULL;
        }

        sender->mSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
        if (sender->mSocket == -1) {
            ALOGW("%s unable to create socket", __PRETTY_FUNCTION__);
            return NULL;
        }

        struct sockaddr_in bind_addr;
        memset(&bind_addr, 0, sizeof(bind_addr));
        bind_addr.sin_family = AF_INET;
        if (bind(sender->mSocket,
                 reinterpret_cast<const sockaddr*>(&bind_addr),
                 sizeof(bind_addr)) < 0) {
            ALOGW("%s unable to bind socket (errno %d)",
                  __PRETTY_FUNCTION__, errno);
            return NULL;
        }

        sender->mRetryReceiver = new RetryReceiver(sender.get());
        if (sender->mRetryReceiver == NULL) {
            return NULL;
        }

        sender->mLooper->setName("AAH_TXSender");
        sender->mLooper->registerHandler(sender->mReflector);
        sender->mLooper->start(false, false, PRIORITY_AUDIO);

        if (sender->mRetryReceiver->run("AAH_TXSenderRetry", PRIORITY_AUDIO)
                != OK) {
            ALOGW("%s unable to start retry thread", __PRETTY_FUNCTION__);
            return NULL;
        }

        sInstance = sender;
    }

    return sender;
}

AAH_TXSender::~AAH_TXSender() {
    mLooper->stop();
    mLooper->unregisterHandler(mReflector->id());

    if (mRetryReceiver != NULL) {
        mRetryReceiver->requestExit();
        mRetryReceiver->mWakeupEvent.setEvent();
        if (mRetryReceiver->requestExitAndWait() != OK) {
            ALOGW("%s shutdown of retry receiver failed", __PRETTY_FUNCTION__);
        }
        mRetryReceiver->mSender = NULL;
        mRetryReceiver.clear();
    }

    if (mSocket != -1) {
        close(mSocket);
    }
}

// Return the next epoch number usable for a newly instantiated endpoint.
uint32_t AAH_TXSender::getNextEpoch() {
    Mutex::Autolock autoLock(sLock);

    if (sNextEpochValid) {
        sNextEpoch = (sNextEpoch + 1) & TRTPPacket::kTRTPEpochMask;
    } else {
        sNextEpoch = ns2ms(systemTime()) & TRTPPacket::kTRTPEpochMask;
        sNextEpochValid = true;
    }

    return sNextEpoch;
}

// Notify the sender that a player has started sending to this endpoint.
// Returns a program ID for use by the calling player.
uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) {
    Mutex::Autolock lock(mEndpointLock);

    EndpointState* eps = mEndpointMap.valueFor(endpoint);
    if (eps) {
        eps->playerRefCount++;
    } else {
        eps = new EndpointState(getNextEpoch());
        mEndpointMap.add(endpoint, eps);
    }

    // if this is the first registered endpoint, then send a message to start
    // trimming retry buffers and a message to start sending heartbeats.
    if (mEndpointMap.size() == 1) {
        sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
                                                handlerID());
        trimMessage->post(kRetryTrimIntervalUs);

        sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
                                                     handlerID());
        heartbeatMessage->post(kHeartbeatIntervalUs);
    }

    eps->nextProgramID++;
    return eps->nextProgramID;
}

// Notify the sender that a player has ceased sending to this endpoint.
// An endpoint's state can not be deleted until all of the endpoint's
// registered players have called unregisterEndpoint.
void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
    Mutex::Autolock lock(mEndpointLock);

    EndpointState* eps = mEndpointMap.valueFor(endpoint);
    if (eps) {
        eps->playerRefCount--;
        CHECK(eps->playerRefCount >= 0);
    }
}

void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
    switch (msg->what()) {
        case kWhatSendPacket:
            onSendPacket(msg);
            break;

        case kWhatTrimRetryBuffers:
            trimRetryBuffers();
            break;

        case kWhatSendHeartbeats:
            sendHeartbeats();
            break;

        default:
            TRESPASS();
            break;
    }
}

void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
    sp<RefBase> obj;
    CHECK(msg->findObject(kSendPacketTRTPPacket, &obj));
    sp<TRTPPacket> packet = static_cast<TRTPPacket*>(obj.get());

    uint32_t ipAddr;
    CHECK(msg->findInt32(kSendPacketIPAddr,
                         reinterpret_cast<int32_t*>(&ipAddr)));

    int32_t port32;
    CHECK(msg->findInt32(kSendPacketPort, &port32));
    uint16_t port = port32;

    Mutex::Autolock lock(mEndpointLock);
    doSendPacket_l(packet, Endpoint(ipAddr, port));
    mLastSentPacketTime = systemTime();
}

void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
                                  const Endpoint& endpoint) {
    EndpointState* eps = mEndpointMap.valueFor(endpoint);
    if (!eps) {
        // the endpoint state has disappeared, so the player that sent this
        // packet must be dead.
        return;
    }

    // assign the packet's sequence number
    packet->setEpoch(eps->epoch);
    packet->setSeqNumber(eps->trtpSeqNumber++);

    // add the packet to the retry buffer
    RetryBuffer& retry = eps->retry;
    retry.push_back(packet);

    // send the packet
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = endpoint.addr;
    addr.sin_port = endpoint.port;

    ssize_t result = sendto(mSocket,
                            packet->getPacket(),
                            packet->getPacketLen(),
                            0,
                            (const struct sockaddr *) &addr,
                            sizeof(addr));
    if (result == -1) {
        ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
    }
}

void AAH_TXSender::trimRetryBuffers() {
    Mutex::Autolock lock(mEndpointLock);

    nsecs_t localTimeNow = systemTime();

    Vector<Endpoint> endpointsToRemove;

    for (size_t i = 0; i < mEndpointMap.size(); i++) {
        EndpointState* eps = mEndpointMap.editValueAt(i);
        RetryBuffer& retry = eps->retry;

        while (!retry.isEmpty()) {
            if (retry[0]->getExpireTime() < localTimeNow) {
                retry.pop_front();
            } else {
                break;
            }
        }

        if (retry.isEmpty() && eps->playerRefCount == 0) {
            endpointsToRemove.add(mEndpointMap.keyAt(i));
        }
    }

    // remove the state for any endpoints that are no longer in use
    for (size_t i = 0; i < endpointsToRemove.size(); i++) {
        Endpoint& e = endpointsToRemove.editItemAt(i);
        ALOGD("*** %s removing endpoint addr=%08x",
                __PRETTY_FUNCTION__, e.addr);
        size_t index = mEndpointMap.indexOfKey(e);
        delete mEndpointMap.valueAt(index);
        mEndpointMap.removeItemsAt(index);
    }

    // schedule the next trim
    if (mEndpointMap.size()) {
        sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
                                                handlerID());
        trimMessage->post(kRetryTrimIntervalUs);
    }
}

void AAH_TXSender::sendHeartbeats() {
    Mutex::Autolock lock(mEndpointLock);

    if (shouldSendHeartbeats_l()) {
        for (size_t i = 0; i < mEndpointMap.size(); i++) {
            EndpointState* eps = mEndpointMap.editValueAt(i);
            const Endpoint& ep = mEndpointMap.keyAt(i);

            sp<TRTPControlPacket> packet = new TRTPControlPacket();
            packet->setCommandID(TRTPControlPacket::kCommandNop);

            packet->setExpireTime(systemTime() +
                                  AAH_TXPlayer::kAAHRetryKeepAroundTimeNs);
            packet->pack();

            doSendPacket_l(packet, ep);
        }
    }

    // schedule the next heartbeat
    if (mEndpointMap.size()) {
        sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
                                                     handlerID());
        heartbeatMessage->post(kHeartbeatIntervalUs);
    }
}

bool AAH_TXSender::shouldSendHeartbeats_l() {
    // assert(holding endpoint lock)
    return (systemTime() < (mLastSentPacketTime + kHeartbeatTimeout));
}

// Receiver

// initial 4-byte ID of a retry request packet
const uint32_t AAH_TXSender::RetryReceiver::kRetryRequestID = 'Treq';

// initial 4-byte ID of a retry NAK packet
const uint32_t AAH_TXSender::RetryReceiver::kRetryNakID = 'Tnak';

// initial 4-byte ID of a fast start request packet
const uint32_t AAH_TXSender::RetryReceiver::kFastStartRequestID = 'Tfst';

AAH_TXSender::RetryReceiver::RetryReceiver(AAH_TXSender* sender)
        : Thread(false),
    mSender(sender) {}

    AAH_TXSender::RetryReceiver::~RetryReceiver() {
        mWakeupEvent.clearPendingEvents();
    }

// Returns true if val is within the interval bounded inclusively by
// start and end.  Also handles the case where there is a rollover of the
// range between start and end.
template <typename T>
static inline bool withinIntervalWithRollover(T val, T start, T end) {
    return ((start <= end && val >= start && val <= end) ||
            (start > end && (val >= start || val <= end)));
}

bool AAH_TXSender::RetryReceiver::threadLoop() {
    struct pollfd pollFds[2];
    pollFds[0].fd = mSender->mSocket;
    pollFds[0].events = POLLIN;
    pollFds[0].revents = 0;
    pollFds[1].fd = mWakeupEvent.getWakeupHandle();
    pollFds[1].events = POLLIN;
    pollFds[1].revents = 0;

    int pollResult = poll(pollFds, NELEM(pollFds), -1);
    if (pollResult == -1) {
        ALOGE("%s poll failed", __PRETTY_FUNCTION__);
        return false;
    }

    if (exitPending()) {
        ALOGI("*** %s exiting", __PRETTY_FUNCTION__);
        return false;
    }

    if (pollFds[0].revents) {
        handleRetryRequest();
    }

    return true;
}

void AAH_TXSender::RetryReceiver::handleRetryRequest() {
    ALOGV("*** RX %s start", __PRETTY_FUNCTION__);

    RetryPacket request;
    struct sockaddr requestSrcAddr;
    socklen_t requestSrcAddrLen = sizeof(requestSrcAddr);

    ssize_t result = recvfrom(mSender->mSocket, &request, sizeof(request), 0,
                              &requestSrcAddr, &requestSrcAddrLen);
    if (result == -1) {
        ALOGE("%s recvfrom failed, errno=%d", __PRETTY_FUNCTION__, errno);
        return;
    }

    if (static_cast<size_t>(result) < sizeof(RetryPacket)) {
        ALOGW("%s short packet received", __PRETTY_FUNCTION__);
        return;
    }

    uint32_t host_request_id = ntohl(request.id);
    if ((host_request_id != kRetryRequestID) &&
        (host_request_id != kFastStartRequestID)) {
        ALOGW("%s received retry request with bogus ID (%08x)",
                __PRETTY_FUNCTION__, host_request_id);
        return;
    }

    Endpoint endpoint(request.endpointIP, request.endpointPort);

    Mutex::Autolock lock(mSender->mEndpointLock);

    EndpointState* eps = mSender->mEndpointMap.valueFor(endpoint);

    if (eps == NULL || eps->retry.isEmpty()) {
        // we have no retry buffer or an empty retry buffer for this endpoint,
        // so NAK the entire request
        RetryPacket nak = request;
        nak.id = htonl(kRetryNakID);
        result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
                        &requestSrcAddr, requestSrcAddrLen);
        if (result == -1) {
            ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
        }
        return;
    }

    RetryBuffer& retry = eps->retry;

    uint16_t startSeq = ntohs(request.seqStart);
    uint16_t endSeq = ntohs(request.seqEnd);

    uint16_t retryFirstSeq = retry[0]->getSeqNumber();
    uint16_t retryLastSeq = retry[retry.size() - 1]->getSeqNumber();

    // If this is a fast start, then force the start of the retry to match the
    // start of the retransmit ring buffer (unless the end of the retransmit
    // ring buffer is already past the point of fast start)
    if ((host_request_id == kFastStartRequestID) &&
        !((startSeq - retryFirstSeq) & 0x8000)) {
        startSeq = retryFirstSeq;
    }

    int startIndex;
    if (withinIntervalWithRollover(startSeq, retryFirstSeq, retryLastSeq)) {
        startIndex = static_cast<uint16_t>(startSeq - retryFirstSeq);
    } else {
        startIndex = -1;
    }

    int endIndex;
    if (withinIntervalWithRollover(endSeq, retryFirstSeq, retryLastSeq)) {
        endIndex = static_cast<uint16_t>(endSeq - retryFirstSeq);
    } else {
        endIndex = -1;
    }

    if (startIndex == -1 && endIndex == -1) {
        // no part of the request range is found in the retry buffer
        RetryPacket nak = request;
        nak.id = htonl(kRetryNakID);
        result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
                        &requestSrcAddr, requestSrcAddrLen);
        if (result == -1) {
            ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
        }
        return;
    }

    if (startIndex == -1) {
        // NAK a subrange at the front of the request range
        RetryPacket nak = request;
        nak.id = htonl(kRetryNakID);
        nak.seqEnd = htons(retryFirstSeq - 1);
        result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
                        &requestSrcAddr, requestSrcAddrLen);
        if (result == -1) {
            ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
        }

        startIndex = 0;
    } else if (endIndex == -1) {
        // NAK a subrange at the back of the request range
        RetryPacket nak = request;
        nak.id = htonl(kRetryNakID);
        nak.seqStart = htons(retryLastSeq + 1);
        result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
                        &requestSrcAddr, requestSrcAddrLen);
        if (result == -1) {
            ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
        }

        endIndex = retry.size() - 1;
    }

    // send the retry packets
    for (int i = startIndex; i <= endIndex; i++) {
        const sp<TRTPPacket>& replyPacket = retry[i];

        result = sendto(mSender->mSocket,
                        replyPacket->getPacket(),
                        replyPacket->getPacketLen(),
                        0,
                        &requestSrcAddr,
                        requestSrcAddrLen);

        if (result == -1) {
            ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
        }
    }
}

// Endpoint

AAH_TXSender::Endpoint::Endpoint()
        : addr(0)
        , port(0) { }

AAH_TXSender::Endpoint::Endpoint(uint32_t a, uint16_t p)
        : addr(a)
        , port(p) {}

bool AAH_TXSender::Endpoint::operator<(const Endpoint& other) const {
    return ((addr < other.addr) ||
            (addr == other.addr && port < other.port));
}

// EndpointState

AAH_TXSender::EndpointState::EndpointState(uint32_t _epoch)
    : retry(kRetryBufferCapacity)
    , playerRefCount(1)
    , trtpSeqNumber(0)
    , nextProgramID(0)
    , epoch(_epoch) { }

// CircularBuffer

template <typename T>
CircularBuffer<T>::CircularBuffer(size_t capacity)
        : mCapacity(capacity)
        , mHead(0)
        , mTail(0)
        , mFillCount(0) {
    mBuffer = new T[capacity];
}

template <typename T>
CircularBuffer<T>::~CircularBuffer() {
    delete [] mBuffer;
}

template <typename T>
void CircularBuffer<T>::push_back(const T& item) {
    if (this->isFull()) {
        this->pop_front();
    }
    mBuffer[mHead] = item;
    mHead = (mHead + 1) % mCapacity;
    mFillCount++;
}

template <typename T>
void CircularBuffer<T>::pop_front() {
    CHECK(!isEmpty());
    mBuffer[mTail] = T();
    mTail = (mTail + 1) % mCapacity;
    mFillCount--;
}

template <typename T>
size_t CircularBuffer<T>::size() const {
    return mFillCount;
}

template <typename T>
bool CircularBuffer<T>::isFull() const {
    return (mFillCount == mCapacity);
}

template <typename T>
bool CircularBuffer<T>::isEmpty() const {
    return (mFillCount == 0);
}

template <typename T>
const T& CircularBuffer<T>::itemAt(size_t index) const {
    CHECK(index < mFillCount);
    return mBuffer[(mTail + index) % mCapacity];
}

template <typename T>
const T& CircularBuffer<T>::operator[](size_t index) const {
    return itemAt(index);
}

}  // namespace android