Alternative approaches for the fan-in pattern in Go
kvo@envs.net
6 July 2024
1 Introduction
In Go, "fan-in" is a concurrency pattern in which input from several channels is multiplexed onto a single channel. This is useful when a number of slow jobs need to be executed concurrently and their results aggregated.
Suppose we have a work
function that completes a single job. In practice, this may be a slow job to complete, and we want to spawn on a separate goroutine. When it completes its job, it returns (sends) its result over the provided channel.
// T is the intended return type, and ... are other inputs
func work(c chan T, ...) {
defer close(c)
// work happens here
c <- result
}
Let's say we also have an aggregator function (e.g. main
) which spawns a separate instance of work
for each job, each on a different goroutine. It will then listen on the multiplexed channel for results, and process each one until the channel is closed (and all jobs are done).
func main() {
// let jobs be the number of jobs
chans := make([]chan int, jobs)
for i := range chans {
chans[i] = make(chan int)
}
results := mux(chans...)
for i := 0; i < jobs; i++ {
go work(chans[i])
}
for result := range results {
// do something with each result
}
}
All that is missing now is the channel multiplexer — the mux
function. We can use Sameer Ajmani's merge function, adapted slightly to support generic types, as our multiplexer:
func mux[T any](chans ...chan T) <-chan T {
var wg sync.WaitGroup
out := make(chan T)
output := func(c <-chan T) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(chans))
for _, c := range chans {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
(A complete version with all three functions is available in Appendix A)
However, this accepted solution to the fan-in problem requires a goroutine to be spawned for each input channel. Hence there are two goroutines per worker, with an extra goroutine for the channel closer. Considering that each new goroutine starts with approximately 4 kB of memory, the performance penalty seems unreasonable for such a trivial problem.
2 Channel closer approach
A better solution might be for all workers to send results to a single destination channel in the first place. However, this isn't as simple as it sounds. Once all results are sent over the channel, the channel must be closed. This means that there needs to be a mechanism for determining when all jobs are complete.
This can be achieved by having each worker signal that it is "done" over a channel to the aggregator. Once all workers have been spawned, the aggregator creates a new separate goroutine. This new goroutine waits for all workers to emit a "done" signal, then closes both the results and the "done" channel. The aggregator can then process all the results, e.g. by ranging over the results channel until it is closed.
This approach can be called a channel closer approach as it relies on a goroutine to close a single channel, instead of multiplexing several channels. This results in a significant reduction in the number of channels (from to ) and in the number of goroutines (from to ).
A channel-closer implementation is available in Appendix B.
3 Send-as-return approach
Why need have a "done" channel at all? If each worker only sends one value over the channel (in a similar way to a function return), the number of channel sends is equal to the number of jobs. If this is the case, there is no need to close the channel at all:
package main
import (
"fmt"
"os"
)
func work(c chan int) {
// arbitrarily chosen result
result := 37
c <- result
}
func main() {
// arbitrarily chosen number of jobs
jobs := 32768
sink, _ := os.OpenFile("/dev/null", os.O_WRONLY, 0640)
ch := make(chan int)
for i := 0; i < jobs; i++ {
go work(ch)
}
for _ = range jobs {
result := <-ch
fmt.Fprintln(sink, result)
}
}
The number of channels decreases to , and the number of goroutines is simply .
4 Discussion
The number of goroutines and channels used in each approach is summarised in Table 1, where is the number of jobs/workers:
Approach | No. of goroutines | No. of channels |
---|---|---|
Fan-in | ||
Channel closer | ||
Send-as-return |
5 Conclusion
Futhermore, the traditional solution uses channels while the gate-closer solution only uses .
References
Appendices
A Standalone fan-in implementation
package main
import (
"fmt"
"os"
"sync"
)
func work(c chan int) {
defer close(c)
result := 37
c <- result
}
func mux[T any](chans ...chan T) <-chan T {
var wg sync.WaitGroup
out := make(chan T)
output := func(c <-chan T) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(chans))
for _, c := range chans {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
jobs := 32768
sink, _ := os.OpenFile("/dev/null", os.O_WRONLY, 0640)
chans := make([]chan int, jobs)
for i := range chans {
chans[i] = make(chan int)
}
results := mux(chans...)
for i := 0; i < jobs; i++ {
go work(chans[i])
}
for result := range results {
fmt.Fprintln(sink, result)
}
}
B Standalone channel-closer implementation
package main
import (
"fmt"
"os"
)
func work(c chan int, done chan bool) {
defer send(done, true)
result := 37
c <- result
}
func send[T any](c chan T, v T) {
c <- v
}
func wait[T any](c chan T, done chan bool, jobs int) {
for i := 0; i < jobs; i++ {
<-done
}
close(done)
close(c)
}
func main() {
jobs := 32768
sink, _ := os.OpenFile("/dev/null", os.O_WRONLY, 0640)
ch := make(chan int)
done := make(chan bool)
for i := 0; i < jobs; i++ {
go work(ch, done)
}
go wait(ch, done, jobs)
for result := range ch {
fmt.Fprintln(sink, result)
}
}