Go Concurrency Patterns
Verified@wpank
npx machina-cli add skill @wpank/go-concurrency-patterns --openclawGo Concurrency Patterns
Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management.
When to Use
- Building concurrent Go applications
- Implementing worker pools and pipelines
- Managing goroutine lifecycles and cancellation
- Debugging race conditions
- Implementing graceful shutdown
Concurrency Primitives
| Primitive | Purpose | When to Use |
|---|---|---|
goroutine | Lightweight concurrent execution | Any concurrent work |
channel | Communication between goroutines | Passing data, signaling |
select | Multiplex channel operations | Waiting on multiple channels |
sync.Mutex | Mutual exclusion | Protecting shared state |
sync.WaitGroup | Wait for goroutines to complete | Coordinating goroutine completion |
context.Context | Cancellation and deadlines | Request-scoped lifecycle management |
errgroup.Group | Concurrent tasks with errors | Parallel work that can fail |
Go Concurrency Mantra: Don't communicate by sharing memory; share memory by communicating.
Quick Start
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
results := make(chan string, 10)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
select {
case <-ctx.Done():
return
case results <- fmt.Sprintf("Worker %d done", id):
}
}(i)
}
go func() { wg.Wait(); close(results) }()
for result := range results {
fmt.Println(result)
}
}
Pattern 1: Worker Pool
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
Err error
}
func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
select {
case <-ctx.Done():
return
default:
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Processed: %s", job.Data),
}
}
}
}()
}
go func() { wg.Wait(); close(results) }()
return results
}
// Usage
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
jobs := make(chan Job, 100)
go func() {
for i := 0; i < 50; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)}
}
close(jobs)
}()
for result := range WorkerPool(ctx, 5, jobs) {
fmt.Printf("Result: %+v\n", result)
}
}
Pattern 2: Fan-Out / Fan-In Pipeline
// Stage 1: Generate values
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case <-ctx.Done(): return
case out <- n:
}
}
}()
return out
}
// Stage 2: Transform (run multiple instances for fan-out)
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done(): return
case out <- n * n:
}
}
}()
return out
}
// Fan-in: Merge multiple channels into one
func merge(ctx context.Context, channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case <-ctx.Done(): return
case out <- n:
}
}
}(ch)
}
go func() { wg.Wait(); close(out) }()
return out
}
// Usage: fan out to 3 squarers, fan in results
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c1 := square(ctx, in)
c2 := square(ctx, in)
c3 := square(ctx, in)
for result := range merge(ctx, c1, c2, c3) {
fmt.Println(result)
}
}
Pattern 3: errgroup with Cancellation
import "golang.org/x/sync/errgroup"
func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return fmt.Errorf("creating request for %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetching %s: %w", url, err)
}
defer resp.Body.Close()
results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err // First error cancels all others via ctx
}
return results, nil
}
// With concurrency limit
func fetchWithLimit(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // Max concurrent goroutines
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
result, err := fetchURL(ctx, url)
if err != nil { return err }
results[i] = result
return nil
})
}
return results, g.Wait()
}
Pattern 4: Bounded Concurrency (Semaphore)
import "golang.org/x/sync/semaphore"
type RateLimitedWorker struct {
sem *semaphore.Weighted
}
func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
return &RateLimitedWorker{sem: semaphore.NewWeighted(maxConcurrent)}
}
func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
var (
wg sync.WaitGroup
mu sync.Mutex
errors []error
)
for _, task := range tasks {
if err := w.sem.Acquire(ctx, 1); err != nil {
return []error{err}
}
wg.Add(1)
go func(t func() error) {
defer wg.Done()
defer w.sem.Release(1)
if err := t(); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}(task)
}
wg.Wait()
return errors
}
// Simpler alternative: channel-based semaphore
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore { return make(chan struct{}, n) }
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
Pattern 5: Graceful Shutdown
func main() {
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
server := NewServer()
server.Start(ctx)
sig := <-sigCh
fmt.Printf("Received signal: %v\n", sig)
cancel() // Cancel context to stop all workers
server.Shutdown(5 * time.Second)
}
type Server struct {
wg sync.WaitGroup
}
func (s *Server) Start(ctx context.Context) {
for i := 0; i < 5; i++ {
s.wg.Add(1)
go s.worker(ctx, i)
}
}
func (s *Server) worker(ctx context.Context, id int) {
defer s.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d cleaning up...\n", id)
return
case <-ticker.C:
fmt.Printf("Worker %d working...\n", id)
}
}
}
func (s *Server) Shutdown(timeout time.Duration) {
done := make(chan struct{})
go func() { s.wg.Wait(); close(done) }()
select {
case <-done:
fmt.Println("Clean shutdown completed")
case <-time.After(timeout):
fmt.Println("Shutdown timed out, forcing exit")
}
}
Pattern 6: Concurrent Map
// sync.Map: optimized for read-heavy workloads with stable keys
type Cache struct {
m sync.Map
}
func (c *Cache) Get(key string) (any, bool) { return c.m.Load(key) }
func (c *Cache) Set(key string, value any) { c.m.Store(key, value) }
func (c *Cache) GetOrSet(key string, val any) (any, bool) {
return c.m.LoadOrStore(key, val)
}
// ShardedMap: better for write-heavy workloads
type ShardedMap struct {
shards []*shard
numShards int
}
type shard struct {
sync.RWMutex
data map[string]any
}
func NewShardedMap(n int) *ShardedMap {
m := &ShardedMap{shards: make([]*shard, n), numShards: n}
for i := range m.shards {
m.shards[i] = &shard{data: make(map[string]any)}
}
return m
}
func (m *ShardedMap) getShard(key string) *shard {
h := 0
for _, c := range key {
h = 31*h + int(c)
}
return m.shards[h%m.numShards]
}
func (m *ShardedMap) Get(key string) (any, bool) {
s := m.getShard(key)
s.RLock()
defer s.RUnlock()
v, ok := s.data[key]
return v, ok
}
func (m *ShardedMap) Set(key string, value any) {
s := m.getShard(key)
s.Lock()
defer s.Unlock()
s.data[key] = value
}
When to use which:
sync.Map— Few keys, many reads, keys added once and rarely deletedShardedMap— Many keys, frequent writes, need predictable performance
Select Patterns
// Timeout
select {
case v := <-ch:
fmt.Println("Received:", v)
case <-time.After(time.Second):
fmt.Println("Timeout!")
}
// Non-blocking send/receive
select {
case ch <- 42:
fmt.Println("Sent")
default:
fmt.Println("Channel full, skipping")
}
// Priority select: check high-priority first
for {
select {
case msg := <-highPriority:
handle(msg)
default:
select {
case msg := <-highPriority:
handle(msg)
case msg := <-lowPriority:
handle(msg)
}
}
}
Race Detection
go test -race ./... # Tests with race detector
go build -race . # Build with race detector
go run -race main.go # Run with race detector
Best Practices
Do:
- Use
context.Contextfor cancellation and deadlines on every goroutine - Close channels from the sender side only
- Use
errgroupfor concurrent operations that return errors - Buffer channels when count is known upfront
- Prefer channels over mutexes for coordination
- Always run tests with
-race
Don't:
- Leak goroutines — every goroutine must have an exit path
- Close a channel from the receiver — causes panic
- Use
time.Sleepfor synchronization — use proper primitives - Ignore
ctx.Done()in long-running goroutines - Share memory without synchronization — use channels or mutexes
NEVER Do
- NEVER close a channel from the receiver — Only the sender should close; receivers panic on closed channels
- NEVER send on a closed channel — Causes panic; design so sender controls close
- NEVER use unbounded goroutine spawning — Use worker pools or semaphores for bounded concurrency
- NEVER ignore the
-raceflag in testing — Data races are silent bugs that corrupt state - NEVER pass pointers to loop variables into goroutines — Capture the value or use index closure pattern
- NEVER use
time.Sleepas synchronization — Use channels, WaitGroups, or context
Overview
Go Concurrency Patterns cover goroutines, channels, sync primitives, and context management to build robust concurrent applications. It emphasizes worker pools and pipelines to structure work and backpressure. It also provides approaches for graceful shutdown and debugging race conditions in production systems.
How This Skill Works
Go uses lightweight goroutines for parallel work and channels for safe communication. Contexts provide timeouts and cancellation signals that propagate through call chains, while sync.WaitGroup and errgroup coordinate task completion and error handling. Patterns like worker pools and fan-out/fan-in pipelines organize workflow to enable scalable processing.
When to Use It
- Building concurrent Go applications
- Implementing worker pools and pipelines
- Managing goroutine lifecycles and cancellation
- Debugging race conditions
- Implementing graceful shutdown
Quick Start
- Step 1: Create a cancellable context (context.WithCancel or context.WithTimeout) to control lifecycles
- Step 2: Create channels for jobs and results and start a WaitGroup to manage workers
- Step 3: Launch worker goroutines, feed jobs, and range over results while handling cancellation and shutdown
Best Practices
- Propagate context through APIs and cancel operations promptly when the context is canceled
- Limit concurrency with worker pools to avoid resource saturation
- Communicate via channels and avoid sharing mutable state without synchronization
- Coordinate completion and errors with sync.WaitGroup or errgroup
- Implement graceful shutdown by canceling the context, signaling workers, draining channels, and exiting cleanly
Example Use Cases
- Web crawler with a worker pool that fetches pages concurrently
- Image processing pipeline using fan-out and fan-in to scale transformations
- Log aggregation service that processes lines with multiple workers
- ETL pipeline performing staged data transformations via channels
- HTTP API server handling requests with per-request workers and cancellation