Get the FREE Ultimate OpenClaw Setup Guide →

go-concurrency

npx machina-cli add skill wpank/ai/go-concurrency --openclaw
Files (1)
SKILL.md
12.2 KB

Go Concurrency Patterns

Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install go-concurrency

When to Use

  • Building concurrent Go applications
  • Implementing worker pools and pipelines
  • Managing goroutine lifecycles and cancellation
  • Debugging race conditions
  • Implementing graceful shutdown

Concurrency Primitives

PrimitivePurposeWhen to Use
goroutineLightweight concurrent executionAny concurrent work
channelCommunication between goroutinesPassing data, signaling
selectMultiplex channel operationsWaiting on multiple channels
sync.MutexMutual exclusionProtecting shared state
sync.WaitGroupWait for goroutines to completeCoordinating goroutine completion
context.ContextCancellation and deadlinesRequest-scoped lifecycle management
errgroup.GroupConcurrent tasks with errorsParallel work that can fail

Go Concurrency Mantra: Don't communicate by sharing memory; share memory by communicating.

Quick Start

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    results := make(chan string, 10)
    var wg sync.WaitGroup

    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            select {
            case <-ctx.Done():
                return
            case results <- fmt.Sprintf("Worker %d done", id):
            }
        }(i)
    }

    go func() { wg.Wait(); close(results) }()

    for result := range results {
        fmt.Println(result)
    }
}

Pattern 1: Worker Pool

type Job struct {
    ID   int
    Data string
}

type Result struct {
    JobID  int
    Output string
    Err    error
}

func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
    results := make(chan Result)
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                select {
                case <-ctx.Done():
                    return
                default:
                    results <- Result{
                        JobID:  job.ID,
                        Output: fmt.Sprintf("Processed: %s", job.Data),
                    }
                }
            }
        }()
    }

    go func() { wg.Wait(); close(results) }()
    return results
}

// Usage
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    jobs := make(chan Job, 100)
    go func() {
        for i := 0; i < 50; i++ {
            jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)}
        }
        close(jobs)
    }()

    for result := range WorkerPool(ctx, 5, jobs) {
        fmt.Printf("Result: %+v\n", result)
    }
}

Pattern 2: Fan-Out / Fan-In Pipeline

// Stage 1: Generate values
func generate(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case <-ctx.Done(): return
            case out <- n:
            }
        }
    }()
    return out
}

// Stage 2: Transform (run multiple instances for fan-out)
func square(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case <-ctx.Done(): return
            case out <- n * n:
            }
        }
    }()
    return out
}

// Fan-in: Merge multiple channels into one
func merge(ctx context.Context, channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    wg.Add(len(channels))
    for _, ch := range channels {
        go func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                select {
                case <-ctx.Done(): return
                case out <- n:
                }
            }
        }(ch)
    }

    go func() { wg.Wait(); close(out) }()
    return out
}

// Usage: fan out to 3 squarers, fan in results
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    c1 := square(ctx, in)
    c2 := square(ctx, in)
    c3 := square(ctx, in)

    for result := range merge(ctx, c1, c2, c3) {
        fmt.Println(result)
    }
}

Pattern 3: errgroup with Cancellation

import "golang.org/x/sync/errgroup"

func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return fmt.Errorf("creating request for %s: %w", url, err)
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetching %s: %w", url, err)
            }
            defer resp.Body.Close()
            results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode)
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err // First error cancels all others via ctx
    }
    return results, nil
}

// With concurrency limit
func fetchWithLimit(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(10) // Max concurrent goroutines
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            result, err := fetchURL(ctx, url)
            if err != nil { return err }
            results[i] = result
            return nil
        })
    }

    return results, g.Wait()
}

Pattern 4: Bounded Concurrency (Semaphore)

import "golang.org/x/sync/semaphore"

type RateLimitedWorker struct {
    sem *semaphore.Weighted
}

func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
    return &RateLimitedWorker{sem: semaphore.NewWeighted(maxConcurrent)}
}

func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
    var (
        wg     sync.WaitGroup
        mu     sync.Mutex
        errors []error
    )

    for _, task := range tasks {
        if err := w.sem.Acquire(ctx, 1); err != nil {
            return []error{err}
        }
        wg.Add(1)
        go func(t func() error) {
            defer wg.Done()
            defer w.sem.Release(1)
            if err := t(); err != nil {
                mu.Lock()
                errors = append(errors, err)
                mu.Unlock()
            }
        }(task)
    }

    wg.Wait()
    return errors
}

// Simpler alternative: channel-based semaphore
type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore       { return make(chan struct{}, n) }
func (s Semaphore) Acquire()             { s <- struct{}{} }
func (s Semaphore) Release()             { <-s }

Pattern 5: Graceful Shutdown

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    server := NewServer()
    server.Start(ctx)

    sig := <-sigCh
    fmt.Printf("Received signal: %v\n", sig)
    cancel() // Cancel context to stop all workers

    server.Shutdown(5 * time.Second)
}

type Server struct {
    wg sync.WaitGroup
}

func (s *Server) Start(ctx context.Context) {
    for i := 0; i < 5; i++ {
        s.wg.Add(1)
        go s.worker(ctx, i)
    }
}

func (s *Server) worker(ctx context.Context, id int) {
    defer s.wg.Done()
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d cleaning up...\n", id)
            return
        case <-ticker.C:
            fmt.Printf("Worker %d working...\n", id)
        }
    }
}

func (s *Server) Shutdown(timeout time.Duration) {
    done := make(chan struct{})
    go func() { s.wg.Wait(); close(done) }()

    select {
    case <-done:
        fmt.Println("Clean shutdown completed")
    case <-time.After(timeout):
        fmt.Println("Shutdown timed out, forcing exit")
    }
}

Pattern 6: Concurrent Map

// sync.Map: optimized for read-heavy workloads with stable keys
type Cache struct {
    m sync.Map
}

func (c *Cache) Get(key string) (any, bool) { return c.m.Load(key) }
func (c *Cache) Set(key string, value any) { c.m.Store(key, value) }
func (c *Cache) GetOrSet(key string, val any) (any, bool) {
    return c.m.LoadOrStore(key, val)
}

// ShardedMap: better for write-heavy workloads
type ShardedMap struct {
    shards    []*shard
    numShards int
}

type shard struct {
    sync.RWMutex
    data map[string]any
}

func NewShardedMap(n int) *ShardedMap {
    m := &ShardedMap{shards: make([]*shard, n), numShards: n}
    for i := range m.shards {
        m.shards[i] = &shard{data: make(map[string]any)}
    }
    return m
}

func (m *ShardedMap) getShard(key string) *shard {
    h := 0
    for _, c := range key {
        h = 31*h + int(c)
    }
    return m.shards[h%m.numShards]
}

func (m *ShardedMap) Get(key string) (any, bool) {
    s := m.getShard(key)
    s.RLock()
    defer s.RUnlock()
    v, ok := s.data[key]
    return v, ok
}

func (m *ShardedMap) Set(key string, value any) {
    s := m.getShard(key)
    s.Lock()
    defer s.Unlock()
    s.data[key] = value
}

When to use which:

  • sync.Map — Few keys, many reads, keys added once and rarely deleted
  • ShardedMap — Many keys, frequent writes, need predictable performance

Select Patterns

// Timeout
select {
case v := <-ch:
    fmt.Println("Received:", v)
case <-time.After(time.Second):
    fmt.Println("Timeout!")
}

// Non-blocking send/receive
select {
case ch <- 42:
    fmt.Println("Sent")
default:
    fmt.Println("Channel full, skipping")
}

// Priority select: check high-priority first
for {
    select {
    case msg := <-highPriority:
        handle(msg)
    default:
        select {
        case msg := <-highPriority:
            handle(msg)
        case msg := <-lowPriority:
            handle(msg)
        }
    }
}

Race Detection

go test -race ./...     # Tests with race detector
go build -race .        # Build with race detector
go run -race main.go    # Run with race detector

Best Practices

Do:

  • Use context.Context for cancellation and deadlines on every goroutine
  • Close channels from the sender side only
  • Use errgroup for concurrent operations that return errors
  • Buffer channels when count is known upfront
  • Prefer channels over mutexes for coordination
  • Always run tests with -race

Don't:

  • Leak goroutines — every goroutine must have an exit path
  • Close a channel from the receiver — causes panic
  • Use time.Sleep for synchronization — use proper primitives
  • Ignore ctx.Done() in long-running goroutines
  • Share memory without synchronization — use channels or mutexes

NEVER Do

  • NEVER close a channel from the receiver — Only the sender should close; receivers panic on closed channels
  • NEVER send on a closed channel — Causes panic; design so sender controls close
  • NEVER use unbounded goroutine spawning — Use worker pools or semaphores for bounded concurrency
  • NEVER ignore the -race flag in testing — Data races are silent bugs that corrupt state
  • NEVER pass pointers to loop variables into goroutines — Capture the value or use index closure pattern
  • NEVER use time.Sleep as synchronization — Use channels, WaitGroups, or context

Source

git clone https://github.com/wpank/ai/blob/main/skills/backend/go-concurrency/SKILL.mdView on GitHub

Overview

Production-ready Go concurrency patterns, from goroutines and channels to sync primitives and context. Learn to build worker pools, pipelines, and graceful shutdown sequences that scale and remain debuggable. This skill helps you manage lifecycles, coordinate tasks, and prevent race conditions in real-world apps.

How This Skill Works

Goroutines execute work concurrently, and channels enable safe data exchange. Worker pools, fan-out/fan-in pipelines, and synchronization primitives orchestrate task flow, while context.Context handles cancellation and deadlines; sync.WaitGroup and errgroup.Group coordinate lifecycles and error propagation.

When to Use It

  • Building concurrent Go applications
  • Implementing worker pools and pipelines
  • Managing goroutine lifecycles and cancellation
  • Debugging race conditions
  • Implementing graceful shutdown

Quick Start

  1. Step 1: Create a cancellable context, e.g. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second); defer cancel()
  2. Step 2: Initialize a job channel and launch a small pool of workers that read from the channel and respect ctx.Done()
  3. Step 3: Collect results from a results channel or returned values, then wait for all workers to finish and close channels

Best Practices

  • Follow the mantra: don't share memory—communicate by passing messages through channels.
  • Use context.Context for cancellation and timeouts to avoid goroutine leaks.
  • Prefers worker pools or fan-out/fan-in pipelines to cap concurrency and improve throughput.
  • Coordinate completion and errors with sync.WaitGroup or errgroup.Group.
  • Close channels when done and design send/receive to avoid deadlocks.

Example Use Cases

  • A worker pool that processes a backlog of jobs with a bounded number of workers and a cancellable context.
  • A fan-out/fan-in pipeline that mirrors data through multiple stages (generate → transform → aggregate).
  • Graceful shutdown of an HTTP server using context cancellation and a shutdown waiter.
  • Parallel tasks with error propagation using errgroup.Group to fail fast on first error.
  • Using the -race detector to identify data races while applying mutexes and channel coordination.

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers