split cpu+gpu work queues

Split work queues for CPU and GPU, assigning using a --backend
whitelist.  This lets us up the CPU process limit to NumCPUs,
while keeping the GPU process limit at 8.

On my desktop this speeds up this mixed workload about 2x:

cat <<EOF | go run tools/fm/fm_bot.go -quiet -script - out/fm
    b=cpu gms ct=8888 w=$WHERE/8888 legacy=true  # old and busted
    b=cpu gms ct=8888 w=$WHERE/srgb              # new hotness
    b=gl  gms ct=8888 w=$WHERE/gl
    b=mtl gms ct=8888 w=$WHERE/mtl
    b=cpu gms ct=565  w=$WHERE/565
    b=cpu gms ct=f16  w=$WHERE/f16
    b=skp gms         w=$WHERE/skp
EOF

Before:        44.37 real       303.87 user        15.40 sys
After:         21.87 real       480.05 user        27.10 sys

Change-Id: Ibea4537c8cdf278f8e4621f26ebacd942952eec3
Reviewed-on: https://skia-review.googlesource.com/c/skia/+/205833
Reviewed-by: Brian Osman <brianosman@google.com>
Commit-Queue: Mike Klein <mtklein@google.com>
This commit is contained in:
Mike Klein 2019-04-03 12:25:38 -04:00
parent fbe2406693
commit d0389c596d

View File

@ -17,11 +17,9 @@ import (
"time"
)
// Set default process count to NumCPU() or 8, whichever's smaller. I picked 8
// heuristically as not too slow for CPU bound work, which can generally scale
// all the way up to NumCPU(), and not too slow for GPU bound work, which will
// start to thrash your GPU if too high, even hang your machine.
func defaultProcessLimit() int {
// Too many GPU processes and we'll start to overwhelm your GPU,
// even hanging your machine in the worst case. Here's a reasonable default.
func defaultGpuLimit() int {
limit := 8
if n := runtime.NumCPU(); n < limit {
return n
@ -33,15 +31,18 @@ var script = flag.String("script", "", "A file with jobs to run, one per line. -
var random = flag.Bool("random", true, "Assign sources into job batches randomly?")
var quiet = flag.Bool("quiet", false, "Print only failures?")
var exact = flag.Bool("exact", false, "Match GM names only exactly.")
var processLimit = flag.Int("processLimit", defaultProcessLimit(),
"Maximum number of concurrent processes, 0 -> NumCPU.")
var cpuLimit = flag.Int("cpuLimit", runtime.NumCPU(),
"Maximum number of concurrent processes for CPU-bound work.")
var gpuLimit = flag.Int("gpuLimit", defaultGpuLimit(),
"Maximum number of concurrent processes for GPU-bound work.")
func init() {
flag.StringVar(script, "s", *script, "Alias for --script.")
flag.BoolVar(random, "r", *random, "Alias for --random.")
flag.BoolVar(quiet, "q", *quiet, "Alias for --quiet.")
flag.BoolVar(exact, "e", *exact, "Alias for --exact.")
flag.IntVar(processLimit, "j", *processLimit, "Alias for --processLimit.")
flag.IntVar(cpuLimit, "c", *cpuLimit, "Alias for --cpuLimit.")
flag.IntVar(gpuLimit, "g", *gpuLimit, "Alias for --gpuLimit.")
}
func listAllGMs(fm string) (gms []string, err error) {
@ -143,13 +144,14 @@ func sourcesAndFlags(args []string, gms []string) ([]string, []string, error) {
return sources, flags, nil
}
type work struct {
Sources []string
Flags []string
}
func main() {
flag.Parse()
if *processLimit == 0 {
*processLimit = runtime.NumCPU()
}
if flag.NArg() < 1 {
log.Fatal("Please pass an fm binary as the first argument.")
}
@ -182,33 +184,35 @@ func main() {
}
}
// The buffer size of main->worker queue channel isn't
// super important... presumably we'll have many hungry
// goroutines snapping work off it as quick as they can,
// and if things get backed up, no real reason for main
// to do anything but block.
queue := make(chan struct {
Sources []string
Flags []string
}, *processLimit)
// The buffer size of main->worker channels isn't super important...
// presumably we'll have many hungry goroutines snapping up work as quick
// as they can, and if things get backed up, no real reason for main to do
// anything but block.
cpu := make(chan work, *cpuLimit)
gpu := make(chan work, *gpuLimit)
// The buffer size of this worker->main results channel
// is much more sensitive. Since it's a many -> one
// funnel, it's easy for the workers to produce lots of
// results that main can't keep up with.
// The buffer size of this worker->main results channel is much more
// sensitive. Since it's a many->one funnel, it's easy for the workers to
// produce lots of results that main can't keep up with.
//
// This needlessly throttles our work, and in the worst
// case, if the buffer fills up before main has finished
// enqueueing all the work, we can deadlock.
// This needlessly throttles our progress, and we can even deadlock if
// the buffer fills up before main has finished enqueueing all the work.
//
// So we set the buffer size here large enough to hold
// one result for every item we might possibly enqueue.
results := make(chan bool, *processLimit*len(jobs))
// So we set the buffer size here large enough to hold a result for every
// item we might possibly enqueue.
results := make(chan bool, (*cpuLimit+*gpuLimit)*len(jobs))
for i := 0; i < *processLimit; i++ {
for i := 0; i < *cpuLimit; i++ {
go func() {
for sf := range queue {
results <- callFM(fm, sf.Sources, sf.Flags)
for w := range cpu {
results <- callFM(fm, w.Sources, w.Flags)
}
}()
}
for i := 0; i < *gpuLimit; i++ {
go func() {
for w := range gpu {
results <- callFM(fm, w.Sources, w.Flags)
}
}()
}
@ -224,6 +228,23 @@ func main() {
log.Fatal(err)
}
// Determine if this is CPU-bound or GPU-bound work, conservatively assuming GPU.
queue, limit := gpu, *gpuLimit
backend := ""
for i, flag := range flags {
if flag == "-b" || flag == "--backend" {
backend = flags[i+1]
}
}
whitelisted := map[string]bool{
"cpu": true,
"skp": true,
"pdf": true,
}
if whitelisted[backend] {
queue, limit = cpu, *cpuLimit
}
if *random {
rand.Shuffle(len(sources), func(i, j int) {
sources[i], sources[j] = sources[j], sources[i]
@ -232,24 +253,22 @@ func main() {
// Round up so there's at least one source per batch.
// This math also helps guarantee that sent stays <= cap(results).
sourcesPerBatch := (len(sources) + *processLimit - 1) / *processLimit
sourcesPerBatch := (len(sources) + limit - 1) / limit
for i := 0; i < len(sources); i += sourcesPerBatch {
end := i+sourcesPerBatch
end := i + sourcesPerBatch
if end > len(sources) {
end = len(sources)
}
batch := sources[i : end]
batch := sources[i:end]
queue <- struct {
Sources []string
Flags []string
}{batch, flags}
queue <- work{batch, flags}
sent += 1
}
}
close(queue)
close(cpu)
close(gpu)
if sent > cap(results) {
log.Fatalf("Oops, we sent %d but cap(results) is only %d. "+