/*
* Copyright (C) 2011 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "LibAAH_RTP"
#include <media/stagefright/foundation/ADebug.h>
#include <netinet/in.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <media/stagefright/foundation/AMessage.h>
#include <utils/misc.h>
#include "aah_tx_player.h"
#include "aah_tx_sender.h"
namespace android {
const char* AAH_TXSender::kSendPacketIPAddr = "ipaddr";
const char* AAH_TXSender::kSendPacketPort = "port";
const char* AAH_TXSender::kSendPacketTRTPPacket = "trtp";
const int AAH_TXSender::kRetryTrimIntervalUs = 100000;
const int AAH_TXSender::kHeartbeatIntervalUs = 1000000;
const int AAH_TXSender::kRetryBufferCapacity = 100;
const nsecs_t AAH_TXSender::kHeartbeatTimeout = 600ull * 1000000000ull;
Mutex AAH_TXSender::sLock;
wp<AAH_TXSender> AAH_TXSender::sInstance;
uint32_t AAH_TXSender::sNextEpoch;
bool AAH_TXSender::sNextEpochValid = false;
AAH_TXSender::AAH_TXSender() : mSocket(-1) {
mLastSentPacketTime = systemTime();
}
sp<AAH_TXSender> AAH_TXSender::GetInstance() {
Mutex::Autolock autoLock(sLock);
sp<AAH_TXSender> sender = sInstance.promote();
if (sender == NULL) {
sender = new AAH_TXSender();
if (sender == NULL) {
return NULL;
}
sender->mLooper = new ALooper();
if (sender->mLooper == NULL) {
return NULL;
}
sender->mReflector = new AHandlerReflector<AAH_TXSender>(sender.get());
if (sender->mReflector == NULL) {
return NULL;
}
sender->mSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sender->mSocket == -1) {
ALOGW("%s unable to create socket", __PRETTY_FUNCTION__);
return NULL;
}
struct sockaddr_in bind_addr;
memset(&bind_addr, 0, sizeof(bind_addr));
bind_addr.sin_family = AF_INET;
if (bind(sender->mSocket,
reinterpret_cast<const sockaddr*>(&bind_addr),
sizeof(bind_addr)) < 0) {
ALOGW("%s unable to bind socket (errno %d)",
__PRETTY_FUNCTION__, errno);
return NULL;
}
sender->mRetryReceiver = new RetryReceiver(sender.get());
if (sender->mRetryReceiver == NULL) {
return NULL;
}
sender->mLooper->setName("AAH_TXSender");
sender->mLooper->registerHandler(sender->mReflector);
sender->mLooper->start(false, false, PRIORITY_AUDIO);
if (sender->mRetryReceiver->run("AAH_TXSenderRetry", PRIORITY_AUDIO)
!= OK) {
ALOGW("%s unable to start retry thread", __PRETTY_FUNCTION__);
return NULL;
}
sInstance = sender;
}
return sender;
}
AAH_TXSender::~AAH_TXSender() {
mLooper->stop();
mLooper->unregisterHandler(mReflector->id());
if (mRetryReceiver != NULL) {
mRetryReceiver->requestExit();
mRetryReceiver->mWakeupEvent.setEvent();
if (mRetryReceiver->requestExitAndWait() != OK) {
ALOGW("%s shutdown of retry receiver failed", __PRETTY_FUNCTION__);
}
mRetryReceiver->mSender = NULL;
mRetryReceiver.clear();
}
if (mSocket != -1) {
close(mSocket);
}
}
// Return the next epoch number usable for a newly instantiated endpoint.
uint32_t AAH_TXSender::getNextEpoch() {
Mutex::Autolock autoLock(sLock);
if (sNextEpochValid) {
sNextEpoch = (sNextEpoch + 1) & TRTPPacket::kTRTPEpochMask;
} else {
sNextEpoch = ns2ms(systemTime()) & TRTPPacket::kTRTPEpochMask;
sNextEpochValid = true;
}
return sNextEpoch;
}
// Notify the sender that a player has started sending to this endpoint.
// Returns a program ID for use by the calling player.
uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) {
Mutex::Autolock lock(mEndpointLock);
EndpointState* eps = mEndpointMap.valueFor(endpoint);
if (eps) {
eps->playerRefCount++;
} else {
eps = new EndpointState(getNextEpoch());
mEndpointMap.add(endpoint, eps);
}
// if this is the first registered endpoint, then send a message to start
// trimming retry buffers and a message to start sending heartbeats.
if (mEndpointMap.size() == 1) {
sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
handlerID());
trimMessage->post(kRetryTrimIntervalUs);
sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
handlerID());
heartbeatMessage->post(kHeartbeatIntervalUs);
}
eps->nextProgramID++;
return eps->nextProgramID;
}
// Notify the sender that a player has ceased sending to this endpoint.
// An endpoint's state can not be deleted until all of the endpoint's
// registered players have called unregisterEndpoint.
void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
Mutex::Autolock lock(mEndpointLock);
EndpointState* eps = mEndpointMap.valueFor(endpoint);
if (eps) {
eps->playerRefCount--;
CHECK(eps->playerRefCount >= 0);
}
}
void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
switch (msg->what()) {
case kWhatSendPacket:
onSendPacket(msg);
break;
case kWhatTrimRetryBuffers:
trimRetryBuffers();
break;
case kWhatSendHeartbeats:
sendHeartbeats();
break;
default:
TRESPASS();
break;
}
}
void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
sp<RefBase> obj;
CHECK(msg->findObject(kSendPacketTRTPPacket, &obj));
sp<TRTPPacket> packet = static_cast<TRTPPacket*>(obj.get());
uint32_t ipAddr;
CHECK(msg->findInt32(kSendPacketIPAddr,
reinterpret_cast<int32_t*>(&ipAddr)));
int32_t port32;
CHECK(msg->findInt32(kSendPacketPort, &port32));
uint16_t port = port32;
Mutex::Autolock lock(mEndpointLock);
doSendPacket_l(packet, Endpoint(ipAddr, port));
mLastSentPacketTime = systemTime();
}
void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
const Endpoint& endpoint) {
EndpointState* eps = mEndpointMap.valueFor(endpoint);
if (!eps) {
// the endpoint state has disappeared, so the player that sent this
// packet must be dead.
return;
}
// assign the packet's sequence number
packet->setEpoch(eps->epoch);
packet->setSeqNumber(eps->trtpSeqNumber++);
// add the packet to the retry buffer
RetryBuffer& retry = eps->retry;
retry.push_back(packet);
// send the packet
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = endpoint.addr;
addr.sin_port = endpoint.port;
ssize_t result = sendto(mSocket,
packet->getPacket(),
packet->getPacketLen(),
0,
(const struct sockaddr *) &addr,
sizeof(addr));
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
}
void AAH_TXSender::trimRetryBuffers() {
Mutex::Autolock lock(mEndpointLock);
nsecs_t localTimeNow = systemTime();
Vector<Endpoint> endpointsToRemove;
for (size_t i = 0; i < mEndpointMap.size(); i++) {
EndpointState* eps = mEndpointMap.editValueAt(i);
RetryBuffer& retry = eps->retry;
while (!retry.isEmpty()) {
if (retry[0]->getExpireTime() < localTimeNow) {
retry.pop_front();
} else {
break;
}
}
if (retry.isEmpty() && eps->playerRefCount == 0) {
endpointsToRemove.add(mEndpointMap.keyAt(i));
}
}
// remove the state for any endpoints that are no longer in use
for (size_t i = 0; i < endpointsToRemove.size(); i++) {
Endpoint& e = endpointsToRemove.editItemAt(i);
ALOGD("*** %s removing endpoint addr=%08x",
__PRETTY_FUNCTION__, e.addr);
size_t index = mEndpointMap.indexOfKey(e);
delete mEndpointMap.valueAt(index);
mEndpointMap.removeItemsAt(index);
}
// schedule the next trim
if (mEndpointMap.size()) {
sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
handlerID());
trimMessage->post(kRetryTrimIntervalUs);
}
}
void AAH_TXSender::sendHeartbeats() {
Mutex::Autolock lock(mEndpointLock);
if (shouldSendHeartbeats_l()) {
for (size_t i = 0; i < mEndpointMap.size(); i++) {
EndpointState* eps = mEndpointMap.editValueAt(i);
const Endpoint& ep = mEndpointMap.keyAt(i);
sp<TRTPControlPacket> packet = new TRTPControlPacket();
packet->setCommandID(TRTPControlPacket::kCommandNop);
packet->setExpireTime(systemTime() +
AAH_TXPlayer::kAAHRetryKeepAroundTimeNs);
packet->pack();
doSendPacket_l(packet, ep);
}
}
// schedule the next heartbeat
if (mEndpointMap.size()) {
sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
handlerID());
heartbeatMessage->post(kHeartbeatIntervalUs);
}
}
bool AAH_TXSender::shouldSendHeartbeats_l() {
// assert(holding endpoint lock)
return (systemTime() < (mLastSentPacketTime + kHeartbeatTimeout));
}
// Receiver
// initial 4-byte ID of a retry request packet
const uint32_t AAH_TXSender::RetryReceiver::kRetryRequestID = 'Treq';
// initial 4-byte ID of a retry NAK packet
const uint32_t AAH_TXSender::RetryReceiver::kRetryNakID = 'Tnak';
// initial 4-byte ID of a fast start request packet
const uint32_t AAH_TXSender::RetryReceiver::kFastStartRequestID = 'Tfst';
AAH_TXSender::RetryReceiver::RetryReceiver(AAH_TXSender* sender)
: Thread(false),
mSender(sender) {}
AAH_TXSender::RetryReceiver::~RetryReceiver() {
mWakeupEvent.clearPendingEvents();
}
// Returns true if val is within the interval bounded inclusively by
// start and end. Also handles the case where there is a rollover of the
// range between start and end.
template <typename T>
static inline bool withinIntervalWithRollover(T val, T start, T end) {
return ((start <= end && val >= start && val <= end) ||
(start > end && (val >= start || val <= end)));
}
bool AAH_TXSender::RetryReceiver::threadLoop() {
struct pollfd pollFds[2];
pollFds[0].fd = mSender->mSocket;
pollFds[0].events = POLLIN;
pollFds[0].revents = 0;
pollFds[1].fd = mWakeupEvent.getWakeupHandle();
pollFds[1].events = POLLIN;
pollFds[1].revents = 0;
int pollResult = poll(pollFds, NELEM(pollFds), -1);
if (pollResult == -1) {
ALOGE("%s poll failed", __PRETTY_FUNCTION__);
return false;
}
if (exitPending()) {
ALOGI("*** %s exiting", __PRETTY_FUNCTION__);
return false;
}
if (pollFds[0].revents) {
handleRetryRequest();
}
return true;
}
void AAH_TXSender::RetryReceiver::handleRetryRequest() {
ALOGV("*** RX %s start", __PRETTY_FUNCTION__);
RetryPacket request;
struct sockaddr requestSrcAddr;
socklen_t requestSrcAddrLen = sizeof(requestSrcAddr);
ssize_t result = recvfrom(mSender->mSocket, &request, sizeof(request), 0,
&requestSrcAddr, &requestSrcAddrLen);
if (result == -1) {
ALOGE("%s recvfrom failed, errno=%d", __PRETTY_FUNCTION__, errno);
return;
}
if (static_cast<size_t>(result) < sizeof(RetryPacket)) {
ALOGW("%s short packet received", __PRETTY_FUNCTION__);
return;
}
uint32_t host_request_id = ntohl(request.id);
if ((host_request_id != kRetryRequestID) &&
(host_request_id != kFastStartRequestID)) {
ALOGW("%s received retry request with bogus ID (%08x)",
__PRETTY_FUNCTION__, host_request_id);
return;
}
Endpoint endpoint(request.endpointIP, request.endpointPort);
Mutex::Autolock lock(mSender->mEndpointLock);
EndpointState* eps = mSender->mEndpointMap.valueFor(endpoint);
if (eps == NULL || eps->retry.isEmpty()) {
// we have no retry buffer or an empty retry buffer for this endpoint,
// so NAK the entire request
RetryPacket nak = request;
nak.id = htonl(kRetryNakID);
result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
&requestSrcAddr, requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
return;
}
RetryBuffer& retry = eps->retry;
uint16_t startSeq = ntohs(request.seqStart);
uint16_t endSeq = ntohs(request.seqEnd);
uint16_t retryFirstSeq = retry[0]->getSeqNumber();
uint16_t retryLastSeq = retry[retry.size() - 1]->getSeqNumber();
// If this is a fast start, then force the start of the retry to match the
// start of the retransmit ring buffer (unless the end of the retransmit
// ring buffer is already past the point of fast start)
if ((host_request_id == kFastStartRequestID) &&
!((startSeq - retryFirstSeq) & 0x8000)) {
startSeq = retryFirstSeq;
}
int startIndex;
if (withinIntervalWithRollover(startSeq, retryFirstSeq, retryLastSeq)) {
startIndex = static_cast<uint16_t>(startSeq - retryFirstSeq);
} else {
startIndex = -1;
}
int endIndex;
if (withinIntervalWithRollover(endSeq, retryFirstSeq, retryLastSeq)) {
endIndex = static_cast<uint16_t>(endSeq - retryFirstSeq);
} else {
endIndex = -1;
}
if (startIndex == -1 && endIndex == -1) {
// no part of the request range is found in the retry buffer
RetryPacket nak = request;
nak.id = htonl(kRetryNakID);
result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
&requestSrcAddr, requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
return;
}
if (startIndex == -1) {
// NAK a subrange at the front of the request range
RetryPacket nak = request;
nak.id = htonl(kRetryNakID);
nak.seqEnd = htons(retryFirstSeq - 1);
result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
&requestSrcAddr, requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
startIndex = 0;
} else if (endIndex == -1) {
// NAK a subrange at the back of the request range
RetryPacket nak = request;
nak.id = htonl(kRetryNakID);
nak.seqStart = htons(retryLastSeq + 1);
result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
&requestSrcAddr, requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
endIndex = retry.size() - 1;
}
// send the retry packets
for (int i = startIndex; i <= endIndex; i++) {
const sp<TRTPPacket>& replyPacket = retry[i];
result = sendto(mSender->mSocket,
replyPacket->getPacket(),
replyPacket->getPacketLen(),
0,
&requestSrcAddr,
requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
}
}
// Endpoint
AAH_TXSender::Endpoint::Endpoint()
: addr(0)
, port(0) { }
AAH_TXSender::Endpoint::Endpoint(uint32_t a, uint16_t p)
: addr(a)
, port(p) {}
bool AAH_TXSender::Endpoint::operator<(const Endpoint& other) const {
return ((addr < other.addr) ||
(addr == other.addr && port < other.port));
}
// EndpointState
AAH_TXSender::EndpointState::EndpointState(uint32_t _epoch)
: retry(kRetryBufferCapacity)
, playerRefCount(1)
, trtpSeqNumber(0)
, nextProgramID(0)
, epoch(_epoch) { }
// CircularBuffer
template <typename T>
CircularBuffer<T>::CircularBuffer(size_t capacity)
: mCapacity(capacity)
, mHead(0)
, mTail(0)
, mFillCount(0) {
mBuffer = new T[capacity];
}
template <typename T>
CircularBuffer<T>::~CircularBuffer() {
delete [] mBuffer;
}
template <typename T>
void CircularBuffer<T>::push_back(const T& item) {
if (this->isFull()) {
this->pop_front();
}
mBuffer[mHead] = item;
mHead = (mHead + 1) % mCapacity;
mFillCount++;
}
template <typename T>
void CircularBuffer<T>::pop_front() {
CHECK(!isEmpty());
mBuffer[mTail] = T();
mTail = (mTail + 1) % mCapacity;
mFillCount--;
}
template <typename T>
size_t CircularBuffer<T>::size() const {
return mFillCount;
}
template <typename T>
bool CircularBuffer<T>::isFull() const {
return (mFillCount == mCapacity);
}
template <typename T>
bool CircularBuffer<T>::isEmpty() const {
return (mFillCount == 0);
}
template <typename T>
const T& CircularBuffer<T>::itemAt(size_t index) const {
CHECK(index < mFillCount);
return mBuffer[(mTail + index) % mCapacity];
}
template <typename T>
const T& CircularBuffer<T>::operator[](size_t index) const {
return itemAt(index);
}
} // namespace android