// Copyright 2018 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package main
import (
"time"
"github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/report"
"github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/prog"
)
func (mgr *Manager) hubSyncLoop() {
hc := &HubConnector{
mgr: mgr,
cfg: mgr.cfg,
target: mgr.target,
stats: mgr.stats,
enabledCalls: mgr.checkResult.EnabledCalls[mgr.cfg.Sandbox],
fresh: mgr.fresh,
hubReproQueue: mgr.hubReproQueue,
}
if mgr.cfg.Reproduce && mgr.dash != nil {
hc.needMoreRepros = mgr.needMoreRepros
}
hc.loop()
}
type HubConnector struct {
mgr HubManagerView
cfg *mgrconfig.Config
target *prog.Target
stats *Stats
enabledCalls []int
fresh bool
hubCorpus map[hash.Sig]bool
newRepros [][]byte
hubReproQueue chan *Crash
needMoreRepros chan chan bool
}
// HubManagerView restricts interface between HubConnector and Manager.
type HubManagerView interface {
getMinimizedCorpus() (corpus, repros [][]byte)
addNewCandidates(progs [][]byte)
}
func (hc *HubConnector) loop() {
var hub *rpctype.RPCClient
for {
time.Sleep(time.Minute)
corpus, repros := hc.mgr.getMinimizedCorpus()
hc.newRepros = append(hc.newRepros, repros...)
if hub == nil {
var err error
if hub, err = hc.connect(corpus); err != nil {
log.Logf(0, "failed to connect to hub at %v: %v", hc.cfg.HubAddr, err)
continue
}
log.Logf(0, "connected to hub at %v, corpus %v", hc.cfg.HubAddr, len(corpus))
}
if err := hc.sync(hub, corpus); err != nil {
log.Logf(0, "hub sync failed: %v", err)
hub.Close()
hub = nil
}
}
}
func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
a := &rpctype.HubConnectArgs{
Client: hc.cfg.HubClient,
Key: hc.cfg.HubKey,
Manager: hc.cfg.Name,
Fresh: hc.fresh,
}
for _, id := range hc.enabledCalls {
a.Calls = append(a.Calls, hc.target.Syscalls[id].Name)
}
hubCorpus := make(map[hash.Sig]bool)
for _, inp := range corpus {
hubCorpus[hash.Hash(inp)] = true
a.Corpus = append(a.Corpus, inp)
}
// Hub.Connect request can be very large, so do it on a transient connection
// (rpc connection buffers never shrink).
if err := rpctype.RPCCall(hc.cfg.HubAddr, "Hub.Connect", a, nil); err != nil {
return nil, err
}
hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr)
if err != nil {
return nil, err
}
hc.hubCorpus = hubCorpus
hc.fresh = false
return hub, nil
}
func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error {
a := &rpctype.HubSyncArgs{
Client: hc.cfg.HubClient,
Key: hc.cfg.HubKey,
Manager: hc.cfg.Name,
}
sigs := make(map[hash.Sig]bool)
for _, inp := range corpus {
sig := hash.Hash(inp)
sigs[sig] = true
if hc.hubCorpus[sig] {
continue
}
hc.hubCorpus[sig] = true
a.Add = append(a.Add, inp)
}
for sig := range hc.hubCorpus {
if sigs[sig] {
continue
}
delete(hc.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
if hc.needMoreRepros != nil {
needReproReply := make(chan bool)
hc.needMoreRepros <- needReproReply
a.NeedRepros = <-needReproReply
}
a.Repros = hc.newRepros
for {
r := new(rpctype.HubSyncRes)
if err := hub.Call("Hub.Sync", a, r); err != nil {
return err
}
progDropped := hc.processProgs(r.Progs)
reproDropped := hc.processRepros(r.Repros)
hc.stats.hubSendProgAdd.add(len(a.Add))
hc.stats.hubSendProgDel.add(len(a.Del))
hc.stats.hubSendRepro.add(len(a.Repros))
hc.stats.hubRecvProg.add(len(r.Progs) - progDropped)
hc.stats.hubRecvProgDrop.add(progDropped)
hc.stats.hubRecvRepro.add(len(r.Repros) - reproDropped)
hc.stats.hubRecvReproDrop.add(reproDropped)
log.Logf(0, "hub sync: send: add %v, del %v, repros %v;"+
" recv: progs %v, repros %v; more %v",
len(a.Add), len(a.Del), len(a.Repros),
len(r.Progs)-progDropped, len(r.Repros)-reproDropped, r.More)
a.Add = nil
a.Del = nil
a.Repros = nil
a.NeedRepros = false
hc.newRepros = nil
if len(r.Progs)+r.More == 0 {
return nil
}
}
}
func (hc *HubConnector) processProgs(progs [][]byte) int {
dropped := 0
candidates := make([][]byte, 0, len(progs))
for _, inp := range progs {
if _, err := hc.target.Deserialize(inp); err != nil {
dropped++
continue
}
candidates = append(candidates, inp)
}
hc.mgr.addNewCandidates(candidates)
return dropped
}
func (hc *HubConnector) processRepros(repros [][]byte) int {
dropped := 0
for _, repro := range repros {
if _, err := hc.target.Deserialize(repro); err != nil {
dropped++
continue
}
hc.hubReproQueue <- &Crash{
vmIndex: -1,
hub: true,
Report: &report.Report{
Title: "external repro",
Output: repro,
},
}
}
return dropped
}