Get the FREE Ultimate OpenClaw Setup Guide →

ai-pipeline-orchestration

npx machina-cli add skill BagelHole/DevOps-Security-Agent-Skills/ai-pipeline-orchestration --openclaw
Files (1)
SKILL.md
9.0 KB

AI Pipeline Orchestration

Build reliable, observable AI workflows — from document ingestion to batch inference to model training pipelines.

When to Use This Skill

Use this skill when:

  • Scheduling recurring RAG document ingestion and re-indexing
  • Orchestrating multi-step batch LLM processing workflows
  • Running nightly model evaluation and fine-tuning jobs
  • Building ETL pipelines that feed into AI models
  • Managing dependencies between data preparation and model serving

Tool Selection

ToolBest ForComplexityGPU Jobs
PrefectModern Python-first; easy to adoptLowGood
AirflowComplex DAGs; large teams; existing usageHighGood
DagsterAsset-centric; strong data lineageMediumExcellent
TemporalLong-running workflows; reliability-firstMediumGood

Prefect — Quick Start

pip install prefect prefect-kubernetes

# Start Prefect server (or use Prefect Cloud)
prefect server start

# In another terminal
prefect worker start --pool default-agent-pool

Prefect: RAG Ingestion Pipeline

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def fetch_documents(source_url: str) -> list[dict]:
    """Fetch documents from source; cached to avoid re-fetching."""
    logger = get_run_logger()
    logger.info(f"Fetching from {source_url}")
    # ... fetch logic
    return documents

@task(retries=3, retry_delay_seconds=30)
def chunk_and_embed(documents: list[dict]) -> list[dict]:
    """Chunk documents and generate embeddings with retry on failure."""
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("BAAI/bge-large-en-v1.5")
    chunks = []
    for doc in documents:
        doc_chunks = chunk_text(doc["content"])
        embeddings = model.encode(doc_chunks, batch_size=64)
        for chunk, emb in zip(doc_chunks, embeddings):
            chunks.append({"text": chunk, "embedding": emb.tolist(),
                           "source": doc["url"], "doc_hash": doc["hash"]})
    return chunks

@task(retries=2)
def upsert_to_vector_store(chunks: list[dict]) -> int:
    """Upsert embeddings to Qdrant, skip unchanged documents."""
    from qdrant_client import QdrantClient
    client = QdrantClient("http://qdrant:6333")
    client.upsert(collection_name="knowledge-base", points=[...])
    return len(chunks)

@flow(name="rag-ingestion", log_prints=True)
def rag_ingestion_pipeline(sources: list[str]):
    """Full RAG ingestion flow — runs daily."""
    logger = get_run_logger()
    total = 0
    for source in sources:
        docs = fetch_documents(source)
        chunks = chunk_and_embed(docs)
        count = upsert_to_vector_store(chunks)
        total += count
        logger.info(f"Ingested {count} chunks from {source}")
    logger.info(f"Pipeline complete: {total} total chunks indexed")

if __name__ == "__main__":
    rag_ingestion_pipeline.serve(
        name="daily-rag-ingestion",
        cron="0 2 * * *",          # 2 AM daily
        parameters={"sources": ["https://docs.myapp.com", "https://api.myapp.com/kb"]},
    )

Prefect: Batch LLM Inference Pipeline

from prefect import flow, task
from prefect.concurrency.sync import concurrency
import asyncio
from openai import AsyncOpenAI

@task(retries=3, retry_delay_seconds=60)
async def process_batch(items: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
    """Process a batch of items through LLM with rate limit protection."""
    client = AsyncOpenAI()
    async with concurrency("openai-api", occupy=len(items)):  # rate limit
        tasks = [
            client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": item["prompt"]}],
                max_tokens=256,
            )
            for item in items
        ]
        responses = await asyncio.gather(*tasks, return_exceptions=True)

    results = []
    for item, response in zip(items, responses):
        if isinstance(response, Exception):
            results.append({**item, "error": str(response), "output": None})
        else:
            results.append({**item, "output": response.choices[0].message.content})
    return results

@flow(name="batch-llm-inference")
async def batch_inference_flow(input_file: str, output_file: str, batch_size: int = 50):
    import json
    items = [json.loads(line) for line in open(input_file)]
    batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]

    all_results = []
    for batch in batches:
        results = await process_batch(batch)
        all_results.extend(results)

    with open(output_file, "w") as f:
        for result in all_results:
            f.write(json.dumps(result) + "\n")
    return len(all_results)

Airflow: Model Training DAG

from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime
from kubernetes.client import models as k8s

@dag(
    dag_id="llm_fine_tuning",
    schedule="@weekly",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["ai", "training"],
)
def llm_fine_tuning_dag():

    @task
    def prepare_dataset() -> str:
        """Download and preprocess training data."""
        # ... data prep logic
        return "s3://my-bucket/training-data/2025-03-01/"

    train = KubernetesPodOperator(
        task_id="train_model",
        name="llm-training-job",
        namespace="ml",
        image="nvcr.io/nvidia/pytorch:24.05-py3",
        cmds=["accelerate", "launch", "-m", "axolotl.cli.train", "/config/config.yaml"],
        resources=k8s.V1ResourceRequirements(
            limits={"nvidia.com/gpu": "4", "memory": "320Gi"},
            requests={"nvidia.com/gpu": "4"},
        ),
        node_selector={"nvidia.com/gpu.product": "A100-SXM4-80GB"},
        volumes=[...],
        volume_mounts=[...],
        get_logs=True,
        is_delete_operator_pod=True,
    )

    @task
    def evaluate_model(dataset_path: str) -> dict:
        """Run evals; fail pipeline if quality drops."""
        metrics = run_evals()
        if metrics["accuracy"] < 0.85:
            raise ValueError(f"Model quality too low: {metrics}")
        return metrics

    @task
    def deploy_model(metrics: dict):
        """Push merged model to HF Hub and update vLLM config."""
        update_serving_config(new_model="org/fine-tuned-v2")

    dataset = prepare_dataset()
    train.set_upstream(dataset)
    eval_result = evaluate_model(dataset)
    eval_result.set_upstream(train)
    deploy_model(eval_result)

llm_fine_tuning_dag()

Dagster: Asset-Based AI Pipeline

from dagster import asset, AssetExecutionContext, define_asset_job, ScheduleDefinition

@asset(description="Raw documents fetched from knowledge sources")
def raw_documents(context: AssetExecutionContext) -> list[dict]:
    context.log.info("Fetching documents...")
    return fetch_all_documents()

@asset(
    deps=[raw_documents],
    description="Chunked and embedded document vectors",
)
def document_embeddings(context: AssetExecutionContext, raw_documents) -> int:
    chunks = process_and_embed(raw_documents)
    context.log.info(f"Generated {len(chunks)} embeddings")
    upsert_to_qdrant(chunks)
    return len(chunks)

@asset(
    deps=[document_embeddings],
    description="RAG system quality metrics",
)
def rag_quality_metrics(context: AssetExecutionContext) -> dict:
    metrics = evaluate_rag_system()
    context.add_output_metadata({"ragas_score": metrics["ragas_score"]})
    return metrics

# Schedule: refresh embeddings nightly
nightly_refresh = ScheduleDefinition(
    job=define_asset_job("rag_refresh_job", [raw_documents, document_embeddings]),
    cron_schedule="0 1 * * *",
)

Best Practices

  • Use task-level retries for API calls; use flow-level retries for transient infra failures.
  • Cache expensive steps (embedding generation, data fetching) to speed up reruns.
  • Emit custom metrics from pipelines (chunk count, error rate, cost) to your observability stack.
  • Use concurrency limits in Prefect or pool slots in Airflow to respect external rate limits.
  • Separate ingestion, training, and deployment pipelines — don't couple them in one giant DAG.

Related Skills

Source

git clone https://github.com/BagelHole/DevOps-Security-Agent-Skills/blob/main/devops/ai/ai-pipeline-orchestration/SKILL.mdView on GitHub

Overview

ai-pipeline-orchestration enables end-to-end AI/ML pipelines—from data ingestion and preprocessing to model training, batch inference, and RAG indexing. It emphasizes reliability, observability, and retriability by using popular orchestrators such as Prefect, Airflow, and Dagster to manage dependencies and error handling in production AI systems.

How This Skill Works

Developers compose flows and tasks using a chosen orchestrator (Prefect, Airflow, or Dagster). Pipelines are built from modular steps—data ingestion, processing, indexing, and training—with built-in retries, caching, and rich run logs to provide observability and retriability in production environments.

When to Use It

  • Scheduling recurring RAG document ingestion and re-indexing
  • Orchestrating multi-step batch LLM processing workflows
  • Running nightly model evaluation and fine-tuning jobs
  • Building ETL pipelines that feed into AI models
  • Managing dependencies between data preparation and model serving

Quick Start

  1. Step 1: Install an orchestrator (e.g., Prefect) and start the server or connect to Prefect Cloud.
  2. Step 2: Implement a simple RAG ingestion flow with tasks for data fetch, chunk/embed, and vector-store upsert as shown in the examples.
  3. Step 3: Schedule the flow to run automatically (e.g., rag_ingestion_pipeline.serve with a daily cron such as '0 2 * * *').

Best Practices

  • Select the orchestrator based on DAG complexity and team needs (Prefect for Python-first, Airflow for complex DAGs, Dagster for data lineage).
  • Modularize pipelines into clear stages: ingestion, processing, indexing, training, and serving.
  • Make tasks idempotent and configure retries with backoff; capture transient failures for retrial.
  • Enable observability with structured logs, metrics, and lineage to monitor pipeline health.
  • Keep data preparation and model serving as separate, testable stages to simplify maintenance and retraining.

Example Use Cases

  • RAG ingestion pipeline: fetch documents from sources, chunk and embed content, then upsert embeddings to a vector store (e.g., Qdrant) for retrieval augmented generation.
  • RAG ingestion using Prefect: a daily rag_ingestion_pipeline that processes multiple sources with caching and retries.
  • Prefect: Batch LLM Inference Pipeline demonstrating task orchestration for processing batches of items through an LLM.
  • Nightly model evaluation and fine-tuning jobs scheduled within the orchestrator to track performance and improve models.
  • ETL pipelines feeding AI models with dependencies managed by Prefect, Airflow, or Dagster to ensure clean data provisioning for training and serving.

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers