/*-------------------------------------------------------------------------
* drawElements Stream Library
* ---------------------------
*
* Copyright 2014 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*//*!
* \file
* \brief Buffered and threaded input and output streams
*//*--------------------------------------------------------------------*/
#include "deThreadStream.h"
#include "deStreamCpyThread.h"
#include "deRingbuffer.h"
#include "stdlib.h"
typedef struct deThreadInStream_s
{
deRingbuffer* ringbuffer;
deInStream* input;
deInStream consumerStream;
deOutStream producerStream;
deThread thread;
int bufferSize;
} deThreadInStream;
typedef struct deThreadOutStream_s
{
deRingbuffer* ringbuffer;
deInStream consumerStream;
deOutStream producerStream;
deStreamCpyThread* thread;
} deThreadOutStream;
static void inStreamCopy (void* arg)
{
deThreadInStream* threadStream = (deThreadInStream*)arg;
deUint8* buffer = malloc(sizeof(deUint8) * threadStream->bufferSize);
for(;;)
{
deInt32 read = 0;
deInt32 written = 0;
deStreamResult readResult = DE_STREAMRESULT_ERROR;
readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
while (written < read)
{
deInt32 wrote = 0;
/* \todo [mika] Handle errors */
deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);
written += wrote;
}
if (readResult == DE_STREAMRESULT_END_OF_STREAM)
{
break;
}
}
deOutStream_flush(&(threadStream->producerStream));
deRingbuffer_stop(threadStream->ringbuffer);
free(buffer);
}
static deStreamResult threadInStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* numRead)
{
deThreadInStream* threadStream = (deThreadInStream*)stream;
return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
}
static const char* threadInStream_getError (deStreamData* stream)
{
deThreadInStream* threadStream = (deThreadInStream*)stream;
/* \todo [mika] Add handling for errors on thread stream */
return deInStream_getError(&(threadStream->consumerStream));
}
static deStreamStatus threadInStream_getStatus (deStreamData* stream)
{
deThreadInStream* threadStream = (deThreadInStream*)stream;
/* \todo [mika] Add handling for status on thread stream */
return deInStream_getStatus(&(threadStream->consumerStream));
}
/* \note [mika] Used by both in and out stream */
static deStreamResult threadStream_deinit (deStreamData* stream)
{
deThreadInStream* threadStream = (deThreadInStream*)stream;
deRingbuffer_stop(threadStream->ringbuffer);
deThread_join(threadStream->thread);
deThread_destroy(threadStream->thread);
deOutStream_deinit(&(threadStream->producerStream));
deInStream_deinit(&(threadStream->consumerStream));
deRingbuffer_destroy(threadStream->ringbuffer);
return DE_STREAMRESULT_SUCCESS;
}
static const deIOStreamVFTable threadInStreamVFTable = {
threadInStream_read,
DE_NULL,
threadInStream_getError,
DE_NULL,
threadStream_deinit,
threadInStream_getStatus
};
void deThreadInStream_init (deInStream* stream, deInStream* input, int ringbufferBlockSize, int ringbufferBlockCount)
{
deThreadInStream* threadStream = DE_NULL;
threadStream = malloc(sizeof(deThreadInStream));
DE_ASSERT(threadStream);
threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
DE_ASSERT(threadStream->ringbuffer);
threadStream->bufferSize = ringbufferBlockSize;
threadStream->input = input;
deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
threadStream->thread = deThread_create(inStreamCopy, threadStream, DE_NULL);
stream->ioStream.vfTable = &threadInStreamVFTable;
stream->ioStream.streamData = threadStream;
}
static deStreamResult threadOutStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* numWritten)
{
deThreadOutStream* threadStream = (deThreadOutStream*)stream;
return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
}
static const char* threadOutStream_getError (deStreamData* stream)
{
deThreadOutStream* threadStream = (deThreadOutStream*)stream;
/* \todo [mika] Add handling for errors on thread stream */
return deOutStream_getError(&(threadStream->producerStream));
}
static deStreamStatus threadOutStream_getStatus (deStreamData* stream)
{
deThreadOutStream* threadStream = (deThreadOutStream*)stream;
/* \todo [mika] Add handling for errors on thread stream */
return deOutStream_getStatus(&(threadStream->producerStream));
}
static deStreamResult threadOutStream_flush (deStreamData* stream)
{
deThreadOutStream* threadStream = (deThreadOutStream*)stream;
return deOutStream_flush(&(threadStream->producerStream));
}
static const deIOStreamVFTable threadOutStreamVFTable = {
DE_NULL,
threadOutStream_write,
threadOutStream_getError,
threadOutStream_flush,
threadStream_deinit,
threadOutStream_getStatus
};
void deThreadOutStream_init (deOutStream* stream, deOutStream* output, int ringbufferBlockSize, int ringbufferBlockCount)
{
deThreadOutStream* threadStream = DE_NULL;
threadStream = malloc(sizeof(deThreadOutStream));
DE_ASSERT(threadStream);
threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
DE_ASSERT(threadStream->ringbuffer);
deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
threadStream->thread = deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
stream->ioStream.vfTable = &threadOutStreamVFTable;
stream->ioStream.streamData = threadStream;
}