/*-------------------------------------------------------------------------
 * drawElements Stream Library
 * ---------------------------
 *
 * Copyright 2014 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 *//*!
 * \file
 * \brief Buffered and threaded input and output streams
 *//*--------------------------------------------------------------------*/

#include "deThreadStream.h"
#include "deStreamCpyThread.h"
#include "deRingbuffer.h"
#include "stdlib.h"

typedef struct deThreadInStream_s
{
	deRingbuffer*		ringbuffer;
	deInStream*			input;
	deInStream			consumerStream;
	deOutStream			producerStream;
	deThread			thread;
	int					bufferSize;
} deThreadInStream;

typedef struct deThreadOutStream_s
{
	deRingbuffer*		ringbuffer;
	deInStream			consumerStream;
	deOutStream			producerStream;
	deStreamCpyThread*	thread;
} deThreadOutStream;

static void inStreamCopy (void* arg)
{
	deThreadInStream* threadStream = (deThreadInStream*)arg;

	deUint8* buffer = malloc(sizeof(deUint8) * (size_t)threadStream->bufferSize);

	for(;;)
	{
		deInt32 read	= 0;
		deInt32 written	= 0;
		deStreamResult readResult = DE_STREAMRESULT_ERROR;

		readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
		DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
		while (written < read)
		{
			deInt32 wrote = 0;

			/* \todo [mika] Handle errors */
			deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);

			written += wrote;
		}

		if (readResult == DE_STREAMRESULT_END_OF_STREAM)
		{
			break;
		}
	}

	deOutStream_flush(&(threadStream->producerStream));
	deRingbuffer_stop(threadStream->ringbuffer);
	free(buffer);

}

static deStreamResult threadInStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* numRead)
{
	deThreadInStream* threadStream = (deThreadInStream*)stream;
	return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
}

static const char* threadInStream_getError (deStreamData* stream)
{
	deThreadInStream* threadStream = (deThreadInStream*)stream;

	/* \todo [mika] Add handling for errors on thread stream */
	return deInStream_getError(&(threadStream->consumerStream));
}

static deStreamStatus threadInStream_getStatus (deStreamData* stream)
{
	deThreadInStream* threadStream = (deThreadInStream*)stream;

	/* \todo [mika] Add handling for status on thread stream */
	return deInStream_getStatus(&(threadStream->consumerStream));
}

/* \note [mika] Used by both in and out stream */
static deStreamResult threadStream_deinit (deStreamData* stream)
{
	deThreadInStream* threadStream = (deThreadInStream*)stream;

	deRingbuffer_stop(threadStream->ringbuffer);

	deThread_join(threadStream->thread);
	deThread_destroy(threadStream->thread);

	deOutStream_deinit(&(threadStream->producerStream));
	deInStream_deinit(&(threadStream->consumerStream));

	deRingbuffer_destroy(threadStream->ringbuffer);

	return DE_STREAMRESULT_SUCCESS;
}

static const deIOStreamVFTable threadInStreamVFTable = {
	threadInStream_read,
	DE_NULL,
	threadInStream_getError,
	DE_NULL,
	threadStream_deinit,
	threadInStream_getStatus
};

void deThreadInStream_init (deInStream* stream, deInStream* input, int ringbufferBlockSize, int ringbufferBlockCount)
{
	deThreadInStream* threadStream = DE_NULL;

	threadStream = malloc(sizeof(deThreadInStream));
	DE_ASSERT(threadStream);

	threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
	DE_ASSERT(threadStream->ringbuffer);

	threadStream->bufferSize = ringbufferBlockSize;
	threadStream->input = input;
	deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
	deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);

	threadStream->thread		= deThread_create(inStreamCopy, threadStream, DE_NULL);
	stream->ioStream.vfTable	= &threadInStreamVFTable;
	stream->ioStream.streamData = threadStream;
}

static deStreamResult threadOutStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* numWritten)
{
	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
	return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
}

static const char* threadOutStream_getError (deStreamData* stream)
{
	deThreadOutStream* threadStream = (deThreadOutStream*)stream;

	/* \todo [mika] Add handling for errors on thread stream */
	return deOutStream_getError(&(threadStream->producerStream));
}

static deStreamStatus threadOutStream_getStatus (deStreamData* stream)
{
	deThreadOutStream* threadStream = (deThreadOutStream*)stream;

	/* \todo [mika] Add handling for errors on thread stream */
	return deOutStream_getStatus(&(threadStream->producerStream));
}

static deStreamResult threadOutStream_flush (deStreamData* stream)
{
	deThreadOutStream* threadStream = (deThreadOutStream*)stream;

	return deOutStream_flush(&(threadStream->producerStream));
}

static const deIOStreamVFTable threadOutStreamVFTable = {
	DE_NULL,
	threadOutStream_write,
	threadOutStream_getError,
	threadOutStream_flush,
	threadStream_deinit,
	threadOutStream_getStatus
};

void deThreadOutStream_init (deOutStream* stream, deOutStream* output, int ringbufferBlockSize, int ringbufferBlockCount)
{
	deThreadOutStream* threadStream = DE_NULL;

	threadStream = malloc(sizeof(deThreadOutStream));
	DE_ASSERT(threadStream);

	threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
	DE_ASSERT(threadStream->ringbuffer);

	deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
	deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);

	threadStream->thread		= deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
	stream->ioStream.vfTable	= &threadOutStreamVFTable;
	stream->ioStream.streamData = threadStream;
}