// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "media/cast/logging/encoding_event_subscriber.h"

#include <cstring>
#include <utility>

#include "base/logging.h"
#include "media/cast/logging/proto/proto_utils.h"

using google::protobuf::RepeatedPtrField;
using media::cast::proto::AggregatedFrameEvent;
using media::cast::proto::AggregatedPacketEvent;
using media::cast::proto::BasePacketEvent;
using media::cast::proto::LogMetadata;

namespace {

// A size limit on maps to keep lookups fast.
const size_t kMaxMapSize = 200;

// The smallest (oredered by RTP timestamp) |kNumMapEntriesToTransfer| entries
// will be moved when the map size reaches |kMaxMapSize|.
// Must be smaller than |kMaxMapSize|.
const size_t kNumMapEntriesToTransfer = 100;

template <typename ProtoPtr>
bool IsRtpTimestampLessThan(const ProtoPtr& lhs, const ProtoPtr& rhs) {
  return lhs->relative_rtp_timestamp() < rhs->relative_rtp_timestamp();
}

BasePacketEvent* GetNewBasePacketEvent(AggregatedPacketEvent* event_proto,
    int packet_id, int size) {
  BasePacketEvent* base = event_proto->add_base_packet_event();
  base->set_packet_id(packet_id);
  base->set_size(size);
  return base;
}

}

namespace media {
namespace cast {

EncodingEventSubscriber::EncodingEventSubscriber(
    EventMediaType event_media_type,
    size_t max_frames)
    : event_media_type_(event_media_type),
      max_frames_(max_frames),
      frame_event_storage_index_(0),
      packet_event_storage_index_(0),
      seen_first_rtp_timestamp_(false),
      first_rtp_timestamp_(0u) {}

EncodingEventSubscriber::~EncodingEventSubscriber() {
  DCHECK(thread_checker_.CalledOnValidThread());
}

void EncodingEventSubscriber::OnReceiveFrameEvent(
    const FrameEvent& frame_event) {
  DCHECK(thread_checker_.CalledOnValidThread());

  if (event_media_type_ != frame_event.media_type)
    return;

  RtpTimestamp relative_rtp_timestamp =
      GetRelativeRtpTimestamp(frame_event.rtp_timestamp);
  FrameEventMap::iterator it = frame_event_map_.find(relative_rtp_timestamp);
  linked_ptr<AggregatedFrameEvent> event_proto;

  // Look up existing entry. If not found, create a new entry and add to map.
  if (it == frame_event_map_.end()) {
    event_proto.reset(new AggregatedFrameEvent);
    event_proto->set_relative_rtp_timestamp(relative_rtp_timestamp);
    frame_event_map_.insert(
        std::make_pair(relative_rtp_timestamp, event_proto));
  } else {
    event_proto = it->second;
    if (event_proto->event_type_size() >= kMaxEventsPerProto) {
      DVLOG(2) << "Too many events in frame " << frame_event.rtp_timestamp
               << ". Using new frame event proto.";
      AddFrameEventToStorage(event_proto);
      event_proto.reset(new AggregatedFrameEvent);
      event_proto->set_relative_rtp_timestamp(relative_rtp_timestamp);
      it->second = event_proto;
    }
  }

  event_proto->add_event_type(ToProtoEventType(frame_event.type));
  event_proto->add_event_timestamp_ms(
      (frame_event.timestamp - base::TimeTicks()).InMilliseconds());

  if (frame_event.type == FRAME_ENCODED) {
    event_proto->set_encoded_frame_size(frame_event.size);
    if (frame_event.media_type == VIDEO_EVENT) {
      event_proto->set_encoded_frame_size(frame_event.size);
      event_proto->set_key_frame(frame_event.key_frame);
      event_proto->set_target_bitrate(frame_event.target_bitrate);
    }
  } else if (frame_event.type == FRAME_PLAYOUT) {
    event_proto->set_delay_millis(frame_event.delay_delta.InMilliseconds());
  }

  if (frame_event_map_.size() > kMaxMapSize)
    TransferFrameEvents(kNumMapEntriesToTransfer);

  DCHECK(frame_event_map_.size() <= kMaxMapSize);
  DCHECK(frame_event_storage_.size() <= max_frames_);
}

void EncodingEventSubscriber::OnReceivePacketEvent(
    const PacketEvent& packet_event) {
  DCHECK(thread_checker_.CalledOnValidThread());

  if (event_media_type_ != packet_event.media_type)
    return;

  RtpTimestamp relative_rtp_timestamp =
      GetRelativeRtpTimestamp(packet_event.rtp_timestamp);
  PacketEventMap::iterator it =
      packet_event_map_.find(relative_rtp_timestamp);
  linked_ptr<AggregatedPacketEvent> event_proto;
  BasePacketEvent* base_packet_event_proto = NULL;

  // Look up existing entry. If not found, create a new entry and add to map.
  if (it == packet_event_map_.end()) {
    event_proto.reset(new AggregatedPacketEvent);
    event_proto->set_relative_rtp_timestamp(relative_rtp_timestamp);
    packet_event_map_.insert(
        std::make_pair(relative_rtp_timestamp, event_proto));
    base_packet_event_proto = GetNewBasePacketEvent(
        event_proto.get(), packet_event.packet_id, packet_event.size);
  } else {
    // Found existing entry, now look up existing BasePacketEvent using packet
    // ID. If not found, create a new entry and add to proto.
    event_proto = it->second;
    RepeatedPtrField<BasePacketEvent>* field =
        event_proto->mutable_base_packet_event();
    for (RepeatedPtrField<BasePacketEvent>::pointer_iterator base_it =
             field->pointer_begin();
         base_it != field->pointer_end();
         ++base_it) {
      if ((*base_it)->packet_id() == packet_event.packet_id) {
        base_packet_event_proto = *base_it;
        break;
      }
    }
    if (!base_packet_event_proto) {
      if (event_proto->base_packet_event_size() >= kMaxPacketsPerFrame) {
        DVLOG(3) << "Too many packets in AggregatedPacketEvent "
                 << packet_event.rtp_timestamp << ". "
                 << "Using new packet event proto.";
        AddPacketEventToStorage(event_proto);
        event_proto.reset(new AggregatedPacketEvent);
        event_proto->set_relative_rtp_timestamp(relative_rtp_timestamp);
        it->second = event_proto;
      }

      base_packet_event_proto = GetNewBasePacketEvent(
          event_proto.get(), packet_event.packet_id, packet_event.size);
    } else if (base_packet_event_proto->event_type_size() >=
               kMaxEventsPerProto) {
      DVLOG(3) << "Too many events in packet "
               << packet_event.rtp_timestamp << ", "
               << packet_event.packet_id << ". Using new packet event proto.";
      AddPacketEventToStorage(event_proto);
      event_proto.reset(new AggregatedPacketEvent);
      event_proto->set_relative_rtp_timestamp(relative_rtp_timestamp);
      it->second = event_proto;
      base_packet_event_proto = GetNewBasePacketEvent(
          event_proto.get(), packet_event.packet_id, packet_event.size);
    }
  }

  base_packet_event_proto->add_event_type(
      ToProtoEventType(packet_event.type));
  base_packet_event_proto->add_event_timestamp_ms(
      (packet_event.timestamp - base::TimeTicks()).InMilliseconds());

  // |base_packet_event_proto| could have been created with a receiver event
  // which does not have the packet size and we would need to overwrite it when
  // we see a sender event, which does have the packet size.
  if (packet_event.size > 0) {
    base_packet_event_proto->set_size(packet_event.size);
  }

  if (packet_event_map_.size() > kMaxMapSize)
    TransferPacketEvents(kNumMapEntriesToTransfer);

  DCHECK(packet_event_map_.size() <= kMaxMapSize);
  DCHECK(packet_event_storage_.size() <= max_frames_);
}

void EncodingEventSubscriber::GetEventsAndReset(LogMetadata* metadata,
    FrameEventList* frame_events, PacketEventList* packet_events) {
  DCHECK(thread_checker_.CalledOnValidThread());

  // Flush all events.
  TransferFrameEvents(frame_event_map_.size());
  TransferPacketEvents(packet_event_map_.size());
  std::sort(frame_event_storage_.begin(), frame_event_storage_.end(),
            &IsRtpTimestampLessThan<linked_ptr<AggregatedFrameEvent> >);
  std::sort(packet_event_storage_.begin(), packet_event_storage_.end(),
            &IsRtpTimestampLessThan<linked_ptr<AggregatedPacketEvent> >);

  metadata->set_is_audio(event_media_type_ == AUDIO_EVENT);
  metadata->set_first_rtp_timestamp(first_rtp_timestamp_);
  metadata->set_num_frame_events(frame_event_storage_.size());
  metadata->set_num_packet_events(packet_event_storage_.size());
  metadata->set_reference_timestamp_ms_at_unix_epoch(
      (base::TimeTicks::UnixEpoch() - base::TimeTicks()).InMilliseconds());
  frame_events->swap(frame_event_storage_);
  packet_events->swap(packet_event_storage_);
  Reset();
}

void EncodingEventSubscriber::TransferFrameEvents(size_t max_num_entries) {
  DCHECK(frame_event_map_.size() >= max_num_entries);

  FrameEventMap::iterator it = frame_event_map_.begin();
  for (size_t i = 0;
       i < max_num_entries && it != frame_event_map_.end();
       i++, ++it) {
    AddFrameEventToStorage(it->second);
  }

  frame_event_map_.erase(frame_event_map_.begin(), it);
}

void EncodingEventSubscriber::TransferPacketEvents(size_t max_num_entries) {
  PacketEventMap::iterator it = packet_event_map_.begin();
  for (size_t i = 0;
       i < max_num_entries && it != packet_event_map_.end();
       i++, ++it) {
    AddPacketEventToStorage(it->second);
  }

  packet_event_map_.erase(packet_event_map_.begin(), it);
}

void EncodingEventSubscriber::AddFrameEventToStorage(
    const linked_ptr<AggregatedFrameEvent>& frame_event_proto) {
  if (frame_event_storage_.size() >= max_frames_) {
    frame_event_storage_[frame_event_storage_index_] = frame_event_proto;
  } else {
    frame_event_storage_.push_back(frame_event_proto);
  }

  frame_event_storage_index_ = (frame_event_storage_index_ + 1) % max_frames_;
}

void EncodingEventSubscriber::AddPacketEventToStorage(
    const linked_ptr<AggregatedPacketEvent>& packet_event_proto) {
  if (packet_event_storage_.size() >= max_frames_)
    packet_event_storage_[packet_event_storage_index_] = packet_event_proto;
  else
    packet_event_storage_.push_back(packet_event_proto);

  packet_event_storage_index_ = (packet_event_storage_index_ + 1) % max_frames_;
}

RtpTimestamp EncodingEventSubscriber::GetRelativeRtpTimestamp(
    RtpTimestamp rtp_timestamp) {
  if (!seen_first_rtp_timestamp_) {
    seen_first_rtp_timestamp_ = true;
    first_rtp_timestamp_ = rtp_timestamp;
  }

  return rtp_timestamp - first_rtp_timestamp_;
}

void EncodingEventSubscriber::Reset() {
  frame_event_map_.clear();
  frame_event_storage_.clear();
  frame_event_storage_index_ = 0;
  packet_event_map_.clear();
  packet_event_storage_.clear();
  packet_event_storage_index_ = 0;
  seen_first_rtp_timestamp_ = false;
  first_rtp_timestamp_ = 0u;
}

}  // namespace cast
}  // namespace media