Get the FREE Ultimate OpenClaw Setup Guide →

Data Pipeline

Scanned
npx machina-cli add skill samarth777/modal-skills/data-pipeline --openclaw
Files (1)
SKILL.md
7.2 KB

Data Processing Pipeline Example

A complete example of a scalable data processing pipeline on Modal.

import modal
from datetime import datetime

# --- Image Definition ---
image = (
    modal.Image.debian_slim(python_version="3.12")
    .pip_install(
        "pandas==2.2.0",
        "pyarrow==15.0.0",
        "polars==0.20.0",
        "duckdb==0.10.0",
        "boto3",
    )
)

app = modal.App("data-pipeline", image=image)

# --- Volumes ---
raw_data = modal.Volume.from_name("raw-data", create_if_missing=True)
processed_data = modal.Volume.from_name("processed-data", create_if_missing=True)

RAW_PATH = "/raw"
PROCESSED_PATH = "/processed"

# --- Extract Stage ---
@app.function(
    volumes={RAW_PATH: raw_data},
    secrets=[modal.Secret.from_name("aws-credentials")],
    timeout=3600,
    memory=8192,
)
def extract_from_s3(
    bucket: str,
    prefix: str,
    date: str,
) -> list[str]:
    """Download files from S3 to Modal volume."""
    import boto3
    import os
    
    s3 = boto3.client("s3")
    
    # List objects
    response = s3.list_objects_v2(Bucket=bucket, Prefix=f"{prefix}/{date}")
    files = [obj["Key"] for obj in response.get("Contents", [])]
    
    downloaded = []
    for key in files:
        local_path = f"{RAW_PATH}/{key}"
        os.makedirs(os.path.dirname(local_path), exist_ok=True)
        s3.download_file(bucket, key, local_path)
        downloaded.append(local_path)
    
    raw_data.commit()
    return downloaded

# --- Transform Stage ---
@app.function(
    volumes={
        RAW_PATH: raw_data,
        PROCESSED_PATH: processed_data,
    },
    memory=16384,
    cpu=4,
)
def transform_file(input_path: str) -> str:
    """Transform a single file using Polars."""
    import polars as pl
    import os
    
    # Read raw data
    df = pl.read_parquet(input_path)
    
    # Apply transformations
    df = (
        df
        .filter(pl.col("status") == "active")
        .with_columns([
            pl.col("timestamp").cast(pl.Datetime),
            pl.col("amount").cast(pl.Float64),
            (pl.col("amount") * pl.col("quantity")).alias("total"),
        ])
        .drop_nulls()
    )
    
    # Write output
    output_path = input_path.replace(RAW_PATH, PROCESSED_PATH)
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    df.write_parquet(output_path)
    
    processed_data.commit()
    return output_path

# --- Aggregate Stage ---
@app.function(
    volumes={PROCESSED_PATH: processed_data},
    memory=32768,
    cpu=8,
)
def aggregate_data(file_paths: list[str]) -> dict:
    """Aggregate processed files using DuckDB."""
    import duckdb
    
    # Connect to DuckDB
    con = duckdb.connect()
    
    # Register all files as a view
    file_pattern = f"{PROCESSED_PATH}/**/*.parquet"
    
    # Run aggregation query
    result = con.execute(f"""
        SELECT
            date_trunc('day', timestamp) as date,
            COUNT(*) as count,
            SUM(total) as total_amount,
            AVG(total) as avg_amount
        FROM read_parquet('{file_pattern}')
        GROUP BY 1
        ORDER BY 1
    """).fetchdf()
    
    # Save summary
    summary_path = f"{PROCESSED_PATH}/summary/daily_summary.parquet"
    result.to_parquet(summary_path)
    processed_data.commit()
    
    return {
        "rows_processed": int(result["count"].sum()),
        "total_amount": float(result["total_amount"].sum()),
        "date_range": [
            result["date"].min().isoformat(),
            result["date"].max().isoformat(),
        ],
    }

# --- Load Stage ---
@app.function(
    volumes={PROCESSED_PATH: processed_data},
    secrets=[modal.Secret.from_name("database-credentials")],
)
def load_to_database(summary_path: str) -> int:
    """Load summary data to database."""
    import pandas as pd
    import os
    # from sqlalchemy import create_engine
    
    df = pd.read_parquet(summary_path)
    
    # Load to database
    # engine = create_engine(os.environ["DATABASE_URL"])
    # df.to_sql("daily_summary", engine, if_exists="append", index=False)
    
    print(f"Loaded {len(df)} rows to database")
    return len(df)

# --- Pipeline Orchestrator ---
@app.function(timeout=7200)
def run_pipeline(
    bucket: str,
    prefix: str,
    date: str,
) -> dict:
    """Run the full ETL pipeline."""
    from datetime import datetime
    
    start_time = datetime.now()
    
    # Extract
    print("Starting extraction...")
    raw_files = extract_from_s3.remote(bucket, prefix, date)
    print(f"Extracted {len(raw_files)} files")
    
    # Transform in parallel
    print("Starting transformation...")
    processed_files = list(transform_file.map(raw_files))
    print(f"Transformed {len(processed_files)} files")
    
    # Aggregate
    print("Starting aggregation...")
    summary = aggregate_data.remote(processed_files)
    print(f"Aggregation complete: {summary}")
    
    # Load
    print("Loading to database...")
    summary_path = f"{PROCESSED_PATH}/summary/daily_summary.parquet"
    rows_loaded = load_to_database.remote(summary_path)
    
    duration = (datetime.now() - start_time).total_seconds()
    
    return {
        "status": "success",
        "files_processed": len(raw_files),
        "rows_loaded": rows_loaded,
        "summary": summary,
        "duration_seconds": duration,
    }

# --- Scheduled Job ---
@app.function(schedule=modal.Cron("0 6 * * *"))  # 6 AM daily
def daily_pipeline():
    """Run pipeline daily for yesterday's data."""
    from datetime import datetime, timedelta
    
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    
    result = run_pipeline.remote(
        bucket="my-data-bucket",
        prefix="events",
        date=yesterday,
    )
    
    print(f"Daily pipeline complete: {result}")
    return result

# --- Web Trigger ---
@app.function()
@modal.fastapi_endpoint(method="POST", requires_proxy_auth=True)
def trigger_pipeline(body: dict) -> dict:
    """Trigger pipeline via API."""
    call = run_pipeline.spawn(
        bucket=body["bucket"],
        prefix=body["prefix"],
        date=body["date"],
    )
    
    return {"call_id": call.object_id, "status": "started"}

@app.function()
@modal.fastapi_endpoint(method="GET", requires_proxy_auth=True)
def get_pipeline_status(call_id: str) -> dict:
    """Check pipeline status."""
    call = modal.FunctionCall.from_id(call_id)
    
    try:
        result = call.get(timeout=0)
        return {"status": "completed", "result": result}
    except TimeoutError:
        return {"status": "running"}
    except Exception as e:
        return {"status": "failed", "error": str(e)}

# --- CLI ---
@app.local_entrypoint()
def main(
    bucket: str = "my-data-bucket",
    prefix: str = "events",
    date: str = "2024-01-01",
):
    result = run_pipeline.remote(bucket, prefix, date)
    print(f"Pipeline complete: {result}")

Usage

# Run pipeline manually
modal run data_pipeline.py --bucket my-bucket --prefix events --date 2024-01-15

# Deploy with scheduled job
modal deploy data_pipeline.py

# Trigger via API
curl -X POST https://your-workspace--data-pipeline-trigger-pipeline.modal.run \
  -H "Modal-Key: $TOKEN_ID" \
  -H "Modal-Secret: $TOKEN_SECRET" \
  -H "Content-Type: application/json" \
  -d '{"bucket": "my-bucket", "prefix": "events", "date": "2024-01-15"}'

Source

git clone https://github.com/samarth777/modal-skills/blob/main/skills/data-pipeline/SKILL.mdView on GitHub

Overview

An end-to-end, scalable data processing pipeline implemented on Modal. It shows how to ingest raw data from S3 into a mounted raw data volume, transform with Polars, aggregate with DuckDB, and load a daily summary to a database. The setup highlights staged functions, secret management, and persistent volumes.

How This Skill Works

Technically, it builds a Modal App with an image containing pandas, pyarrow, polars, duckdb, and boto3. Each stage is a Modal function that binds to raw and processed volumes and uses secrets for AWS and database access. The pipeline flows from extract_from_s3 to transform_file to aggregate_data and finally load_to_database, committing volumes along the way.

When to Use It

  • Ingest daily data from S3 into a structured pipeline and generate a daily summary.
  • Need fast, in-memory transforms on parquet data using Polars with clean type casting.
  • Aggregate processed data across multiple parquet files using DuckDB.
  • Load summary metrics into a database while keeping credentials secure with Modal Secrets.
  • Build a reproducible, end-to-end data pipeline on a containerized Modal App with persistent volumes.

Quick Start

  1. Step 1: Create a Modal image with pandas, pyarrow, polars, duckdb, and boto3 installed at exact versions.
  2. Step 2: Define RAW_PATH and PROCESSED_PATH volumes and implement extract_from_s3, transform_file, aggregate_data, and load_to_database functions.
  3. Step 3: Run the Modal app (app = modal.App(...)) and trigger the stages to process data from S3 to the database.

Best Practices

  • Use separate volumes for raw and processed data and commit after each stage.
  • Pin exact library versions (pandas, pyarrow, polars, duckdb) to ensure reproducibility.
  • Structure the pipeline into distinct extract, transform, aggregate, and load stages for clarity and retries.
  • Store credentials in Modal Secrets rather than hard-coding them in code.
  • Test with small datasets locally and scale the app resources (memory/CPU) for bigger runs.

Example Use Cases

  • Ingest daily logs from S3 into RAW_PATH, then transform and filter by active status.
  • Cast and derive new columns (timestamp, amount, total) using Polars during transform.
  • Use DuckDB to read all processed parquet files and compute daily aggregates (count, sum, avg).
  • Write the daily summary to PROCESSED_PATH and generate a summary parquet for loading.
  • Load the summary into a database using database-credentials secret.

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers