C++程序  |  312行  |  10.66 KB

#include "producer_queue_channel.h"

#include <inttypes.h>

#include "consumer_queue_channel.h"
#include "producer_channel.h"

using android::pdx::ErrorStatus;
using android::pdx::Message;
using android::pdx::Status;
using android::pdx::RemoteChannelHandle;
using android::pdx::rpc::DispatchRemoteMethod;

namespace android {
namespace dvr {

ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service,
                                           int channel_id,
                                           size_t meta_size_bytes,
                                           const UsagePolicy& usage_policy,
                                           int* error)
    : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
      meta_size_bytes_(meta_size_bytes),
      usage_policy_(usage_policy),
      capacity_(0) {
  *error = 0;
}

ProducerQueueChannel::~ProducerQueueChannel() {
  ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
           buffer_id());
  for (auto* consumer : consumer_channels_)
    consumer->OnProducerClosed();
}

/* static */
Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
    BufferHubService* service, int channel_id, size_t meta_size_bytes,
    const UsagePolicy& usage_policy) {
  // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
  // should be mutually exclusive.
  if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) {
    ALOGE(
        "BufferHubService::OnCreateProducerQueue: illegal usage mask "
        "configuration: usage_deny_set_mask=%" PRIx64
        " usage_deny_clear_mask=%" PRIx64,
        usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask);
    return ErrorStatus(EINVAL);
  }

  int error = 0;
  std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
      service, channel_id, meta_size_bytes, usage_policy, &error));
  if (error < 0)
    return ErrorStatus(-error);
  else
    return {std::move(producer)};
}

bool ProducerQueueChannel::HandleMessage(Message& message) {
  ATRACE_NAME("ProducerQueueChannel::HandleMessage");
  switch (message.GetOp()) {
    case BufferHubRPC::CreateConsumerQueue::Opcode:
      DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
          *this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
      return true;

    case BufferHubRPC::GetQueueInfo::Opcode:
      DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
          *this, &ProducerQueueChannel::OnGetQueueInfo, message);
      return true;

    case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
      DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
          *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
          message);
      return true;

    case BufferHubRPC::ProducerQueueDetachBuffer::Opcode:
      DispatchRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(
          *this, &ProducerQueueChannel::OnProducerQueueDetachBuffer, message);
      return true;

    default:
      return false;
  }
}

void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
  ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
}

BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
  return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
                    usage_policy_);
}

Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
    Message& message) {
  ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
  ALOGD_IF(TRACE, "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d",
           channel_id());

  int channel_id;
  auto status = message.PushChannel(0, nullptr, &channel_id);
  if (!status) {
    ALOGE(
        "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
        "channel: %s",
        status.GetErrorMessage().c_str());
    return ErrorStatus(ENOMEM);
  }

  auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
      service(), buffer_id(), channel_id, shared_from_this());

  // Register the existing buffers with the new consumer queue.
  for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
    if (auto buffer = buffers_[slot].lock())
      consumer_queue_channel->RegisterNewBuffer(buffer, slot);
  }

  const auto channel_status =
      service()->SetChannel(channel_id, consumer_queue_channel);
  if (!channel_status) {
    ALOGE(
        "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
        "%s",
        channel_status.GetErrorMessage().c_str());
    return ErrorStatus(ENOMEM);
  }

  return {status.take()};
}

Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) {
  return {{meta_size_bytes_, buffer_id()}};
}

Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
ProducerQueueChannel::OnProducerQueueAllocateBuffers(
    Message& message, uint32_t width, uint32_t height, uint32_t layer_count,
    uint32_t format, uint64_t usage, size_t buffer_count) {
  ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
  ALOGD_IF(TRACE,
           "ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
           "producer_channel_id=%d",
           channel_id());

  std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;

  // Deny buffer allocation violating preset rules.
  if (usage & usage_policy_.usage_deny_set_mask) {
    ALOGE(
        "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
        " is not permitted. Violating usage_deny_set_mask, the following  bits "
        "shall not be set: %" PRIx64 ".",
        usage, usage_policy_.usage_deny_set_mask);
    return ErrorStatus(EINVAL);
  }

  if (~usage & usage_policy_.usage_deny_clear_mask) {
    ALOGE(
        "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
        " is not permitted. Violating usage_deny_clear_mask, the following "
        "bits must be set: %" PRIx64 ".",
        usage, usage_policy_.usage_deny_clear_mask);
    return ErrorStatus(EINVAL);
  }

  // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_|
  // takes precedence and will overwrite |usage_policy_.usage_clear_mask|.
  uint64_t effective_usage =
      (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask;

  for (size_t i = 0; i < buffer_count; i++) {
    auto status = AllocateBuffer(message, width, height, layer_count, format,
                                 effective_usage);
    if (!status) {
      ALOGE(
          "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to "
          "allocate new buffer.");
      return ErrorStatus(status.error());
    }
    buffer_handles.push_back(status.take());
  }

  return {std::move(buffer_handles)};
}

Status<std::pair<RemoteChannelHandle, size_t>>
ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width,
                                     uint32_t height, uint32_t layer_count,
                                     uint32_t format, uint64_t usage) {
  ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
  ALOGD_IF(TRACE,
           "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
           channel_id());

  if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
    ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
    return ErrorStatus(E2BIG);
  }

  // Here we are creating a new BufferHubBuffer, initialize the producer
  // channel, and returning its file handle back to the client.
  // buffer_id is the id of the producer channel of BufferHubBuffer.
  int buffer_id;
  auto status = message.PushChannel(0, nullptr, &buffer_id);

  if (!status) {
    ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
          status.GetErrorMessage().c_str());
    return ErrorStatus(status.error());
  }

  ALOGD_IF(TRACE,
           "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u "
           "height=%u layer_count=%u format=%u usage=%" PRIx64,
           buffer_id, width, height, layer_count, format, usage);
  auto buffer_handle = status.take();

  auto producer_channel_status =
      ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
                              format, usage, meta_size_bytes_);
  if (!producer_channel_status) {
    ALOGE(
        "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
        "buffer: %s",
        producer_channel_status.GetErrorMessage().c_str());
    return ErrorStatus(ENOMEM);
  }
  auto producer_channel = producer_channel_status.take();

  ALOGD_IF(
      TRACE,
      "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
      buffer_id, buffer_handle.value());

  const auto channel_status =
      service()->SetChannel(buffer_id, producer_channel);
  if (!channel_status) {
    ALOGE(
        "ProducerQueueChannel::AllocateBuffer: failed to set producer channel "
        "for new BufferHubBuffer: %s",
        channel_status.GetErrorMessage().c_str());
    return ErrorStatus(ENOMEM);
  }

  // Register the newly allocated buffer's channel_id into the first empty
  // buffer slot.
  size_t slot = 0;
  for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
    if (buffers_[slot].expired())
      break;
  }
  if (slot == BufferHubRPC::kMaxQueueCapacity) {
    ALOGE(
        "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
        "buffer allocation.");
    return ErrorStatus(E2BIG);
  }

  buffers_[slot] = producer_channel;
  capacity_++;

  // Notify each consumer channel about the new buffer.
  for (auto* consumer_channel : consumer_channels_) {
    ALOGD(
        "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
        "buffer, buffer_id=%d",
        buffer_id);
    consumer_channel->RegisterNewBuffer(producer_channel, slot);
  }

  return {{std::move(buffer_handle), slot}};
}

Status<void> ProducerQueueChannel::OnProducerQueueDetachBuffer(
    Message& /*message*/, size_t slot) {
  if (buffers_[slot].expired()) {
    ALOGE(
        "ProducerQueueChannel::OnProducerQueueDetachBuffer: trying to detach "
        "an invalid buffer producer at slot %zu",
        slot);
    return ErrorStatus(EINVAL);
  }

  if (capacity_ == 0) {
    ALOGE(
        "ProducerQueueChannel::OnProducerQueueDetachBuffer: trying to detach a "
        "buffer producer while the queue's capacity is already zero.");
    return ErrorStatus(EINVAL);
  }

  buffers_[slot].reset();
  capacity_--;
  return {};
}

void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
  consumer_channels_.push_back(channel);
}

void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
  consumer_channels_.erase(
      std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
}

}  // namespace dvr
}  // namespace android