Websocket Hub Patterns
Verified@wpank
npx machina-cli add skill @wpank/websocket-hub-patterns --openclawWebSocket Hub Patterns
Production patterns for horizontally-scalable WebSocket connections with Redis-backed coordination.
Installation
OpenClaw / Moltbot / Clawbot
npx clawhub@latest install websocket-hub-patterns
When to Use
- Real-time bidirectional communication
- Chat applications, collaborative editing
- Live dashboards with client interactions
- Need horizontal scaling across multiple gateway instances
Hub Structure
type Hub struct {
// Local state
connections map[*Connection]bool
subscriptions map[string]map[*Connection]bool // channel -> connections
// Channels
register chan *Connection
unregister chan *Connection
broadcast chan *Event
// Redis for scaling
redisClient *redis.Client
redisSubs map[string]*goredis.PubSub
redisSubLock sync.Mutex
// Optional: Distributed registry
connRegistry *ConnectionRegistry
instanceID string
// Shutdown
done chan struct{}
wg sync.WaitGroup
}
Hub Main Loop
func (h *Hub) Run() {
for {
select {
case <-h.done:
return
case conn := <-h.register:
h.connections[conn] = true
if h.connRegistry != nil {
h.connRegistry.RegisterConnection(ctx, conn.ID(), info)
}
case conn := <-h.unregister:
if _, ok := h.connections[conn]; ok {
if h.connRegistry != nil {
h.connRegistry.UnregisterConnection(ctx, conn.ID())
}
h.removeConnection(conn)
}
case event := <-h.broadcast:
h.broadcastToChannel(event)
}
}
}
Lazy Redis Subscriptions
Subscribe to Redis only when first local subscriber joins:
func (h *Hub) subscribeToChannel(conn *Connection, channel string) error {
// Add to local subscriptions
if h.subscriptions[channel] == nil {
h.subscriptions[channel] = make(map[*Connection]bool)
}
h.subscriptions[channel][conn] = true
// Lazy: Only subscribe to Redis on first subscriber
h.redisSubLock.Lock()
defer h.redisSubLock.Unlock()
if _, exists := h.redisSubs[channel]; !exists {
pubsub := h.redisClient.Subscribe(context.Background(), channel)
h.redisSubs[channel] = pubsub
go h.forwardRedisMessages(channel, pubsub)
}
return nil
}
func (h *Hub) unsubscribeFromChannel(conn *Connection, channel string) {
if subs, ok := h.subscriptions[channel]; ok {
delete(subs, conn)
// Cleanup when no local subscribers
if len(subs) == 0 {
delete(h.subscriptions, channel)
h.closeRedisSubscription(channel)
}
}
}
Redis Message Forwarding
func (h *Hub) forwardRedisMessages(channel string, pubsub *goredis.PubSub) {
ch := pubsub.Channel()
for {
select {
case <-h.done:
return
case msg, ok := <-ch:
if !ok {
return
}
h.broadcast <- &Event{
Channel: channel,
Data: []byte(msg.Payload),
}
}
}
}
func (h *Hub) broadcastToChannel(event *Event) {
subs := h.subscriptions[event.Channel]
for conn := range subs {
select {
case conn.send <- event.Data:
// Sent
default:
// Buffer full - close slow client
h.removeConnection(conn)
}
}
}
Connection Write Pump
func (c *Connection) writePump() {
ticker := time.NewTicker(54 * time.Second) // Ping interval
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.conn.WriteMessage(websocket.TextMessage, message)
// Batch drain queue
for i := 0; i < len(c.send); i++ {
c.conn.WriteMessage(websocket.TextMessage, <-c.send)
}
case <-ticker.C:
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
Connection Registry for Horizontal Scaling
type ConnectionRegistry struct {
client *redis.Client
instanceID string
}
func (r *ConnectionRegistry) RegisterConnection(ctx context.Context, connID string, info ConnectionInfo) error {
info.InstanceID = r.instanceID
data, _ := json.Marshal(info)
return r.client.Set(ctx, "ws:conn:"+connID, data, 2*time.Minute).Err()
}
func (r *ConnectionRegistry) HeartbeatInstance(ctx context.Context, connectionCount int) error {
info := InstanceInfo{
InstanceID: r.instanceID,
Connections: connectionCount,
}
data, _ := json.Marshal(info)
return r.client.Set(ctx, "ws:instance:"+r.instanceID, data, 30*time.Second).Err()
}
Graceful Shutdown
func (h *Hub) Shutdown() {
close(h.done)
// Close all Redis subscriptions
h.redisSubLock.Lock()
for channel, pubsub := range h.redisSubs {
pubsub.Close()
delete(h.redisSubs, channel)
}
h.redisSubLock.Unlock()
// Close all connections
for conn := range h.connections {
conn.Close()
}
h.wg.Wait()
}
Decision Tree
| Situation | Approach |
|---|---|
| Single instance | Skip ConnectionRegistry |
| Multi-instance | Enable ConnectionRegistry |
| No subscribers to channel | Lazy unsubscribe from Redis |
| Slow client | Close on buffer overflow |
| Need message history | Use Redis Streams + Pub/Sub |
Related Skills
- Meta-skill: ai/skills/meta/realtime-dashboard/ — Complete realtime dashboard guide
- dual-stream-architecture — Event publishing
- resilient-connections — Connection resilience
NEVER Do
- NEVER block on conn.send — Use select with default to detect overflow
- NEVER skip graceful shutdown — Clients need close frames
- NEVER share pubsub across channels — Each channel needs own subscription
- NEVER forget instance heartbeat — Dead instances leave orphaned connections
- NEVER send without ping/pong — Load balancers close "idle" connections
Overview
WebSocket Hub Patterns describe patterns for building real-time WebSocket servers that scale across multiple instances using a Redis-backed pub/sub bridge. The hub maintains a local connection registry and lazy Redis subscriptions to minimize cross-node traffic, plus a graceful shutdown path. This pattern is ideal for real-time apps that require reliability and horizontal scalability.
How This Skill Works
The Hub holds local connections and per-channel subscriptions, and uses Redis Pub/Sub to propagate messages across instances. Subscriptions to Redis are created lazily on the first subscriber, protected by a mutex to avoid duplicate subscriptions, with a dedicated goroutine forwarding Redis messages into the hub's broadcast channel. The Run loop handles registration, unregistration, and broadcasts, optionally updating a distributed Connection Registry and cleaning up on shutdown.
When to Use It
- Real-time bidirectional communication between clients
- Chat applications and collaborative editing across nodes
- Live dashboards with client interactions across multiple gateways
- Horizontally scaling across multiple gateway instances
- Presence and notifications for large numbers of clients
Quick Start
- Step 1: Install the pattern with npx clawhub@latest install websocket-hub-patterns
- Step 2: Initialize a Hub with a Redis client and optional ConnectionRegistry, then start Run()
- Step 3: Connect WebSocket clients and subscribe them to channels to receive broadcasts
Best Practices
- Prefer lazy Redis subscriptions to minimize Redis PUB/SUB load until needed
- Keep a per-channel subscriptions map and guard Redis subscribe with a mutex
- Register and unregister connections in the optional Connection Registry
- Use non-blocking sends and drop slow clients to prevent backpressure from stalling the hub
- Gracefully shutdown by closing done, draining goroutines, and awaiting cleanup
Example Use Cases
- A multi-node chat application where rooms are synchronized across servers
- A collaborative editor that propagates document edits across gateways
- A real-time analytics dashboard receiving updates from distributed clients
- A multiplayer game lobby system across multiple game servers
- A telemetry gateway streaming device data to all connected dashboards