Get the FREE Ultimate OpenClaw Setup Guide →

event-store

npx machina-cli add skill wpank/ai/event-store --openclaw
Files (1)
SKILL.md
16.4 KB

Event Store

Guide to designing event stores for event-sourced applications — covering event schemas, projections, snapshotting, and CQRS integration.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install event-store

When to Use This Skill

  • Designing event sourcing infrastructure
  • Choosing between event store technologies
  • Implementing custom event stores
  • Building projections from event streams
  • Adding snapshotting for aggregate performance
  • Integrating CQRS with event sourcing

Core Concepts

Event Store Architecture

┌─────────────────────────────────────────────────────┐
│                    Event Store                       │
├─────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   Stream 1   │  │   Stream 2   │  │   Stream 3   │ │
│  │ (Aggregate)  │  │ (Aggregate)  │  │ (Aggregate)  │ │
│  ├─────────────┤  ├─────────────┤  ├─────────────┤ │
│  │ Event 1     │  │ Event 1     │  │ Event 1     │ │
│  │ Event 2     │  │ Event 2     │  │ Event 2     │ │
│  │ Event 3     │  │ ...         │  │ Event 3     │ │
│  │ ...         │  │             │  │ Event 4     │ │
│  └─────────────┘  └─────────────┘  └─────────────┘ │
├─────────────────────────────────────────────────────┤
│  Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ...     │
└─────────────────────────────────────────────────────┘

Event Store Requirements

RequirementDescription
Append-onlyEvents are immutable, only appends
OrderedPer-stream and global ordering
VersionedOptimistic concurrency control
SubscriptionsReal-time event notifications
IdempotentHandle duplicate writes safely

Technology Comparison

TechnologyBest ForLimitations
EventStoreDBPure event sourcingSingle-purpose
PostgreSQLExisting Postgres stackManual implementation
KafkaHigh-throughput streamsNot ideal for per-stream queries
DynamoDBServerless, AWS-nativeQuery limitations

Event Schema Design

Events are the source of truth. Well-designed schemas ensure long-term evolvability.

Event Envelope Structure

{
  "event_id": "uuid",
  "stream_id": "Order-abc123",
  "event_type": "OrderPlaced",
  "version": 1,
  "schema_version": 1,
  "data": {
    "customer_id": "cust-1",
    "total_cents": 5000
  },
  "metadata": {
    "correlation_id": "req-xyz",
    "causation_id": "evt-prev",
    "user_id": "user-1",
    "timestamp": "2025-01-15T10:30:00Z"
  },
  "global_position": 42
}

Schema Evolution Rules

  1. Add fields freely — new optional fields are always safe
  2. Never remove or rename fields — introduce a new event type instead
  3. Version event typesOrderPlacedV2 when the schema changes materially
  4. Upcast on read — transform old versions to the current shape in the deserializer

PostgreSQL Event Store Schema

CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id VARCHAR(255) NOT NULL,
    stream_type VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    version BIGINT NOT NULL,
    global_position BIGSERIAL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);

CREATE INDEX idx_events_stream ON events(stream_id, version);
CREATE INDEX idx_events_global ON events(global_position);
CREATE INDEX idx_events_type ON events(event_type);

CREATE TABLE snapshots (
    stream_id VARCHAR(255) PRIMARY KEY,
    stream_type VARCHAR(255) NOT NULL,
    snapshot_data JSONB NOT NULL,
    version BIGINT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE subscription_checkpoints (
    subscription_id VARCHAR(255) PRIMARY KEY,
    last_position BIGINT NOT NULL DEFAULT 0,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

Event Store Implementation

@dataclass
class Event:
    stream_id: str
    event_type: str
    data: dict
    metadata: dict = field(default_factory=dict)
    event_id: UUID = field(default_factory=uuid4)
    version: int | None = None
    global_position: int | None = None

class EventStore:  # backed by PostgreSQL schema above
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def append(self, stream_id: str, stream_type: str,
                     events: list[Event],
                     expected_version: int | None = None) -> list[Event]:
        """Append events with optimistic concurrency control."""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                if expected_version is not None:
                    current = await conn.fetchval(
                        "SELECT MAX(version) FROM events "
                        "WHERE stream_id = $1", stream_id
                    ) or 0
                    if current != expected_version:
                        raise ConcurrencyError(
                            f"Expected {expected_version}, got {current}"
                        )

                start = await conn.fetchval(
                    "SELECT COALESCE(MAX(version), 0) + 1 "
                    "FROM events WHERE stream_id = $1", stream_id
                )
                for i, evt in enumerate(events):
                    evt.version = start + i
                    row = await conn.fetchrow(
                        "INSERT INTO events (id, stream_id, stream_type, "
                        "event_type, event_data, metadata, version) "
                        "VALUES ($1,$2,$3,$4,$5,$6,$7) "
                        "RETURNING global_position",
                        evt.event_id, stream_id, stream_type,
                        evt.event_type, json.dumps(evt.data),
                        json.dumps(evt.metadata), evt.version,
                    )
                    evt.global_position = row["global_position"]
                return events

    async def read_stream(self, stream_id: str,
                          from_version: int = 0) -> list[Event]:
        """Read events for a single stream."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT * FROM events WHERE stream_id = $1 "
                "AND version >= $2 ORDER BY version",
                stream_id, from_version,
            )
            return [self._to_event(r) for r in rows]

    async def read_all(self, from_position: int = 0,
                       limit: int = 1000) -> list[Event]:
        """Read global event stream for projections / subscriptions."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT * FROM events WHERE global_position > $1 "
                "ORDER BY global_position LIMIT $2",
                from_position, limit,
            )
            return [self._to_event(r) for r in rows]

Projections

Projections build read-optimised views by replaying events. They are the "Q" side of CQRS.

Projection Lifecycle

  1. Start from checkpoint — resume from last processed global position
  2. Apply events — update the read model for each relevant event type
  3. Save checkpoint — persist the new position atomically with the read model

Projection Example

class OrderSummaryProjection:
    def __init__(self, db, event_store: EventStore):
        self.db = db
        self.store = event_store

    async def run(self, batch_size: int = 100):
        position = await self._load_checkpoint()
        while True:
            events = await self.store.read_all(position, batch_size)
            if not events:
                await asyncio.sleep(1)
                continue
            for evt in events:
                await self._apply(evt)
                position = evt.global_position
            await self._save_checkpoint(position)

    async def _apply(self, event: Event):
        match event.event_type:
            case "OrderPlaced":
                await self.db.execute(
                    "INSERT INTO order_summaries (id, customer, total, status) "
                    "VALUES ($1,$2,$3,'placed')",
                    event.data["order_id"], event.data["customer_id"],
                    event.data["total_cents"],
                )
            case "OrderShipped":
                await self.db.execute(
                    "UPDATE order_summaries SET status='shipped' "
                    "WHERE id=$1", event.data["order_id"],
                )

Projection Design Rules

  • Idempotent handlers — replaying the same event twice must not corrupt state
  • One projection per read model — keep projections focused
  • Rebuild from scratch — projections should be deletable and fully replayable
  • Separate storage — projections can live in different databases (Postgres, Elasticsearch, Redis)

Snapshotting

Snapshots accelerate aggregate rehydration by caching state at a known version.

Use when streams exceed ~100 events, aggregates have expensive rehydration, or on a cadence (e.g., every 50 events).

Snapshot Flow

class SnapshottedRepository:
    def __init__(self, event_store: EventStore, pool):
        self.store = event_store
        self.pool = pool

    async def load(self, stream_id: str) -> Aggregate:
        # 1. Try loading snapshot
        snap = await self._load_snapshot(stream_id)
        from_version = 0
        aggregate = Aggregate(stream_id)

        if snap:
            aggregate.restore(snap["data"])
            from_version = snap["version"] + 1

        # 2. Replay events after snapshot
        events = await self.store.read_stream(stream_id, from_version)
        for evt in events:
            aggregate.apply(evt)

        # 3. Snapshot if too many events replayed
        if len(events) > 50:
            await self._save_snapshot(
                stream_id, aggregate.snapshot(), aggregate.version
            )

        return aggregate

CQRS Integration

CQRS separates the write model (commands → events) from the read model (projections).

Commands ──► Aggregate ──► Event Store ──► Projections ──► Query API
 (write)     (domain)      (append)        (build)        (read)

Key Principles

  1. Write side validates commands, emits events, enforces invariants
  2. Read side subscribes to events, builds optimised query models
  3. Eventual consistency — reads may lag behind writes by milliseconds to seconds
  4. Independent scaling — scale reads and writes separately

Command Handler Pattern

class PlaceOrderHandler:
    def __init__(self, event_store: EventStore):
        self.store = event_store

    async def handle(self, cmd: PlaceOrderCommand):
        # Load aggregate from events
        events = await self.store.read_stream(f"Order-{cmd.order_id}")
        order = Order.reconstitute(events)

        # Execute command — validates and produces new events
        new_events = order.place(cmd.customer_id, cmd.items)

        # Persist with concurrency check
        await self.store.append(
            f"Order-{cmd.order_id}", "Order", new_events,
            expected_version=order.version,
        )

EventStoreDB Integration

from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json

client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")

def append_events(stream_name: str, events: list, expected_revision=None):
    new_events = [
        NewEvent(
            type=event['type'],
            data=json.dumps(event['data']).encode(),
            metadata=json.dumps(event.get('metadata', {})).encode()
        )
        for event in events
    ]
    state = (StreamState.ANY if expected_revision is None
             else StreamState.NO_STREAM if expected_revision == -1
             else expected_revision)
    return client.append_to_stream(stream_name, new_events, current_version=state)

def read_stream(stream_name: str, from_revision: int = 0):
    return [
        {'type': e.type, 'data': json.loads(e.data),
         'stream_position': e.stream_position}
        for e in client.get_stream(stream_name, stream_position=from_revision)
    ]

# Category projection: read all events for Order-* streams
def read_category(category: str):
    return read_stream(f"$ce-{category}")

DynamoDB Event Store

import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json, uuid

class DynamoEventStore:
    def __init__(self, table_name: str):
        self.table = boto3.resource('dynamodb').Table(table_name)

    def append(self, stream_id: str, events: list, expected_version: int = 0):
        with self.table.batch_writer() as batch:
            for i, event in enumerate(events):
                version = expected_version + i + 1
                batch.put_item(Item={
                    'PK': f"STREAM#{stream_id}",
                    'SK': f"VERSION#{version:020d}",
                    'GSI1PK': 'EVENTS',
                    'GSI1SK': datetime.utcnow().isoformat(),
                    'event_id': str(uuid.uuid4()),
                    'event_type': event['type'],
                    'event_data': json.dumps(event['data']),
                    'version': version,
                })

    def read_stream(self, stream_id: str, from_version: int = 0):
        resp = self.table.query(
            KeyConditionExpression=
                Key('PK').eq(f"STREAM#{stream_id}") &
                Key('SK').gte(f"VERSION#{from_version:020d}")
        )
        return [
            {'event_type': item['event_type'],
             'data': json.loads(item['event_data']),
             'version': item['version']}
            for item in resp['Items']
        ]

DynamoDB table design: PK=STREAM#{id}, SK=VERSION#{version}, GSI1 for global ordering.

Best Practices

Do

  • Name streams {Type}-{id} — e.g., Order-abc123
  • Include correlation / causation IDs in metadata for tracing
  • Version event schemas from day one — plan for evolution
  • Implement idempotent writes — use event IDs for deduplication
  • Index for your query patterns — stream, global position, event type

Don't

  • Mutate or delete events — they are immutable facts
  • Store large payloads — keep events small; reference blobs externally
  • Skip optimistic concurrency — prevents data corruption
  • Ignore backpressure — handle slow consumers gracefully
  • Couple projections to the write model — projections should be independently deployable

NEVER Do

  • NEVER update or delete events — Events are immutable historical facts; create compensating events instead
  • NEVER skip version checks on append — Optimistic concurrency prevents lost updates and corruption
  • NEVER embed large blobs in events — Store blobs externally, reference by ID in the event
  • NEVER use random UUIDs for event IDs without idempotency checks — Retries create duplicates
  • NEVER read projections for command validation — Use the event stream as the source of truth
  • NEVER couple projections to the write transaction — Projections must be rebuildable independently

Source

git clone https://github.com/wpank/ai/blob/main/skills/backend/event-store/SKILL.mdView on GitHub

Overview

Design and implement event stores for event sourced systems, covering event schemas, projections, snapshotting, and CQRS integration. This skill helps teams plan durable persistence, real time notification streams, and scalable read models.

How This Skill Works

Defines an event envelope and supports per-stream and global ordering with versioned events for optimistic concurrency. It also covers snapshotting, projections, and CQRS integration so you can build efficient read models.

When to Use It

  • Designing event sourcing infrastructure
  • Choosing between event store technologies
  • Implementing custom event stores
  • Building projections from event streams
  • Integrating CQRS with event sourcing

Quick Start

  1. Step 1: Choose your event store tech (EventStoreDB PostgreSQL Kafka DynamoDB)
  2. Step 2: Define the event envelope and draft a few sample events
  3. Step 3: Implement append, read, projection, snapshotting, and optional CQRS integration

Best Practices

  • Define a stable event envelope: event_id, stream_id, event_type, version, schema_version, data, metadata, global_position
  • Keep append-only immutability and maintain per-stream and global ordering
  • Version event types and upcast on read to handle schema evolution
  • Design for idempotent writes and safe duplicate handling
  • Index and partition events to support projections and read models

Example Use Cases

  • Orders domain event streams like OrderPlaced and OrderCancelled
  • Payments lifecycle events such as PaymentAuthorized and PaymentCaptured
  • Inventory and stock projection events like StockReserved and StockReleased
  • User activity streams such as UserSignedUp and UserLoggedIn
  • Audit trails and compliance events like CredentialChanged

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers