Get the FREE Ultimate OpenClaw Setup Guide →

Databases

npx machina-cli add skill muhammederem/chief/databases --openclaw
Files (1)
SKILL.md
12.0 KB

Database Integration Patterns

Overview

Database integration patterns for PostgreSQL (relational) and MongoDB (NoSQL) with Python, including ORM usage, migrations, and best practices.

PostgreSQL with SQLAlchemy

Connection Setup

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base

# Database URL
SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"

# Create engine
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    pool_pre_ping=True,  # Verify connections
    pool_size=10,
    max_overflow=20
)

# Session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Base class for models
Base = declarative_base()

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Model Definitions

from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, Numeric
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    name = Column(String, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

    # Relationships
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")

    def __repr__(self):
        return f"<User {self.email}>"

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, nullable=False)
    content = Column(Text)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    published = Column(Boolean, default=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

    # Relationships
    author = relationship("User", back_populates="posts")

    def __repr__(self):
        return f"<Post {self.title}>"

Query Patterns

from sqlalchemy.orm import Session
from typing import List, Optional

# Basic queries
def get_user(db: Session, user_id: int) -> Optional[User]:
    return db.query(User).filter(User.id == user_id).first()

def get_user_by_email(db: Session, email: str) -> Optional[User]:
    return db.query(User).filter(User.email == email).first()

def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
    return db.query(User).offset(skip).limit(limit).all()

# With filters
def get_active_users(db: Session) -> List[User]:
    return db.query(User).filter(User.is_active == True).all()

# Complex queries
def get_posts_with_authors(db: Session) -> List[Post]:
    return db.query(Post).join(User).filter(User.is_active == True).all()

# Sorting
def get_users_sorted(db: Session) -> List[User]:
    return db.query(User).order_by(User.created_at.desc()).all()

# Count
def count_users(db: Session) -> int:
    return db.query(User).count()

CRUD Operations

# Create
def create_user(db: Session, user_data: dict) -> User:
    db_user = User(**user_data)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

# Update
def update_user(db: Session, user_id: int, user_data: dict) -> Optional[User]:
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user:
        for field, value in user_data.items():
            setattr(db_user, field, value)
        db.commit()
        db.refresh(db_user)
    return db_user

# Delete
def delete_user(db: Session, user_id: int) -> bool:
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user:
        db.delete(db_user)
        db.commit()
        return True
    return False

Relationships and Joins

# Eager loading
def get_posts_with_author(db: Session):
    return db.query(Post).options(joinedload(Post.author)).all()

# Filter relationships
def get_user_posts(db: Session, user_id: int):
    return db.query(Post).filter(Post.author_id == user_id).all()

# Create with relationships
def create_user_with_post(db: Session, user_data: dict, post_data: dict):
    user = User(**user_data)
    post = Post(**post_data, author=user)
    db.add(user)
    db.add(post)
    db.commit()
    db.refresh(user)
    return user

Transactions

from sqlalchemy.exc import IntegrityError

def transfer_points(db: Session, from_id: int, to_id: int, amount: int):
    try:
        # Start transaction
        db.begin()

        # Get users
        from_user = db.query(User).filter(User.id == from_id).with_for_update().first()
        to_user = db.query(User).filter(User.id == to_id).with_for_update().first()

        # Transfer
        from_user.points -= amount
        to_user.points += amount

        db.commit()
        return True

    except IntegrityError:
        db.rollback()
        return False

MongoDB with Motor

Async Connection

from motor.motor_asyncio import AsyncIOMotorClient
from typing import Optional

class MongoDB:
    client: AsyncIOMotorClient = None

    def connect(self, uri: str):
        self.client = AsyncIOMotorClient(uri)

    def close(self):
        if self.client:
            self.client.close()

    def get_db(self, db_name: str):
        return self.client[db_name]

# Dependency
async def get_db():
    db = mongo.get_db("mydb")
    try:
        yield db
    finally:
        pass

# Model
class User:
    def __init__(self, **kwargs):
        self.id = str(kwargs.get("_id"))
        self.email = kwargs.get("email")
        self.name = kwargs.get("name")
        self.hashed_password = kwargs.get("hashed_password")
        self.is_active = kwargs.get("is_active", True)

    def to_dict(self):
        return {
            "email": self.email,
            "name": self.name,
            "hashed_password": self.hashed_password,
            "is_active": self.is_active,
        }

CRUD Operations

from bson import ObjectId
from datetime import datetime

# Create
async def create_user(db, user_data: dict) -> User:
    user_data["created_at"] = datetime.utcnow()
    result = await db.users.insert_one(user_data)
    user_data["_id"] = result.inserted_id
    return User(**user_data)

# Read
async def get_user(db: str, user_id: str) -> Optional[User]:
    user = await db.users.find_one({"_id": ObjectId(user_id)})
    return User(**user) if user else None

async def get_user_by_email(db: str, email: str) -> Optional[User]:
    user = await db.users.find_one({"email": email})
    return User(**user) if user else None

async def get_users(db: str, skip: int = 0, limit: int = 100) -> list[User]:
    cursor = db.users.find().skip(skip).limit(limit)
    users = await cursor.to_list(length=limit)
    return [User(**user) for user in users]

# Update
async def update_user(db: str, user_id: str, user_data: dict) -> Optional[User]:
    user_data["updated_at"] = datetime.utcnow()
    result = await db.users.update_one(
        {"_id": ObjectId(user_id)},
        {"$set": user_data}
    )
    if result.modified_count:
        return await get_user(db, user_id)
    return None

# Delete
async def delete_user(db: str, user_id: str) -> bool:
    result = await db.users.delete_one({"_id": ObjectId(user_id)})
    return result.deleted_count > 0

Advanced Queries

# Aggregation pipeline
async def get_user_stats(db: str):
    pipeline = [
        {"$group": {
            "_id": "$is_active",
            "count": {"$sum": 1}
        }}
    ]
    results = await db.users.aggregate(pipeline).to_list(None)
    return results

# Text search
async def search_users(db: str, query: str):
    results = await db.users.find(
        {"$text": {"$search": query}}
    ).to_list(None)
    return [User(**user) for user in results]

# Complex filters
async def get_users_by_criteria(
    db: str,
    is_active: bool = None,
    min_date: datetime = None
):
    query = {}
    if is_active is not None:
        query["is_active"] = is_active
    if min_date:
        query["created_at"] = {"$gte": min_date}

    cursor = db.users.find(query)
    users = await cursor.to_list(None)
    return [User(**user) for user in users]

Indexes

async def create_indexes(db: str):
    # Single field index
    await db.users.create_index("email", unique=True)

    # Compound index
    await db.users.create_index([("email", 1), ("is_active", 1)])

    # Text index
    await db.users.create_index([("name", "text"), ("email", "text")])

    # TTL index
    await db.sessions.create_index("created_at", expireAfterSeconds=3600)

Migrations with Alembic (PostgreSQL)

Setup

alembic init alembic

Configuration

# alembic/env.py
from sqlalchemy import engine_from_config
from alembic import context

# Import Base and models
from app.db.database import Base
from app.models import user, post  # Import all models

target_metadata = Base.metadata

Create Migration

alembic revision --autogenerate -m "Add users table"

Run Migration

alembic upgrade head

Rollback

alembic downgrade -1

Redis Integration

Caching with Redis

import redis
import json
from typing import Any, Optional

class RedisCache:
    def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
        self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)

    def get(self, key: str) -> Optional[Any]:
        value = self.client.get(key)
        return json.loads(value) if value else None

    def set(self, key: str, value: Any, ttl: int = 3600):
        self.client.setex(key, ttl, json.dumps(value))

    def delete(self, key: str):
        self.client.delete(key)

    def exists(self, key: str) -> bool:
        return self.client.exists(key) > 0

# Usage
cache = RedisCache()

def get_user_cached(user_id: int):
    cache_key = f"user:{user_id}"

    # Try cache
    cached_user = cache.get(cache_key)
    if cached_user:
        return cached_user

    # Query database
    user = db.query(User).filter(User.id == user_id).first()

    # Cache result
    if user:
        cache.set(cache_key, user.to_dict())

    return user

Best Practices

1. Connection Management

  • Use connection pooling
  • Set appropriate pool sizes
  • Use dependency injection
  • Always close connections

2. Query Optimization

  • Use indexes on filter columns
  • Avoid N+1 queries (use eager loading)
  • Use pagination
  • Select only needed columns

3. Transactions

  • Keep transactions short
  • Use explicit transactions for multi-step operations
  • Handle errors and rollback
  • Use row-level locking when needed

4. Data Validation

  • Validate at application layer (Pydantic)
  • Use database constraints
  • Sanitize inputs
  • Handle edge cases

5. Security

  • Use parameterized queries
  • Hash sensitive data (passwords)
  • Use environment variables for credentials
  • Implement proper access controls

6. Performance

  • Use prepared statements
  • Batch operations when possible
  • Cache frequently accessed data
  • Monitor query performance
  • Use read replicas for scaling reads

Common Patterns

Repository Pattern

class UserRepository:
    def __init__(self, db: Session):
        self.db = db

    def get(self, id: int) -> Optional[User]:
        return self.db.query(User).filter(User.id == id).first()

    def create(self, user_data: dict) -> User:
        user = User(**user_data)
        self.db.add(user)
        self.db.commit()
        self.db.refresh(user)
        return user

Unit of Work Pattern

class UnitOfWork:
    def __init__(self, db: Session):
        self.db = db
        self.users = UserRepository(db)

    def commit(self):
        self.db.commit()

    def rollback(self):
        self.db.rollback()

Integration

  • FastAPI: SQLAlchemy/Motor integration
  • Alembic: Database migrations
  • Redis: Caching and sessions
  • Docker: Local development databases

Source

git clone https://github.com/muhammederem/chief/blob/main/.claude/skills/backend/databases/SKILL.mdView on GitHub

Overview

Database integration patterns for PostgreSQL (relational) and MongoDB (NoSQL) with Python, focusing on ORM usage, migrations, and best practices. This guide demonstrates practical SQLAlchemy setup, model definitions, query patterns, and CRUD workflows to form robust data layers.

How This Skill Works

The pattern shows creating a SQLAlchemy engine with pooling, a session factory, and a declarative Base. It defines ORM models (User and Post) with relationships, and provides reusable query and CRUD helpers that orchestrate sessions, commits, and refreshes.

When to Use It

  • Building a Python app with PostgreSQL as the relational backend using SQLAlchemy ORM
  • Need efficient DB connections with pooling (pool_pre_ping, pool_size, max_overflow)
  • Implementing common CRUD operations for User and Post entities
  • Modeling relationships (User has many Posts) and performing joined queries
  • Planning migrations and evolving schemas with ORM-based workflows

Quick Start

  1. Step 1: Configure the database URL and create an engine with pooling settings
  2. Step 2: Define ORM models (User, Post) and a Base, then create a session factory
  3. Step 3: Use provided query and CRUD helpers to interact with the database

Best Practices

  • Configure a robust SQLAlchemy engine with pool_pre_ping, pool_size, and max_overflow to keep connections healthy
  • Use a declarative Base and clearly named models with appropriate indexes and constraints (e.g., unique email)
  • Define relationships with back_populates and cascade options (e.g., delete-orphan) for integrity
  • Store timestamps with server_default=now() and update timestamps on changes to support auditing
  • Encapsulate DB access in reusable CRUD/query helpers and manage session lifecycle (open, commit, refresh, close)

Example Use Cases

  • Define User and Post models with a one-to-many relationship and foreign keys
  • Create a new user and a related post in a single operation
  • Query active users and fetch their posts with a join
  • Count total users or posts to monitor growth
  • Update user attributes and persist changes with commit and refresh

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers