event-store
npx machina-cli add skill wpank/ai/event-store --openclawEvent Store
Guide to designing event stores for event-sourced applications — covering event schemas, projections, snapshotting, and CQRS integration.
Installation
OpenClaw / Moltbot / Clawbot
npx clawhub@latest install event-store
When to Use This Skill
- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Building projections from event streams
- Adding snapshotting for aggregate performance
- Integrating CQRS with event sourcing
Core Concepts
Event Store Architecture
┌─────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
│ │ Event 3 │ │ ... │ │ Event 3 │ │
│ │ ... │ │ │ │ Event 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘
Event Store Requirements
| Requirement | Description |
|---|---|
| Append-only | Events are immutable, only appends |
| Ordered | Per-stream and global ordering |
| Versioned | Optimistic concurrency control |
| Subscriptions | Real-time event notifications |
| Idempotent | Handle duplicate writes safely |
Technology Comparison
| Technology | Best For | Limitations |
|---|---|---|
| EventStoreDB | Pure event sourcing | Single-purpose |
| PostgreSQL | Existing Postgres stack | Manual implementation |
| Kafka | High-throughput streams | Not ideal for per-stream queries |
| DynamoDB | Serverless, AWS-native | Query limitations |
Event Schema Design
Events are the source of truth. Well-designed schemas ensure long-term evolvability.
Event Envelope Structure
{
"event_id": "uuid",
"stream_id": "Order-abc123",
"event_type": "OrderPlaced",
"version": 1,
"schema_version": 1,
"data": {
"customer_id": "cust-1",
"total_cents": 5000
},
"metadata": {
"correlation_id": "req-xyz",
"causation_id": "evt-prev",
"user_id": "user-1",
"timestamp": "2025-01-15T10:30:00Z"
},
"global_position": 42
}
Schema Evolution Rules
- Add fields freely — new optional fields are always safe
- Never remove or rename fields — introduce a new event type instead
- Version event types —
OrderPlacedV2when the schema changes materially - Upcast on read — transform old versions to the current shape in the deserializer
PostgreSQL Event Store Schema
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
CREATE INDEX idx_events_stream ON events(stream_id, version);
CREATE INDEX idx_events_global ON events(global_position);
CREATE INDEX idx_events_type ON events(event_type);
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Event Store Implementation
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: UUID = field(default_factory=uuid4)
version: int | None = None
global_position: int | None = None
class EventStore: # backed by PostgreSQL schema above
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def append(self, stream_id: str, stream_type: str,
events: list[Event],
expected_version: int | None = None) -> list[Event]:
"""Append events with optimistic concurrency control."""
async with self.pool.acquire() as conn:
async with conn.transaction():
if expected_version is not None:
current = await conn.fetchval(
"SELECT MAX(version) FROM events "
"WHERE stream_id = $1", stream_id
) or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected {expected_version}, got {current}"
)
start = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) + 1 "
"FROM events WHERE stream_id = $1", stream_id
)
for i, evt in enumerate(events):
evt.version = start + i
row = await conn.fetchrow(
"INSERT INTO events (id, stream_id, stream_type, "
"event_type, event_data, metadata, version) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) "
"RETURNING global_position",
evt.event_id, stream_id, stream_type,
evt.event_type, json.dumps(evt.data),
json.dumps(evt.metadata), evt.version,
)
evt.global_position = row["global_position"]
return events
async def read_stream(self, stream_id: str,
from_version: int = 0) -> list[Event]:
"""Read events for a single stream."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM events WHERE stream_id = $1 "
"AND version >= $2 ORDER BY version",
stream_id, from_version,
)
return [self._to_event(r) for r in rows]
async def read_all(self, from_position: int = 0,
limit: int = 1000) -> list[Event]:
"""Read global event stream for projections / subscriptions."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM events WHERE global_position > $1 "
"ORDER BY global_position LIMIT $2",
from_position, limit,
)
return [self._to_event(r) for r in rows]
Projections
Projections build read-optimised views by replaying events. They are the "Q" side of CQRS.
Projection Lifecycle
- Start from checkpoint — resume from last processed global position
- Apply events — update the read model for each relevant event type
- Save checkpoint — persist the new position atomically with the read model
Projection Example
class OrderSummaryProjection:
def __init__(self, db, event_store: EventStore):
self.db = db
self.store = event_store
async def run(self, batch_size: int = 100):
position = await self._load_checkpoint()
while True:
events = await self.store.read_all(position, batch_size)
if not events:
await asyncio.sleep(1)
continue
for evt in events:
await self._apply(evt)
position = evt.global_position
await self._save_checkpoint(position)
async def _apply(self, event: Event):
match event.event_type:
case "OrderPlaced":
await self.db.execute(
"INSERT INTO order_summaries (id, customer, total, status) "
"VALUES ($1,$2,$3,'placed')",
event.data["order_id"], event.data["customer_id"],
event.data["total_cents"],
)
case "OrderShipped":
await self.db.execute(
"UPDATE order_summaries SET status='shipped' "
"WHERE id=$1", event.data["order_id"],
)
Projection Design Rules
- Idempotent handlers — replaying the same event twice must not corrupt state
- One projection per read model — keep projections focused
- Rebuild from scratch — projections should be deletable and fully replayable
- Separate storage — projections can live in different databases (Postgres, Elasticsearch, Redis)
Snapshotting
Snapshots accelerate aggregate rehydration by caching state at a known version.
Use when streams exceed ~100 events, aggregates have expensive rehydration, or on a cadence (e.g., every 50 events).
Snapshot Flow
class SnapshottedRepository:
def __init__(self, event_store: EventStore, pool):
self.store = event_store
self.pool = pool
async def load(self, stream_id: str) -> Aggregate:
# 1. Try loading snapshot
snap = await self._load_snapshot(stream_id)
from_version = 0
aggregate = Aggregate(stream_id)
if snap:
aggregate.restore(snap["data"])
from_version = snap["version"] + 1
# 2. Replay events after snapshot
events = await self.store.read_stream(stream_id, from_version)
for evt in events:
aggregate.apply(evt)
# 3. Snapshot if too many events replayed
if len(events) > 50:
await self._save_snapshot(
stream_id, aggregate.snapshot(), aggregate.version
)
return aggregate
CQRS Integration
CQRS separates the write model (commands → events) from the read model (projections).
Commands ──► Aggregate ──► Event Store ──► Projections ──► Query API
(write) (domain) (append) (build) (read)
Key Principles
- Write side validates commands, emits events, enforces invariants
- Read side subscribes to events, builds optimised query models
- Eventual consistency — reads may lag behind writes by milliseconds to seconds
- Independent scaling — scale reads and writes separately
Command Handler Pattern
class PlaceOrderHandler:
def __init__(self, event_store: EventStore):
self.store = event_store
async def handle(self, cmd: PlaceOrderCommand):
# Load aggregate from events
events = await self.store.read_stream(f"Order-{cmd.order_id}")
order = Order.reconstitute(events)
# Execute command — validates and produces new events
new_events = order.place(cmd.customer_id, cmd.items)
# Persist with concurrency check
await self.store.append(
f"Order-{cmd.order_id}", "Order", new_events,
expected_version=order.version,
)
EventStoreDB Integration
from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
def append_events(stream_name: str, events: list, expected_revision=None):
new_events = [
NewEvent(
type=event['type'],
data=json.dumps(event['data']).encode(),
metadata=json.dumps(event.get('metadata', {})).encode()
)
for event in events
]
state = (StreamState.ANY if expected_revision is None
else StreamState.NO_STREAM if expected_revision == -1
else expected_revision)
return client.append_to_stream(stream_name, new_events, current_version=state)
def read_stream(stream_name: str, from_revision: int = 0):
return [
{'type': e.type, 'data': json.loads(e.data),
'stream_position': e.stream_position}
for e in client.get_stream(stream_name, stream_position=from_revision)
]
# Category projection: read all events for Order-* streams
def read_category(category: str):
return read_stream(f"$ce-{category}")
DynamoDB Event Store
import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json, uuid
class DynamoEventStore:
def __init__(self, table_name: str):
self.table = boto3.resource('dynamodb').Table(table_name)
def append(self, stream_id: str, events: list, expected_version: int = 0):
with self.table.batch_writer() as batch:
for i, event in enumerate(events):
version = expected_version + i + 1
batch.put_item(Item={
'PK': f"STREAM#{stream_id}",
'SK': f"VERSION#{version:020d}",
'GSI1PK': 'EVENTS',
'GSI1SK': datetime.utcnow().isoformat(),
'event_id': str(uuid.uuid4()),
'event_type': event['type'],
'event_data': json.dumps(event['data']),
'version': version,
})
def read_stream(self, stream_id: str, from_version: int = 0):
resp = self.table.query(
KeyConditionExpression=
Key('PK').eq(f"STREAM#{stream_id}") &
Key('SK').gte(f"VERSION#{from_version:020d}")
)
return [
{'event_type': item['event_type'],
'data': json.loads(item['event_data']),
'version': item['version']}
for item in resp['Items']
]
DynamoDB table design: PK=STREAM#{id}, SK=VERSION#{version}, GSI1 for global ordering.
Best Practices
Do
- Name streams
{Type}-{id}— e.g.,Order-abc123 - Include correlation / causation IDs in metadata for tracing
- Version event schemas from day one — plan for evolution
- Implement idempotent writes — use event IDs for deduplication
- Index for your query patterns — stream, global position, event type
Don't
- Mutate or delete events — they are immutable facts
- Store large payloads — keep events small; reference blobs externally
- Skip optimistic concurrency — prevents data corruption
- Ignore backpressure — handle slow consumers gracefully
- Couple projections to the write model — projections should be independently deployable
NEVER Do
- NEVER update or delete events — Events are immutable historical facts; create compensating events instead
- NEVER skip version checks on append — Optimistic concurrency prevents lost updates and corruption
- NEVER embed large blobs in events — Store blobs externally, reference by ID in the event
- NEVER use random UUIDs for event IDs without idempotency checks — Retries create duplicates
- NEVER read projections for command validation — Use the event stream as the source of truth
- NEVER couple projections to the write transaction — Projections must be rebuildable independently
Source
git clone https://github.com/wpank/ai/blob/main/skills/backend/event-store/SKILL.mdView on GitHub Overview
Design and implement event stores for event sourced systems, covering event schemas, projections, snapshotting, and CQRS integration. This skill helps teams plan durable persistence, real time notification streams, and scalable read models.
How This Skill Works
Defines an event envelope and supports per-stream and global ordering with versioned events for optimistic concurrency. It also covers snapshotting, projections, and CQRS integration so you can build efficient read models.
When to Use It
- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Building projections from event streams
- Integrating CQRS with event sourcing
Quick Start
- Step 1: Choose your event store tech (EventStoreDB PostgreSQL Kafka DynamoDB)
- Step 2: Define the event envelope and draft a few sample events
- Step 3: Implement append, read, projection, snapshotting, and optional CQRS integration
Best Practices
- Define a stable event envelope: event_id, stream_id, event_type, version, schema_version, data, metadata, global_position
- Keep append-only immutability and maintain per-stream and global ordering
- Version event types and upcast on read to handle schema evolution
- Design for idempotent writes and safe duplicate handling
- Index and partition events to support projections and read models
Example Use Cases
- Orders domain event streams like OrderPlaced and OrderCancelled
- Payments lifecycle events such as PaymentAuthorized and PaymentCaptured
- Inventory and stock projection events like StockReserved and StockReleased
- User activity streams such as UserSignedUp and UserLoggedIn
- Audit trails and compliance events like CredentialChanged