/*------------------------------------------------------------------------- * drawElements Stream Library * --------------------------- * * Copyright 2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * *//*! * \file * \brief Thread safe ringbuffer *//*--------------------------------------------------------------------*/ #include "deRingbuffer.h" #include "deInt32.h" #include "deMemory.h" #include "deSemaphore.h" #include <stdlib.h> #include <stdio.h> struct deRingbuffer_s { deInt32 blockSize; deInt32 blockCount; deInt32* blockUsage; deUint8* buffer; deSemaphore emptyCount; deSemaphore fullCount; deInt32 outBlock; deInt32 outPos; deInt32 inBlock; deInt32 inPos; deBool stopNotified; deBool consumerStopping; }; deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount) { deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer)); DE_ASSERT(ringbuffer); DE_ASSERT(blockCount > 0); DE_ASSERT(blockSize > 0); ringbuffer->blockSize = blockSize; ringbuffer->blockCount = blockCount; ringbuffer->buffer = (deUint8*)deMalloc(sizeof(deUint8) * (size_t)blockSize * (size_t)blockCount); ringbuffer->blockUsage = (deInt32*)deMalloc(sizeof(deUint32) * (size_t)blockCount); ringbuffer->emptyCount = deSemaphore_create(ringbuffer->blockCount, DE_NULL); ringbuffer->fullCount = deSemaphore_create(0, DE_NULL); if (!ringbuffer->buffer || !ringbuffer->blockUsage || !ringbuffer->emptyCount || !ringbuffer->fullCount) { if (ringbuffer->emptyCount) deSemaphore_destroy(ringbuffer->emptyCount); if (ringbuffer->fullCount) deSemaphore_destroy(ringbuffer->fullCount); deFree(ringbuffer->buffer); deFree(ringbuffer->blockUsage); deFree(ringbuffer); return DE_NULL; } memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * (size_t)blockCount); ringbuffer->outBlock = 0; ringbuffer->outPos = 0; ringbuffer->inBlock = 0; ringbuffer->inPos = 0; ringbuffer->stopNotified = DE_FALSE; ringbuffer->consumerStopping = DE_FALSE; return ringbuffer; } void deRingbuffer_stop (deRingbuffer* ringbuffer) { /* Set notify to true and increment fullCount to let consumer continue */ ringbuffer->stopNotified = DE_TRUE; deSemaphore_increment(ringbuffer->fullCount); } void deRingbuffer_destroy (deRingbuffer* ringbuffer) { deSemaphore_destroy(ringbuffer->emptyCount); deSemaphore_destroy(ringbuffer->fullCount); free(ringbuffer->buffer); free(ringbuffer->blockUsage); free(ringbuffer); } static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written) { deRingbuffer* ringbuffer = (deRingbuffer*)stream; DE_ASSERT(stream); /* If ringbuffer is stopping return error on write */ if (ringbuffer->stopNotified) { DE_ASSERT(DE_FALSE); return DE_STREAMRESULT_ERROR; } *written = 0; /* Write while more data available */ while (*written < bufSize) { deInt32 writeSize = 0; deUint8* src = DE_NULL; deUint8* dst = DE_NULL; /* If between blocks accuire new block */ if (ringbuffer->inPos == 0) { deSemaphore_decrement(ringbuffer->emptyCount); } writeSize = deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written); dst = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos; src = (deUint8*)buf + *written; deMemcpy(dst, src, (size_t)writeSize); ringbuffer->inPos += writeSize; *written += writeSize; ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize; /* Block is full move to next one (or "between" this and next block) */ if (ringbuffer->inPos == ringbuffer->blockSize) { ringbuffer->inPos = 0; ringbuffer->inBlock++; if (ringbuffer->inBlock == ringbuffer->blockCount) ringbuffer->inBlock = 0; deSemaphore_increment(ringbuffer->fullCount); } } return DE_STREAMRESULT_SUCCESS; } static deStreamResult producerStream_flush (deStreamData* stream) { deRingbuffer* ringbuffer = (deRingbuffer*)stream; DE_ASSERT(stream); /* No blocks reserved by producer */ if (ringbuffer->inPos == 0) return DE_STREAMRESULT_SUCCESS; ringbuffer->inPos = 0; ringbuffer->inBlock++; if (ringbuffer->inBlock == ringbuffer->blockCount) ringbuffer->inBlock = 0; deSemaphore_increment(ringbuffer->fullCount); return DE_STREAMRESULT_SUCCESS; } static deStreamResult producerStream_deinit (deStreamData* stream) { DE_ASSERT(stream); producerStream_flush(stream); /* \note mika Stream doesn't own ringbuffer, so it's not deallocated */ return DE_STREAMRESULT_SUCCESS; } static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read) { deRingbuffer* ringbuffer = (deRingbuffer*)stream; DE_ASSERT(stream); *read = 0; DE_ASSERT(ringbuffer); while (*read < bufSize) { deInt32 writeSize = 0; deUint8* src = DE_NULL; deUint8* dst = DE_NULL; /* If between blocks accuire new block */ if (ringbuffer->outPos == 0) { /* If consumer is set to stop after everything is consumed, * do not block if there is no more input left */ if (ringbuffer->consumerStopping) { /* Try to accuire new block, if can't there is no more input */ if (!deSemaphore_tryDecrement(ringbuffer->fullCount)) { return DE_STREAMRESULT_END_OF_STREAM; } } else { /* If not stopping block until there is more input */ deSemaphore_decrement(ringbuffer->fullCount); /* Ringbuffer was set to stop */ if (ringbuffer->stopNotified) { ringbuffer->consumerStopping = DE_TRUE; } } } writeSize = deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read); src = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos; dst = (deUint8*)buf + *read; deMemcpy(dst, src, (size_t)writeSize); ringbuffer->outPos += writeSize; *read += writeSize; /* Block is consumed move to next one (or "between" this and next block) */ if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock]) { ringbuffer->blockUsage[ringbuffer->outBlock] = 0; ringbuffer->outPos = 0; ringbuffer->outBlock++; if (ringbuffer->outBlock == ringbuffer->blockCount) ringbuffer->outBlock = 0; deSemaphore_increment(ringbuffer->emptyCount); } } return DE_STREAMRESULT_SUCCESS; } static deStreamResult consumerStream_deinit (deStreamData* stream) { DE_ASSERT(stream); DE_UNREF(stream); return DE_STREAMRESULT_SUCCESS; } /* There are no sensible errors so status is always good */ deStreamStatus dummy_getStatus (deStreamData* stream) { DE_UNREF(stream); return DE_STREAMSTATUS_GOOD; } /* There are no sensible errors in ringbuffer */ static const char* dummy_getError (deStreamData* stream) { DE_ASSERT(stream); DE_UNREF(stream); return DE_NULL; } static const deIOStreamVFTable producerStreamVFTable = { DE_NULL, producerStream_write, dummy_getError, producerStream_flush, producerStream_deinit, dummy_getStatus }; static const deIOStreamVFTable consumerStreamVFTable = { consumerStream_read, DE_NULL, dummy_getError, DE_NULL, consumerStream_deinit, dummy_getStatus }; void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer) { stream->ioStream.streamData = (deStreamData*)buffer; stream->ioStream.vfTable = &producerStreamVFTable; } void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer) { stream->ioStream.streamData = (deStreamData*)buffer; stream->ioStream.vfTable = &consumerStreamVFTable; }