ai-pipeline-orchestration
npx machina-cli add skill BagelHole/DevOps-Security-Agent-Skills/ai-pipeline-orchestration --openclawAI Pipeline Orchestration
Build reliable, observable AI workflows — from document ingestion to batch inference to model training pipelines.
When to Use This Skill
Use this skill when:
- Scheduling recurring RAG document ingestion and re-indexing
- Orchestrating multi-step batch LLM processing workflows
- Running nightly model evaluation and fine-tuning jobs
- Building ETL pipelines that feed into AI models
- Managing dependencies between data preparation and model serving
Tool Selection
| Tool | Best For | Complexity | GPU Jobs |
|---|---|---|---|
| Prefect | Modern Python-first; easy to adopt | Low | Good |
| Airflow | Complex DAGs; large teams; existing usage | High | Good |
| Dagster | Asset-centric; strong data lineage | Medium | Excellent |
| Temporal | Long-running workflows; reliability-first | Medium | Good |
Prefect — Quick Start
pip install prefect prefect-kubernetes
# Start Prefect server (or use Prefect Cloud)
prefect server start
# In another terminal
prefect worker start --pool default-agent-pool
Prefect: RAG Ingestion Pipeline
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def fetch_documents(source_url: str) -> list[dict]:
"""Fetch documents from source; cached to avoid re-fetching."""
logger = get_run_logger()
logger.info(f"Fetching from {source_url}")
# ... fetch logic
return documents
@task(retries=3, retry_delay_seconds=30)
def chunk_and_embed(documents: list[dict]) -> list[dict]:
"""Chunk documents and generate embeddings with retry on failure."""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("BAAI/bge-large-en-v1.5")
chunks = []
for doc in documents:
doc_chunks = chunk_text(doc["content"])
embeddings = model.encode(doc_chunks, batch_size=64)
for chunk, emb in zip(doc_chunks, embeddings):
chunks.append({"text": chunk, "embedding": emb.tolist(),
"source": doc["url"], "doc_hash": doc["hash"]})
return chunks
@task(retries=2)
def upsert_to_vector_store(chunks: list[dict]) -> int:
"""Upsert embeddings to Qdrant, skip unchanged documents."""
from qdrant_client import QdrantClient
client = QdrantClient("http://qdrant:6333")
client.upsert(collection_name="knowledge-base", points=[...])
return len(chunks)
@flow(name="rag-ingestion", log_prints=True)
def rag_ingestion_pipeline(sources: list[str]):
"""Full RAG ingestion flow — runs daily."""
logger = get_run_logger()
total = 0
for source in sources:
docs = fetch_documents(source)
chunks = chunk_and_embed(docs)
count = upsert_to_vector_store(chunks)
total += count
logger.info(f"Ingested {count} chunks from {source}")
logger.info(f"Pipeline complete: {total} total chunks indexed")
if __name__ == "__main__":
rag_ingestion_pipeline.serve(
name="daily-rag-ingestion",
cron="0 2 * * *", # 2 AM daily
parameters={"sources": ["https://docs.myapp.com", "https://api.myapp.com/kb"]},
)
Prefect: Batch LLM Inference Pipeline
from prefect import flow, task
from prefect.concurrency.sync import concurrency
import asyncio
from openai import AsyncOpenAI
@task(retries=3, retry_delay_seconds=60)
async def process_batch(items: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
"""Process a batch of items through LLM with rate limit protection."""
client = AsyncOpenAI()
async with concurrency("openai-api", occupy=len(items)): # rate limit
tasks = [
client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": item["prompt"]}],
max_tokens=256,
)
for item in items
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for item, response in zip(items, responses):
if isinstance(response, Exception):
results.append({**item, "error": str(response), "output": None})
else:
results.append({**item, "output": response.choices[0].message.content})
return results
@flow(name="batch-llm-inference")
async def batch_inference_flow(input_file: str, output_file: str, batch_size: int = 50):
import json
items = [json.loads(line) for line in open(input_file)]
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
all_results = []
for batch in batches:
results = await process_batch(batch)
all_results.extend(results)
with open(output_file, "w") as f:
for result in all_results:
f.write(json.dumps(result) + "\n")
return len(all_results)
Airflow: Model Training DAG
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime
from kubernetes.client import models as k8s
@dag(
dag_id="llm_fine_tuning",
schedule="@weekly",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["ai", "training"],
)
def llm_fine_tuning_dag():
@task
def prepare_dataset() -> str:
"""Download and preprocess training data."""
# ... data prep logic
return "s3://my-bucket/training-data/2025-03-01/"
train = KubernetesPodOperator(
task_id="train_model",
name="llm-training-job",
namespace="ml",
image="nvcr.io/nvidia/pytorch:24.05-py3",
cmds=["accelerate", "launch", "-m", "axolotl.cli.train", "/config/config.yaml"],
resources=k8s.V1ResourceRequirements(
limits={"nvidia.com/gpu": "4", "memory": "320Gi"},
requests={"nvidia.com/gpu": "4"},
),
node_selector={"nvidia.com/gpu.product": "A100-SXM4-80GB"},
volumes=[...],
volume_mounts=[...],
get_logs=True,
is_delete_operator_pod=True,
)
@task
def evaluate_model(dataset_path: str) -> dict:
"""Run evals; fail pipeline if quality drops."""
metrics = run_evals()
if metrics["accuracy"] < 0.85:
raise ValueError(f"Model quality too low: {metrics}")
return metrics
@task
def deploy_model(metrics: dict):
"""Push merged model to HF Hub and update vLLM config."""
update_serving_config(new_model="org/fine-tuned-v2")
dataset = prepare_dataset()
train.set_upstream(dataset)
eval_result = evaluate_model(dataset)
eval_result.set_upstream(train)
deploy_model(eval_result)
llm_fine_tuning_dag()
Dagster: Asset-Based AI Pipeline
from dagster import asset, AssetExecutionContext, define_asset_job, ScheduleDefinition
@asset(description="Raw documents fetched from knowledge sources")
def raw_documents(context: AssetExecutionContext) -> list[dict]:
context.log.info("Fetching documents...")
return fetch_all_documents()
@asset(
deps=[raw_documents],
description="Chunked and embedded document vectors",
)
def document_embeddings(context: AssetExecutionContext, raw_documents) -> int:
chunks = process_and_embed(raw_documents)
context.log.info(f"Generated {len(chunks)} embeddings")
upsert_to_qdrant(chunks)
return len(chunks)
@asset(
deps=[document_embeddings],
description="RAG system quality metrics",
)
def rag_quality_metrics(context: AssetExecutionContext) -> dict:
metrics = evaluate_rag_system()
context.add_output_metadata({"ragas_score": metrics["ragas_score"]})
return metrics
# Schedule: refresh embeddings nightly
nightly_refresh = ScheduleDefinition(
job=define_asset_job("rag_refresh_job", [raw_documents, document_embeddings]),
cron_schedule="0 1 * * *",
)
Best Practices
- Use task-level retries for API calls; use flow-level retries for transient infra failures.
- Cache expensive steps (embedding generation, data fetching) to speed up reruns.
- Emit custom metrics from pipelines (chunk count, error rate, cost) to your observability stack.
- Use
concurrencylimits in Prefect orpoolslots in Airflow to respect external rate limits. - Separate ingestion, training, and deployment pipelines — don't couple them in one giant DAG.
Related Skills
- rag-infrastructure - RAG system setup
- llm-fine-tuning - Training jobs
- agent-observability - Pipeline monitoring
- kubernetes-ops - Running pipeline pods on K8s
Source
git clone https://github.com/BagelHole/DevOps-Security-Agent-Skills/blob/main/devops/ai/ai-pipeline-orchestration/SKILL.mdView on GitHub Overview
ai-pipeline-orchestration enables end-to-end AI/ML pipelines—from data ingestion and preprocessing to model training, batch inference, and RAG indexing. It emphasizes reliability, observability, and retriability by using popular orchestrators such as Prefect, Airflow, and Dagster to manage dependencies and error handling in production AI systems.
How This Skill Works
Developers compose flows and tasks using a chosen orchestrator (Prefect, Airflow, or Dagster). Pipelines are built from modular steps—data ingestion, processing, indexing, and training—with built-in retries, caching, and rich run logs to provide observability and retriability in production environments.
When to Use It
- Scheduling recurring RAG document ingestion and re-indexing
- Orchestrating multi-step batch LLM processing workflows
- Running nightly model evaluation and fine-tuning jobs
- Building ETL pipelines that feed into AI models
- Managing dependencies between data preparation and model serving
Quick Start
- Step 1: Install an orchestrator (e.g., Prefect) and start the server or connect to Prefect Cloud.
- Step 2: Implement a simple RAG ingestion flow with tasks for data fetch, chunk/embed, and vector-store upsert as shown in the examples.
- Step 3: Schedule the flow to run automatically (e.g., rag_ingestion_pipeline.serve with a daily cron such as '0 2 * * *').
Best Practices
- Select the orchestrator based on DAG complexity and team needs (Prefect for Python-first, Airflow for complex DAGs, Dagster for data lineage).
- Modularize pipelines into clear stages: ingestion, processing, indexing, training, and serving.
- Make tasks idempotent and configure retries with backoff; capture transient failures for retrial.
- Enable observability with structured logs, metrics, and lineage to monitor pipeline health.
- Keep data preparation and model serving as separate, testable stages to simplify maintenance and retraining.
Example Use Cases
- RAG ingestion pipeline: fetch documents from sources, chunk and embed content, then upsert embeddings to a vector store (e.g., Qdrant) for retrieval augmented generation.
- RAG ingestion using Prefect: a daily rag_ingestion_pipeline that processes multiple sources with caching and retries.
- Prefect: Batch LLM Inference Pipeline demonstrating task orchestration for processing batches of items through an LLM.
- Nightly model evaluation and fine-tuning jobs scheduled within the orchestrator to track performance and improve models.
- ETL pipelines feeding AI models with dependencies managed by Prefect, Airflow, or Dagster to ensure clean data provisioning for training and serving.