Some intermediate goroutines and channels that handle some process
func pipeline[I any, O any](quit chan struct{}, input <-chan I, process func(I) O) <-chan O { out := make(chan O) go func() { for in := range input { defer close(out) select { case out <- process(in): case <-quit: return } } }() return out}
Fan-in
Merge multiple channels into one channel concurrently
func fanIn[T any](channels ...<-chan T) <-chan T { var wg sync.WaitGroup out := make(chan T) wg.Add(len(channels)) for _, ch := range channels { go func(in <-chan T) { for i := range in { out <- i } wg.Done() }(ch) } go func() { wg.Wait() close(out) }() return out}
Generator
Keep a channel full an automatically refill it with on demand calculations
// example infinite numbersfunc generateRandInt(min, max int) <-chan int { out := make(chan int, 3) go func() { for { out <- rand.IntN(max-min+1) + min } }() return out}func main() { gen := generateRandInt(0, 100) for range 20 { fmt.Println(<-gen) }}
// example set number of items to be generatedfunc generateRandIntN(min, max, count int) <-chan int { out := make(chan int, 1) go func() { for range count { out <- rand.IntN(max-min+1) + min } close(out) }() return out}func main() { gen := generateRandIntN(0, 100, 5) for n := range gen { fmt.Println(n) } // or for { n, open := <-gen if !open { break } fmt.Println(n) }}
Context
Lets you pass in information like a quit channel
func performOperation(ctx context.Context, op string, delay time.Duration) <-chan string { out := make(chan string) go func() { for { select { case <-time.After(delay * time.Millisecond): out <- fmt.Sprintf("%v completed", op) return case <-ctx.Done(): out <- fmt.Sprintf("%v aborted", op) return } } }() return out}func main() { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) web := performOperation(ctx, "spin up server", 100) database := performOperation(ctx, "start database", 500) security := performOperation(ctx, "do security checks", 1000)MainLoop: for { select { case res := <-web: fmt.Println(res) case res := <-database: fmt.Println(res) cancel() break MainLoop case res := <-security: fmt.Println(res) } } fmt.Println(<-security)}