Get the FREE Ultimate OpenClaw Setup Guide →
w

Websocket Hub Patterns

Verified

@wpank

npx machina-cli add skill @wpank/websocket-hub-patterns --openclaw
Files (1)
SKILL.md
6.8 KB

WebSocket Hub Patterns

Production patterns for horizontally-scalable WebSocket connections with Redis-backed coordination.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install websocket-hub-patterns

When to Use

  • Real-time bidirectional communication
  • Chat applications, collaborative editing
  • Live dashboards with client interactions
  • Need horizontal scaling across multiple gateway instances

Hub Structure

type Hub struct {
    // Local state
    connections   map[*Connection]bool
    subscriptions map[string]map[*Connection]bool // channel -> connections

    // Channels
    register   chan *Connection
    unregister chan *Connection
    broadcast  chan *Event

    // Redis for scaling
    redisClient  *redis.Client
    redisSubs    map[string]*goredis.PubSub
    redisSubLock sync.Mutex

    // Optional: Distributed registry
    connRegistry *ConnectionRegistry
    instanceID   string

    // Shutdown
    done chan struct{}
    wg   sync.WaitGroup
}

Hub Main Loop

func (h *Hub) Run() {
    for {
        select {
        case <-h.done:
            return

        case conn := <-h.register:
            h.connections[conn] = true
            if h.connRegistry != nil {
                h.connRegistry.RegisterConnection(ctx, conn.ID(), info)
            }

        case conn := <-h.unregister:
            if _, ok := h.connections[conn]; ok {
                if h.connRegistry != nil {
                    h.connRegistry.UnregisterConnection(ctx, conn.ID())
                }
                h.removeConnection(conn)
            }

        case event := <-h.broadcast:
            h.broadcastToChannel(event)
        }
    }
}

Lazy Redis Subscriptions

Subscribe to Redis only when first local subscriber joins:

func (h *Hub) subscribeToChannel(conn *Connection, channel string) error {
    // Add to local subscriptions
    if h.subscriptions[channel] == nil {
        h.subscriptions[channel] = make(map[*Connection]bool)
    }
    h.subscriptions[channel][conn] = true

    // Lazy: Only subscribe to Redis on first subscriber
    h.redisSubLock.Lock()
    defer h.redisSubLock.Unlock()

    if _, exists := h.redisSubs[channel]; !exists {
        pubsub := h.redisClient.Subscribe(context.Background(), channel)
        h.redisSubs[channel] = pubsub
        go h.forwardRedisMessages(channel, pubsub)
    }

    return nil
}

func (h *Hub) unsubscribeFromChannel(conn *Connection, channel string) {
    if subs, ok := h.subscriptions[channel]; ok {
        delete(subs, conn)

        // Cleanup when no local subscribers
        if len(subs) == 0 {
            delete(h.subscriptions, channel)
            h.closeRedisSubscription(channel)
        }
    }
}

Redis Message Forwarding

func (h *Hub) forwardRedisMessages(channel string, pubsub *goredis.PubSub) {
    ch := pubsub.Channel()
    for {
        select {
        case <-h.done:
            return
        case msg, ok := <-ch:
            if !ok {
                return
            }
            h.broadcast <- &Event{
                Channel: channel,
                Data:    []byte(msg.Payload),
            }
        }
    }
}

func (h *Hub) broadcastToChannel(event *Event) {
    subs := h.subscriptions[event.Channel]
    for conn := range subs {
        select {
        case conn.send <- event.Data:
            // Sent
        default:
            // Buffer full - close slow client
            h.removeConnection(conn)
        }
    }
}

Connection Write Pump

func (c *Connection) writePump() {
    ticker := time.NewTicker(54 * time.Second) // Ping interval
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            c.conn.WriteMessage(websocket.TextMessage, message)

            // Batch drain queue
            for i := 0; i < len(c.send); i++ {
                c.conn.WriteMessage(websocket.TextMessage, <-c.send)
            }

        case <-ticker.C:
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

Connection Registry for Horizontal Scaling

type ConnectionRegistry struct {
    client     *redis.Client
    instanceID string
}

func (r *ConnectionRegistry) RegisterConnection(ctx context.Context, connID string, info ConnectionInfo) error {
    info.InstanceID = r.instanceID
    data, _ := json.Marshal(info)
    return r.client.Set(ctx, "ws:conn:"+connID, data, 2*time.Minute).Err()
}

func (r *ConnectionRegistry) HeartbeatInstance(ctx context.Context, connectionCount int) error {
    info := InstanceInfo{
        InstanceID:  r.instanceID,
        Connections: connectionCount,
    }
    data, _ := json.Marshal(info)
    return r.client.Set(ctx, "ws:instance:"+r.instanceID, data, 30*time.Second).Err()
}

Graceful Shutdown

func (h *Hub) Shutdown() {
    close(h.done)

    // Close all Redis subscriptions
    h.redisSubLock.Lock()
    for channel, pubsub := range h.redisSubs {
        pubsub.Close()
        delete(h.redisSubs, channel)
    }
    h.redisSubLock.Unlock()

    // Close all connections
    for conn := range h.connections {
        conn.Close()
    }

    h.wg.Wait()
}

Decision Tree

SituationApproach
Single instanceSkip ConnectionRegistry
Multi-instanceEnable ConnectionRegistry
No subscribers to channelLazy unsubscribe from Redis
Slow clientClose on buffer overflow
Need message historyUse Redis Streams + Pub/Sub

Related Skills


NEVER Do

  • NEVER block on conn.send — Use select with default to detect overflow
  • NEVER skip graceful shutdown — Clients need close frames
  • NEVER share pubsub across channels — Each channel needs own subscription
  • NEVER forget instance heartbeat — Dead instances leave orphaned connections
  • NEVER send without ping/pong — Load balancers close "idle" connections

Source

git clone https://clawhub.ai/wpank/websocket-hub-patternsView on GitHub

Overview

WebSocket Hub Patterns describe patterns for building real-time WebSocket servers that scale across multiple instances using a Redis-backed pub/sub bridge. The hub maintains a local connection registry and lazy Redis subscriptions to minimize cross-node traffic, plus a graceful shutdown path. This pattern is ideal for real-time apps that require reliability and horizontal scalability.

How This Skill Works

The Hub holds local connections and per-channel subscriptions, and uses Redis Pub/Sub to propagate messages across instances. Subscriptions to Redis are created lazily on the first subscriber, protected by a mutex to avoid duplicate subscriptions, with a dedicated goroutine forwarding Redis messages into the hub's broadcast channel. The Run loop handles registration, unregistration, and broadcasts, optionally updating a distributed Connection Registry and cleaning up on shutdown.

When to Use It

  • Real-time bidirectional communication between clients
  • Chat applications and collaborative editing across nodes
  • Live dashboards with client interactions across multiple gateways
  • Horizontally scaling across multiple gateway instances
  • Presence and notifications for large numbers of clients

Quick Start

  1. Step 1: Install the pattern with npx clawhub@latest install websocket-hub-patterns
  2. Step 2: Initialize a Hub with a Redis client and optional ConnectionRegistry, then start Run()
  3. Step 3: Connect WebSocket clients and subscribe them to channels to receive broadcasts

Best Practices

  • Prefer lazy Redis subscriptions to minimize Redis PUB/SUB load until needed
  • Keep a per-channel subscriptions map and guard Redis subscribe with a mutex
  • Register and unregister connections in the optional Connection Registry
  • Use non-blocking sends and drop slow clients to prevent backpressure from stalling the hub
  • Gracefully shutdown by closing done, draining goroutines, and awaiting cleanup

Example Use Cases

  • A multi-node chat application where rooms are synchronized across servers
  • A collaborative editor that propagates document edits across gateways
  • A real-time analytics dashboard receiving updates from distributed clients
  • A multiplayer game lobby system across multiple game servers
  • A telemetry gateway streaming device data to all connected dashboards

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers