Databases
npx machina-cli add skill muhammederem/chief/databases --openclawFiles (1)
SKILL.md
12.0 KB
Database Integration Patterns
Overview
Database integration patterns for PostgreSQL (relational) and MongoDB (NoSQL) with Python, including ORM usage, migrations, and best practices.
PostgreSQL with SQLAlchemy
Connection Setup
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base
# Database URL
SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
# Create engine
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_pre_ping=True, # Verify connections
pool_size=10,
max_overflow=20
)
# Session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base class for models
Base = declarative_base()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Model Definitions
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, Numeric
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
name = Column(String, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
def __repr__(self):
return f"<User {self.email}>"
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
content = Column(Text)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
published = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
author = relationship("User", back_populates="posts")
def __repr__(self):
return f"<Post {self.title}>"
Query Patterns
from sqlalchemy.orm import Session
from typing import List, Optional
# Basic queries
def get_user(db: Session, user_id: int) -> Optional[User]:
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
return db.query(User).offset(skip).limit(limit).all()
# With filters
def get_active_users(db: Session) -> List[User]:
return db.query(User).filter(User.is_active == True).all()
# Complex queries
def get_posts_with_authors(db: Session) -> List[Post]:
return db.query(Post).join(User).filter(User.is_active == True).all()
# Sorting
def get_users_sorted(db: Session) -> List[User]:
return db.query(User).order_by(User.created_at.desc()).all()
# Count
def count_users(db: Session) -> int:
return db.query(User).count()
CRUD Operations
# Create
def create_user(db: Session, user_data: dict) -> User:
db_user = User(**user_data)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# Update
def update_user(db: Session, user_id: int, user_data: dict) -> Optional[User]:
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
for field, value in user_data.items():
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user
# Delete
def delete_user(db: Session, user_id: int) -> bool:
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
db.delete(db_user)
db.commit()
return True
return False
Relationships and Joins
# Eager loading
def get_posts_with_author(db: Session):
return db.query(Post).options(joinedload(Post.author)).all()
# Filter relationships
def get_user_posts(db: Session, user_id: int):
return db.query(Post).filter(Post.author_id == user_id).all()
# Create with relationships
def create_user_with_post(db: Session, user_data: dict, post_data: dict):
user = User(**user_data)
post = Post(**post_data, author=user)
db.add(user)
db.add(post)
db.commit()
db.refresh(user)
return user
Transactions
from sqlalchemy.exc import IntegrityError
def transfer_points(db: Session, from_id: int, to_id: int, amount: int):
try:
# Start transaction
db.begin()
# Get users
from_user = db.query(User).filter(User.id == from_id).with_for_update().first()
to_user = db.query(User).filter(User.id == to_id).with_for_update().first()
# Transfer
from_user.points -= amount
to_user.points += amount
db.commit()
return True
except IntegrityError:
db.rollback()
return False
MongoDB with Motor
Async Connection
from motor.motor_asyncio import AsyncIOMotorClient
from typing import Optional
class MongoDB:
client: AsyncIOMotorClient = None
def connect(self, uri: str):
self.client = AsyncIOMotorClient(uri)
def close(self):
if self.client:
self.client.close()
def get_db(self, db_name: str):
return self.client[db_name]
# Dependency
async def get_db():
db = mongo.get_db("mydb")
try:
yield db
finally:
pass
# Model
class User:
def __init__(self, **kwargs):
self.id = str(kwargs.get("_id"))
self.email = kwargs.get("email")
self.name = kwargs.get("name")
self.hashed_password = kwargs.get("hashed_password")
self.is_active = kwargs.get("is_active", True)
def to_dict(self):
return {
"email": self.email,
"name": self.name,
"hashed_password": self.hashed_password,
"is_active": self.is_active,
}
CRUD Operations
from bson import ObjectId
from datetime import datetime
# Create
async def create_user(db, user_data: dict) -> User:
user_data["created_at"] = datetime.utcnow()
result = await db.users.insert_one(user_data)
user_data["_id"] = result.inserted_id
return User(**user_data)
# Read
async def get_user(db: str, user_id: str) -> Optional[User]:
user = await db.users.find_one({"_id": ObjectId(user_id)})
return User(**user) if user else None
async def get_user_by_email(db: str, email: str) -> Optional[User]:
user = await db.users.find_one({"email": email})
return User(**user) if user else None
async def get_users(db: str, skip: int = 0, limit: int = 100) -> list[User]:
cursor = db.users.find().skip(skip).limit(limit)
users = await cursor.to_list(length=limit)
return [User(**user) for user in users]
# Update
async def update_user(db: str, user_id: str, user_data: dict) -> Optional[User]:
user_data["updated_at"] = datetime.utcnow()
result = await db.users.update_one(
{"_id": ObjectId(user_id)},
{"$set": user_data}
)
if result.modified_count:
return await get_user(db, user_id)
return None
# Delete
async def delete_user(db: str, user_id: str) -> bool:
result = await db.users.delete_one({"_id": ObjectId(user_id)})
return result.deleted_count > 0
Advanced Queries
# Aggregation pipeline
async def get_user_stats(db: str):
pipeline = [
{"$group": {
"_id": "$is_active",
"count": {"$sum": 1}
}}
]
results = await db.users.aggregate(pipeline).to_list(None)
return results
# Text search
async def search_users(db: str, query: str):
results = await db.users.find(
{"$text": {"$search": query}}
).to_list(None)
return [User(**user) for user in results]
# Complex filters
async def get_users_by_criteria(
db: str,
is_active: bool = None,
min_date: datetime = None
):
query = {}
if is_active is not None:
query["is_active"] = is_active
if min_date:
query["created_at"] = {"$gte": min_date}
cursor = db.users.find(query)
users = await cursor.to_list(None)
return [User(**user) for user in users]
Indexes
async def create_indexes(db: str):
# Single field index
await db.users.create_index("email", unique=True)
# Compound index
await db.users.create_index([("email", 1), ("is_active", 1)])
# Text index
await db.users.create_index([("name", "text"), ("email", "text")])
# TTL index
await db.sessions.create_index("created_at", expireAfterSeconds=3600)
Migrations with Alembic (PostgreSQL)
Setup
alembic init alembic
Configuration
# alembic/env.py
from sqlalchemy import engine_from_config
from alembic import context
# Import Base and models
from app.db.database import Base
from app.models import user, post # Import all models
target_metadata = Base.metadata
Create Migration
alembic revision --autogenerate -m "Add users table"
Run Migration
alembic upgrade head
Rollback
alembic downgrade -1
Redis Integration
Caching with Redis
import redis
import json
from typing import Any, Optional
class RedisCache:
def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
def get(self, key: str) -> Optional[Any]:
value = self.client.get(key)
return json.loads(value) if value else None
def set(self, key: str, value: Any, ttl: int = 3600):
self.client.setex(key, ttl, json.dumps(value))
def delete(self, key: str):
self.client.delete(key)
def exists(self, key: str) -> bool:
return self.client.exists(key) > 0
# Usage
cache = RedisCache()
def get_user_cached(user_id: int):
cache_key = f"user:{user_id}"
# Try cache
cached_user = cache.get(cache_key)
if cached_user:
return cached_user
# Query database
user = db.query(User).filter(User.id == user_id).first()
# Cache result
if user:
cache.set(cache_key, user.to_dict())
return user
Best Practices
1. Connection Management
- Use connection pooling
- Set appropriate pool sizes
- Use dependency injection
- Always close connections
2. Query Optimization
- Use indexes on filter columns
- Avoid N+1 queries (use eager loading)
- Use pagination
- Select only needed columns
3. Transactions
- Keep transactions short
- Use explicit transactions for multi-step operations
- Handle errors and rollback
- Use row-level locking when needed
4. Data Validation
- Validate at application layer (Pydantic)
- Use database constraints
- Sanitize inputs
- Handle edge cases
5. Security
- Use parameterized queries
- Hash sensitive data (passwords)
- Use environment variables for credentials
- Implement proper access controls
6. Performance
- Use prepared statements
- Batch operations when possible
- Cache frequently accessed data
- Monitor query performance
- Use read replicas for scaling reads
Common Patterns
Repository Pattern
class UserRepository:
def __init__(self, db: Session):
self.db = db
def get(self, id: int) -> Optional[User]:
return self.db.query(User).filter(User.id == id).first()
def create(self, user_data: dict) -> User:
user = User(**user_data)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
Unit of Work Pattern
class UnitOfWork:
def __init__(self, db: Session):
self.db = db
self.users = UserRepository(db)
def commit(self):
self.db.commit()
def rollback(self):
self.db.rollback()
Integration
- FastAPI: SQLAlchemy/Motor integration
- Alembic: Database migrations
- Redis: Caching and sessions
- Docker: Local development databases
Source
git clone https://github.com/muhammederem/chief/blob/main/.claude/skills/backend/databases/SKILL.mdView on GitHub Overview
Database integration patterns for PostgreSQL (relational) and MongoDB (NoSQL) with Python, focusing on ORM usage, migrations, and best practices. This guide demonstrates practical SQLAlchemy setup, model definitions, query patterns, and CRUD workflows to form robust data layers.
How This Skill Works
The pattern shows creating a SQLAlchemy engine with pooling, a session factory, and a declarative Base. It defines ORM models (User and Post) with relationships, and provides reusable query and CRUD helpers that orchestrate sessions, commits, and refreshes.
When to Use It
- Building a Python app with PostgreSQL as the relational backend using SQLAlchemy ORM
- Need efficient DB connections with pooling (pool_pre_ping, pool_size, max_overflow)
- Implementing common CRUD operations for User and Post entities
- Modeling relationships (User has many Posts) and performing joined queries
- Planning migrations and evolving schemas with ORM-based workflows
Quick Start
- Step 1: Configure the database URL and create an engine with pooling settings
- Step 2: Define ORM models (User, Post) and a Base, then create a session factory
- Step 3: Use provided query and CRUD helpers to interact with the database
Best Practices
- Configure a robust SQLAlchemy engine with pool_pre_ping, pool_size, and max_overflow to keep connections healthy
- Use a declarative Base and clearly named models with appropriate indexes and constraints (e.g., unique email)
- Define relationships with back_populates and cascade options (e.g., delete-orphan) for integrity
- Store timestamps with server_default=now() and update timestamps on changes to support auditing
- Encapsulate DB access in reusable CRUD/query helpers and manage session lifecycle (open, commit, refresh, close)
Example Use Cases
- Define User and Post models with a one-to-many relationship and foreign keys
- Create a new user and a related post in a single operation
- Query active users and fetch their posts with a join
- Count total users or posts to monitor growth
- Update user attributes and persist changes with commit and refresh
Frequently Asked Questions
Add this skill to your agents