add retries to fm_bot

Change-Id: Ibcc6ee83cde537caaab824658721bbda300abc18
Reviewed-on: https://skia-review.googlesource.com/c/skia/+/208273
Commit-Queue: Mike Klein <mtklein@google.com>
Reviewed-by: Brian Osman <brianosman@google.com>
This commit is contained in:
Mike Klein 2019-04-15 15:18:00 -05:00
parent 6f8bfba1b7
commit acb8be89d1

View File

@ -14,6 +14,8 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -72,7 +74,9 @@ func callFM(fm string, sources []string, flags []string) bool {
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("\n%v #failed (%v):\n%s\n", strings.Join(cmd.Args, " "), err, output)
if !*quiet || len(sources) == 1 {
log.Printf("\n%v #failed (%v):\n%s\n", strings.Join(cmd.Args, " "), err, output)
}
return false
} else if !*quiet {
log.Printf("\n%v #done in %v:\n%s", strings.Join(cmd.Args, " "), time.Since(start), output)
@ -193,40 +197,38 @@ func main() {
}
}
// The buffer size of main->worker channels isn't super important...
// presumably we'll have many hungry goroutines snapping up work as quick
// as they can, and if things get backed up, no real reason for main to do
// anything but block.
wg := &sync.WaitGroup{}
var failures int32 = 0
worker := func(queue chan work) {
for w := range queue {
if !callFM(fm, w.Sources, w.Flags) {
if len(w.Sources) == 1 {
// If a source ran alone and failed, that's just a failure.
atomic.AddInt32(&failures, 1)
} else {
// If a batch of sources ran and failed, split them up and try again.
for _, source := range w.Sources {
wg.Add(1)
queue <- work{[]string{source}, w.Flags}
}
}
}
wg.Done()
}
}
cpu := make(chan work, *cpuLimit)
gpu := make(chan work, *gpuLimit)
// The buffer size of this worker->main results channel is much more
// sensitive. Since it's a many->one funnel, it's easy for the workers to
// produce lots of results that main can't keep up with.
//
// This needlessly throttles our progress, and we can even deadlock if
// the buffer fills up before main has finished enqueueing all the work.
//
// So we set the buffer size here large enough to hold a result for every
// item we might possibly enqueue.
results := make(chan bool, (*cpuLimit+*gpuLimit)*len(jobs))
for i := 0; i < *cpuLimit; i++ {
go func() {
for w := range cpu {
results <- callFM(fm, w.Sources, w.Flags)
}
}()
}
for i := 0; i < *gpuLimit; i++ {
go func() {
for w := range gpu {
results <- callFM(fm, w.Sources, w.Flags)
}
}()
go worker(cpu)
}
gpu := make(chan work, *gpuLimit)
for i := 0; i < *gpuLimit; i++ {
go worker(gpu)
}
sent := 0
for _, job := range jobs {
// Skip blank lines, empty command lines.
if len(job) == 0 {
@ -261,7 +263,6 @@ func main() {
}
// Round up so there's at least one source per batch.
// This math also helps guarantee that sent stays <= cap(results).
sourcesPerBatch := (len(sources) + limit - 1) / limit
for i := 0; i < len(sources); i += sourcesPerBatch {
@ -271,26 +272,14 @@ func main() {
}
batch := sources[i:end]
wg.Add(1)
queue <- work{batch, flags}
sent += 1
}
}
close(cpu)
close(gpu)
if sent > cap(results) {
log.Fatalf("Oops, we sent %d but cap(results) is only %d. "+
"This could lead to deadlock and is a bug.", sent, cap(results))
}
wg.Wait()
failures := 0
for i := 0; i < sent; i++ {
if !<-results {
failures += 1
}
}
if failures > 0 {
log.Fatalln(failures, "invocations of", fm, "failed")
log.Fatalln(failures, "failures after retries")
}
}