/* * Copyright (C) 2018 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "BufferPoolStatus" //#define LOG_NDEBUG 0 #include <thread> #include <time.h> #include "BufferStatus.h" namespace android { namespace hardware { namespace media { namespace bufferpool { namespace V2_0 { namespace implementation { int64_t getTimestampNow() { int64_t stamp; struct timespec ts; // TODO: CLOCK_MONOTONIC_COARSE? clock_gettime(CLOCK_MONOTONIC, &ts); stamp = ts.tv_nsec / 1000; stamp += (ts.tv_sec * 1000000LL); return stamp; } bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId) { return curMsgId != prevMsgId && curMsgId - prevMsgId < prevMsgId - curMsgId; } bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId) { if (from < to) { return from <= bufferId && bufferId < to; } else { // wrap happens return from <= bufferId || bufferId < to; } } static constexpr int kNumElementsInQueue = 1024*16; static constexpr int kMinElementsToSyncInQueue = 128; ResultStatus BufferStatusObserver::open( ConnectionId id, const StatusDescriptor** fmqDescPtr) { if (mBufferStatusQueues.find(id) != mBufferStatusQueues.end()) { // TODO: id collision log? return ResultStatus::CRITICAL_ERROR; } std::unique_ptr<BufferStatusQueue> queue = std::make_unique<BufferStatusQueue>(kNumElementsInQueue); if (!queue || queue->isValid() == false) { *fmqDescPtr = nullptr; return ResultStatus::NO_MEMORY; } else { *fmqDescPtr = queue->getDesc(); } auto result = mBufferStatusQueues.insert( std::make_pair(id, std::move(queue))); if (!result.second) { *fmqDescPtr = nullptr; return ResultStatus::NO_MEMORY; } return ResultStatus::OK; } ResultStatus BufferStatusObserver::close(ConnectionId id) { if (mBufferStatusQueues.find(id) == mBufferStatusQueues.end()) { return ResultStatus::CRITICAL_ERROR; } mBufferStatusQueues.erase(id); return ResultStatus::OK; } void BufferStatusObserver::getBufferStatusChanges(std::vector<BufferStatusMessage> &messages) { for (auto it = mBufferStatusQueues.begin(); it != mBufferStatusQueues.end(); ++it) { BufferStatusMessage message; size_t avail = it->second->availableToRead(); while (avail > 0) { if (!it->second->read(&message, 1)) { // Since avaliable # of reads are already confirmed, // this should not happen. // TODO: error handling (spurious client?) ALOGW("FMQ message cannot be read from %lld", (long long)it->first); return; } message.connectionId = it->first; messages.push_back(message); --avail; } } } BufferStatusChannel::BufferStatusChannel( const StatusDescriptor &fmqDesc) { std::unique_ptr<BufferStatusQueue> queue = std::make_unique<BufferStatusQueue>(fmqDesc); if (!queue || queue->isValid() == false) { mValid = false; return; } mValid = true; mBufferStatusQueue = std::move(queue); } bool BufferStatusChannel::isValid() { return mValid; } bool BufferStatusChannel::needsSync() { if (mValid) { size_t avail = mBufferStatusQueue->availableToWrite(); return avail + kMinElementsToSyncInQueue < kNumElementsInQueue; } return false; } void BufferStatusChannel::postBufferRelease( ConnectionId connectionId, std::list<BufferId> &pending, std::list<BufferId> &posted) { if (mValid && pending.size() > 0) { size_t avail = mBufferStatusQueue->availableToWrite(); avail = std::min(avail, pending.size()); BufferStatusMessage message; for (size_t i = 0 ; i < avail; ++i) { BufferId id = pending.front(); message.newStatus = BufferStatus::NOT_USED; message.bufferId = id; message.connectionId = connectionId; if (!mBufferStatusQueue->write(&message, 1)) { // Since avaliable # of writes are already confirmed, // this should not happen. // TODO: error handing? ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId); return; } pending.pop_front(); posted.push_back(id); } } } void BufferStatusChannel::postBufferInvalidateAck( ConnectionId connectionId, uint32_t invalidateId, bool *invalidated) { if (mValid && !*invalidated) { size_t avail = mBufferStatusQueue->availableToWrite(); if (avail > 0) { BufferStatusMessage message; message.newStatus = BufferStatus::INVALIDATION_ACK; message.bufferId = invalidateId; message.connectionId = connectionId; if (!mBufferStatusQueue->write(&message, 1)) { // Since avaliable # of writes are already confirmed, // this should not happen. // TODO: error handing? ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId); return; } *invalidated = true; } } } bool BufferStatusChannel::postBufferStatusMessage( TransactionId transactionId, BufferId bufferId, BufferStatus status, ConnectionId connectionId, ConnectionId targetId, std::list<BufferId> &pending, std::list<BufferId> &posted) { if (mValid) { size_t avail = mBufferStatusQueue->availableToWrite(); size_t numPending = pending.size(); if (avail >= numPending + 1) { BufferStatusMessage release, message; for (size_t i = 0; i < numPending; ++i) { BufferId id = pending.front(); release.newStatus = BufferStatus::NOT_USED; release.bufferId = id; release.connectionId = connectionId; if (!mBufferStatusQueue->write(&release, 1)) { // Since avaliable # of writes are already confirmed, // this should not happen. // TODO: error handling? ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId); return false; } pending.pop_front(); posted.push_back(id); } message.transactionId = transactionId; message.bufferId = bufferId; message.newStatus = status; message.connectionId = connectionId; message.targetConnectionId = targetId; // TODO : timesatamp message.timestampUs = 0; if (!mBufferStatusQueue->write(&message, 1)) { // Since avaliable # of writes are already confirmed, // this should not happen. ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId); return false; } return true; } } return false; } BufferInvalidationListener::BufferInvalidationListener( const InvalidationDescriptor &fmqDesc) { std::unique_ptr<BufferInvalidationQueue> queue = std::make_unique<BufferInvalidationQueue>(fmqDesc); if (!queue || queue->isValid() == false) { mValid = false; return; } mValid = true; mBufferInvalidationQueue = std::move(queue); // drain previous messages size_t avail = std::min( mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue); std::vector<BufferInvalidationMessage> temp(avail); if (avail > 0) { mBufferInvalidationQueue->read(temp.data(), avail); } } void BufferInvalidationListener::getInvalidations( std::vector<BufferInvalidationMessage> &messages) { // Try twice in case of overflow. // TODO: handling overflow though it may not happen. for (int i = 0; i < 2; ++i) { size_t avail = std::min( mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue); if (avail > 0) { std::vector<BufferInvalidationMessage> temp(avail); if (mBufferInvalidationQueue->read(temp.data(), avail)) { messages.reserve(messages.size() + avail); for (auto it = temp.begin(); it != temp.end(); ++it) { messages.push_back(*it); } break; } } else { return; } } } bool BufferInvalidationListener::isValid() { return mValid; } BufferInvalidationChannel::BufferInvalidationChannel() : mValid(true), mBufferInvalidationQueue( std::make_unique<BufferInvalidationQueue>(kNumElementsInQueue, true)) { if (!mBufferInvalidationQueue || mBufferInvalidationQueue->isValid() == false) { mValid = false; } } bool BufferInvalidationChannel::isValid() { return mValid; } void BufferInvalidationChannel::getDesc(const InvalidationDescriptor **fmqDescPtr) { if (mValid) { *fmqDescPtr = mBufferInvalidationQueue->getDesc(); } else { *fmqDescPtr = nullptr; } } void BufferInvalidationChannel::postInvalidation( uint32_t msgId, BufferId fromId, BufferId toId) { BufferInvalidationMessage message; message.messageId = msgId; message.fromBufferId = fromId; message.toBufferId = toId; // TODO: handle failure (it does not happen normally.) mBufferInvalidationQueue->write(&message); } } // namespace implementation } // namespace V2_0 } // namespace bufferpool } // namespace media } // namespace hardware } // namespace android