end kickoff() step at the right time

If we track how many pending batches a kickoff()
has in flight, we can endStep() it properly when
that number hits zero.

This double sync.WaitGroup trick is pretty neat.
Now we're thinking with portals...

Added some comments to prevent myself falling in
the trap of assuming we'll have runtime.NumCPU()
batches... rounding the batch size up means we'll
sometimes have fewer.

Change-Id: If50615c204485862462c240b9bbdfd4ddbad43b2
Reviewed-on: https://skia-review.googlesource.com/c/skia/+/366142
Reviewed-by: Eric Boren <borenet@google.com>
This commit is contained in:
Mike Klein 2021-02-04 08:34:38 -06:00
parent a6bf48c100
commit 420c8a505e

View File

@ -182,6 +182,7 @@ func main() {
type Work struct {
Ctx context.Context
WG *sync.WaitGroup
Sources []string // Passed to FM -s: names of gms/tests, paths to image files, .skps, etc.
Flags []string // Other flags to pass to FM: --ct 565, --msaa 16, etc.
}
@ -263,24 +264,29 @@ func main() {
}
queue := make(chan Work, 1<<20) // Arbitrarily huge buffer to avoid ever blocking.
wg := &sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for w := range queue {
ctx := startStep(w.Ctx, td.Props(strings.Join(w.Sources, " ")))
worker(ctx, w.Sources, w.Flags)
endStep(ctx)
wg.Done()
func() {
defer w.WG.Done()
// For organizational purposes, create a step representing this batch,
// with the batch call to FM and any individual reruns all nested inside.
ctx := startStep(w.Ctx, td.Props(strings.Join(w.Sources, " ")))
defer endStep(ctx)
worker(ctx, w.Sources, w.Flags)
}()
}
}()
}
// Get some work going, first breaking it into batches to increase our parallelism.
pendingKickoffs := &sync.WaitGroup{}
kickoff := func(sources, flags []string) {
if len(sources) == 0 {
return // A blank or commented job line from -script or the command line.
}
pendingKickoffs.Add(1)
// Shuffle the sources randomly as a cheap way to approximate evenly expensive batches.
// (Intentionally not rand.Seed()'d to stay deterministically reproducible.)
@ -289,16 +295,29 @@ func main() {
sources[i], sources[j] = sources[j], sources[i]
})
// For organizational purposes, create a step representing this call to kickoff(),
// with each batch of sources nested inside.
ctx := startStep(ctx, td.Props(strings.Join(flags, " ")))
defer endStep(ctx)
pendingBatches := &sync.WaitGroup{}
nbatches := runtime.NumCPU() // Arbitrary, nice to scale ~= cores.
batch := (len(sources) + nbatches - 1) / nbatches // Round up to avoid empty batches.
util.ChunkIter(len(sources), batch, func(start, end int) error {
wg.Add(1)
queue <- Work{ctx, sources[start:end], flags}
// Arbitrary, nice to scale ~= cores.
approxNumBatches := runtime.NumCPU()
// Round up batch size to avoid empty batches, making approxNumBatches approximate.
batchSize := (len(sources) + approxNumBatches - 1) / approxNumBatches
util.ChunkIter(len(sources), batchSize, func(start, end int) error {
pendingBatches.Add(1)
queue <- Work{ctx, pendingBatches, sources[start:end], flags}
return nil
})
// When the batches for this kickoff() are all done, this kickoff() is done.
go func() {
pendingBatches.Wait()
endStep(ctx)
pendingKickoffs.Done()
}()
}
// Parse a job like "gms b=cpu ct=8888" into sources and flags for kickoff().
@ -421,7 +440,7 @@ func main() {
}
}
wg.Wait()
pendingKickoffs.Wait()
if failures > 0 {
fatal(ctx, fmt.Errorf("%v runs of %v failed after retries.\n", failures, fm))
}