/* * Copyright (C) 2018 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "BufferPoolAccessor" //#define LOG_NDEBUG 0 #include <sys/types.h> #include <time.h> #include <unistd.h> #include <utils/Log.h> #include <thread> #include "AccessorImpl.h" #include "Connection.h" namespace android { namespace hardware { namespace media { namespace bufferpool { namespace V2_0 { namespace implementation { namespace { static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec static constexpr int64_t kLogDurationUs = 5000000; // 5 secs static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15; static constexpr size_t kMinBufferCountForEviction = 40; } // Buffer structure in bufferpool process struct InternalBuffer { BufferId mId; size_t mOwnerCount; size_t mTransactionCount; const std::shared_ptr<BufferPoolAllocation> mAllocation; const size_t mAllocSize; const std::vector<uint8_t> mConfig; bool mInvalidated; InternalBuffer( BufferId id, const std::shared_ptr<BufferPoolAllocation> &alloc, const size_t allocSize, const std::vector<uint8_t> &allocConfig) : mId(id), mOwnerCount(0), mTransactionCount(0), mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig), mInvalidated(false) {} const native_handle_t *handle() { return mAllocation->handle(); } void invalidate() { mInvalidated = true; } }; struct TransactionStatus { TransactionId mId; BufferId mBufferId; ConnectionId mSender; ConnectionId mReceiver; BufferStatus mStatus; int64_t mTimestampUs; bool mSenderValidated; TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) { mId = message.transactionId; mBufferId = message.bufferId; mStatus = message.newStatus; mTimestampUs = timestampUs; if (mStatus == BufferStatus::TRANSFER_TO) { mSender = message.connectionId; mReceiver = message.targetConnectionId; mSenderValidated = true; } else { mSender = -1LL; mReceiver = message.connectionId; mSenderValidated = false; } } }; // Helper template methods for handling map of set. template<class T, class U> bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) { auto iter = mapOfSet->find(key); if (iter == mapOfSet->end()) { std::set<U> valueSet{value}; mapOfSet->insert(std::make_pair(key, valueSet)); return true; } else if (iter->second.find(value) == iter->second.end()) { iter->second.insert(value); return true; } return false; } template<class T, class U> bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) { bool ret = false; auto iter = mapOfSet->find(key); if (iter != mapOfSet->end()) { if (iter->second.erase(value) > 0) { ret = true; } if (iter->second.size() == 0) { mapOfSet->erase(iter); } } return ret; } template<class T, class U> bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) { auto iter = mapOfSet->find(key); if (iter != mapOfSet->end()) { auto setIter = iter->second.find(value); return setIter != iter->second.end(); } return false; } int32_t Accessor::Impl::sPid = getpid(); uint32_t Accessor::Impl::sSeqId = time(nullptr); Accessor::Impl::Impl( const std::shared_ptr<BufferPoolAllocator> &allocator) : mAllocator(allocator) {} Accessor::Impl::~Impl() { } ResultStatus Accessor::Impl::connect( const sp<Accessor> &accessor, const sp<IObserver> &observer, sp<Connection> *connection, ConnectionId *pConnectionId, uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr) { sp<Connection> newConnection = new Connection(); ResultStatus status = ResultStatus::CRITICAL_ERROR; { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); if (newConnection) { ConnectionId id = (int64_t)sPid << 32 | sSeqId; status = mBufferPool.mObserver.open(id, statusDescPtr); if (status == ResultStatus::OK) { newConnection->initialize(accessor, id); *connection = newConnection; *pConnectionId = id; *pMsgId = mBufferPool.mInvalidation.mInvalidationId; mBufferPool.mInvalidationChannel.getDesc(invDescPtr); mBufferPool.mInvalidation.onConnect(id, observer); ++sSeqId; } } mBufferPool.processStatusMessages(); mBufferPool.cleanUp(); } return status; } ResultStatus Accessor::Impl::close(ConnectionId connectionId) { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId); mBufferPool.processStatusMessages(); mBufferPool.handleClose(connectionId); mBufferPool.mObserver.close(connectionId); mBufferPool.mInvalidation.onClose(connectionId); // Since close# will be called after all works are finished, it is OK to // evict unused buffers. mBufferPool.cleanUp(true); return ResultStatus::OK; } ResultStatus Accessor::Impl::allocate( ConnectionId connectionId, const std::vector<uint8_t>& params, BufferId *bufferId, const native_handle_t** handle) { std::unique_lock<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); ResultStatus status = ResultStatus::OK; if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) { lock.unlock(); std::shared_ptr<BufferPoolAllocation> alloc; size_t allocSize; status = mAllocator->allocate(params, &alloc, &allocSize); lock.lock(); if (status == ResultStatus::OK) { status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle); } ALOGV("create a buffer %d : %u %p", status == ResultStatus::OK, *bufferId, *handle); } if (status == ResultStatus::OK) { // TODO: handle ownBuffer failure mBufferPool.handleOwnBuffer(connectionId, *bufferId); } mBufferPool.cleanUp(); return status; } ResultStatus Accessor::Impl::fetch( ConnectionId connectionId, TransactionId transactionId, BufferId bufferId, const native_handle_t** handle) { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); auto found = mBufferPool.mTransactions.find(transactionId); if (found != mBufferPool.mTransactions.end() && contains(&mBufferPool.mPendingTransactions, connectionId, transactionId)) { if (found->second->mSenderValidated && found->second->mStatus == BufferStatus::TRANSFER_FROM && found->second->mBufferId == bufferId) { found->second->mStatus = BufferStatus::TRANSFER_FETCH; auto bufferIt = mBufferPool.mBuffers.find(bufferId); if (bufferIt != mBufferPool.mBuffers.end()) { mBufferPool.mStats.onBufferFetched(); *handle = bufferIt->second->handle(); return ResultStatus::OK; } } } mBufferPool.cleanUp(); return ResultStatus::CRITICAL_ERROR; } void Accessor::Impl::cleanUp(bool clearCache) { // transaction timeout, buffer cacheing TTL handling std::lock_guard<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.cleanUp(clearCache); } void Accessor::Impl::flush() { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.flush(shared_from_this()); } void Accessor::Impl::handleInvalidateAck() { std::map<ConnectionId, const sp<IObserver>> observers; uint32_t invalidationId; { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId); } // Do not hold lock for send invalidations size_t deadClients = 0; for (auto it = observers.begin(); it != observers.end(); ++it) { const sp<IObserver> observer = it->second; if (observer) { Return<void> transResult = observer->onMessage(it->first, invalidationId); if (!transResult.isOk()) { ++deadClients; } } } if (deadClients > 0) { ALOGD("During invalidation found %zu dead clients", deadClients); } } bool Accessor::Impl::isValid() { return mBufferPool.isValid(); } Accessor::Impl::Impl::BufferPool::BufferPool() : mTimestampUs(getTimestampNow()), mLastCleanUpUs(mTimestampUs), mLastLogUs(mTimestampUs), mSeq(0), mStartSeq(0) { mValid = mInvalidationChannel.isValid(); } // Statistics helper template<typename T, typename S> int percentage(T base, S total) { return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0); } std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0); Accessor::Impl::Impl::BufferPool::~BufferPool() { std::lock_guard<std::mutex> lock(mMutex); ALOGD("Destruction - bufferpool2 %p " "cached: %zu/%zuM, %zu/%d%% in use; " "allocs: %zu, %d%% recycled; " "transfers: %zu, %d%% unfetced", this, mStats.mBuffersCached, mStats.mSizeCached >> 20, mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached), mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations), mStats.mTotalTransfers, percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers)); } void Accessor::Impl::BufferPool::Invalidation::onConnect( ConnectionId conId, const sp<IObserver>& observer) { mAcks[conId] = mInvalidationId; // starts from current invalidationId mObservers.insert(std::make_pair(conId, observer)); } void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) { mAcks.erase(conId); mObservers.erase(conId); } void Accessor::Impl::BufferPool::Invalidation::onAck( ConnectionId conId, uint32_t msgId) { auto it = mAcks.find(conId); if (it == mAcks.end()) { ALOGW("ACK from inconsistent connection! %lld", (long long)conId); return; } if (isMessageLater(msgId, it->second)) { mAcks[conId] = msgId; } } void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated( BufferId bufferId, BufferInvalidationChannel &channel) { for (auto it = mPendings.begin(); it != mPendings.end();) { if (it->isInvalidated(bufferId)) { uint32_t msgId = 0; if (it->mNeedsAck) { msgId = ++mInvalidationId; if (msgId == 0) { // wrap happens msgId = ++mInvalidationId; } } channel.postInvalidation(msgId, it->mFrom, it->mTo); it = mPendings.erase(it); continue; } ++it; } } void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest( bool needsAck, uint32_t from, uint32_t to, size_t left, BufferInvalidationChannel &channel, const std::shared_ptr<Accessor::Impl> &impl) { uint32_t msgId = 0; if (needsAck) { msgId = ++mInvalidationId; if (msgId == 0) { // wrap happens msgId = ++mInvalidationId; } } ALOGV("bufferpool2 invalidation requested and queued"); if (left == 0) { channel.postInvalidation(msgId, from, to); } else { // TODO: sending hint message? ALOGV("bufferpoo2 invalidation requested and pending"); Pending pending(needsAck, from, to, left, impl); mPendings.push_back(pending); } sInvalidator->addAccessor(mId, impl); } void Accessor::Impl::BufferPool::Invalidation::onHandleAck( std::map<ConnectionId, const sp<IObserver>> *observers, uint32_t *invalidationId) { if (mInvalidationId != 0) { *invalidationId = mInvalidationId; std::set<int> deads; for (auto it = mAcks.begin(); it != mAcks.end(); ++it) { if (it->second != mInvalidationId) { const sp<IObserver> observer = mObservers[it->first]; if (observer) { observers->emplace(it->first, observer); ALOGV("connection %lld will call observer (%u: %u)", (long long)it->first, it->second, mInvalidationId); // N.B: onMessage will be called later. ignore possibility of // onMessage# oneway call being lost. it->second = mInvalidationId; } else { ALOGV("bufferpool2 observer died %lld", (long long)it->first); deads.insert(it->first); } } } if (deads.size() > 0) { for (auto it = deads.begin(); it != deads.end(); ++it) { onClose(*it); } } } if (mPendings.size() == 0) { // All invalidation Ids are synced and no more pending invalidations. sInvalidator->delAccessor(mId); } } bool Accessor::Impl::BufferPool::handleOwnBuffer( ConnectionId connectionId, BufferId bufferId) { bool added = insert(&mUsingBuffers, connectionId, bufferId); if (added) { auto iter = mBuffers.find(bufferId); iter->second->mOwnerCount++; } insert(&mUsingConnections, bufferId, connectionId); return added; } bool Accessor::Impl::BufferPool::handleReleaseBuffer( ConnectionId connectionId, BufferId bufferId) { bool deleted = erase(&mUsingBuffers, connectionId, bufferId); if (deleted) { auto iter = mBuffers.find(bufferId); iter->second->mOwnerCount--; if (iter->second->mOwnerCount == 0 && iter->second->mTransactionCount == 0) { if (!iter->second->mInvalidated) { mStats.onBufferUnused(iter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(iter->second->mAllocSize); mStats.onBufferEvicted(iter->second->mAllocSize); mBuffers.erase(iter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } } erase(&mUsingConnections, bufferId, connectionId); ALOGV("release buffer %u : %d", bufferId, deleted); return deleted; } bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) { auto completed = mCompletedTransactions.find( message.transactionId); if (completed != mCompletedTransactions.end()) { // already completed mCompletedTransactions.erase(completed); return true; } // the buffer should exist and be owned. auto bufferIter = mBuffers.find(message.bufferId); if (bufferIter == mBuffers.end() || !contains(&mUsingBuffers, message.connectionId, message.bufferId)) { return false; } auto found = mTransactions.find(message.transactionId); if (found != mTransactions.end()) { // transfer_from was received earlier. found->second->mSender = message.connectionId; found->second->mSenderValidated = true; return true; } // TODO: verify there is target connection Id mStats.onBufferSent(); mTransactions.insert(std::make_pair( message.transactionId, std::make_unique<TransactionStatus>(message, mTimestampUs))); insert(&mPendingTransactions, message.targetConnectionId, message.transactionId); bufferIter->second->mTransactionCount++; return true; } bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) { auto found = mTransactions.find(message.transactionId); if (found == mTransactions.end()) { // TODO: is it feasible to check ownership here? mStats.onBufferSent(); mTransactions.insert(std::make_pair( message.transactionId, std::make_unique<TransactionStatus>(message, mTimestampUs))); insert(&mPendingTransactions, message.connectionId, message.transactionId); auto bufferIter = mBuffers.find(message.bufferId); bufferIter->second->mTransactionCount++; } else { if (message.connectionId == found->second->mReceiver) { found->second->mStatus = BufferStatus::TRANSFER_FROM; } } return true; } bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) { auto found = mTransactions.find(message.transactionId); if (found != mTransactions.end()) { bool deleted = erase(&mPendingTransactions, message.connectionId, message.transactionId); if (deleted) { if (!found->second->mSenderValidated) { mCompletedTransactions.insert(message.transactionId); } auto bufferIter = mBuffers.find(message.bufferId); if (message.newStatus == BufferStatus::TRANSFER_OK) { handleOwnBuffer(message.connectionId, message.bufferId); } bufferIter->second->mTransactionCount--; if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(message.bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel); } } mTransactions.erase(found); } ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId, message.bufferId, deleted); return deleted; } ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId, message.bufferId); return false; } void Accessor::Impl::BufferPool::processStatusMessages() { std::vector<BufferStatusMessage> messages; mObserver.getBufferStatusChanges(messages); mTimestampUs = getTimestampNow(); for (BufferStatusMessage& message: messages) { bool ret = false; switch (message.newStatus) { case BufferStatus::NOT_USED: ret = handleReleaseBuffer( message.connectionId, message.bufferId); break; case BufferStatus::USED: // not happening break; case BufferStatus::TRANSFER_TO: ret = handleTransferTo(message); break; case BufferStatus::TRANSFER_FROM: ret = handleTransferFrom(message); break; case BufferStatus::TRANSFER_TIMEOUT: // TODO break; case BufferStatus::TRANSFER_LOST: // TODO break; case BufferStatus::TRANSFER_FETCH: // not happening break; case BufferStatus::TRANSFER_OK: case BufferStatus::TRANSFER_ERROR: ret = handleTransferResult(message); break; case BufferStatus::INVALIDATION_ACK: mInvalidation.onAck(message.connectionId, message.bufferId); ret = true; break; } if (ret == false) { ALOGW("buffer status message processing failure - message : %d connection : %lld", message.newStatus, (long long)message.connectionId); } } messages.clear(); } bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { // Cleaning buffers auto buffers = mUsingBuffers.find(connectionId); if (buffers != mUsingBuffers.end()) { for (const BufferId& bufferId : buffers->second) { bool deleted = erase(&mUsingConnections, bufferId, connectionId); if (deleted) { auto bufferIter = mBuffers.find(bufferId); bufferIter->second->mOwnerCount--; if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { // TODO: handle freebuffer insert fail if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } } } mUsingBuffers.erase(buffers); } // Cleaning transactions auto pending = mPendingTransactions.find(connectionId); if (pending != mPendingTransactions.end()) { for (const TransactionId& transactionId : pending->second) { auto iter = mTransactions.find(transactionId); if (iter != mTransactions.end()) { if (!iter->second->mSenderValidated) { mCompletedTransactions.insert(transactionId); } BufferId bufferId = iter->second->mBufferId; auto bufferIter = mBuffers.find(bufferId); bufferIter->second->mTransactionCount--; if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { // TODO: handle freebuffer insert fail if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } mTransactions.erase(iter); } } } return true; } bool Accessor::Impl::BufferPool::getFreeBuffer( const std::shared_ptr<BufferPoolAllocator> &allocator, const std::vector<uint8_t> ¶ms, BufferId *pId, const native_handle_t** handle) { auto bufferIt = mFreeBuffers.begin(); for (;bufferIt != mFreeBuffers.end(); ++bufferIt) { BufferId bufferId = *bufferIt; if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) { break; } } if (bufferIt != mFreeBuffers.end()) { BufferId id = *bufferIt; mFreeBuffers.erase(bufferIt); mStats.onBufferRecycled(mBuffers[id]->mAllocSize); *handle = mBuffers[id]->handle(); *pId = id; ALOGV("recycle a buffer %u %p", id, *handle); return true; } return false; } ResultStatus Accessor::Impl::BufferPool::addNewBuffer( const std::shared_ptr<BufferPoolAllocation> &alloc, const size_t allocSize, const std::vector<uint8_t> ¶ms, BufferId *pId, const native_handle_t** handle) { BufferId bufferId = mSeq++; if (mSeq == Connection::SYNC_BUFFERID) { mSeq = 0; } std::unique_ptr<InternalBuffer> buffer = std::make_unique<InternalBuffer>( bufferId, alloc, allocSize, params); if (buffer) { auto res = mBuffers.insert(std::make_pair( bufferId, std::move(buffer))); if (res.second) { mStats.onBufferAllocated(allocSize); *handle = alloc->handle(); *pId = bufferId; return ResultStatus::OK; } } return ResultStatus::NO_MEMORY; } void Accessor::Impl::BufferPool::cleanUp(bool clearCache) { if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) { mLastCleanUpUs = mTimestampUs; if (mTimestampUs > mLastLogUs + kLogDurationUs) { mLastLogUs = mTimestampUs; ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - " "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - " "%zu/%zu (fetch/transfer)", this, mStats.mBuffersCached, mStats.mSizeCached, mStats.mBuffersInUse, mStats.mSizeInUse, mStats.mTotalRecycles, mStats.mTotalAllocations, mStats.mTotalFetches, mStats.mTotalTransfers); } for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) { if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction && mBuffers.size() < kMinBufferCountForEviction) { break; } auto it = mBuffers.find(*freeIt); if (it != mBuffers.end() && it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) { mStats.onBufferEvicted(it->second->mAllocSize); mBuffers.erase(it); freeIt = mFreeBuffers.erase(freeIt); } else { ++freeIt; ALOGW("bufferpool2 inconsistent!"); } } } } void Accessor::Impl::BufferPool::invalidate( bool needsAck, BufferId from, BufferId to, const std::shared_ptr<Accessor::Impl> &impl) { for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) { if (isBufferInRange(from, to, *freeIt)) { auto it = mBuffers.find(*freeIt); if (it != mBuffers.end() && it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) { mStats.onBufferEvicted(it->second->mAllocSize); mBuffers.erase(it); freeIt = mFreeBuffers.erase(freeIt); continue; } else { ALOGW("bufferpool2 inconsistent!"); } } ++freeIt; } size_t left = 0; for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) { if (isBufferInRange(from, to, it->first)) { it->second->invalidate(); ++left; } } mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl); } void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) { BufferId from = mStartSeq; BufferId to = mSeq; mStartSeq = mSeq; // TODO: needsAck params ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to); if (from != to) { invalidate(true, from, to, impl); } } void Accessor::Impl::invalidatorThread( std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors, std::mutex &mutex, std::condition_variable &cv, bool &ready) { while(true) { std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied; { std::unique_lock<std::mutex> lock(mutex); if (!ready) { cv.wait(lock); } copied.insert(accessors.begin(), accessors.end()); } std::list<ConnectionId> erased; for (auto it = copied.begin(); it != copied.end(); ++it) { const std::shared_ptr<Accessor::Impl> impl = it->second.lock(); if (!impl) { erased.push_back(it->first); } else { impl->handleInvalidateAck(); } } { std::unique_lock<std::mutex> lock(mutex); for (auto it = erased.begin(); it != erased.end(); ++it) { accessors.erase(*it); } if (accessors.size() == 0) { ready = false; } else { // prevent draining cpu. lock.unlock(); std::this_thread::yield(); } } } } Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) { std::thread invalidator( invalidatorThread, std::ref(mAccessors), std::ref(mMutex), std::ref(mCv), std::ref(mReady)); invalidator.detach(); } void Accessor::Impl::AccessorInvalidator::addAccessor( uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) { bool notify = false; std::unique_lock<std::mutex> lock(mMutex); if (mAccessors.find(accessorId) == mAccessors.end()) { if (!mReady) { mReady = true; notify = true; } mAccessors.insert(std::make_pair(accessorId, impl)); ALOGV("buffer invalidation added bp:%u %d", accessorId, notify); } lock.unlock(); if (notify) { mCv.notify_one(); } } void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) { std::lock_guard<std::mutex> lock(mMutex); mAccessors.erase(accessorId); ALOGV("buffer invalidation deleted bp:%u", accessorId); if (mAccessors.size() == 0) { mReady = false; } } std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator; void Accessor::Impl::createInvalidator() { if (!sInvalidator) { sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>(); } } } // namespace implementation } // namespace V2_0 } // namespace bufferpool } // namespace media } // namespace hardware } // namespace android