/*
 * Copyright (C) 2011 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "LibAAH_RTP"
#include <utils/Log.h>

#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <netdb.h>
#include <netinet/ip.h>

#include <common_time/cc_helper.h>
#include <media/IMediaPlayer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/FileSource.h>
#include <media/stagefright/MediaBuffer.h>
#include <media/stagefright/MediaDefs.h>
#include <media/stagefright/MetaData.h>
#include <utils/Timers.h>

#include "aah_tx_packet.h"
#include "aah_tx_player.h"

namespace android {

static int64_t kLowWaterMarkUs = 2000000ll;  // 2secs
static int64_t kHighWaterMarkUs = 10000000ll;  // 10secs
static const size_t kLowWaterMarkBytes = 40000;
static const size_t kHighWaterMarkBytes = 200000;

// When we start up, how much lead time should we put on the first access unit?
static const int64_t kAAHStartupLeadTimeUs = 300000LL;

// How much time do we attempt to lead the clock by in steady state?
static const int64_t kAAHBufferTimeUs = 1000000LL;

// how long do we keep data in our retransmit buffer after sending it.
const int64_t AAH_TXPlayer::kAAHRetryKeepAroundTimeNs =
    kAAHBufferTimeUs * 1100;

sp<MediaPlayerBase> createAAH_TXPlayer() {
    sp<MediaPlayerBase> ret = new AAH_TXPlayer();
    return ret;
}

template <typename T> static T clamp(T val, T min, T max) {
    if (val < min) {
        return min;
    } else if (val > max) {
        return max;
    } else {
        return val;
    }
}

struct AAH_TXEvent : public TimedEventQueue::Event {
    AAH_TXEvent(AAH_TXPlayer *player,
                void (AAH_TXPlayer::*method)()) : mPlayer(player)
                                                , mMethod(method) {}

  protected:
    virtual ~AAH_TXEvent() {}

    virtual void fire(TimedEventQueue *queue, int64_t /* now_us */) {
        (mPlayer->*mMethod)();
    }

  private:
    AAH_TXPlayer *mPlayer;
    void (AAH_TXPlayer::*mMethod)();

    AAH_TXEvent(const AAH_TXEvent &);
    AAH_TXEvent& operator=(const AAH_TXEvent &);
};

AAH_TXPlayer::AAH_TXPlayer()
        : mQueueStarted(false)
        , mFlags(0)
        , mExtractorFlags(0) {
    DataSource::RegisterDefaultSniffers();

    mBufferingEvent = new AAH_TXEvent(this, &AAH_TXPlayer::onBufferingUpdate);
    mBufferingEventPending = false;

    mPumpAudioEvent = new AAH_TXEvent(this, &AAH_TXPlayer::onPumpAudio);
    mPumpAudioEventPending = false;

    mAudioCodecData = NULL;

    reset_l();
}

AAH_TXPlayer::~AAH_TXPlayer() {
    if (mQueueStarted) {
        mQueue.stop();
    }

    reset_l();
}

void AAH_TXPlayer::cancelPlayerEvents(bool keepBufferingGoing) {
    if (!keepBufferingGoing) {
        mQueue.cancelEvent(mBufferingEvent->eventID());
        mBufferingEventPending = false;

        mQueue.cancelEvent(mPumpAudioEvent->eventID());
        mPumpAudioEventPending = false;
    }
}

status_t AAH_TXPlayer::initCheck() {
    // Check for the presense of the common time service by attempting to query
    // for CommonTime's frequency.  If we get an error back, we cannot talk to
    // the service at all and should abort now.
    status_t res;
    uint64_t freq;
    res = mCCHelper.getCommonFreq(&freq);
    if (OK != res) {
        ALOGE("Failed to connect to common time service! (res %d)", res);
        return res;
    }

    return OK;
}

status_t AAH_TXPlayer::setDataSource(
        const char *url,
        const KeyedVector<String8, String8> *headers) {
    Mutex::Autolock autoLock(mLock);
    return setDataSource_l(url, headers);
}

status_t AAH_TXPlayer::setDataSource_l(
        const char *url,
        const KeyedVector<String8, String8> *headers) {
    reset_l();

    mUri.setTo(url);

    if (headers) {
        mUriHeaders = *headers;

        ssize_t index = mUriHeaders.indexOfKey(String8("x-hide-urls-from-log"));
        if (index >= 0) {
            // Browser is in "incognito" mode, suppress logging URLs.

            // This isn't something that should be passed to the server.
            mUriHeaders.removeItemsAt(index);

            mFlags |= INCOGNITO;
        }
    }

    // The URL may optionally contain a "#" character followed by a Skyjam
    // cookie.  Ideally the cookie header should just be passed in the headers
    // argument, but the Java API for supplying headers is apparently not yet
    // exposed in the SDK used by application developers.
    const char kSkyjamCookieDelimiter = '#';
    char* skyjamCookie = strrchr(mUri.string(), kSkyjamCookieDelimiter);
    if (skyjamCookie) {
        skyjamCookie++;
        mUriHeaders.add(String8("Cookie"), String8(skyjamCookie));
        mUri = String8(mUri.string(), skyjamCookie - mUri.string());
    }

    return OK;
}

status_t AAH_TXPlayer::setDataSource(int fd, int64_t offset, int64_t length) {
    Mutex::Autolock autoLock(mLock);

    reset_l();

    sp<DataSource> dataSource = new FileSource(dup(fd), offset, length);

    status_t err = dataSource->initCheck();

    if (err != OK) {
        return err;
    }

    mFileSource = dataSource;

    sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);

    if (extractor == NULL) {
        return UNKNOWN_ERROR;
    }

    return setDataSource_l(extractor);
}

status_t AAH_TXPlayer::setVideoSurface(const sp<Surface>& surface) {
    return OK;
}

status_t AAH_TXPlayer::setVideoSurfaceTexture(
        const sp<ISurfaceTexture>& surfaceTexture) {
    return OK;
}

status_t AAH_TXPlayer::prepare() {
    return INVALID_OPERATION;
}

status_t AAH_TXPlayer::prepareAsync() {
    Mutex::Autolock autoLock(mLock);

    return prepareAsync_l();
}

status_t AAH_TXPlayer::prepareAsync_l() {
    if (mFlags & PREPARING) {
        return UNKNOWN_ERROR;  // async prepare already pending
    }

    mAAH_Sender = AAH_TXSender::GetInstance();
    if (mAAH_Sender == NULL) {
        return NO_MEMORY;
    }

    if (!mQueueStarted) {
        mQueue.start();
        mQueueStarted = true;
    }

    mFlags |= PREPARING;
    mAsyncPrepareEvent = new AAH_TXEvent(
            this, &AAH_TXPlayer::onPrepareAsyncEvent);

    mQueue.postEvent(mAsyncPrepareEvent);

    return OK;
}

status_t AAH_TXPlayer::finishSetDataSource_l() {
    sp<DataSource> dataSource;

    if (!strncasecmp("http://",  mUri.string(), 7) ||
        !strncasecmp("https://", mUri.string(), 8)) {

        mConnectingDataSource = HTTPBase::Create(
                (mFlags & INCOGNITO)
                    ? HTTPBase::kFlagIncognito
                    : 0);

        mLock.unlock();
        status_t err = mConnectingDataSource->connect(mUri, &mUriHeaders);
        mLock.lock();

        if (err != OK) {
            mConnectingDataSource.clear();

            ALOGI("mConnectingDataSource->connect() returned %d", err);
            return err;
        }

        mCachedSource = new NuCachedSource2(mConnectingDataSource);
        mConnectingDataSource.clear();

        dataSource = mCachedSource;

        // We're going to prefill the cache before trying to instantiate
        // the extractor below, as the latter is an operation that otherwise
        // could block on the datasource for a significant amount of time.
        // During that time we'd be unable to abort the preparation phase
        // without this prefill.

        mLock.unlock();

        for (;;) {
            status_t finalStatus;
            size_t cachedDataRemaining =
                mCachedSource->approxDataRemaining(&finalStatus);

            if (finalStatus != OK ||
                cachedDataRemaining >= kHighWaterMarkBytes ||
                (mFlags & PREPARE_CANCELLED)) {
                break;
            }

            usleep(200000);
        }

        mLock.lock();

        if (mFlags & PREPARE_CANCELLED) {
            ALOGI("Prepare cancelled while waiting for initial cache fill.");
            return UNKNOWN_ERROR;
        }
    } else {
        dataSource = DataSource::CreateFromURI(mUri.string(), &mUriHeaders);
    }

    if (dataSource == NULL) {
        return UNKNOWN_ERROR;
    }

    sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);

    if (extractor == NULL) {
        return UNKNOWN_ERROR;
    }

    return setDataSource_l(extractor);
}

status_t AAH_TXPlayer::setDataSource_l(const sp<MediaExtractor> &extractor) {
    // Attempt to approximate overall stream bitrate by summing all
    // tracks' individual bitrates, if not all of them advertise bitrate,
    // we have to fail.

    int64_t totalBitRate = 0;

    for (size_t i = 0; i < extractor->countTracks(); ++i) {
        sp<MetaData> meta = extractor->getTrackMetaData(i);

        int32_t bitrate;
        if (!meta->findInt32(kKeyBitRate, &bitrate)) {
            totalBitRate = -1;
            break;
        }

        totalBitRate += bitrate;
    }

    mBitrate = totalBitRate;

    ALOGV("mBitrate = %lld bits/sec", mBitrate);

    bool haveAudio = false;
    for (size_t i = 0; i < extractor->countTracks(); ++i) {
        sp<MetaData> meta = extractor->getTrackMetaData(i);

        const char *mime;
        CHECK(meta->findCString(kKeyMIMEType, &mime));

        if (!strncasecmp(mime, "audio/", 6)) {
            mAudioSource = extractor->getTrack(i);
            CHECK(mAudioSource != NULL);
            haveAudio = true;
            break;
        }
    }

    if (!haveAudio) {
        return UNKNOWN_ERROR;
    }

    mExtractorFlags = extractor->flags();

    return OK;
}

void AAH_TXPlayer::abortPrepare(status_t err) {
    CHECK(err != OK);

    notifyListener_l(MEDIA_ERROR, MEDIA_ERROR_UNKNOWN, err);

    mPrepareResult = err;
    mFlags &= ~(PREPARING|PREPARE_CANCELLED|PREPARING_CONNECTED);
    mPreparedCondition.broadcast();
}

void AAH_TXPlayer::onPrepareAsyncEvent() {
    Mutex::Autolock autoLock(mLock);

    if (mFlags & PREPARE_CANCELLED) {
        ALOGI("prepare was cancelled before doing anything");
        abortPrepare(UNKNOWN_ERROR);
        return;
    }

    if (mUri.size() > 0) {
        status_t err = finishSetDataSource_l();

        if (err != OK) {
            abortPrepare(err);
            return;
        }
    }

    mAudioFormat = mAudioSource->getFormat();
    if (!mAudioFormat->findInt64(kKeyDuration, &mDurationUs))
        mDurationUs = 1;

    const char* mime_type = NULL;
    if (!mAudioFormat->findCString(kKeyMIMEType, &mime_type)) {
        ALOGE("Failed to find audio substream MIME type during prepare.");
        abortPrepare(BAD_VALUE);
        return;
    }

    if (!strcmp(mime_type, MEDIA_MIMETYPE_AUDIO_MPEG)) {
        mAudioCodec = TRTPAudioPacket::kCodecMPEG1Audio;
    } else
    if (!strcmp(mime_type, MEDIA_MIMETYPE_AUDIO_AAC)) {
        mAudioCodec = TRTPAudioPacket::kCodecAACAudio;

        uint32_t type;
        int32_t  sample_rate;
        int32_t  channel_count;
        const void* esds_data;
        size_t esds_len;

        if (!mAudioFormat->findInt32(kKeySampleRate, &sample_rate)) {
            ALOGE("Failed to find sample rate for AAC substream.");
            abortPrepare(BAD_VALUE);
            return;
        }

        if (!mAudioFormat->findInt32(kKeyChannelCount, &channel_count)) {
            ALOGE("Failed to find channel count for AAC substream.");
            abortPrepare(BAD_VALUE);
            return;
        }

        if (!mAudioFormat->findData(kKeyESDS, &type, &esds_data, &esds_len)) {
            ALOGE("Failed to find codec init data for AAC substream.");
            abortPrepare(BAD_VALUE);
            return;
        }

        CHECK(NULL == mAudioCodecData);
        mAudioCodecDataSize = esds_len
                            + sizeof(sample_rate)
                            + sizeof(channel_count);
        mAudioCodecData = new uint8_t[mAudioCodecDataSize];
        if (NULL == mAudioCodecData) {
            ALOGE("Failed to allocate %u bytes for AAC substream codec aux"
                  " data.", mAudioCodecDataSize);
            mAudioCodecDataSize = 0;
            abortPrepare(BAD_VALUE);
            return;
        }

        uint8_t* tmp = mAudioCodecData;
        tmp[0] = static_cast<uint8_t>((sample_rate   >> 24) & 0xFF);
        tmp[1] = static_cast<uint8_t>((sample_rate   >> 16) & 0xFF);
        tmp[2] = static_cast<uint8_t>((sample_rate   >>  8) & 0xFF);
        tmp[3] = static_cast<uint8_t>((sample_rate        ) & 0xFF);
        tmp[4] = static_cast<uint8_t>((channel_count >> 24) & 0xFF);
        tmp[5] = static_cast<uint8_t>((channel_count >> 16) & 0xFF);
        tmp[6] = static_cast<uint8_t>((channel_count >>  8) & 0xFF);
        tmp[7] = static_cast<uint8_t>((channel_count      ) & 0xFF);

        memcpy(tmp + 8, esds_data, esds_len);
    } else {
        ALOGE("Unsupported MIME type \"%s\" in audio substream", mime_type);
        abortPrepare(BAD_VALUE);
        return;
    }

    status_t err = mAudioSource->start();
    if (err != OK) {
        ALOGI("failed to start audio source, err=%d", err);
        abortPrepare(err);
        return;
    }

    mFlags |= PREPARING_CONNECTED;

    if (mCachedSource != NULL) {
        postBufferingEvent_l();
    } else {
        finishAsyncPrepare_l();
    }
}

void AAH_TXPlayer::finishAsyncPrepare_l() {
    notifyListener_l(MEDIA_PREPARED);

    mPrepareResult = OK;
    mFlags &= ~(PREPARING|PREPARE_CANCELLED|PREPARING_CONNECTED);
    mFlags |= PREPARED;
    mPreparedCondition.broadcast();
}

status_t AAH_TXPlayer::start() {
    Mutex::Autolock autoLock(mLock);

    mFlags &= ~CACHE_UNDERRUN;

    return play_l();
}

status_t AAH_TXPlayer::play_l() {
    if (mFlags & PLAYING) {
        return OK;
    }

    if (!(mFlags & PREPARED)) {
        return INVALID_OPERATION;
    }

    {
        Mutex::Autolock lock(mEndpointLock);
        if (!mEndpointValid) {
            return INVALID_OPERATION;
        }
        if (!mEndpointRegistered) {
            mProgramID = mAAH_Sender->registerEndpoint(mEndpoint);
            mEndpointRegistered = true;
        }
    }

    mFlags |= PLAYING;

    updateClockTransform_l(false);

    postPumpAudioEvent_l(-1);

    return OK;
}

status_t AAH_TXPlayer::stop() {
    status_t ret = pause();
    sendEOS_l();
    return ret;
}

status_t AAH_TXPlayer::pause() {
    Mutex::Autolock autoLock(mLock);

    mFlags &= ~CACHE_UNDERRUN;

    return pause_l();
}

status_t AAH_TXPlayer::pause_l(bool doClockUpdate) {
    if (!(mFlags & PLAYING)) {
        return OK;
    }

    cancelPlayerEvents(true /* keepBufferingGoing */);

    mFlags &= ~PLAYING;

    if (doClockUpdate) {
        updateClockTransform_l(true);
    }

    return OK;
}

void AAH_TXPlayer::updateClockTransform_l(bool pause) {
    // record the new pause status so that onPumpAudio knows what rate to apply
    // when it initializes the transform
    mPlayRateIsPaused = pause;

    // if we haven't yet established a valid clock transform, then we can't
    // do anything here
    if (!mCurrentClockTransformValid) {
        return;
    }

    // sample the current common time
    int64_t commonTimeNow;
    if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
        ALOGE("updateClockTransform_l get common time failed");
        mCurrentClockTransformValid = false;
        return;
    }

    // convert the current common time to media time using the old
    // transform
    int64_t mediaTimeNow;
    if (!mCurrentClockTransform.doReverseTransform(
            commonTimeNow, &mediaTimeNow)) {
        ALOGE("updateClockTransform_l reverse transform failed");
        mCurrentClockTransformValid = false;
        return;
    }

    // calculate a new transform that preserves the old transform's
    // result for the current time
    mCurrentClockTransform.a_zero = mediaTimeNow;
    mCurrentClockTransform.b_zero = commonTimeNow;
    mCurrentClockTransform.a_to_b_numer = 1;
    mCurrentClockTransform.a_to_b_denom = pause ? 0 : 1;

    // send a packet announcing the new transform
    sp<TRTPControlPacket> packet = new TRTPControlPacket();
    packet->setClockTransform(mCurrentClockTransform);
    packet->setCommandID(TRTPControlPacket::kCommandNop);
    queuePacketToSender_l(packet);
}

void AAH_TXPlayer::sendEOS_l() {
    sp<TRTPControlPacket> packet = new TRTPControlPacket();
    packet->setCommandID(TRTPControlPacket::kCommandEOS);
    queuePacketToSender_l(packet);
}

bool AAH_TXPlayer::isPlaying() {
    return (mFlags & PLAYING) || (mFlags & CACHE_UNDERRUN);
}

status_t AAH_TXPlayer::seekTo(int msec) {
    if (mExtractorFlags & MediaExtractor::CAN_SEEK) {
        Mutex::Autolock autoLock(mLock);
        return seekTo_l(static_cast<int64_t>(msec) * 1000);
    }

    notifyListener_l(MEDIA_SEEK_COMPLETE);
    return OK;
}

status_t AAH_TXPlayer::seekTo_l(int64_t timeUs) {
    mIsSeeking = true;
    mSeekTimeUs = timeUs;

    mCurrentClockTransformValid = false;
    mLastQueuedMediaTimePTSValid = false;

    // send a flush command packet
    sp<TRTPControlPacket> packet = new TRTPControlPacket();
    packet->setCommandID(TRTPControlPacket::kCommandFlush);
    queuePacketToSender_l(packet);

    return OK;
}

status_t AAH_TXPlayer::getCurrentPosition(int *msec) {
    if (!msec) {
        return BAD_VALUE;
    }

    Mutex::Autolock lock(mLock);

    int position;

    if (mIsSeeking) {
        position = mSeekTimeUs / 1000;
    } else if (mCurrentClockTransformValid) {
        // sample the current common time
        int64_t commonTimeNow;
        if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
            ALOGE("getCurrentPosition get common time failed");
            return INVALID_OPERATION;
        }

        int64_t mediaTimeNow;
        if (!mCurrentClockTransform.doReverseTransform(commonTimeNow,
                    &mediaTimeNow)) {
            ALOGE("getCurrentPosition reverse transform failed");
            return INVALID_OPERATION;
        }

        position = static_cast<int>(mediaTimeNow / 1000);
    } else {
        position = 0;
    }

    int duration;
    if (getDuration_l(&duration) == OK) {
        *msec = clamp(position, 0, duration);
    } else {
        *msec = (position >= 0) ? position : 0;
    }

    return OK;
}

status_t AAH_TXPlayer::getDuration(int* msec) {
    if (!msec) {
        return BAD_VALUE;
    }

    Mutex::Autolock lock(mLock);

    return getDuration_l(msec);
}

status_t AAH_TXPlayer::getDuration_l(int* msec) {
    if (mDurationUs < 0) {
        return UNKNOWN_ERROR;
    }

    *msec = (mDurationUs + 500) / 1000;

    return OK;
}

status_t AAH_TXPlayer::reset() {
    Mutex::Autolock autoLock(mLock);
    reset_l();
    return OK;
}

void AAH_TXPlayer::reset_l() {
    if (mFlags & PREPARING) {
        mFlags |= PREPARE_CANCELLED;
        if (mConnectingDataSource != NULL) {
            ALOGI("interrupting the connection process");
            mConnectingDataSource->disconnect();
        }

        if (mFlags & PREPARING_CONNECTED) {
            // We are basically done preparing, we're just buffering
            // enough data to start playback, we can safely interrupt that.
            finishAsyncPrepare_l();
        }
    }

    while (mFlags & PREPARING) {
        mPreparedCondition.wait(mLock);
    }

    cancelPlayerEvents();

    sendEOS_l();

    mCachedSource.clear();

    if (mAudioSource != NULL) {
        mAudioSource->stop();
    }
    mAudioSource.clear();
    mAudioCodec = TRTPAudioPacket::kCodecInvalid;
    mAudioFormat = NULL;
    delete[] mAudioCodecData;
    mAudioCodecData = NULL;
    mAudioCodecDataSize = 0;

    mFlags = 0;
    mExtractorFlags = 0;

    mDurationUs = -1;
    mIsSeeking = false;
    mSeekTimeUs = 0;

    mUri.setTo("");
    mUriHeaders.clear();

    mFileSource.clear();

    mBitrate = -1;

    {
        Mutex::Autolock lock(mEndpointLock);
        if (mAAH_Sender != NULL && mEndpointRegistered) {
            mAAH_Sender->unregisterEndpoint(mEndpoint);
        }
        mEndpointRegistered = false;
        mEndpointValid = false;
    }

    mProgramID = 0;

    mAAH_Sender.clear();
    mLastQueuedMediaTimePTSValid = false;
    mCurrentClockTransformValid = false;
    mPlayRateIsPaused = false;

    mTRTPVolume = 255;
}

status_t AAH_TXPlayer::setLooping(int loop) {
    return OK;
}

player_type AAH_TXPlayer::playerType() {
    return AAH_TX_PLAYER;
}

status_t AAH_TXPlayer::setParameter(int key, const Parcel &request) {
    return ERROR_UNSUPPORTED;
}

status_t AAH_TXPlayer::getParameter(int key, Parcel *reply) {
    return ERROR_UNSUPPORTED;
}

status_t AAH_TXPlayer::invoke(const Parcel& request, Parcel *reply) {
    return INVALID_OPERATION;
}

status_t AAH_TXPlayer::getMetadata(const media::Metadata::Filter& ids,
                                   Parcel* records) {
    using media::Metadata;

    Metadata metadata(records);

    metadata.appendBool(Metadata::kPauseAvailable, true);
    metadata.appendBool(Metadata::kSeekBackwardAvailable, false);
    metadata.appendBool(Metadata::kSeekForwardAvailable, false);
    metadata.appendBool(Metadata::kSeekAvailable, false);

    return OK;
}

status_t AAH_TXPlayer::setVolume(float leftVolume, float rightVolume) {
    if (leftVolume != rightVolume) {
        ALOGE("%s does not support per channel volume: %f, %f",
              __PRETTY_FUNCTION__, leftVolume, rightVolume);
    }

    float volume = clamp(leftVolume, 0.0f, 1.0f);

    Mutex::Autolock lock(mLock);
    mTRTPVolume = static_cast<uint8_t>((leftVolume * 255.0) + 0.5);

    return OK;
}

status_t AAH_TXPlayer::setAudioStreamType(audio_stream_type_t streamType) {
    return OK;
}

status_t AAH_TXPlayer::setRetransmitEndpoint(
        const struct sockaddr_in* endpoint) {
    Mutex::Autolock lock(mLock);

    if (NULL == endpoint)
        return BAD_VALUE;

    // Once the endpoint has been registered, it may not be changed.
    if (mEndpointRegistered)
        return INVALID_OPERATION;

    mEndpoint.addr = endpoint->sin_addr.s_addr;
    mEndpoint.port = endpoint->sin_port;
    mEndpointValid = true;

    return OK;
}

void AAH_TXPlayer::notifyListener_l(int msg, int ext1, int ext2) {
    sendEvent(msg, ext1, ext2);
}

bool AAH_TXPlayer::getBitrate_l(int64_t *bitrate) {
    off64_t size;
    if (mDurationUs >= 0 &&
        mCachedSource != NULL &&
        mCachedSource->getSize(&size) == OK) {
        *bitrate = size * 8000000ll / mDurationUs;  // in bits/sec
        return true;
    }

    if (mBitrate >= 0) {
        *bitrate = mBitrate;
        return true;
    }

    *bitrate = 0;

    return false;
}

// Returns true iff cached duration is available/applicable.
bool AAH_TXPlayer::getCachedDuration_l(int64_t *durationUs, bool *eos) {
    int64_t bitrate;

    if (mCachedSource != NULL && getBitrate_l(&bitrate)) {
        status_t finalStatus;
        size_t cachedDataRemaining = mCachedSource->approxDataRemaining(
                                        &finalStatus);
        *durationUs = cachedDataRemaining * 8000000ll / bitrate;
        *eos = (finalStatus != OK);
        return true;
    }

    return false;
}

void AAH_TXPlayer::ensureCacheIsFetching_l() {
    if (mCachedSource != NULL) {
        mCachedSource->resumeFetchingIfNecessary();
    }
}

void AAH_TXPlayer::postBufferingEvent_l() {
    if (mBufferingEventPending) {
        return;
    }
    mBufferingEventPending = true;
    mQueue.postEventWithDelay(mBufferingEvent, 1000000ll);
}

void AAH_TXPlayer::postPumpAudioEvent_l(int64_t delayUs) {
    if (mPumpAudioEventPending) {
        return;
    }
    mPumpAudioEventPending = true;
    mQueue.postEventWithDelay(mPumpAudioEvent, delayUs < 0 ? 10000 : delayUs);
}

void AAH_TXPlayer::onBufferingUpdate() {
    Mutex::Autolock autoLock(mLock);
    if (!mBufferingEventPending) {
        return;
    }
    mBufferingEventPending = false;

    if (mCachedSource != NULL) {
        status_t finalStatus;
        size_t cachedDataRemaining = mCachedSource->approxDataRemaining(
                                        &finalStatus);
        bool eos = (finalStatus != OK);

        if (eos) {
            if (finalStatus == ERROR_END_OF_STREAM) {
                notifyListener_l(MEDIA_BUFFERING_UPDATE, 100);
            }
            if (mFlags & PREPARING) {
                ALOGV("cache has reached EOS, prepare is done.");
                finishAsyncPrepare_l();
            }
        } else {
            int64_t bitrate;
            if (getBitrate_l(&bitrate)) {
                size_t cachedSize = mCachedSource->cachedSize();
                int64_t cachedDurationUs = cachedSize * 8000000ll / bitrate;

                int percentage = (100.0 * (double) cachedDurationUs)
                               / mDurationUs;
                if (percentage > 100) {
                    percentage = 100;
                }

                notifyListener_l(MEDIA_BUFFERING_UPDATE, percentage);
            } else {
                // We don't know the bitrate of the stream, use absolute size
                // limits to maintain the cache.

                if ((mFlags & PLAYING) &&
                    !eos &&
                    (cachedDataRemaining < kLowWaterMarkBytes)) {
                    ALOGI("cache is running low (< %d) , pausing.",
                          kLowWaterMarkBytes);
                    mFlags |= CACHE_UNDERRUN;
                    pause_l();
                    ensureCacheIsFetching_l();
                    notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_START);
                } else if (eos || cachedDataRemaining > kHighWaterMarkBytes) {
                    if (mFlags & CACHE_UNDERRUN) {
                        ALOGI("cache has filled up (> %d), resuming.",
                              kHighWaterMarkBytes);
                        mFlags &= ~CACHE_UNDERRUN;
                        play_l();
                        notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_END);
                    } else if (mFlags & PREPARING) {
                        ALOGV("cache has filled up (> %d), prepare is done",
                              kHighWaterMarkBytes);
                        finishAsyncPrepare_l();
                    }
                }
            }
        }
    }

    int64_t cachedDurationUs;
    bool eos;
    if (getCachedDuration_l(&cachedDurationUs, &eos)) {
        ALOGV("cachedDurationUs = %.2f secs, eos=%d",
              cachedDurationUs / 1E6, eos);

        if ((mFlags & PLAYING) &&
            !eos &&
            (cachedDurationUs < kLowWaterMarkUs)) {
            ALOGI("cache is running low (%.2f secs) , pausing.",
                  cachedDurationUs / 1E6);
            mFlags |= CACHE_UNDERRUN;
            pause_l();
            ensureCacheIsFetching_l();
            notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_START);
        } else if (eos || cachedDurationUs > kHighWaterMarkUs) {
            if (mFlags & CACHE_UNDERRUN) {
                ALOGI("cache has filled up (%.2f secs), resuming.",
                      cachedDurationUs / 1E6);
                mFlags &= ~CACHE_UNDERRUN;
                play_l();
                notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_END);
            } else if (mFlags & PREPARING) {
                ALOGV("cache has filled up (%.2f secs), prepare is done",
                        cachedDurationUs / 1E6);
                finishAsyncPrepare_l();
            }
        }
    }

    postBufferingEvent_l();
}

void AAH_TXPlayer::onPumpAudio() {
    while (true) {
        Mutex::Autolock autoLock(mLock);
        // If this flag is clear, its because someone has externally canceled
        // this pump operation (probably because we a resetting/shutting down).
        // Get out immediately, do not reschedule ourselves.
        if (!mPumpAudioEventPending) {
            return;
        }

        // Start by checking if there is still work to be doing.  If we have
        // never queued a payload (so we don't know what the last queued PTS is)
        // or we have never established a MediaTime->CommonTime transformation,
        // then we have work to do (one time through this loop should establish
        // both).  Otherwise, we want to keep a fixed amt of presentation time
        // worth of data buffered.  If we cannot get common time (service is
        // unavailable, or common time is undefined)) then we don't have a lot
        // of good options here.  For now, signal an error up to the app level
        // and shut down the transmission pump.
        int64_t commonTimeNow;
        if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
            // Failed to get common time; either the service is down or common
            // time is not synced.  Raise an error and shutdown the player.
            ALOGE("*** Cannot pump audio, unable to fetch common time."
                  "  Shutting down.");
            notifyListener_l(MEDIA_ERROR, MEDIA_ERROR_UNKNOWN, UNKNOWN_ERROR);
            mPumpAudioEventPending = false;
            break;
        }

        if (mCurrentClockTransformValid && mLastQueuedMediaTimePTSValid) {
            int64_t mediaTimeNow;
            bool conversionResult = mCurrentClockTransform.doReverseTransform(
                                        commonTimeNow,
                                        &mediaTimeNow);
            CHECK(conversionResult);

            if ((mediaTimeNow +
                 kAAHBufferTimeUs -
                 mLastQueuedMediaTimePTS) <= 0) {
                break;
            }
        }

        MediaSource::ReadOptions options;
        if (mIsSeeking) {
            options.setSeekTo(mSeekTimeUs);
        }

        MediaBuffer* mediaBuffer;
        status_t err = mAudioSource->read(&mediaBuffer, &options);
        if (err != NO_ERROR) {
            if (err == ERROR_END_OF_STREAM) {
                ALOGI("*** %s reached end of stream", __PRETTY_FUNCTION__);
                notifyListener_l(MEDIA_BUFFERING_UPDATE, 100);
                notifyListener_l(MEDIA_PLAYBACK_COMPLETE);
                pause_l(false);
                sendEOS_l();
            } else {
                ALOGE("*** %s read failed err=%d", __PRETTY_FUNCTION__, err);
            }
            return;
        }

        if (mIsSeeking) {
            mIsSeeking = false;
            notifyListener_l(MEDIA_SEEK_COMPLETE);
        }

        uint8_t* data = (static_cast<uint8_t*>(mediaBuffer->data()) +
                mediaBuffer->range_offset());
        ALOGV("*** %s got media buffer data=[%02hhx %02hhx %02hhx %02hhx]"
              " offset=%d length=%d", __PRETTY_FUNCTION__,
              data[0], data[1], data[2], data[3],
              mediaBuffer->range_offset(), mediaBuffer->range_length());

        int64_t mediaTimeUs;
        CHECK(mediaBuffer->meta_data()->findInt64(kKeyTime, &mediaTimeUs));
        ALOGV("*** timeUs=%lld", mediaTimeUs);

        if (!mCurrentClockTransformValid) {
            if (OK == mCCHelper.getCommonTime(&commonTimeNow)) {
                mCurrentClockTransform.a_zero = mediaTimeUs;
                mCurrentClockTransform.b_zero = commonTimeNow +
                                                kAAHStartupLeadTimeUs;
                mCurrentClockTransform.a_to_b_numer = 1;
                mCurrentClockTransform.a_to_b_denom = mPlayRateIsPaused ? 0 : 1;
                mCurrentClockTransformValid = true;
            } else {
                // Failed to get common time; either the service is down or
                // common time is not synced.  Raise an error and shutdown the
                // player.
                ALOGE("*** Cannot begin transmission, unable to fetch common"
                      " time. Dropping sample with pts=%lld", mediaTimeUs);
                notifyListener_l(MEDIA_ERROR,
                                 MEDIA_ERROR_UNKNOWN,
                                 UNKNOWN_ERROR);
                mPumpAudioEventPending = false;
                break;
            }
        }

        ALOGV("*** transmitting packet with pts=%lld", mediaTimeUs);

        sp<TRTPAudioPacket> packet = new TRTPAudioPacket();
        packet->setPTS(mediaTimeUs);
        packet->setSubstreamID(1);

        packet->setCodecType(mAudioCodec);
        packet->setVolume(mTRTPVolume);
        // TODO : introduce a throttle for this so we can control the
        // frequency with which transforms get sent.
        packet->setClockTransform(mCurrentClockTransform);
        packet->setAccessUnitData(data, mediaBuffer->range_length());

        // TODO : while its pretty much universally true that audio ES payloads
        // are all RAPs across all codecs, it might be a good idea to throttle
        // the frequency with which we send codec out of band data to the RXers.
        // If/when we do, we need to flag only those payloads which have
        // required out of band data attached to them as RAPs.
        packet->setRandomAccessPoint(true);

        if (mAudioCodecData && mAudioCodecDataSize) {
            packet->setAuxData(mAudioCodecData, mAudioCodecDataSize);
        }

        queuePacketToSender_l(packet);
        mediaBuffer->release();

        mLastQueuedMediaTimePTSValid = true;
        mLastQueuedMediaTimePTS = mediaTimeUs;
    }

    { // Explicit scope for the autolock pattern.
        Mutex::Autolock autoLock(mLock);

        // If someone externally has cleared this flag, its because we should be
        // shutting down.  Do not reschedule ourselves.
        if (!mPumpAudioEventPending) {
            return;
        }

        // Looks like no one canceled us explicitly.  Clear our flag and post a
        // new event to ourselves.
        mPumpAudioEventPending = false;
        postPumpAudioEvent_l(10000);
    }
}

void AAH_TXPlayer::queuePacketToSender_l(const sp<TRTPPacket>& packet) {
    if (mAAH_Sender == NULL) {
        return;
    }

    sp<AMessage> message = new AMessage(AAH_TXSender::kWhatSendPacket,
                                        mAAH_Sender->handlerID());

    {
        Mutex::Autolock lock(mEndpointLock);
        if (!mEndpointValid) {
            return;
        }

        message->setInt32(AAH_TXSender::kSendPacketIPAddr, mEndpoint.addr);
        message->setInt32(AAH_TXSender::kSendPacketPort, mEndpoint.port);
    }

    packet->setProgramID(mProgramID);
    packet->setExpireTime(systemTime() + kAAHRetryKeepAroundTimeNs);
    packet->pack();

    message->setObject(AAH_TXSender::kSendPacketTRTPPacket, packet);

    message->post();
}

}  // namespace android