/*-------------------------------------------------------------------------
* drawElements Stream Library
* ---------------------------
*
* Copyright 2014 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*//*!
* \file
* \brief Thread safe ringbuffer
*//*--------------------------------------------------------------------*/
#include "deRingbuffer.h"
#include "deInt32.h"
#include "deMemory.h"
#include "deSemaphore.h"
#include <stdlib.h>
#include <stdio.h>
struct deRingbuffer_s
{
deInt32 blockSize;
deInt32 blockCount;
deInt32* blockUsage;
deUint8* buffer;
deSemaphore emptyCount;
deSemaphore fullCount;
deInt32 outBlock;
deInt32 outPos;
deInt32 inBlock;
deInt32 inPos;
deBool stopNotified;
deBool consumerStopping;
};
deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount)
{
deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer));
DE_ASSERT(ringbuffer);
DE_ASSERT(blockCount > 0);
DE_ASSERT(blockSize > 0);
ringbuffer->blockSize = blockSize;
ringbuffer->blockCount = blockCount;
ringbuffer->buffer = (deUint8*)deMalloc(sizeof(deUint8) * (size_t)blockSize * (size_t)blockCount);
ringbuffer->blockUsage = (deInt32*)deMalloc(sizeof(deUint32) * (size_t)blockCount);
ringbuffer->emptyCount = deSemaphore_create(ringbuffer->blockCount, DE_NULL);
ringbuffer->fullCount = deSemaphore_create(0, DE_NULL);
if (!ringbuffer->buffer ||
!ringbuffer->blockUsage ||
!ringbuffer->emptyCount ||
!ringbuffer->fullCount)
{
if (ringbuffer->emptyCount)
deSemaphore_destroy(ringbuffer->emptyCount);
if (ringbuffer->fullCount)
deSemaphore_destroy(ringbuffer->fullCount);
deFree(ringbuffer->buffer);
deFree(ringbuffer->blockUsage);
deFree(ringbuffer);
return DE_NULL;
}
memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * (size_t)blockCount);
ringbuffer->outBlock = 0;
ringbuffer->outPos = 0;
ringbuffer->inBlock = 0;
ringbuffer->inPos = 0;
ringbuffer->stopNotified = DE_FALSE;
ringbuffer->consumerStopping = DE_FALSE;
return ringbuffer;
}
void deRingbuffer_stop (deRingbuffer* ringbuffer)
{
/* Set notify to true and increment fullCount to let consumer continue */
ringbuffer->stopNotified = DE_TRUE;
deSemaphore_increment(ringbuffer->fullCount);
}
void deRingbuffer_destroy (deRingbuffer* ringbuffer)
{
deSemaphore_destroy(ringbuffer->emptyCount);
deSemaphore_destroy(ringbuffer->fullCount);
free(ringbuffer->buffer);
free(ringbuffer->blockUsage);
free(ringbuffer);
}
static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written)
{
deRingbuffer* ringbuffer = (deRingbuffer*)stream;
DE_ASSERT(stream);
/* If ringbuffer is stopping return error on write */
if (ringbuffer->stopNotified)
{
DE_ASSERT(DE_FALSE);
return DE_STREAMRESULT_ERROR;
}
*written = 0;
/* Write while more data available */
while (*written < bufSize)
{
deInt32 writeSize = 0;
deUint8* src = DE_NULL;
deUint8* dst = DE_NULL;
/* If between blocks accuire new block */
if (ringbuffer->inPos == 0)
{
deSemaphore_decrement(ringbuffer->emptyCount);
}
writeSize = deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
dst = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
src = (deUint8*)buf + *written;
deMemcpy(dst, src, (size_t)writeSize);
ringbuffer->inPos += writeSize;
*written += writeSize;
ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
/* Block is full move to next one (or "between" this and next block) */
if (ringbuffer->inPos == ringbuffer->blockSize)
{
ringbuffer->inPos = 0;
ringbuffer->inBlock++;
if (ringbuffer->inBlock == ringbuffer->blockCount)
ringbuffer->inBlock = 0;
deSemaphore_increment(ringbuffer->fullCount);
}
}
return DE_STREAMRESULT_SUCCESS;
}
static deStreamResult producerStream_flush (deStreamData* stream)
{
deRingbuffer* ringbuffer = (deRingbuffer*)stream;
DE_ASSERT(stream);
/* No blocks reserved by producer */
if (ringbuffer->inPos == 0)
return DE_STREAMRESULT_SUCCESS;
ringbuffer->inPos = 0;
ringbuffer->inBlock++;
if (ringbuffer->inBlock == ringbuffer->blockCount)
ringbuffer->inBlock = 0;
deSemaphore_increment(ringbuffer->fullCount);
return DE_STREAMRESULT_SUCCESS;
}
static deStreamResult producerStream_deinit (deStreamData* stream)
{
DE_ASSERT(stream);
producerStream_flush(stream);
/* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
return DE_STREAMRESULT_SUCCESS;
}
static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read)
{
deRingbuffer* ringbuffer = (deRingbuffer*)stream;
DE_ASSERT(stream);
*read = 0;
DE_ASSERT(ringbuffer);
while (*read < bufSize)
{
deInt32 writeSize = 0;
deUint8* src = DE_NULL;
deUint8* dst = DE_NULL;
/* If between blocks accuire new block */
if (ringbuffer->outPos == 0)
{
/* If consumer is set to stop after everything is consumed,
* do not block if there is no more input left
*/
if (ringbuffer->consumerStopping)
{
/* Try to accuire new block, if can't there is no more input */
if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
{
return DE_STREAMRESULT_END_OF_STREAM;
}
}
else
{
/* If not stopping block until there is more input */
deSemaphore_decrement(ringbuffer->fullCount);
/* Ringbuffer was set to stop */
if (ringbuffer->stopNotified)
{
ringbuffer->consumerStopping = DE_TRUE;
}
}
}
writeSize = deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
src = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
dst = (deUint8*)buf + *read;
deMemcpy(dst, src, (size_t)writeSize);
ringbuffer->outPos += writeSize;
*read += writeSize;
/* Block is consumed move to next one (or "between" this and next block) */
if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
{
ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
ringbuffer->outPos = 0;
ringbuffer->outBlock++;
if (ringbuffer->outBlock == ringbuffer->blockCount)
ringbuffer->outBlock = 0;
deSemaphore_increment(ringbuffer->emptyCount);
}
}
return DE_STREAMRESULT_SUCCESS;
}
static deStreamResult consumerStream_deinit (deStreamData* stream)
{
DE_ASSERT(stream);
DE_UNREF(stream);
return DE_STREAMRESULT_SUCCESS;
}
/* There are no sensible errors so status is always good */
deStreamStatus dummy_getStatus (deStreamData* stream)
{
DE_UNREF(stream);
return DE_STREAMSTATUS_GOOD;
}
/* There are no sensible errors in ringbuffer */
static const char* dummy_getError (deStreamData* stream)
{
DE_ASSERT(stream);
DE_UNREF(stream);
return DE_NULL;
}
static const deIOStreamVFTable producerStreamVFTable = {
DE_NULL,
producerStream_write,
dummy_getError,
producerStream_flush,
producerStream_deinit,
dummy_getStatus
};
static const deIOStreamVFTable consumerStreamVFTable = {
consumerStream_read,
DE_NULL,
dummy_getError,
DE_NULL,
consumerStream_deinit,
dummy_getStatus
};
void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer)
{
stream->ioStream.streamData = (deStreamData*)buffer;
stream->ioStream.vfTable = &producerStreamVFTable;
}
void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer)
{
stream->ioStream.streamData = (deStreamData*)buffer;
stream->ioStream.vfTable = &consumerStreamVFTable;
}