refamiliarize fm_driver.go
Update comments and small tweaks as I remember how this works. Change-Id: I4a279781e512fc707b96226e62a2831a1d0683e5 Reviewed-on: https://skia-review.googlesource.com/c/skia/+/362196 Reviewed-by: Mike Klein <mtklein@google.com> Commit-Queue: Mike Klein <mtklein@google.com>
This commit is contained in:
parent
9c9f60cdb4
commit
75bd058766
@ -31,8 +31,8 @@ func main() {
|
||||
projectId = flag.String("project_id", "", "ID of the Google Cloud project.")
|
||||
taskId = flag.String("task_id", "", "ID of this task.")
|
||||
taskName = flag.String("task_name", "", "Name of the task.")
|
||||
local = flag.Bool("local", true, "True if running locally (as opposed to on the bots)")
|
||||
output = flag.String("o", "", "If provided, dump a JSON blob of step data to the given file. Prints to stdout if '-' is given.")
|
||||
output = flag.String("o", "", "Dump JSON step data to the given file, or stdout if -.")
|
||||
local = flag.Bool("local", true, "Running locally (else on the bots)?")
|
||||
|
||||
resources = flag.String("resources", "resources", "Passed to fm -i.")
|
||||
)
|
||||
@ -43,12 +43,12 @@ func main() {
|
||||
if *local {
|
||||
// Task Driver echoes every exec.Run() stdout and stderr to the console,
|
||||
// which makes it hard to find failures (especially stdout). Send them to /dev/null.
|
||||
f, err := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
|
||||
devnull, err := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
|
||||
if err != nil {
|
||||
td.Fatal(ctx, err)
|
||||
}
|
||||
os.Stdout = f
|
||||
os.Stderr = f
|
||||
os.Stdout = devnull
|
||||
os.Stderr = devnull
|
||||
}
|
||||
|
||||
if flag.NArg() < 1 {
|
||||
@ -56,7 +56,7 @@ func main() {
|
||||
}
|
||||
fm := flag.Arg(0)
|
||||
|
||||
// Run fm --flag to find the names of all linked GMs or tests.
|
||||
// Run `fm <flag>` to find the names of all linked GMs or tests.
|
||||
query := func(flag string) []string {
|
||||
stdout := &bytes.Buffer{}
|
||||
cmd := &exec.Command{Name: fm, Stdout: stdout}
|
||||
@ -79,6 +79,7 @@ func main() {
|
||||
gms := query("--listGMs")
|
||||
tests := query("--listTests")
|
||||
|
||||
// Parse a job like "gms b=cpu ct=8888" into a struct of Sources to run under given Flags.
|
||||
parse := func(job []string) *work {
|
||||
w := &work{}
|
||||
|
||||
@ -137,6 +138,7 @@ func main() {
|
||||
td.Fatal(ctx, err)
|
||||
}
|
||||
|
||||
// We'll kick off workers to run FM with `-s <Sources...> <Flags...>` from parsed jobs.
|
||||
var failures int32 = 0
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
@ -146,10 +148,12 @@ func main() {
|
||||
stderr := &bytes.Buffer{}
|
||||
cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr}
|
||||
cmd.Args = append(cmd.Args, "-i", *resources)
|
||||
cmd.Args = append(cmd.Args, w.Flags...)
|
||||
cmd.Args = append(cmd.Args, "-s")
|
||||
cmd.Args = append(cmd.Args, w.Sources...)
|
||||
cmd.Args = append(cmd.Args, w.Flags...)
|
||||
if err := exec.Run(ctx, cmd); err != nil {
|
||||
// We optimistically run batches of Sources, but if a batch fails,
|
||||
// we'll re-run one at a time to find the precise failures.
|
||||
if len(w.Sources) == 1 {
|
||||
// If a source ran alone and failed, that's just a failure.
|
||||
atomic.AddInt32(&failures, 1)
|
||||
@ -170,8 +174,10 @@ func main() {
|
||||
strings.Join(lines, "\n\t"))
|
||||
}
|
||||
} else {
|
||||
// If a batch of sources ran and failed, split them up and try again.
|
||||
// If a batch fails, retry each individually.
|
||||
for _, source := range w.Sources {
|
||||
// Requeuing work from the workers makes sizing the chan buffer tricky:
|
||||
// we don't ever want this `queue <-` to block on a full buffer.
|
||||
wg.Add(1)
|
||||
queue <- work{[]string{source}, w.Flags}
|
||||
}
|
||||
@ -182,7 +188,7 @@ func main() {
|
||||
}
|
||||
|
||||
workers := runtime.NumCPU()
|
||||
queue := make(chan work, 1<<20)
|
||||
queue := make(chan work, 1<<20) // Huge buffer to avoid having to be smart about requeuing.
|
||||
for i := 0; i < workers; i++ {
|
||||
go worker(queue)
|
||||
}
|
||||
@ -190,15 +196,16 @@ func main() {
|
||||
for _, job := range jobs {
|
||||
w := parse(job)
|
||||
if len(w.Sources) == 0 {
|
||||
continue
|
||||
continue // A blank/commented line in the job script.
|
||||
}
|
||||
|
||||
// Shuffle the sources randomly as a cheap way to approximate evenly expensive batches.
|
||||
// (Intentionally not rand.Seed()'d to stay deterministically reproducible.)
|
||||
rand.Shuffle(len(w.Sources), func(i, j int) {
|
||||
w.Sources[i], w.Sources[j] = w.Sources[j], w.Sources[i]
|
||||
})
|
||||
|
||||
// Round up so there's at least one source per batch.
|
||||
// Round batch sizes up so there's at least one source per batch.
|
||||
batch := (len(w.Sources) + workers - 1) / workers
|
||||
util.ChunkIter(len(w.Sources), batch, func(start, end int) error {
|
||||
wg.Add(1)
|
||||
|
Loading…
Reference in New Issue
Block a user