diff --git a/infra/bots/task_drivers/fm_driver/fm_driver.go b/infra/bots/task_drivers/fm_driver/fm_driver.go index d571b0e591..c30a144101 100644 --- a/infra/bots/task_drivers/fm_driver/fm_driver.go +++ b/infra/bots/task_drivers/fm_driver/fm_driver.go @@ -38,7 +38,6 @@ func main() { actualStdout := os.Stdout actualStderr := os.Stderr - verbosity := exec.Info if *local { // Task Driver echoes every exec.Run() stdout and stderr to the console, // which makes it hard to find failures (especially stdout). Send them to /dev/null. @@ -48,9 +47,6 @@ func main() { } os.Stdout = devnull os.Stderr = devnull - // Having stifled stderr/stdout, changing Command.Verbose won't have any visible effect, - // but setting it to Silent will bypass a fair chunk of wasted formatting work. - verbosity = exec.Silent } if flag.NArg() < 1 { @@ -61,7 +57,7 @@ func main() { // Run `fm ` to find the names of all linked GMs or tests. query := func(flag string) []string { stdout := &bytes.Buffer{} - cmd := &exec.Command{Name: fm, Stdout: stdout, Verbose: verbosity} + cmd := &exec.Command{Name: fm, Stdout: stdout} cmd.Args = append(cmd.Args, "-i", *resources) cmd.Args = append(cmd.Args, flag) if err := exec.Run(ctx, cmd); err != nil { @@ -106,112 +102,14 @@ func main() { }() } - // We'll kick off worker goroutines to run batches of work, and on failure, - // crash, or unknown hash, we'll split that batch into individual reruns to - // isolate those unusual results. - var failures int32 = 0 - wg := &sync.WaitGroup{} - - var worker func([]string, []string) - worker = func(sources []string, flags []string) { - defer wg.Done() - - stdout := &bytes.Buffer{} - stderr := &bytes.Buffer{} - cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr, Verbose: verbosity} - cmd.Args = append(cmd.Args, "-i", *resources, "-s") - cmd.Args = append(cmd.Args, sources...) - cmd.Args = append(cmd.Args, flags...) - - // Run our FM command. - err := exec.Run(ctx, cmd) - - // On success, scan stdout for any unknown hashes. - unknownHash := func() string { - if err == nil && *bot != "" { // The map of known hashes is only filled when using -bot. - scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - if parts := strings.Fields(scanner.Text()); len(parts) == 3 { - md5 := parts[1] - if !known[md5] { - return md5 - } - } - } - if err := scanner.Err(); err != nil { - td.Fatal(ctx, err) - } - } - return "" - }() - - // If a batch failed or produced an unknown hash, isolate with individual runs. - if len(sources) > 1 && (err != nil || unknownHash != "") { - wg.Add(len(sources)) - for i := range sources { - // We could kick off independent goroutines here for more parallelism, - // but the bots are already parallel enough that they'd exhaust their - // process limits, and I haven't seen any impact on local runs. - worker(sources[i:i+1], flags) - } - return - } - - // If an individual run failed, nothing more to do but fail. - if err != nil { - atomic.AddInt32(&failures, 1) - td.FailStep(ctx, err) - if *local { - lines := []string{} - scanner := bufio.NewScanner(stderr) - for scanner.Scan() { - lines = append(lines, scanner.Text()) - } - if err := scanner.Err(); err != nil { - td.Fatal(ctx, err) - } - fmt.Fprintf(actualStderr, "%v %v #failed:\n\t%v\n", - cmd.Name, - strings.Join(cmd.Args, " "), - strings.Join(lines, "\n\t")) - } - return - } - - // If an individual run succeeded but produced an unknown hash, TODO upload .png to Gold. - if unknownHash != "" { - fmt.Fprintf(actualStdout, "%v %v #%v\n", - cmd.Name, - strings.Join(cmd.Args, " "), - unknownHash) - } + type Work struct { + Sources []string // Passed to FM -s: names of gms/tests, paths to image files, .skps, etc. + Flags []string // Other flags to pass to FM: --ct 565, --msaa 16, etc. } + todo := []Work{} - // Start workers that run `FM -s sources... flags...` in small source batches for parallelism. - kickoff := func(sources, flags []string) { - if len(sources) == 0 { - return // A blank or commented job line from -script or the command line. - } - - // Shuffle the sources randomly as a cheap way to approximate evenly expensive batches. - // (Intentionally not rand.Seed()'d to stay deterministically reproducible.) - rand.Shuffle(len(sources), func(i, j int) { - sources[i], sources[j] = sources[j], sources[i] - }) - - // Round batch sizes up so there's at least one source per batch. - // Batch size is arbitrary, but nice to scale with the machine like this. - batches := runtime.NumCPU() - batch := (len(sources) + batches - 1) / batches - util.ChunkIter(len(sources), batch, func(start, end int) error { - wg.Add(1) - go worker(sources[start:end], flags) - return nil - }) - } - - // Parse a job like "gms b=cpu ct=8888" into sources and flags for kickoff(). - parse := func(job []string) (sources, flags []string) { + // Parse a job like "gms b=cpu ct=8888" into Work{Sources=, Flags={-b,cpu,--ct,8888}}. + parse := func(job []string) (w Work) { for _, token := range job { // Everything after # is a comment. if strings.HasPrefix(token, "#") { @@ -220,12 +118,12 @@ func main() { // Treat "gm" or "gms" as a shortcut for all known GMs. if token == "gm" || token == "gms" { - sources = append(sources, gms...) + w.Sources = append(w.Sources, gms...) continue } // Same for tests. if token == "test" || token == "tests" { - sources = append(sources, tests...) + w.Sources = append(w.Sources, tests...) continue } @@ -237,18 +135,18 @@ func main() { } f += parts[0] - flags = append(flags, f, parts[1]) + w.Flags = append(w.Flags, f, parts[1]) continue } // Anything else must be the name of a source for FM to run. - sources = append(sources, token) + w.Sources = append(w.Sources, token) } return } // Parse one job from the command line, handy for ad hoc local runs. - kickoff(parse(flag.Args()[1:])) + todo = append(todo, parse(flag.Args()[1:])) // Any number of jobs can come from -script. if *script != "" { @@ -262,7 +160,7 @@ func main() { } scanner := bufio.NewScanner(file) for scanner.Scan() { - kickoff(parse(strings.Fields(scanner.Text()))) + todo = append(todo, parse(strings.Fields(scanner.Text()))) } if err := scanner.Err(); err != nil { td.Fatal(ctx, err) @@ -288,13 +186,132 @@ func main() { tests = filter(tests, func(s string) bool { return s != "GoodHash" }) } - // You could use parse() here if you like, but it's just as easy to kickoff() directly. - kickoff(tests, strings.Fields("-b cpu")) - kickoff(gms, strings.Fields("-b cpu")) - kickoff(gms, strings.Fields("-b cpu --skvm")) + // You could use parse() here if you like, but it's just as easy to make Work{} directly. + work := func(sources []string, flags string) { + todo = append(todo, Work{sources, strings.Fields(flags)}) + } + work(tests, "-b cpu") + work(gms, "-b cpu") + work(gms, "-b cpu --skvm") } + // We'll try to spread our work roughly evenly over a number of worker goroutines. + // We can't combine Work with different Flags, but we can do the opposite, + // splitting a single Work into smaller Work units with the same Flags, + // even all the way down to a single Source. So we'll optimistically run + // batches of Sources together, but if a batch fails or crashes, we'll + // split it up and re-run one at a time to find the precise failures. + var failures int32 = 0 + wg := &sync.WaitGroup{} + worker := func(queue chan Work) { + for w := range queue { + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr} + cmd.Args = append(cmd.Args, "-i", *resources) + cmd.Args = append(cmd.Args, "-s") + cmd.Args = append(cmd.Args, w.Sources...) + cmd.Args = append(cmd.Args, w.Flags...) + // TODO: when len(w.Sources) == 1, add -w ... to cmd.Args to write a .png for upload. + + // On cmd failure or unknown hash, we'll split the Work batch up into individual reruns. + requeue := func() { + // Requeuing Work from the workers is what makes sizing the chan buffer tricky: + // we don't ever want these `queue <-` to block a worker because of a full buffer. + for _, source := range w.Sources { + wg.Add(1) + queue <- Work{[]string{source}, w.Flags} + } + } + + if err := exec.Run(ctx, cmd); err != nil { + if len(w.Sources) == 1 { + // If a source ran alone and failed, that's just a failure. + atomic.AddInt32(&failures, 1) + td.FailStep(ctx, err) + if *local { + lines := []string{} + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + lines = append(lines, scanner.Text()) + } + if err := scanner.Err(); err != nil { + td.Fatal(ctx, err) + } + + fmt.Fprintf(actualStderr, "%v %v #failed:\n\t%v\n", + cmd.Name, + strings.Join(cmd.Args, " "), + strings.Join(lines, "\n\t")) + } + } else { + // If a batch of sources failed, break up the batch to isolate the failures. + requeue() + } + } else { + // FM completed successfully. Scan stdout for any unknown hash. + unknown := func() string { + if *bot != "" { // The map known[] is only filled when *bot != "". + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + if parts := strings.Fields(scanner.Text()); len(parts) == 3 { + md5 := parts[1] + if !known[md5] { + return md5 + } + } + } + if err := scanner.Err(); err != nil { + td.Fatal(ctx, err) + } + } + return "" + }() + + if unknown != "" { + if len(w.Sources) == 1 { + // TODO upload .png with goldctl. + fmt.Fprintf(actualStdout, "%v %v #%v\n", + cmd.Name, + strings.Join(cmd.Args, " "), + unknown) + } else { + // Split the batch to run individually and TODO, write .pngs. + requeue() + } + } + } + wg.Done() + } + } + + workers := runtime.NumCPU() + queue := make(chan Work, 1<<20) // Huge buffer to avoid having to be smart about requeuing. + for i := 0; i < workers; i++ { + go worker(queue) + } + + for _, w := range todo { + if len(w.Sources) == 0 { + continue // A blank or commented job line from -script or the command line. + } + + // Shuffle the sources randomly as a cheap way to approximate evenly expensive batches. + // (Intentionally not rand.Seed()'d to stay deterministically reproducible.) + rand.Shuffle(len(w.Sources), func(i, j int) { + w.Sources[i], w.Sources[j] = w.Sources[j], w.Sources[i] + }) + + // Round batch sizes up so there's at least one source per batch. + batch := (len(w.Sources) + workers - 1) / workers + util.ChunkIter(len(w.Sources), batch, func(start, end int) error { + wg.Add(1) + queue <- Work{w.Sources[start:end], w.Flags} + return nil + }) + } wg.Wait() + if failures > 0 { if *local { // td.Fatalf() would work fine, but barfs up a panic that we don't need to see.