#include "producer_channel.h" #include <log/log.h> #include <sync/sync.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <sys/poll.h> #include <utils/Trace.h> #include <algorithm> #include <atomic> #include <thread> #include <private/dvr/bufferhub_rpc.h> #include "consumer_channel.h" #include "detached_buffer_channel.h" using android::pdx::BorrowedHandle; using android::pdx::ErrorStatus; using android::pdx::Message; using android::pdx::RemoteChannelHandle; using android::pdx::Status; using android::pdx::rpc::BufferWrapper; using android::pdx::rpc::DispatchRemoteMethod; using android::pdx::rpc::WrapBuffer; namespace android { namespace dvr { namespace { static inline uint64_t FindNextClearedBit(uint64_t bits) { return ~bits - (~bits & (~bits - 1)); } } // namespace ProducerChannel::ProducerChannel(BufferHubService* service, int buffer_id, int channel_id, IonBuffer buffer, IonBuffer metadata_buffer, size_t user_metadata_size, int* error) : BufferHubChannel(service, buffer_id, channel_id, kProducerType), buffer_(std::move(buffer)), metadata_buffer_(std::move(metadata_buffer)), user_metadata_size_(user_metadata_size), metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize + user_metadata_size) { if (!buffer_.IsValid()) { ALOGE("ProducerChannel::ProducerChannel: Invalid buffer."); *error = -EINVAL; return; } if (!metadata_buffer_.IsValid()) { ALOGE("ProducerChannel::ProducerChannel: Invalid metadata buffer."); *error = -EINVAL; return; } *error = InitializeBuffer(); } ProducerChannel::ProducerChannel(BufferHubService* service, int channel_id, uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, uint64_t usage, size_t user_metadata_size, int* error) : BufferHubChannel(service, channel_id, channel_id, kProducerType), pending_consumers_(0), producer_owns_(true), user_metadata_size_(user_metadata_size), metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize + user_metadata_size) { if (int ret = buffer_.Alloc(width, height, layer_count, format, usage)) { ALOGE("ProducerChannel::ProducerChannel: Failed to allocate buffer: %s", strerror(-ret)); *error = ret; return; } if (int ret = metadata_buffer_.Alloc(metadata_buf_size_, /*height=*/1, /*layer_count=*/1, BufferHubDefs::kMetadataFormat, BufferHubDefs::kMetadataUsage)) { ALOGE("ProducerChannel::ProducerChannel: Failed to allocate metadata: %s", strerror(-ret)); *error = ret; return; } *error = InitializeBuffer(); } int ProducerChannel::InitializeBuffer() { void* metadata_ptr = nullptr; if (int ret = metadata_buffer_.Lock(BufferHubDefs::kMetadataUsage, /*x=*/0, /*y=*/0, metadata_buf_size_, /*height=*/1, &metadata_ptr)) { ALOGE("ProducerChannel::ProducerChannel: Failed to lock metadata."); return ret; } metadata_header_ = reinterpret_cast<BufferHubDefs::MetadataHeader*>(metadata_ptr); // Using placement new here to reuse shared memory instead of new allocation // and also initialize the value to zero. buffer_state_ = new (&metadata_header_->buffer_state) std::atomic<uint64_t>(0); fence_state_ = new (&metadata_header_->fence_state) std::atomic<uint64_t>(0); acquire_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC)); release_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC)); if (!acquire_fence_fd_ || !release_fence_fd_) { ALOGE("ProducerChannel::ProducerChannel: Failed to create shared fences."); return -EIO; } dummy_fence_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); if (!dummy_fence_fd_) { ALOGE("ProducerChannel::ProducerChannel: Failed to create dummy fences."); return EIO; } epoll_event event; event.events = 0; event.data.u64 = 0ULL; if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_ADD, dummy_fence_fd_.Get(), &event) < 0) { ALOGE( "ProducerChannel::ProducerChannel: Failed to modify the shared " "release fence to include the dummy fence: %s", strerror(errno)); return -EIO; } // Success. return 0; } std::unique_ptr<ProducerChannel> ProducerChannel::Create( BufferHubService* service, int buffer_id, int channel_id, IonBuffer buffer, IonBuffer metadata_buffer, size_t user_metadata_size) { int error = 0; std::unique_ptr<ProducerChannel> producer(new ProducerChannel( service, buffer_id, channel_id, std::move(buffer), std::move(metadata_buffer), user_metadata_size, &error)); if (error < 0) return nullptr; else return producer; } Status<std::shared_ptr<ProducerChannel>> ProducerChannel::Create( BufferHubService* service, int channel_id, uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, uint64_t usage, size_t user_metadata_size) { int error; std::shared_ptr<ProducerChannel> producer( new ProducerChannel(service, channel_id, width, height, layer_count, format, usage, user_metadata_size, &error)); if (error < 0) return ErrorStatus(-error); else return {std::move(producer)}; } ProducerChannel::~ProducerChannel() { ALOGD_IF(TRACE, "ProducerChannel::~ProducerChannel: channel_id=%d buffer_id=%d " "state=%" PRIx64 ".", channel_id(), buffer_id(), buffer_state_->load()); for (auto consumer : consumer_channels_) { consumer->OnProducerClosed(); } Hangup(); } BufferHubChannel::BufferInfo ProducerChannel::GetBufferInfo() const { // Derive the mask of signaled buffers in this producer / consumer set. uint64_t signaled_mask = signaled() ? BufferHubDefs::kProducerStateBit : 0; for (const ConsumerChannel* consumer : consumer_channels_) { signaled_mask |= consumer->signaled() ? consumer->consumer_state_bit() : 0; } return BufferInfo(buffer_id(), consumer_channels_.size(), buffer_.width(), buffer_.height(), buffer_.layer_count(), buffer_.format(), buffer_.usage(), pending_consumers_, buffer_state_->load(), signaled_mask, metadata_header_->queue_index); } void ProducerChannel::HandleImpulse(Message& message) { ATRACE_NAME("ProducerChannel::HandleImpulse"); switch (message.GetOp()) { case BufferHubRPC::ProducerGain::Opcode: OnProducerGain(message); break; case BufferHubRPC::ProducerPost::Opcode: OnProducerPost(message, {}); break; } } bool ProducerChannel::HandleMessage(Message& message) { ATRACE_NAME("ProducerChannel::HandleMessage"); switch (message.GetOp()) { case BufferHubRPC::GetBuffer::Opcode: DispatchRemoteMethod<BufferHubRPC::GetBuffer>( *this, &ProducerChannel::OnGetBuffer, message); return true; case BufferHubRPC::NewConsumer::Opcode: DispatchRemoteMethod<BufferHubRPC::NewConsumer>( *this, &ProducerChannel::OnNewConsumer, message); return true; case BufferHubRPC::ProducerPost::Opcode: DispatchRemoteMethod<BufferHubRPC::ProducerPost>( *this, &ProducerChannel::OnProducerPost, message); return true; case BufferHubRPC::ProducerGain::Opcode: DispatchRemoteMethod<BufferHubRPC::ProducerGain>( *this, &ProducerChannel::OnProducerGain, message); return true; case BufferHubRPC::ProducerBufferDetach::Opcode: DispatchRemoteMethod<BufferHubRPC::ProducerBufferDetach>( *this, &ProducerChannel::OnProducerDetach, message); return true; default: return false; } } BufferDescription<BorrowedHandle> ProducerChannel::GetBuffer( uint64_t buffer_state_bit) { return { buffer_, metadata_buffer_, buffer_id(), buffer_state_bit, acquire_fence_fd_.Borrow(), release_fence_fd_.Borrow()}; } Status<BufferDescription<BorrowedHandle>> ProducerChannel::OnGetBuffer( Message& /*message*/) { ATRACE_NAME("ProducerChannel::OnGetBuffer"); ALOGD_IF(TRACE, "ProducerChannel::OnGetBuffer: buffer=%d, state=%" PRIx64 ".", buffer_id(), buffer_state_->load()); return {GetBuffer(BufferHubDefs::kProducerStateBit)}; } Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { ATRACE_NAME("ProducerChannel::CreateConsumer"); ALOGD_IF(TRACE, "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d", buffer_id(), producer_owns_); int channel_id; auto status = message.PushChannel(0, nullptr, &channel_id); if (!status) { ALOGE( "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s", status.GetErrorMessage().c_str()); return ErrorStatus(ENOMEM); } // Try find the next consumer state bit which has not been claimed by any // consumer yet. uint64_t consumer_state_bit = FindNextClearedBit( active_consumer_bit_mask_ | orphaned_consumer_bit_mask_ | BufferHubDefs::kProducerStateBit); if (consumer_state_bit == 0ULL) { ALOGE( "ProducerChannel::CreateConsumer: reached the maximum mumber of " "consumers per producer: 63."); return ErrorStatus(E2BIG); } auto consumer = std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id, consumer_state_bit, shared_from_this()); const auto channel_status = service()->SetChannel(channel_id, consumer); if (!channel_status) { ALOGE( "ProducerChannel::CreateConsumer: failed to set new consumer channel: " "%s", channel_status.GetErrorMessage().c_str()); return ErrorStatus(ENOMEM); } if (!producer_owns_ && !BufferHubDefs::IsBufferReleased(buffer_state_->load())) { // Signal the new consumer when adding it to a posted producer. if (consumer->OnProducerPosted()) pending_consumers_++; } active_consumer_bit_mask_ |= consumer_state_bit; return {status.take()}; } Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) { ATRACE_NAME("ProducerChannel::OnNewConsumer"); ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id()); return CreateConsumer(message); } Status<void> ProducerChannel::OnProducerPost( Message&, LocalFence acquire_fence) { ATRACE_NAME("ProducerChannel::OnProducerPost"); ALOGD_IF(TRACE, "ProducerChannel::OnProducerPost: buffer_id=%d", buffer_id()); if (!producer_owns_) { ALOGE("ProducerChannel::OnProducerPost: Not in gained state!"); return ErrorStatus(EBUSY); } epoll_event event; event.events = 0; event.data.u64 = 0ULL; int ret = epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD, dummy_fence_fd_.Get(), &event); ALOGE_IF(ret < 0, "ProducerChannel::OnProducerPost: Failed to modify the shared " "release fence to include the dummy fence: %s", strerror(errno)); eventfd_t dummy_fence_count = 0ULL; if (eventfd_read(dummy_fence_fd_.Get(), &dummy_fence_count) < 0) { const int error = errno; if (error != EAGAIN) { ALOGE( "ProducerChannel::ProducerChannel: Failed to read dummy fence, " "error: %s", strerror(error)); return ErrorStatus(error); } } ALOGW_IF(dummy_fence_count > 0, "ProducerChannel::ProducerChannel: %" PRIu64 " dummy fence(s) was signaled during last release/gain cycle " "buffer_id=%d.", dummy_fence_count, buffer_id()); post_fence_ = std::move(acquire_fence); producer_owns_ = false; // Signal any interested consumers. If there are none, the buffer will stay // in posted state until a consumer comes online. This behavior guarantees // that no frame is silently dropped. pending_consumers_ = 0; for (auto consumer : consumer_channels_) { if (consumer->OnProducerPosted()) pending_consumers_++; } ALOGD_IF(TRACE, "ProducerChannel::OnProducerPost: %d pending consumers", pending_consumers_); return {}; } Status<LocalFence> ProducerChannel::OnProducerGain(Message& /*message*/) { ATRACE_NAME("ProducerChannel::OnGain"); ALOGD_IF(TRACE, "ProducerChannel::OnGain: buffer_id=%d", buffer_id()); if (producer_owns_) { ALOGE("ProducerChanneL::OnGain: Already in gained state: channel=%d", channel_id()); return ErrorStatus(EALREADY); } // There are still pending consumers, return busy. if (pending_consumers_ > 0) { ALOGE( "ProducerChannel::OnGain: Producer (id=%d) is gaining a buffer that " "still has %d pending consumer(s).", buffer_id(), pending_consumers_); return ErrorStatus(EBUSY); } ClearAvailable(); producer_owns_ = true; post_fence_.close(); return {std::move(returned_fence_)}; } Status<RemoteChannelHandle> ProducerChannel::OnProducerDetach( Message& message) { ATRACE_NAME("ProducerChannel::OnProducerDetach"); ALOGD_IF(TRACE, "ProducerChannel::OnProducerDetach: buffer_id=%d", buffer_id()); uint64_t buffer_state = buffer_state_->load(); if (!BufferHubDefs::IsBufferGained(buffer_state)) { // Can only detach a BufferProducer when it's in gained state. ALOGW( "ProducerChannel::OnProducerDetach: The buffer (id=%d, state=0x%" PRIx64 ") is not in gained state.", buffer_id(), buffer_state); return {}; } int channel_id; auto status = message.PushChannel(0, nullptr, &channel_id); if (!status) { ALOGE( "ProducerChannel::OnProducerDetach: Failed to push detached buffer " "channel: %s", status.GetErrorMessage().c_str()); return ErrorStatus(ENOMEM); } // Make sure we unlock the buffer. if (int ret = metadata_buffer_.Unlock()) { ALOGE("ProducerChannel::OnProducerDetach: Failed to unlock metadata."); return ErrorStatus(-ret); }; std::unique_ptr<DetachedBufferChannel> channel = DetachedBufferChannel::Create( service(), buffer_id(), channel_id, std::move(buffer_), std::move(metadata_buffer_), user_metadata_size_); if (!channel) { ALOGE("ProducerChannel::OnProducerDetach: Invalid buffer."); return ErrorStatus(EINVAL); } const auto channel_status = service()->SetChannel(channel_id, std::move(channel)); if (!channel_status) { // Technically, this should never fail, as we just pushed the channel. Note // that LOG_FATAL will be stripped out in non-debug build. LOG_FATAL( "ProducerChannel::OnProducerDetach: Failed to set new detached buffer " "channel: %s.", channel_status.GetErrorMessage().c_str()); } return status; } Status<LocalFence> ProducerChannel::OnConsumerAcquire(Message& /*message*/) { ATRACE_NAME("ProducerChannel::OnConsumerAcquire"); ALOGD_IF(TRACE, "ProducerChannel::OnConsumerAcquire: buffer_id=%d", buffer_id()); if (producer_owns_) { ALOGE("ProducerChannel::OnConsumerAcquire: Not in posted state!"); return ErrorStatus(EBUSY); } // Return a borrowed fd to avoid unnecessary duplication of the underlying fd. // Serialization just needs to read the handle. return {std::move(post_fence_)}; } Status<void> ProducerChannel::OnConsumerRelease(Message&, LocalFence release_fence) { ATRACE_NAME("ProducerChannel::OnConsumerRelease"); ALOGD_IF(TRACE, "ProducerChannel::OnConsumerRelease: buffer_id=%d", buffer_id()); if (producer_owns_) { ALOGE("ProducerChannel::OnConsumerRelease: Not in acquired state!"); return ErrorStatus(EBUSY); } // Attempt to merge the fences if necessary. if (release_fence) { if (returned_fence_) { LocalFence merged_fence(sync_merge("bufferhub_merged", returned_fence_.get_fd(), release_fence.get_fd())); const int error = errno; if (!merged_fence) { ALOGE("ProducerChannel::OnConsumerRelease: Failed to merge fences: %s", strerror(error)); return ErrorStatus(error); } returned_fence_ = std::move(merged_fence); } else { returned_fence_ = std::move(release_fence); } } OnConsumerIgnored(); if (pending_consumers_ == 0) { // Clear the producer bit atomically to transit into released state. This // has to done by BufferHub as it requries synchronization among all // consumers. BufferHubDefs::ModifyBufferState(buffer_state_, BufferHubDefs::kProducerStateBit, 0ULL); ALOGD_IF(TRACE, "ProducerChannel::OnConsumerRelease: releasing last consumer: " "buffer_id=%d state=%" PRIx64 ".", buffer_id(), buffer_state_->load()); if (orphaned_consumer_bit_mask_) { ALOGW( "ProducerChannel::OnConsumerRelease: orphaned buffer detected " "during the this acquire/release cycle: id=%d orphaned=0x%" PRIx64 " queue_index=%" PRIu64 ".", buffer_id(), orphaned_consumer_bit_mask_, metadata_header_->queue_index); orphaned_consumer_bit_mask_ = 0; } SignalAvailable(); } ALOGE_IF(pending_consumers_ && BufferHubDefs::IsBufferReleased(buffer_state_->load()), "ProducerChannel::OnConsumerRelease: buffer state inconsistent: " "pending_consumers=%d, buffer buffer is in releaed state.", pending_consumers_); return {}; } void ProducerChannel::OnConsumerIgnored() { if (pending_consumers_ == 0) { ALOGE("ProducerChannel::OnConsumerIgnored: no pending consumer."); return; } --pending_consumers_; ALOGD_IF(TRACE, "ProducerChannel::OnConsumerIgnored: buffer_id=%d %d consumers left", buffer_id(), pending_consumers_); } void ProducerChannel::OnConsumerOrphaned(ConsumerChannel* channel) { // Ignore the orphaned consumer. OnConsumerIgnored(); const uint64_t consumer_state_bit = channel->consumer_state_bit(); ALOGE_IF(orphaned_consumer_bit_mask_ & consumer_state_bit, "ProducerChannel::OnConsumerOrphaned: Consumer " "(consumer_state_bit=%" PRIx64 ") is already orphaned.", consumer_state_bit); orphaned_consumer_bit_mask_ |= consumer_state_bit; // Atomically clear the fence state bit as an orphaned consumer will never // signal a release fence. Also clear the buffer state as it won't be released // as well. fence_state_->fetch_and(~consumer_state_bit); BufferHubDefs::ModifyBufferState(buffer_state_, consumer_state_bit, 0ULL); ALOGW( "ProducerChannel::OnConsumerOrphaned: detected new orphaned consumer " "buffer_id=%d consumer_state_bit=%" PRIx64 " queue_index=%" PRIu64 " buffer_state=%" PRIx64 " fence_state=%" PRIx64 ".", buffer_id(), consumer_state_bit, metadata_header_->queue_index, buffer_state_->load(), fence_state_->load()); } void ProducerChannel::AddConsumer(ConsumerChannel* channel) { consumer_channels_.push_back(channel); } void ProducerChannel::RemoveConsumer(ConsumerChannel* channel) { consumer_channels_.erase( std::find(consumer_channels_.begin(), consumer_channels_.end(), channel)); active_consumer_bit_mask_ &= ~channel->consumer_state_bit(); const uint64_t buffer_state = buffer_state_->load(); if (BufferHubDefs::IsBufferPosted(buffer_state) || BufferHubDefs::IsBufferAcquired(buffer_state)) { // The consumer client is being destoryed without releasing. This could // happen in corner cases when the consumer crashes. Here we mark it // orphaned before remove it from producer. OnConsumerOrphaned(channel); } if (BufferHubDefs::IsBufferReleased(buffer_state) || BufferHubDefs::IsBufferGained(buffer_state)) { // The consumer is being close while it is suppose to signal a release // fence. Signal the dummy fence here. if (fence_state_->load() & channel->consumer_state_bit()) { epoll_event event; event.events = EPOLLIN; event.data.u64 = channel->consumer_state_bit(); if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD, dummy_fence_fd_.Get(), &event) < 0) { ALOGE( "ProducerChannel::RemoveConsumer: Failed to modify the shared " "release fence to include the dummy fence: %s", strerror(errno)); return; } ALOGW( "ProducerChannel::RemoveConsumer: signal dummy release fence " "buffer_id=%d", buffer_id()); eventfd_write(dummy_fence_fd_.Get(), 1); } } } // Returns true if the given parameters match the underlying buffer parameters. bool ProducerChannel::CheckParameters(uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, uint64_t usage, size_t user_metadata_size) { return user_metadata_size == user_metadata_size_ && buffer_.width() == width && buffer_.height() == height && buffer_.layer_count() == layer_count && buffer_.format() == format && buffer_.usage() == usage; } } // namespace dvr } // namespace android