// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "media/cast/receiver/frame_receiver.h"

#include <algorithm>

#include "base/big_endian.h"
#include "base/bind.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "media/cast/cast_environment.h"

namespace {
const int kMinSchedulingDelayMs = 1;
}  // namespace

namespace media {
namespace cast {

FrameReceiver::FrameReceiver(
    const scoped_refptr<CastEnvironment>& cast_environment,
    const FrameReceiverConfig& config,
    EventMediaType event_media_type,
    transport::PacedPacketSender* const packet_sender)
    : cast_environment_(cast_environment),
      packet_parser_(config.incoming_ssrc, config.rtp_payload_type),
      stats_(cast_environment->Clock()),
      event_media_type_(event_media_type),
      event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type),
      rtp_timebase_(config.frequency),
      target_playout_delay_(
          base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)),
      expected_frame_duration_(
          base::TimeDelta::FromSeconds(1) / config.max_frame_rate),
      reports_are_scheduled_(false),
      framer_(cast_environment->Clock(),
              this,
              config.incoming_ssrc,
              true,
              config.rtp_max_delay_ms * config.max_frame_rate / 1000),
      rtcp_(cast_environment_,
            NULL,
            NULL,
            packet_sender,
            &stats_,
            config.rtcp_mode,
            base::TimeDelta::FromMilliseconds(config.rtcp_interval),
            config.feedback_ssrc,
            config.incoming_ssrc,
            config.rtcp_c_name,
            event_media_type),
      is_waiting_for_consecutive_frame_(false),
      lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()),
      weak_factory_(this) {
  DCHECK_GT(config.rtp_max_delay_ms, 0);
  DCHECK_GT(config.max_frame_rate, 0);
  decryptor_.Initialize(config.aes_key, config.aes_iv_mask);
  rtcp_.SetTargetDelay(target_playout_delay_);
  cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber_);
  memset(frame_id_to_rtp_timestamp_, 0, sizeof(frame_id_to_rtp_timestamp_));
}

FrameReceiver::~FrameReceiver() {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
  cast_environment_->Logging()->RemoveRawEventSubscriber(&event_subscriber_);
}

void FrameReceiver::RequestEncodedFrame(
    const ReceiveEncodedFrameCallback& callback) {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
  frame_request_queue_.push_back(callback);
  EmitAvailableEncodedFrames();
}

bool FrameReceiver::ProcessPacket(scoped_ptr<Packet> packet) {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));

  if (Rtcp::IsRtcpPacket(&packet->front(), packet->size())) {
    rtcp_.IncomingRtcpPacket(&packet->front(), packet->size());
  } else {
    RtpCastHeader rtp_header;
    const uint8* payload_data;
    size_t payload_size;
    if (!packet_parser_.ParsePacket(&packet->front(),
                                    packet->size(),
                                    &rtp_header,
                                    &payload_data,
                                    &payload_size)) {
      return false;
    }

    ProcessParsedPacket(rtp_header, payload_data, payload_size);
    stats_.UpdateStatistics(rtp_header);
  }

  if (!reports_are_scheduled_) {
    ScheduleNextRtcpReport();
    ScheduleNextCastMessage();
    reports_are_scheduled_ = true;
  }

  return true;
}

// static
bool FrameReceiver::ParseSenderSsrc(const uint8* packet,
                                    size_t length,
                                    uint32* ssrc) {
  base::BigEndianReader big_endian_reader(
      reinterpret_cast<const char*>(packet), length);
  return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc);
}

void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header,
                                        const uint8* payload_data,
                                        size_t payload_size) {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));

  const base::TimeTicks now = cast_environment_->Clock()->NowTicks();

  frame_id_to_rtp_timestamp_[rtp_header.frame_id & 0xff] =
      rtp_header.rtp_timestamp;
  cast_environment_->Logging()->InsertPacketEvent(
      now, PACKET_RECEIVED, event_media_type_, rtp_header.rtp_timestamp,
      rtp_header.frame_id, rtp_header.packet_id, rtp_header.max_packet_id,
      payload_size);

  bool duplicate = false;
  const bool complete =
      framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate);

  // Duplicate packets are ignored.
  if (duplicate)
    return;

  // Update lip-sync values upon receiving the first packet of each frame, or if
  // they have never been set yet.
  if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) {
    RtpTimestamp fresh_sync_rtp;
    base::TimeTicks fresh_sync_reference;
    if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) {
      // HACK: The sender should have provided Sender Reports before the first
      // frame was sent.  However, the spec does not currently require this.
      // Therefore, when the data is missing, the local clock is used to
      // generate reference timestamps.
      VLOG(2) << "Lip sync info missing.  Falling-back to local clock.";
      fresh_sync_rtp = rtp_header.rtp_timestamp;
      fresh_sync_reference = now;
    }
    // |lip_sync_reference_time_| is always incremented according to the time
    // delta computed from the difference in RTP timestamps.  Then,
    // |lip_sync_drift_| accounts for clock drift and also smoothes-out any
    // sudden/discontinuous shifts in the series of reference time values.
    if (lip_sync_reference_time_.is_null()) {
      lip_sync_reference_time_ = fresh_sync_reference;
    } else {
      lip_sync_reference_time_ += RtpDeltaToTimeDelta(
          static_cast<int32>(fresh_sync_rtp - lip_sync_rtp_timestamp_),
          rtp_timebase_);
    }
    lip_sync_rtp_timestamp_ = fresh_sync_rtp;
    lip_sync_drift_.Update(
        now, fresh_sync_reference - lip_sync_reference_time_);
  }

  // Another frame is complete from a non-duplicate packet.  Attempt to emit
  // more frames to satisfy enqueued requests.
  if (complete)
    EmitAvailableEncodedFrames();
}

void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));

  base::TimeTicks now = cast_environment_->Clock()->NowTicks();
  RtpTimestamp rtp_timestamp =
      frame_id_to_rtp_timestamp_[cast_message.ack_frame_id_ & 0xff];
  cast_environment_->Logging()->InsertFrameEvent(
      now, FRAME_ACK_SENT, event_media_type_,
      rtp_timestamp, cast_message.ack_frame_id_);

  ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events;
  event_subscriber_.GetRtcpEventsAndReset(&rtcp_events);
  rtcp_.SendRtcpFromRtpReceiver(&cast_message, &rtcp_events);
}

void FrameReceiver::EmitAvailableEncodedFrames() {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));

  while (!frame_request_queue_.empty()) {
    // Attempt to peek at the next completed frame from the |framer_|.
    // TODO(miu): We should only be peeking at the metadata, and not copying the
    // payload yet!  Or, at least, peek using a StringPiece instead of a copy.
    scoped_ptr<transport::EncodedFrame> encoded_frame(
        new transport::EncodedFrame());
    bool is_consecutively_next_frame = false;
    bool have_multiple_complete_frames = false;
    if (!framer_.GetEncodedFrame(encoded_frame.get(),
                                 &is_consecutively_next_frame,
                                 &have_multiple_complete_frames)) {
      VLOG(1) << "Wait for more packets to produce a completed frame.";
      return;  // ProcessParsedPacket() will invoke this method in the future.
    }

    const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
    const base::TimeTicks playout_time =
        GetPlayoutTime(encoded_frame->rtp_timestamp);

    // If we have multiple decodable frames, and the current frame is
    // too old, then skip it and decode the next frame instead.
    if (have_multiple_complete_frames && now > playout_time) {
      framer_.ReleaseFrame(encoded_frame->frame_id);
      continue;
    }

    // If |framer_| has a frame ready that is out of sequence, examine the
    // playout time to determine whether it's acceptable to continue, thereby
    // skipping one or more frames.  Skip if the missing frame wouldn't complete
    // playing before the start of playback of the available frame.
    if (!is_consecutively_next_frame) {
      // TODO(miu): Also account for expected decode time here?
      const base::TimeTicks earliest_possible_end_time_of_missing_frame =
          now + expected_frame_duration_;
      if (earliest_possible_end_time_of_missing_frame < playout_time) {
        VLOG(1) << "Wait for next consecutive frame instead of skipping.";
        if (!is_waiting_for_consecutive_frame_) {
          is_waiting_for_consecutive_frame_ = true;
          cast_environment_->PostDelayedTask(
              CastEnvironment::MAIN,
              FROM_HERE,
              base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting,
                         weak_factory_.GetWeakPtr()),
              playout_time - now);
        }
        return;
      }
    }

    // Decrypt the payload data in the frame, if crypto is being used.
    if (decryptor_.initialized()) {
      std::string decrypted_data;
      if (!decryptor_.Decrypt(encoded_frame->frame_id,
                              encoded_frame->data,
                              &decrypted_data)) {
        // Decryption failed.  Give up on this frame.
        framer_.ReleaseFrame(encoded_frame->frame_id);
        continue;
      }
      encoded_frame->data.swap(decrypted_data);
    }

    // At this point, we have a decrypted EncodedFrame ready to be emitted.
    encoded_frame->reference_time = playout_time;
    framer_.ReleaseFrame(encoded_frame->frame_id);
    cast_environment_->PostTask(CastEnvironment::MAIN,
                                FROM_HERE,
                                base::Bind(frame_request_queue_.front(),
                                           base::Passed(&encoded_frame)));
    frame_request_queue_.pop_front();
  }
}

void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
  DCHECK(is_waiting_for_consecutive_frame_);
  is_waiting_for_consecutive_frame_ = false;
  EmitAvailableEncodedFrames();
}

base::TimeTicks FrameReceiver::GetPlayoutTime(uint32 rtp_timestamp) const {
  return lip_sync_reference_time_ +
      lip_sync_drift_.Current() +
      RtpDeltaToTimeDelta(
          static_cast<int32>(rtp_timestamp - lip_sync_rtp_timestamp_),
          rtp_timebase_) +
      target_playout_delay_;
}

void FrameReceiver::ScheduleNextCastMessage() {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
  base::TimeTicks send_time;
  framer_.TimeToSendNextCastMessage(&send_time);
  base::TimeDelta time_to_send =
      send_time - cast_environment_->Clock()->NowTicks();
  time_to_send = std::max(
      time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
  cast_environment_->PostDelayedTask(
      CastEnvironment::MAIN,
      FROM_HERE,
      base::Bind(&FrameReceiver::SendNextCastMessage,
                 weak_factory_.GetWeakPtr()),
      time_to_send);
}

void FrameReceiver::SendNextCastMessage() {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
  framer_.SendCastMessage();  // Will only send a message if it is time.
  ScheduleNextCastMessage();
}

void FrameReceiver::ScheduleNextRtcpReport() {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
  base::TimeDelta time_to_next = rtcp_.TimeToSendNextRtcpReport() -
                                 cast_environment_->Clock()->NowTicks();

  time_to_next = std::max(
      time_to_next, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));

  cast_environment_->PostDelayedTask(
      CastEnvironment::MAIN,
      FROM_HERE,
      base::Bind(&FrameReceiver::SendNextRtcpReport,
                 weak_factory_.GetWeakPtr()),
      time_to_next);
}

void FrameReceiver::SendNextRtcpReport() {
  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
  rtcp_.SendRtcpFromRtpReceiver(NULL, NULL);
  ScheduleNextRtcpReport();
}

}  // namespace cast
}  // namespace media