// Copyright 2016 syzkaller project authors. All rights reserved. // Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. package vmimpl import ( "bytes" "fmt" "io" "sync" ) type OutputMerger struct { Output chan []byte Err chan error teeMu sync.Mutex tee io.Writer wg sync.WaitGroup } type MergerError struct { Name string R io.ReadCloser Err error } func (err MergerError) Error() string { return fmt.Sprintf("failed to read from %v: %v", err.Name, err.Err) } func NewOutputMerger(tee io.Writer) *OutputMerger { return &OutputMerger{ Output: make(chan []byte, 1000), Err: make(chan error, 1), tee: tee, } } func (merger *OutputMerger) Wait() { merger.wg.Wait() close(merger.Output) } func (merger *OutputMerger) Add(name string, r io.ReadCloser) { merger.AddDecoder(name, r, nil) } func (merger *OutputMerger) AddDecoder(name string, r io.ReadCloser, decoder func(data []byte) (start, size int, decoded []byte)) { merger.wg.Add(1) go func() { var pending []byte var proto []byte var buf [4 << 10]byte for { n, err := r.Read(buf[:]) if n != 0 { if decoder != nil { proto = append(proto, buf[:n]...) start, size, decoded := decoder(proto) proto = proto[start+size:] if len(decoded) != 0 { merger.Output <- decoded // note: this can block } } pending = append(pending, buf[:n]...) if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 { out := pending[:pos+1] if merger.tee != nil { merger.teeMu.Lock() merger.tee.Write(out) merger.teeMu.Unlock() } select { case merger.Output <- append([]byte{}, out...): r := copy(pending[:], pending[pos+1:]) pending = pending[:r] default: } } } if err != nil { if len(pending) != 0 { pending = append(pending, '\n') if merger.tee != nil { merger.teeMu.Lock() merger.tee.Write(pending) merger.teeMu.Unlock() } select { case merger.Output <- pending: default: } } r.Close() select { case merger.Err <- MergerError{name, r, err}: default: } merger.wg.Done() return } } }() }