/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "BufferPoolAccessor"
//#define LOG_NDEBUG 0
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <utils/Log.h>
#include <thread>
#include "AccessorImpl.h"
#include "Connection.h"
namespace android {
namespace hardware {
namespace media {
namespace bufferpool {
namespace V2_0 {
namespace implementation {
namespace {
static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
static constexpr size_t kMinBufferCountForEviction = 40;
}
// Buffer structure in bufferpool process
struct InternalBuffer {
BufferId mId;
size_t mOwnerCount;
size_t mTransactionCount;
const std::shared_ptr<BufferPoolAllocation> mAllocation;
const size_t mAllocSize;
const std::vector<uint8_t> mConfig;
bool mInvalidated;
InternalBuffer(
BufferId id,
const std::shared_ptr<BufferPoolAllocation> &alloc,
const size_t allocSize,
const std::vector<uint8_t> &allocConfig)
: mId(id), mOwnerCount(0), mTransactionCount(0),
mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
mInvalidated(false) {}
const native_handle_t *handle() {
return mAllocation->handle();
}
void invalidate() {
mInvalidated = true;
}
};
struct TransactionStatus {
TransactionId mId;
BufferId mBufferId;
ConnectionId mSender;
ConnectionId mReceiver;
BufferStatus mStatus;
int64_t mTimestampUs;
bool mSenderValidated;
TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
mId = message.transactionId;
mBufferId = message.bufferId;
mStatus = message.newStatus;
mTimestampUs = timestampUs;
if (mStatus == BufferStatus::TRANSFER_TO) {
mSender = message.connectionId;
mReceiver = message.targetConnectionId;
mSenderValidated = true;
} else {
mSender = -1LL;
mReceiver = message.connectionId;
mSenderValidated = false;
}
}
};
// Helper template methods for handling map of set.
template<class T, class U>
bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
auto iter = mapOfSet->find(key);
if (iter == mapOfSet->end()) {
std::set<U> valueSet{value};
mapOfSet->insert(std::make_pair(key, valueSet));
return true;
} else if (iter->second.find(value) == iter->second.end()) {
iter->second.insert(value);
return true;
}
return false;
}
template<class T, class U>
bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
bool ret = false;
auto iter = mapOfSet->find(key);
if (iter != mapOfSet->end()) {
if (iter->second.erase(value) > 0) {
ret = true;
}
if (iter->second.size() == 0) {
mapOfSet->erase(iter);
}
}
return ret;
}
template<class T, class U>
bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
auto iter = mapOfSet->find(key);
if (iter != mapOfSet->end()) {
auto setIter = iter->second.find(value);
return setIter != iter->second.end();
}
return false;
}
int32_t Accessor::Impl::sPid = getpid();
uint32_t Accessor::Impl::sSeqId = time(nullptr);
Accessor::Impl::Impl(
const std::shared_ptr<BufferPoolAllocator> &allocator)
: mAllocator(allocator) {}
Accessor::Impl::~Impl() {
}
ResultStatus Accessor::Impl::connect(
const sp<Accessor> &accessor, const sp<IObserver> &observer,
sp<Connection> *connection,
ConnectionId *pConnectionId,
uint32_t *pMsgId,
const StatusDescriptor** statusDescPtr,
const InvalidationDescriptor** invDescPtr) {
sp<Connection> newConnection = new Connection();
ResultStatus status = ResultStatus::CRITICAL_ERROR;
{
std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
if (newConnection) {
ConnectionId id = (int64_t)sPid << 32 | sSeqId;
status = mBufferPool.mObserver.open(id, statusDescPtr);
if (status == ResultStatus::OK) {
newConnection->initialize(accessor, id);
*connection = newConnection;
*pConnectionId = id;
*pMsgId = mBufferPool.mInvalidation.mInvalidationId;
mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
mBufferPool.mInvalidation.onConnect(id, observer);
++sSeqId;
}
}
mBufferPool.processStatusMessages();
mBufferPool.cleanUp();
}
return status;
}
ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
mBufferPool.processStatusMessages();
mBufferPool.handleClose(connectionId);
mBufferPool.mObserver.close(connectionId);
mBufferPool.mInvalidation.onClose(connectionId);
// Since close# will be called after all works are finished, it is OK to
// evict unused buffers.
mBufferPool.cleanUp(true);
return ResultStatus::OK;
}
ResultStatus Accessor::Impl::allocate(
ConnectionId connectionId, const std::vector<uint8_t>& params,
BufferId *bufferId, const native_handle_t** handle) {
std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
mBufferPool.processStatusMessages();
ResultStatus status = ResultStatus::OK;
if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
lock.unlock();
std::shared_ptr<BufferPoolAllocation> alloc;
size_t allocSize;
status = mAllocator->allocate(params, &alloc, &allocSize);
lock.lock();
if (status == ResultStatus::OK) {
status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
}
ALOGV("create a buffer %d : %u %p",
status == ResultStatus::OK, *bufferId, *handle);
}
if (status == ResultStatus::OK) {
// TODO: handle ownBuffer failure
mBufferPool.handleOwnBuffer(connectionId, *bufferId);
}
mBufferPool.cleanUp();
return status;
}
ResultStatus Accessor::Impl::fetch(
ConnectionId connectionId, TransactionId transactionId,
BufferId bufferId, const native_handle_t** handle) {
std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
mBufferPool.processStatusMessages();
auto found = mBufferPool.mTransactions.find(transactionId);
if (found != mBufferPool.mTransactions.end() &&
contains(&mBufferPool.mPendingTransactions,
connectionId, transactionId)) {
if (found->second->mSenderValidated &&
found->second->mStatus == BufferStatus::TRANSFER_FROM &&
found->second->mBufferId == bufferId) {
found->second->mStatus = BufferStatus::TRANSFER_FETCH;
auto bufferIt = mBufferPool.mBuffers.find(bufferId);
if (bufferIt != mBufferPool.mBuffers.end()) {
mBufferPool.mStats.onBufferFetched();
*handle = bufferIt->second->handle();
return ResultStatus::OK;
}
}
}
mBufferPool.cleanUp();
return ResultStatus::CRITICAL_ERROR;
}
void Accessor::Impl::cleanUp(bool clearCache) {
// transaction timeout, buffer cacheing TTL handling
std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
mBufferPool.processStatusMessages();
mBufferPool.cleanUp(clearCache);
}
void Accessor::Impl::flush() {
std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
mBufferPool.processStatusMessages();
mBufferPool.flush(shared_from_this());
}
void Accessor::Impl::handleInvalidateAck() {
std::map<ConnectionId, const sp<IObserver>> observers;
uint32_t invalidationId;
{
std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
mBufferPool.processStatusMessages();
mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
}
// Do not hold lock for send invalidations
size_t deadClients = 0;
for (auto it = observers.begin(); it != observers.end(); ++it) {
const sp<IObserver> observer = it->second;
if (observer) {
Return<void> transResult = observer->onMessage(it->first, invalidationId);
if (!transResult.isOk()) {
++deadClients;
}
}
}
if (deadClients > 0) {
ALOGD("During invalidation found %zu dead clients", deadClients);
}
}
bool Accessor::Impl::isValid() {
return mBufferPool.isValid();
}
Accessor::Impl::Impl::BufferPool::BufferPool()
: mTimestampUs(getTimestampNow()),
mLastCleanUpUs(mTimestampUs),
mLastLogUs(mTimestampUs),
mSeq(0),
mStartSeq(0) {
mValid = mInvalidationChannel.isValid();
}
// Statistics helper
template<typename T, typename S>
int percentage(T base, S total) {
return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
}
std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
Accessor::Impl::Impl::BufferPool::~BufferPool() {
std::lock_guard<std::mutex> lock(mMutex);
ALOGD("Destruction - bufferpool2 %p "
"cached: %zu/%zuM, %zu/%d%% in use; "
"allocs: %zu, %d%% recycled; "
"transfers: %zu, %d%% unfetced",
this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
mStats.mTotalTransfers,
percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
}
void Accessor::Impl::BufferPool::Invalidation::onConnect(
ConnectionId conId, const sp<IObserver>& observer) {
mAcks[conId] = mInvalidationId; // starts from current invalidationId
mObservers.insert(std::make_pair(conId, observer));
}
void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
mAcks.erase(conId);
mObservers.erase(conId);
}
void Accessor::Impl::BufferPool::Invalidation::onAck(
ConnectionId conId,
uint32_t msgId) {
auto it = mAcks.find(conId);
if (it == mAcks.end()) {
ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
return;
}
if (isMessageLater(msgId, it->second)) {
mAcks[conId] = msgId;
}
}
void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
BufferId bufferId,
BufferInvalidationChannel &channel) {
for (auto it = mPendings.begin(); it != mPendings.end();) {
if (it->isInvalidated(bufferId)) {
uint32_t msgId = 0;
if (it->mNeedsAck) {
msgId = ++mInvalidationId;
if (msgId == 0) {
// wrap happens
msgId = ++mInvalidationId;
}
}
channel.postInvalidation(msgId, it->mFrom, it->mTo);
it = mPendings.erase(it);
continue;
}
++it;
}
}
void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
bool needsAck,
uint32_t from,
uint32_t to,
size_t left,
BufferInvalidationChannel &channel,
const std::shared_ptr<Accessor::Impl> &impl) {
uint32_t msgId = 0;
if (needsAck) {
msgId = ++mInvalidationId;
if (msgId == 0) {
// wrap happens
msgId = ++mInvalidationId;
}
}
ALOGV("bufferpool2 invalidation requested and queued");
if (left == 0) {
channel.postInvalidation(msgId, from, to);
} else {
// TODO: sending hint message?
ALOGV("bufferpoo2 invalidation requested and pending");
Pending pending(needsAck, from, to, left, impl);
mPendings.push_back(pending);
}
sInvalidator->addAccessor(mId, impl);
}
void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
std::map<ConnectionId, const sp<IObserver>> *observers,
uint32_t *invalidationId) {
if (mInvalidationId != 0) {
*invalidationId = mInvalidationId;
std::set<int> deads;
for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
if (it->second != mInvalidationId) {
const sp<IObserver> observer = mObservers[it->first];
if (observer) {
observers->emplace(it->first, observer);
ALOGV("connection %lld will call observer (%u: %u)",
(long long)it->first, it->second, mInvalidationId);
// N.B: onMessage will be called later. ignore possibility of
// onMessage# oneway call being lost.
it->second = mInvalidationId;
} else {
ALOGV("bufferpool2 observer died %lld", (long long)it->first);
deads.insert(it->first);
}
}
}
if (deads.size() > 0) {
for (auto it = deads.begin(); it != deads.end(); ++it) {
onClose(*it);
}
}
}
if (mPendings.size() == 0) {
// All invalidation Ids are synced and no more pending invalidations.
sInvalidator->delAccessor(mId);
}
}
bool Accessor::Impl::BufferPool::handleOwnBuffer(
ConnectionId connectionId, BufferId bufferId) {
bool added = insert(&mUsingBuffers, connectionId, bufferId);
if (added) {
auto iter = mBuffers.find(bufferId);
iter->second->mOwnerCount++;
}
insert(&mUsingConnections, bufferId, connectionId);
return added;
}
bool Accessor::Impl::BufferPool::handleReleaseBuffer(
ConnectionId connectionId, BufferId bufferId) {
bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
if (deleted) {
auto iter = mBuffers.find(bufferId);
iter->second->mOwnerCount--;
if (iter->second->mOwnerCount == 0 &&
iter->second->mTransactionCount == 0) {
if (!iter->second->mInvalidated) {
mStats.onBufferUnused(iter->second->mAllocSize);
mFreeBuffers.insert(bufferId);
} else {
mStats.onBufferUnused(iter->second->mAllocSize);
mStats.onBufferEvicted(iter->second->mAllocSize);
mBuffers.erase(iter);
mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
}
}
}
erase(&mUsingConnections, bufferId, connectionId);
ALOGV("release buffer %u : %d", bufferId, deleted);
return deleted;
}
bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
auto completed = mCompletedTransactions.find(
message.transactionId);
if (completed != mCompletedTransactions.end()) {
// already completed
mCompletedTransactions.erase(completed);
return true;
}
// the buffer should exist and be owned.
auto bufferIter = mBuffers.find(message.bufferId);
if (bufferIter == mBuffers.end() ||
!contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
return false;
}
auto found = mTransactions.find(message.transactionId);
if (found != mTransactions.end()) {
// transfer_from was received earlier.
found->second->mSender = message.connectionId;
found->second->mSenderValidated = true;
return true;
}
// TODO: verify there is target connection Id
mStats.onBufferSent();
mTransactions.insert(std::make_pair(
message.transactionId,
std::make_unique<TransactionStatus>(message, mTimestampUs)));
insert(&mPendingTransactions, message.targetConnectionId,
message.transactionId);
bufferIter->second->mTransactionCount++;
return true;
}
bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
auto found = mTransactions.find(message.transactionId);
if (found == mTransactions.end()) {
// TODO: is it feasible to check ownership here?
mStats.onBufferSent();
mTransactions.insert(std::make_pair(
message.transactionId,
std::make_unique<TransactionStatus>(message, mTimestampUs)));
insert(&mPendingTransactions, message.connectionId,
message.transactionId);
auto bufferIter = mBuffers.find(message.bufferId);
bufferIter->second->mTransactionCount++;
} else {
if (message.connectionId == found->second->mReceiver) {
found->second->mStatus = BufferStatus::TRANSFER_FROM;
}
}
return true;
}
bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
auto found = mTransactions.find(message.transactionId);
if (found != mTransactions.end()) {
bool deleted = erase(&mPendingTransactions, message.connectionId,
message.transactionId);
if (deleted) {
if (!found->second->mSenderValidated) {
mCompletedTransactions.insert(message.transactionId);
}
auto bufferIter = mBuffers.find(message.bufferId);
if (message.newStatus == BufferStatus::TRANSFER_OK) {
handleOwnBuffer(message.connectionId, message.bufferId);
}
bufferIter->second->mTransactionCount--;
if (bufferIter->second->mOwnerCount == 0
&& bufferIter->second->mTransactionCount == 0) {
if (!bufferIter->second->mInvalidated) {
mStats.onBufferUnused(bufferIter->second->mAllocSize);
mFreeBuffers.insert(message.bufferId);
} else {
mStats.onBufferUnused(bufferIter->second->mAllocSize);
mStats.onBufferEvicted(bufferIter->second->mAllocSize);
mBuffers.erase(bufferIter);
mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
}
}
mTransactions.erase(found);
}
ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
message.bufferId, deleted);
return deleted;
}
ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
message.bufferId);
return false;
}
void Accessor::Impl::BufferPool::processStatusMessages() {
std::vector<BufferStatusMessage> messages;
mObserver.getBufferStatusChanges(messages);
mTimestampUs = getTimestampNow();
for (BufferStatusMessage& message: messages) {
bool ret = false;
switch (message.newStatus) {
case BufferStatus::NOT_USED:
ret = handleReleaseBuffer(
message.connectionId, message.bufferId);
break;
case BufferStatus::USED:
// not happening
break;
case BufferStatus::TRANSFER_TO:
ret = handleTransferTo(message);
break;
case BufferStatus::TRANSFER_FROM:
ret = handleTransferFrom(message);
break;
case BufferStatus::TRANSFER_TIMEOUT:
// TODO
break;
case BufferStatus::TRANSFER_LOST:
// TODO
break;
case BufferStatus::TRANSFER_FETCH:
// not happening
break;
case BufferStatus::TRANSFER_OK:
case BufferStatus::TRANSFER_ERROR:
ret = handleTransferResult(message);
break;
case BufferStatus::INVALIDATION_ACK:
mInvalidation.onAck(message.connectionId, message.bufferId);
ret = true;
break;
}
if (ret == false) {
ALOGW("buffer status message processing failure - message : %d connection : %lld",
message.newStatus, (long long)message.connectionId);
}
}
messages.clear();
}
bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
// Cleaning buffers
auto buffers = mUsingBuffers.find(connectionId);
if (buffers != mUsingBuffers.end()) {
for (const BufferId& bufferId : buffers->second) {
bool deleted = erase(&mUsingConnections, bufferId, connectionId);
if (deleted) {
auto bufferIter = mBuffers.find(bufferId);
bufferIter->second->mOwnerCount--;
if (bufferIter->second->mOwnerCount == 0 &&
bufferIter->second->mTransactionCount == 0) {
// TODO: handle freebuffer insert fail
if (!bufferIter->second->mInvalidated) {
mStats.onBufferUnused(bufferIter->second->mAllocSize);
mFreeBuffers.insert(bufferId);
} else {
mStats.onBufferUnused(bufferIter->second->mAllocSize);
mStats.onBufferEvicted(bufferIter->second->mAllocSize);
mBuffers.erase(bufferIter);
mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
}
}
}
}
mUsingBuffers.erase(buffers);
}
// Cleaning transactions
auto pending = mPendingTransactions.find(connectionId);
if (pending != mPendingTransactions.end()) {
for (const TransactionId& transactionId : pending->second) {
auto iter = mTransactions.find(transactionId);
if (iter != mTransactions.end()) {
if (!iter->second->mSenderValidated) {
mCompletedTransactions.insert(transactionId);
}
BufferId bufferId = iter->second->mBufferId;
auto bufferIter = mBuffers.find(bufferId);
bufferIter->second->mTransactionCount--;
if (bufferIter->second->mOwnerCount == 0 &&
bufferIter->second->mTransactionCount == 0) {
// TODO: handle freebuffer insert fail
if (!bufferIter->second->mInvalidated) {
mStats.onBufferUnused(bufferIter->second->mAllocSize);
mFreeBuffers.insert(bufferId);
} else {
mStats.onBufferUnused(bufferIter->second->mAllocSize);
mStats.onBufferEvicted(bufferIter->second->mAllocSize);
mBuffers.erase(bufferIter);
mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
}
}
mTransactions.erase(iter);
}
}
}
return true;
}
bool Accessor::Impl::BufferPool::getFreeBuffer(
const std::shared_ptr<BufferPoolAllocator> &allocator,
const std::vector<uint8_t> ¶ms, BufferId *pId,
const native_handle_t** handle) {
auto bufferIt = mFreeBuffers.begin();
for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
BufferId bufferId = *bufferIt;
if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
break;
}
}
if (bufferIt != mFreeBuffers.end()) {
BufferId id = *bufferIt;
mFreeBuffers.erase(bufferIt);
mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
*handle = mBuffers[id]->handle();
*pId = id;
ALOGV("recycle a buffer %u %p", id, *handle);
return true;
}
return false;
}
ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
const std::shared_ptr<BufferPoolAllocation> &alloc,
const size_t allocSize,
const std::vector<uint8_t> ¶ms,
BufferId *pId,
const native_handle_t** handle) {
BufferId bufferId = mSeq++;
if (mSeq == Connection::SYNC_BUFFERID) {
mSeq = 0;
}
std::unique_ptr<InternalBuffer> buffer =
std::make_unique<InternalBuffer>(
bufferId, alloc, allocSize, params);
if (buffer) {
auto res = mBuffers.insert(std::make_pair(
bufferId, std::move(buffer)));
if (res.second) {
mStats.onBufferAllocated(allocSize);
*handle = alloc->handle();
*pId = bufferId;
return ResultStatus::OK;
}
}
return ResultStatus::NO_MEMORY;
}
void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
mLastCleanUpUs = mTimestampUs;
if (mTimestampUs > mLastLogUs + kLogDurationUs) {
mLastLogUs = mTimestampUs;
ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
"%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
"%zu/%zu (fetch/transfer)",
this, mStats.mBuffersCached, mStats.mSizeCached,
mStats.mBuffersInUse, mStats.mSizeInUse,
mStats.mTotalRecycles, mStats.mTotalAllocations,
mStats.mTotalFetches, mStats.mTotalTransfers);
}
for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
&& mBuffers.size() < kMinBufferCountForEviction) {
break;
}
auto it = mBuffers.find(*freeIt);
if (it != mBuffers.end() &&
it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
mStats.onBufferEvicted(it->second->mAllocSize);
mBuffers.erase(it);
freeIt = mFreeBuffers.erase(freeIt);
} else {
++freeIt;
ALOGW("bufferpool2 inconsistent!");
}
}
}
}
void Accessor::Impl::BufferPool::invalidate(
bool needsAck, BufferId from, BufferId to,
const std::shared_ptr<Accessor::Impl> &impl) {
for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
if (isBufferInRange(from, to, *freeIt)) {
auto it = mBuffers.find(*freeIt);
if (it != mBuffers.end() &&
it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
mStats.onBufferEvicted(it->second->mAllocSize);
mBuffers.erase(it);
freeIt = mFreeBuffers.erase(freeIt);
continue;
} else {
ALOGW("bufferpool2 inconsistent!");
}
}
++freeIt;
}
size_t left = 0;
for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
if (isBufferInRange(from, to, it->first)) {
it->second->invalidate();
++left;
}
}
mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
}
void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
BufferId from = mStartSeq;
BufferId to = mSeq;
mStartSeq = mSeq;
// TODO: needsAck params
ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
if (from != to) {
invalidate(true, from, to, impl);
}
}
void Accessor::Impl::invalidatorThread(
std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
std::mutex &mutex,
std::condition_variable &cv,
bool &ready) {
while(true) {
std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
{
std::unique_lock<std::mutex> lock(mutex);
if (!ready) {
cv.wait(lock);
}
copied.insert(accessors.begin(), accessors.end());
}
std::list<ConnectionId> erased;
for (auto it = copied.begin(); it != copied.end(); ++it) {
const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
if (!impl) {
erased.push_back(it->first);
} else {
impl->handleInvalidateAck();
}
}
{
std::unique_lock<std::mutex> lock(mutex);
for (auto it = erased.begin(); it != erased.end(); ++it) {
accessors.erase(*it);
}
if (accessors.size() == 0) {
ready = false;
} else {
// prevent draining cpu.
lock.unlock();
std::this_thread::yield();
}
}
}
}
Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
std::thread invalidator(
invalidatorThread,
std::ref(mAccessors),
std::ref(mMutex),
std::ref(mCv),
std::ref(mReady));
invalidator.detach();
}
void Accessor::Impl::AccessorInvalidator::addAccessor(
uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
bool notify = false;
std::unique_lock<std::mutex> lock(mMutex);
if (mAccessors.find(accessorId) == mAccessors.end()) {
if (!mReady) {
mReady = true;
notify = true;
}
mAccessors.insert(std::make_pair(accessorId, impl));
ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
}
lock.unlock();
if (notify) {
mCv.notify_one();
}
}
void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
std::lock_guard<std::mutex> lock(mMutex);
mAccessors.erase(accessorId);
ALOGV("buffer invalidation deleted bp:%u", accessorId);
if (mAccessors.size() == 0) {
mReady = false;
}
}
std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
void Accessor::Impl::createInvalidator() {
if (!sInvalidator) {
sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
}
}
} // namespace implementation
} // namespace V2_0
} // namespace bufferpool
} // namespace media
} // namespace hardware
} // namespace android