// Copyright 2016 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.

package vmimpl

import (
	"bytes"
	"fmt"
	"io"
	"sync"
)

type OutputMerger struct {
	Output chan []byte
	Err    chan error
	teeMu  sync.Mutex
	tee    io.Writer
	wg     sync.WaitGroup
}

type MergerError struct {
	Name string
	R    io.ReadCloser
	Err  error
}

func (err MergerError) Error() string {
	return fmt.Sprintf("failed to read from %v: %v", err.Name, err.Err)
}

func NewOutputMerger(tee io.Writer) *OutputMerger {
	return &OutputMerger{
		Output: make(chan []byte, 1000),
		Err:    make(chan error, 1),
		tee:    tee,
	}
}

func (merger *OutputMerger) Wait() {
	merger.wg.Wait()
	close(merger.Output)
}

func (merger *OutputMerger) Add(name string, r io.ReadCloser) {
	merger.AddDecoder(name, r, nil)
}

func (merger *OutputMerger) AddDecoder(name string, r io.ReadCloser,
	decoder func(data []byte) (start, size int, decoded []byte)) {
	merger.wg.Add(1)
	go func() {
		var pending []byte
		var proto []byte
		var buf [4 << 10]byte
		for {
			n, err := r.Read(buf[:])
			if n != 0 {
				if decoder != nil {
					proto = append(proto, buf[:n]...)
					start, size, decoded := decoder(proto)
					proto = proto[start+size:]
					if len(decoded) != 0 {
						merger.Output <- decoded // note: this can block
					}
				}
				pending = append(pending, buf[:n]...)
				if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 {
					out := pending[:pos+1]
					if merger.tee != nil {
						merger.teeMu.Lock()
						merger.tee.Write(out)
						merger.teeMu.Unlock()
					}
					select {
					case merger.Output <- append([]byte{}, out...):
						r := copy(pending[:], pending[pos+1:])
						pending = pending[:r]
					default:
					}
				}
			}
			if err != nil {
				if len(pending) != 0 {
					pending = append(pending, '\n')
					if merger.tee != nil {
						merger.teeMu.Lock()
						merger.tee.Write(pending)
						merger.teeMu.Unlock()
					}
					select {
					case merger.Output <- pending:
					default:
					}
				}
				r.Close()
				select {
				case merger.Err <- MergerError{name, r, err}:
				default:
				}
				merger.wg.Done()
				return
			}
		}
	}()
}