#include "consumer_queue_channel.h"
#include <pdx/channel_handle.h>
#include "producer_channel.h"
using android::pdx::ErrorStatus;
using android::pdx::RemoteChannelHandle;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;
using android::pdx::rpc::RemoteMethodError;
namespace android {
namespace dvr {
ConsumerQueueChannel::ConsumerQueueChannel(
BufferHubService* service, int buffer_id, int channel_id,
const std::shared_ptr<Channel>& producer)
: BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
producer_(producer),
capacity_(0) {
GetProducer()->AddConsumer(this);
}
ConsumerQueueChannel::~ConsumerQueueChannel() {
ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
channel_id());
if (auto producer = GetProducer()) {
producer->RemoveConsumer(this);
}
}
bool ConsumerQueueChannel::HandleMessage(Message& message) {
ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
auto producer = GetProducer();
if (!producer) {
RemoteMethodError(message, EPIPE);
return true;
}
switch (message.GetOp()) {
case BufferHubRPC::CreateConsumerQueue::Opcode:
DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
*producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
return true;
case BufferHubRPC::GetQueueInfo::Opcode:
DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
*producer, &ProducerQueueChannel::OnGetQueueInfo, message);
return true;
case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
*this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
return true;
default:
return false;
}
}
std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
const {
return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
}
void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
}
BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
BufferHubChannel::BufferInfo info;
if (auto producer = GetProducer()) {
// If producer has not hung up, copy most buffer info from the producer.
info = producer->GetBufferInfo();
}
info.id = buffer_id();
info.capacity = capacity_;
return info;
}
void ConsumerQueueChannel::RegisterNewBuffer(
const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
ALOGD_IF(TRACE,
"ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
producer_channel->buffer_id(), slot);
pending_buffer_slots_.emplace(producer_channel, slot);
// Signal the client that there is new buffer available throught POLLIN.
SignalAvailable();
}
Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
ALOGD_IF(
TRACE,
"ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
"import: %zu",
pending_buffer_slots_.size());
while (!pending_buffer_slots_.empty()) {
auto producer_channel = pending_buffer_slots_.front().first.lock();
size_t producer_slot = pending_buffer_slots_.front().second;
pending_buffer_slots_.pop();
// It's possible that the producer channel has expired. When this occurs,
// ignore the producer channel.
if (producer_channel == nullptr) {
ALOGW(
"ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
"channel has already been expired.");
continue;
}
auto status = producer_channel->CreateConsumer(message);
// If no buffers are imported successfully, clear available and return an
// error. Otherwise, return all consumer handles already imported
// successfully, but keep available bits on, so that the client can retry
// importing remaining consumer buffers.
if (!status) {
ALOGE(
"ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
"consumer: %s",
status.GetErrorMessage().c_str());
if (buffer_handles.empty()) {
ClearAvailable();
return status.error_status();
} else {
return {std::move(buffer_handles)};
}
}
buffer_handles.emplace_back(status.take(), producer_slot);
}
ClearAvailable();
return {std::move(buffer_handles)};
}
void ConsumerQueueChannel::OnProducerClosed() {
ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
buffer_id());
producer_.reset();
Hangup();
}
} // namespace dvr
} // namespace android