saga-orchestration
npx machina-cli add skill wshobson/agents/saga-orchestration --openclawSaga Orchestration
Patterns for managing distributed transactions and long-running business processes.
When to Use This Skill
- Coordinating multi-service transactions
- Implementing compensating transactions
- Managing long-running business workflows
- Handling failures in distributed systems
- Building order fulfillment processes
- Implementing approval workflows
Core Concepts
1. Saga Types
Choreography Orchestration
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
Event Event Event ▼ ▼ ▼
┌────┐┌────┐┌────┐
│Svc1││Svc2││Svc3│
└────┘└────┘└────┘
2. Saga Execution States
| State | Description |
|---|---|
| Started | Saga initiated |
| Pending | Waiting for step completion |
| Compensating | Rolling back due to failure |
| Completed | All steps succeeded |
| Failed | Saga failed after compensation |
Templates
Template 1: Saga Orchestrator Base
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid
class SagaState(Enum):
STARTED = "started"
PENDING = "pending"
COMPENSATING = "compensating"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
result: Optional[Dict] = None
error: Optional[str] = None
executed_at: Optional[datetime] = None
compensated_at: Optional[datetime] = None
@dataclass
class Saga:
saga_id: str
saga_type: str
state: SagaState
data: Dict[str, Any]
steps: List[SagaStep]
current_step: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
class SagaOrchestrator(ABC):
"""Base class for saga orchestrators."""
def __init__(self, saga_store, event_publisher):
self.saga_store = saga_store
self.event_publisher = event_publisher
@abstractmethod
def define_steps(self, data: Dict) -> List[SagaStep]:
"""Define the saga steps."""
pass
@property
@abstractmethod
def saga_type(self) -> str:
"""Unique saga type identifier."""
pass
async def start(self, data: Dict) -> Saga:
"""Start a new saga."""
saga = Saga(
saga_id=str(uuid.uuid4()),
saga_type=self.saga_type,
state=SagaState.STARTED,
data=data,
steps=self.define_steps(data)
)
await self.saga_store.save(saga)
await self._execute_next_step(saga)
return saga
async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
"""Handle successful step completion."""
saga = await self.saga_store.get(saga_id)
# Update step
for step in saga.steps:
if step.name == step_name:
step.status = "completed"
step.result = result
step.executed_at = datetime.utcnow()
break
saga.current_step += 1
saga.updated_at = datetime.utcnow()
# Check if saga is complete
if saga.current_step >= len(saga.steps):
saga.state = SagaState.COMPLETED
await self.saga_store.save(saga)
await self._on_saga_completed(saga)
else:
saga.state = SagaState.PENDING
await self.saga_store.save(saga)
await self._execute_next_step(saga)
async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
"""Handle step failure - start compensation."""
saga = await self.saga_store.get(saga_id)
# Mark step as failed
for step in saga.steps:
if step.name == step_name:
step.status = "failed"
step.error = error
break
saga.state = SagaState.COMPENSATING
saga.updated_at = datetime.utcnow()
await self.saga_store.save(saga)
# Start compensation from current step backwards
await self._compensate(saga)
async def _execute_next_step(self, saga: Saga):
"""Execute the next step in the saga."""
if saga.current_step >= len(saga.steps):
return
step = saga.steps[saga.current_step]
step.status = "executing"
await self.saga_store.save(saga)
# Publish command to execute step
await self.event_publisher.publish(
step.action,
{
"saga_id": saga.saga_id,
"step_name": step.name,
**saga.data
}
)
async def _compensate(self, saga: Saga):
"""Execute compensation for completed steps."""
# Compensate in reverse order
for i in range(saga.current_step - 1, -1, -1):
step = saga.steps[i]
if step.status == "completed":
step.status = "compensating"
await self.saga_store.save(saga)
await self.event_publisher.publish(
step.compensation,
{
"saga_id": saga.saga_id,
"step_name": step.name,
"original_result": step.result,
**saga.data
}
)
async def handle_compensation_completed(self, saga_id: str, step_name: str):
"""Handle compensation completion."""
saga = await self.saga_store.get(saga_id)
for step in saga.steps:
if step.name == step_name:
step.status = "compensated"
step.compensated_at = datetime.utcnow()
break
# Check if all compensations complete
all_compensated = all(
s.status in ("compensated", "pending", "failed")
for s in saga.steps
)
if all_compensated:
saga.state = SagaState.FAILED
await self._on_saga_failed(saga)
await self.saga_store.save(saga)
async def _on_saga_completed(self, saga: Saga):
"""Called when saga completes successfully."""
await self.event_publisher.publish(
f"{self.saga_type}Completed",
{"saga_id": saga.saga_id, **saga.data}
)
async def _on_saga_failed(self, saga: Saga):
"""Called when saga fails after compensation."""
await self.event_publisher.publish(
f"{self.saga_type}Failed",
{"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}
)
Template 2: Order Fulfillment Saga
class OrderFulfillmentSaga(SagaOrchestrator):
"""Orchestrates order fulfillment across services."""
@property
def saga_type(self) -> str:
return "OrderFulfillment"
def define_steps(self, data: Dict) -> List[SagaStep]:
return [
SagaStep(
name="reserve_inventory",
action="InventoryService.ReserveItems",
compensation="InventoryService.ReleaseReservation"
),
SagaStep(
name="process_payment",
action="PaymentService.ProcessPayment",
compensation="PaymentService.RefundPayment"
),
SagaStep(
name="create_shipment",
action="ShippingService.CreateShipment",
compensation="ShippingService.CancelShipment"
),
SagaStep(
name="send_confirmation",
action="NotificationService.SendOrderConfirmation",
compensation="NotificationService.SendCancellationNotice"
)
]
# Usage
async def create_order(order_data: Dict):
saga = OrderFulfillmentSaga(saga_store, event_publisher)
return await saga.start({
"order_id": order_data["order_id"],
"customer_id": order_data["customer_id"],
"items": order_data["items"],
"payment_method": order_data["payment_method"],
"shipping_address": order_data["shipping_address"]
})
# Event handlers in each service
class InventoryService:
async def handle_reserve_items(self, command: Dict):
try:
# Reserve inventory
reservation = await self.reserve(
command["items"],
command["order_id"]
)
# Report success
await self.event_publisher.publish(
"SagaStepCompleted",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
}
)
except InsufficientInventoryError as e:
await self.event_publisher.publish(
"SagaStepFailed",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"error": str(e)
}
)
async def handle_release_reservation(self, command: Dict):
# Compensating action
await self.release_reservation(
command["original_result"]["reservation_id"]
)
await self.event_publisher.publish(
"SagaCompensationCompleted",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
}
)
Template 3: Choreography-Based Saga
from dataclasses import dataclass
from typing import Dict, Any
import asyncio
@dataclass
class SagaContext:
"""Passed through choreographed saga events."""
saga_id: str
step: int
data: Dict[str, Any]
completed_steps: list
class OrderChoreographySaga:
"""Choreography-based saga using events."""
def __init__(self, event_bus):
self.event_bus = event_bus
self._register_handlers()
def _register_handlers(self):
self.event_bus.subscribe("OrderCreated", self._on_order_created)
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
# Compensation handlers
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
async def _on_order_created(self, event: Dict):
"""Step 1: Order created, reserve inventory."""
await self.event_bus.publish("ReserveInventory", {
"saga_id": event["order_id"],
"order_id": event["order_id"],
"items": event["items"]
})
async def _on_inventory_reserved(self, event: Dict):
"""Step 2: Inventory reserved, process payment."""
await self.event_bus.publish("ProcessPayment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"amount": event["total_amount"],
"reservation_id": event["reservation_id"]
})
async def _on_payment_processed(self, event: Dict):
"""Step 3: Payment done, create shipment."""
await self.event_bus.publish("CreateShipment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"payment_id": event["payment_id"]
})
async def _on_shipment_created(self, event: Dict):
"""Step 4: Complete - send confirmation."""
await self.event_bus.publish("OrderFulfilled", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"tracking_number": event["tracking_number"]
})
# Compensation handlers
async def _on_payment_failed(self, event: Dict):
"""Payment failed - release inventory."""
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"]
})
await self.event_bus.publish("OrderFailed", {
"order_id": event["order_id"],
"reason": "Payment failed"
})
async def _on_shipment_failed(self, event: Dict):
"""Shipment failed - refund payment and release inventory."""
await self.event_bus.publish("RefundPayment", {
"saga_id": event["saga_id"],
"payment_id": event["payment_id"]
})
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"]
})
Template 4: Saga with Timeouts
class TimeoutSagaOrchestrator(SagaOrchestrator):
"""Saga orchestrator with step timeouts."""
def __init__(self, saga_store, event_publisher, scheduler):
super().__init__(saga_store, event_publisher)
self.scheduler = scheduler
async def _execute_next_step(self, saga: Saga):
if saga.current_step >= len(saga.steps):
return
step = saga.steps[saga.current_step]
step.status = "executing"
step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
await self.saga_store.save(saga)
# Schedule timeout check
await self.scheduler.schedule(
f"saga_timeout_{saga.saga_id}_{step.name}",
self._check_timeout,
{"saga_id": saga.saga_id, "step_name": step.name},
run_at=step.timeout_at
)
await self.event_publisher.publish(
step.action,
{"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
)
async def _check_timeout(self, data: Dict):
"""Check if step has timed out."""
saga = await self.saga_store.get(data["saga_id"])
step = next(s for s in saga.steps if s.name == data["step_name"])
if step.status == "executing":
# Step timed out - fail it
await self.handle_step_failed(
data["saga_id"],
data["step_name"],
"Step timed out"
)
Best Practices
Do's
- Make steps idempotent - Safe to retry
- Design compensations carefully - They must work
- Use correlation IDs - For tracing across services
- Implement timeouts - Don't wait forever
- Log everything - For debugging failures
Don'ts
- Don't assume instant completion - Sagas take time
- Don't skip compensation testing - Most critical part
- Don't couple services - Use async messaging
- Don't ignore partial failures - Handle gracefully
Resources
Source
git clone https://github.com/wshobson/agents/blob/main/plugins/backend-development/skills/saga-orchestration/SKILL.mdView on GitHub Overview
Saga orchestration provides patterns to manage distributed transactions and long-running business processes across multiple services. It supports both choreography and orchestration approaches for coordinating steps and compensating actions when failures occur. This approach is essential for complex processes like order fulfillment and approval workflows.
How This Skill Works
Sagas define a sequence of steps, each with a corresponding compensation in case of failure. An orchestrator or a choreography pattern coordinates step execution and monitors outcomes, triggering compensations for completed steps if a later step fails. The provided template shows a Python-based SagaOrchestratorBase that models saga state, steps, and persistence to manage distributed workflows.
When to Use It
- Coordinating multi-service transactions
- Implementing compensating transactions
- Managing long-running business workflows
- Handling failures in distributed systems
- Building order fulfillment processes
Quick Start
- Step 1: Create SagaStep definitions and an abstract SagaOrchestratorBase to model steps, status, and compensations
- Step 2: Implement define_steps(data) and a concrete saga_type to describe the workflow; start the saga with initial data
- Step 3: Listen for step_completed events, advance the saga, and trigger compensations if a step fails
Best Practices
- Choose between choreography and orchestration based on coupling and visibility needs
- Clearly define each step's action and its compensating action
- Design idempotent steps to safely handle retries
- Persist saga state and use durable storage for reliability
- Test failure scenarios and rollback paths across all involved services
Example Use Cases
- Order fulfillment across inventory, payment, and shipping services with compensating steps on failure
- Approval-based user onboarding workflows with multi-step validations
- Cross-service funds transfer with reversals if any step fails
- Travel booking that coordinates flight, hotel, and car rental with rollback logic
- Loan approval workflow with cross-service checks and approvals