C++程序  |  153行  |  4.76 KB

#include "consumer_queue_channel.h"

#include <pdx/channel_handle.h>

#include "producer_channel.h"

using android::pdx::ErrorStatus;
using android::pdx::RemoteChannelHandle;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;
using android::pdx::rpc::RemoteMethodError;

namespace android {
namespace dvr {

ConsumerQueueChannel::ConsumerQueueChannel(
    BufferHubService* service, int buffer_id, int channel_id,
    const std::shared_ptr<Channel>& producer)
    : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
      producer_(producer),
      capacity_(0) {
  GetProducer()->AddConsumer(this);
}

ConsumerQueueChannel::~ConsumerQueueChannel() {
  ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
           channel_id());

  if (auto producer = GetProducer()) {
    producer->RemoveConsumer(this);
  }
}

bool ConsumerQueueChannel::HandleMessage(Message& message) {
  ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
  auto producer = GetProducer();
  if (!producer) {
    RemoteMethodError(message, EPIPE);
    return true;
  }

  switch (message.GetOp()) {
    case BufferHubRPC::CreateConsumerQueue::Opcode:
      DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
          *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
      return true;

    case BufferHubRPC::GetQueueInfo::Opcode:
      DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
          *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
      return true;

    case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
      DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
          *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
      return true;

    default:
      return false;
  }
}

std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
    const {
  return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
}

void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
  ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
}

BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
  BufferHubChannel::BufferInfo info;
  if (auto producer = GetProducer()) {
    // If producer has not hung up, copy most buffer info from the producer.
    info = producer->GetBufferInfo();
  }
  info.id = buffer_id();
  info.capacity = capacity_;
  return info;
}

void ConsumerQueueChannel::RegisterNewBuffer(
    const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
  ALOGD_IF(TRACE,
           "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
           producer_channel->buffer_id(), slot);
  pending_buffer_slots_.emplace(producer_channel, slot);

  // Signal the client that there is new buffer available throught POLLIN.
  SignalAvailable();
}

Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
  std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
  ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
  ALOGD_IF(
      TRACE,
      "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
      "import: %zu",
      pending_buffer_slots_.size());

  while (!pending_buffer_slots_.empty()) {
    auto producer_channel = pending_buffer_slots_.front().first.lock();
    size_t producer_slot = pending_buffer_slots_.front().second;
    pending_buffer_slots_.pop();

    // It's possible that the producer channel has expired. When this occurs,
    // ignore the producer channel.
    if (producer_channel == nullptr) {
      ALOGW(
          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
          "channel has already been expired.");
      continue;
    }

    auto status = producer_channel->CreateConsumer(message);

    // If no buffers are imported successfully, clear available and return an
    // error. Otherwise, return all consumer handles already imported
    // successfully, but keep available bits on, so that the client can retry
    // importing remaining consumer buffers.
    if (!status) {
      ALOGE(
          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
          "consumer: %s",
          status.GetErrorMessage().c_str());
      if (buffer_handles.empty()) {
        ClearAvailable();
        return status.error_status();
      } else {
        return {std::move(buffer_handles)};
      }
    }

    buffer_handles.emplace_back(status.take(), producer_slot);
  }

  ClearAvailable();
  return {std::move(buffer_handles)};
}

void ConsumerQueueChannel::OnProducerClosed() {
  ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
           buffer_id());
  producer_.reset();
  Hangup();
}

}  // namespace dvr
}  // namespace android