Revert fm_driver simplifications
This reverts commits8ef3c539a2
and4b09de3c90
. It turns out controlling the scheduling is a good idea; I keep running into exec failures and process limits. Cq-Include-Trybots: luci.skia.skia.primary:FM-Debian10-Clang-GCE-CPU-AVX2-x86_64-Debug-All,FM-Win2019-Clang-GCE-CPU-AVX2-x86_64-Debug-All Change-Id: Ia72f446965e5093fbf996e78d9513c15dedae3d9 Reviewed-on: https://skia-review.googlesource.com/c/skia/+/364006 Reviewed-by: Mike Klein <mtklein@google.com> Commit-Queue: Mike Klein <mtklein@google.com>
This commit is contained in:
parent
fde740c7f2
commit
779a7b83b5
@ -38,7 +38,6 @@ func main() {
|
||||
|
||||
actualStdout := os.Stdout
|
||||
actualStderr := os.Stderr
|
||||
verbosity := exec.Info
|
||||
if *local {
|
||||
// Task Driver echoes every exec.Run() stdout and stderr to the console,
|
||||
// which makes it hard to find failures (especially stdout). Send them to /dev/null.
|
||||
@ -48,9 +47,6 @@ func main() {
|
||||
}
|
||||
os.Stdout = devnull
|
||||
os.Stderr = devnull
|
||||
// Having stifled stderr/stdout, changing Command.Verbose won't have any visible effect,
|
||||
// but setting it to Silent will bypass a fair chunk of wasted formatting work.
|
||||
verbosity = exec.Silent
|
||||
}
|
||||
|
||||
if flag.NArg() < 1 {
|
||||
@ -61,7 +57,7 @@ func main() {
|
||||
// Run `fm <flag>` to find the names of all linked GMs or tests.
|
||||
query := func(flag string) []string {
|
||||
stdout := &bytes.Buffer{}
|
||||
cmd := &exec.Command{Name: fm, Stdout: stdout, Verbose: verbosity}
|
||||
cmd := &exec.Command{Name: fm, Stdout: stdout}
|
||||
cmd.Args = append(cmd.Args, "-i", *resources)
|
||||
cmd.Args = append(cmd.Args, flag)
|
||||
if err := exec.Run(ctx, cmd); err != nil {
|
||||
@ -106,112 +102,14 @@ func main() {
|
||||
}()
|
||||
}
|
||||
|
||||
// We'll kick off worker goroutines to run batches of work, and on failure,
|
||||
// crash, or unknown hash, we'll split that batch into individual reruns to
|
||||
// isolate those unusual results.
|
||||
var failures int32 = 0
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
var worker func([]string, []string)
|
||||
worker = func(sources []string, flags []string) {
|
||||
defer wg.Done()
|
||||
|
||||
stdout := &bytes.Buffer{}
|
||||
stderr := &bytes.Buffer{}
|
||||
cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr, Verbose: verbosity}
|
||||
cmd.Args = append(cmd.Args, "-i", *resources, "-s")
|
||||
cmd.Args = append(cmd.Args, sources...)
|
||||
cmd.Args = append(cmd.Args, flags...)
|
||||
|
||||
// Run our FM command.
|
||||
err := exec.Run(ctx, cmd)
|
||||
|
||||
// On success, scan stdout for any unknown hashes.
|
||||
unknownHash := func() string {
|
||||
if err == nil && *bot != "" { // The map of known hashes is only filled when using -bot.
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
for scanner.Scan() {
|
||||
if parts := strings.Fields(scanner.Text()); len(parts) == 3 {
|
||||
md5 := parts[1]
|
||||
if !known[md5] {
|
||||
return md5
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
td.Fatal(ctx, err)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}()
|
||||
|
||||
// If a batch failed or produced an unknown hash, isolate with individual runs.
|
||||
if len(sources) > 1 && (err != nil || unknownHash != "") {
|
||||
wg.Add(len(sources))
|
||||
for i := range sources {
|
||||
// We could kick off independent goroutines here for more parallelism,
|
||||
// but the bots are already parallel enough that they'd exhaust their
|
||||
// process limits, and I haven't seen any impact on local runs.
|
||||
worker(sources[i:i+1], flags)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// If an individual run failed, nothing more to do but fail.
|
||||
if err != nil {
|
||||
atomic.AddInt32(&failures, 1)
|
||||
td.FailStep(ctx, err)
|
||||
if *local {
|
||||
lines := []string{}
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
for scanner.Scan() {
|
||||
lines = append(lines, scanner.Text())
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
td.Fatal(ctx, err)
|
||||
}
|
||||
fmt.Fprintf(actualStderr, "%v %v #failed:\n\t%v\n",
|
||||
cmd.Name,
|
||||
strings.Join(cmd.Args, " "),
|
||||
strings.Join(lines, "\n\t"))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// If an individual run succeeded but produced an unknown hash, TODO upload .png to Gold.
|
||||
if unknownHash != "" {
|
||||
fmt.Fprintf(actualStdout, "%v %v #%v\n",
|
||||
cmd.Name,
|
||||
strings.Join(cmd.Args, " "),
|
||||
unknownHash)
|
||||
}
|
||||
type Work struct {
|
||||
Sources []string // Passed to FM -s: names of gms/tests, paths to image files, .skps, etc.
|
||||
Flags []string // Other flags to pass to FM: --ct 565, --msaa 16, etc.
|
||||
}
|
||||
todo := []Work{}
|
||||
|
||||
// Start workers that run `FM -s sources... flags...` in small source batches for parallelism.
|
||||
kickoff := func(sources, flags []string) {
|
||||
if len(sources) == 0 {
|
||||
return // A blank or commented job line from -script or the command line.
|
||||
}
|
||||
|
||||
// Shuffle the sources randomly as a cheap way to approximate evenly expensive batches.
|
||||
// (Intentionally not rand.Seed()'d to stay deterministically reproducible.)
|
||||
rand.Shuffle(len(sources), func(i, j int) {
|
||||
sources[i], sources[j] = sources[j], sources[i]
|
||||
})
|
||||
|
||||
// Round batch sizes up so there's at least one source per batch.
|
||||
// Batch size is arbitrary, but nice to scale with the machine like this.
|
||||
batches := runtime.NumCPU()
|
||||
batch := (len(sources) + batches - 1) / batches
|
||||
util.ChunkIter(len(sources), batch, func(start, end int) error {
|
||||
wg.Add(1)
|
||||
go worker(sources[start:end], flags)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Parse a job like "gms b=cpu ct=8888" into sources and flags for kickoff().
|
||||
parse := func(job []string) (sources, flags []string) {
|
||||
// Parse a job like "gms b=cpu ct=8888" into Work{Sources=<all GMs>, Flags={-b,cpu,--ct,8888}}.
|
||||
parse := func(job []string) (w Work) {
|
||||
for _, token := range job {
|
||||
// Everything after # is a comment.
|
||||
if strings.HasPrefix(token, "#") {
|
||||
@ -220,12 +118,12 @@ func main() {
|
||||
|
||||
// Treat "gm" or "gms" as a shortcut for all known GMs.
|
||||
if token == "gm" || token == "gms" {
|
||||
sources = append(sources, gms...)
|
||||
w.Sources = append(w.Sources, gms...)
|
||||
continue
|
||||
}
|
||||
// Same for tests.
|
||||
if token == "test" || token == "tests" {
|
||||
sources = append(sources, tests...)
|
||||
w.Sources = append(w.Sources, tests...)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -237,18 +135,18 @@ func main() {
|
||||
}
|
||||
f += parts[0]
|
||||
|
||||
flags = append(flags, f, parts[1])
|
||||
w.Flags = append(w.Flags, f, parts[1])
|
||||
continue
|
||||
}
|
||||
|
||||
// Anything else must be the name of a source for FM to run.
|
||||
sources = append(sources, token)
|
||||
w.Sources = append(w.Sources, token)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Parse one job from the command line, handy for ad hoc local runs.
|
||||
kickoff(parse(flag.Args()[1:]))
|
||||
todo = append(todo, parse(flag.Args()[1:]))
|
||||
|
||||
// Any number of jobs can come from -script.
|
||||
if *script != "" {
|
||||
@ -262,7 +160,7 @@ func main() {
|
||||
}
|
||||
scanner := bufio.NewScanner(file)
|
||||
for scanner.Scan() {
|
||||
kickoff(parse(strings.Fields(scanner.Text())))
|
||||
todo = append(todo, parse(strings.Fields(scanner.Text())))
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
td.Fatal(ctx, err)
|
||||
@ -288,13 +186,132 @@ func main() {
|
||||
tests = filter(tests, func(s string) bool { return s != "GoodHash" })
|
||||
}
|
||||
|
||||
// You could use parse() here if you like, but it's just as easy to kickoff() directly.
|
||||
kickoff(tests, strings.Fields("-b cpu"))
|
||||
kickoff(gms, strings.Fields("-b cpu"))
|
||||
kickoff(gms, strings.Fields("-b cpu --skvm"))
|
||||
// You could use parse() here if you like, but it's just as easy to make Work{} directly.
|
||||
work := func(sources []string, flags string) {
|
||||
todo = append(todo, Work{sources, strings.Fields(flags)})
|
||||
}
|
||||
work(tests, "-b cpu")
|
||||
work(gms, "-b cpu")
|
||||
work(gms, "-b cpu --skvm")
|
||||
}
|
||||
|
||||
// We'll try to spread our work roughly evenly over a number of worker goroutines.
|
||||
// We can't combine Work with different Flags, but we can do the opposite,
|
||||
// splitting a single Work into smaller Work units with the same Flags,
|
||||
// even all the way down to a single Source. So we'll optimistically run
|
||||
// batches of Sources together, but if a batch fails or crashes, we'll
|
||||
// split it up and re-run one at a time to find the precise failures.
|
||||
var failures int32 = 0
|
||||
wg := &sync.WaitGroup{}
|
||||
worker := func(queue chan Work) {
|
||||
for w := range queue {
|
||||
stdout := &bytes.Buffer{}
|
||||
stderr := &bytes.Buffer{}
|
||||
cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr}
|
||||
cmd.Args = append(cmd.Args, "-i", *resources)
|
||||
cmd.Args = append(cmd.Args, "-s")
|
||||
cmd.Args = append(cmd.Args, w.Sources...)
|
||||
cmd.Args = append(cmd.Args, w.Flags...)
|
||||
// TODO: when len(w.Sources) == 1, add -w ... to cmd.Args to write a .png for upload.
|
||||
|
||||
// On cmd failure or unknown hash, we'll split the Work batch up into individual reruns.
|
||||
requeue := func() {
|
||||
// Requeuing Work from the workers is what makes sizing the chan buffer tricky:
|
||||
// we don't ever want these `queue <-` to block a worker because of a full buffer.
|
||||
for _, source := range w.Sources {
|
||||
wg.Add(1)
|
||||
queue <- Work{[]string{source}, w.Flags}
|
||||
}
|
||||
}
|
||||
|
||||
if err := exec.Run(ctx, cmd); err != nil {
|
||||
if len(w.Sources) == 1 {
|
||||
// If a source ran alone and failed, that's just a failure.
|
||||
atomic.AddInt32(&failures, 1)
|
||||
td.FailStep(ctx, err)
|
||||
if *local {
|
||||
lines := []string{}
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
for scanner.Scan() {
|
||||
lines = append(lines, scanner.Text())
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
td.Fatal(ctx, err)
|
||||
}
|
||||
|
||||
fmt.Fprintf(actualStderr, "%v %v #failed:\n\t%v\n",
|
||||
cmd.Name,
|
||||
strings.Join(cmd.Args, " "),
|
||||
strings.Join(lines, "\n\t"))
|
||||
}
|
||||
} else {
|
||||
// If a batch of sources failed, break up the batch to isolate the failures.
|
||||
requeue()
|
||||
}
|
||||
} else {
|
||||
// FM completed successfully. Scan stdout for any unknown hash.
|
||||
unknown := func() string {
|
||||
if *bot != "" { // The map known[] is only filled when *bot != "".
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
for scanner.Scan() {
|
||||
if parts := strings.Fields(scanner.Text()); len(parts) == 3 {
|
||||
md5 := parts[1]
|
||||
if !known[md5] {
|
||||
return md5
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
td.Fatal(ctx, err)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}()
|
||||
|
||||
if unknown != "" {
|
||||
if len(w.Sources) == 1 {
|
||||
// TODO upload .png with goldctl.
|
||||
fmt.Fprintf(actualStdout, "%v %v #%v\n",
|
||||
cmd.Name,
|
||||
strings.Join(cmd.Args, " "),
|
||||
unknown)
|
||||
} else {
|
||||
// Split the batch to run individually and TODO, write .pngs.
|
||||
requeue()
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
workers := runtime.NumCPU()
|
||||
queue := make(chan Work, 1<<20) // Huge buffer to avoid having to be smart about requeuing.
|
||||
for i := 0; i < workers; i++ {
|
||||
go worker(queue)
|
||||
}
|
||||
|
||||
for _, w := range todo {
|
||||
if len(w.Sources) == 0 {
|
||||
continue // A blank or commented job line from -script or the command line.
|
||||
}
|
||||
|
||||
// Shuffle the sources randomly as a cheap way to approximate evenly expensive batches.
|
||||
// (Intentionally not rand.Seed()'d to stay deterministically reproducible.)
|
||||
rand.Shuffle(len(w.Sources), func(i, j int) {
|
||||
w.Sources[i], w.Sources[j] = w.Sources[j], w.Sources[i]
|
||||
})
|
||||
|
||||
// Round batch sizes up so there's at least one source per batch.
|
||||
batch := (len(w.Sources) + workers - 1) / workers
|
||||
util.ChunkIter(len(w.Sources), batch, func(start, end int) error {
|
||||
wg.Add(1)
|
||||
queue <- Work{w.Sources[start:end], w.Flags}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if failures > 0 {
|
||||
if *local {
|
||||
// td.Fatalf() would work fine, but barfs up a panic that we don't need to see.
|
||||
|
Loading…
Reference in New Issue
Block a user