Pipelines

  • One input channel and one output channel
  • Some intermediate goroutines and channels that handle some process
 
func pipeline[I any, O any](quit chan struct{}, input <-chan I, process func(I) O) <-chan O {
    out := make(chan O)
    go func() {
        for in := range input {
            defer close(out)
            select {
            case out <- process(in):
            case <-quit:
                return
            }
        }
    }()
 
    return out
}

Fan-in

  • Merge multiple channels into one channel concurrently
func fanIn[T any](channels ...<-chan T) <-chan T {
    var wg sync.WaitGroup
    out := make(chan T)
    wg.Add(len(channels))
    for _, ch := range channels {
        go func(in <-chan T) {
            for i := range in {
                out <- i
            }
            wg.Done()
        }(ch)
    }
 
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

Generator

  • Keep a channel full an automatically refill it with on demand calculations
// example infinite numbers
func generateRandInt(min, max int) <-chan int {
    out := make(chan int, 3)
    go func() {
        for {
            out <- rand.IntN(max-min+1) + min
        }
    }()
    return out
}
 
func main() {
	gen := generateRandInt(0, 100)
    for range 20 {
        fmt.Println(<-gen)
    }
}
// example set number of items to be generated
func generateRandIntN(min, max, count int) <-chan int {
    out := make(chan int, 1)
    go func() {
        for range count {
            out <- rand.IntN(max-min+1) + min
        }
        close(out)
    }()
    return out
}
 
func main() {
	gen := generateRandIntN(0, 100, 5)
    for n := range gen {
        fmt.Println(n)
    }
    
    // or
    
    for {
        n, open := <-gen
        if !open {
            break
        }
        fmt.Println(n)
    }
}

Context

  • Lets you pass in information like a quit channel
func performOperation(ctx context.Context, op string, delay time.Duration) <-chan string {
	out := make(chan string)
	go func() {
		for {
			select {
			case <-time.After(delay * time.Millisecond):
				out <- fmt.Sprintf("%v completed", op)
				return
			case <-ctx.Done():
				out <- fmt.Sprintf("%v aborted", op)
				return
			}
		}
	}()
	return out
}
 
func main() {
 
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
 
	web := performOperation(ctx, "spin up server", 100)
	database := performOperation(ctx, "start database", 500)
	security := performOperation(ctx, "do security checks", 1000)
 
MainLoop:
	for {
		select {
		case res := <-web:
			fmt.Println(res)
		case res := <-database:
			fmt.Println(res)
			cancel()
			break MainLoop
		case res := <-security:
			fmt.Println(res)
		}
	}
 
	fmt.Println(<-security)
 
}