ingesting-data
npx machina-cli add skill ancoleman/ai-design-components/ingesting-data --openclawData Ingestion Patterns
This skill provides patterns for getting data INTO systems from external sources.
When to Use This Skill
- Importing CSV, JSON, Parquet, or Excel files
- Loading data from S3, GCS, or Azure Blob storage
- Consuming REST/GraphQL API feeds
- Building ETL/ELT pipelines
- Database migration and CDC (Change Data Capture)
- Streaming data ingestion from Kafka/Kinesis
Ingestion Pattern Decision Tree
What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md
Quick Start by Language
Python (Recommended for ETL)
dlt (data load tool) - Modern Python ETL:
import dlt
# Define a source
@dlt.source
def github_source(repo: str):
@dlt.resource(write_disposition="merge", primary_key="id")
def issues():
response = requests.get(f"https://api.github.com/repos/{repo}/issues")
yield response.json()
return issues
# Load to destination
pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="postgres", # or duckdb, bigquery, snowflake
dataset_name="github_data"
)
load_info = pipeline.run(github_source("owner/repo"))
print(load_info)
Polars for file processing (faster than pandas):
import polars as pl
# Read CSV with schema inference
df = pl.read_csv("data.csv")
# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")
# Read JSON lines
df = pl.read_ndjson("events.jsonl")
# Write to database
df.write_database(
table_name="events",
connection="postgresql://user:pass@localhost/db",
if_table_exists="append"
)
TypeScript/Node.js
S3 ingestion:
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";
const s3 = new S3Client({ region: "us-east-1" });
async function ingestFromS3(bucket: string, key: string) {
const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
const body = await response.Body?.transformToString();
// Parse CSV
const records = parse(body, { columns: true, skip_empty_lines: true });
// Insert to database
await db.insert(eventsTable).values(records);
}
API feed polling:
import { Hono } from "hono";
// Webhook receiver for real-time ingestion
const app = new Hono();
app.post("/webhooks/stripe", async (c) => {
const event = await c.req.json();
// Validate webhook signature
const signature = c.req.header("stripe-signature");
// ... validation logic
// Ingest event
await db.insert(stripeEventsTable).values({
eventId: event.id,
type: event.type,
data: event.data,
receivedAt: new Date()
});
return c.json({ received: true });
});
Rust
High-performance file ingestion:
use polars::prelude::*;
use aws_sdk_s3::Client;
async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
// Download from S3
let resp = client.get_object()
.bucket(bucket)
.key(key)
.send()
.await?;
let bytes = resp.body.collect().await?.into_bytes();
// Parse with Polars
let df = ParquetReader::new(Cursor::new(bytes))
.finish()?;
Ok(df)
}
Go
Concurrent file processing:
package main
import (
"context"
"encoding/csv"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return err
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
records, err := reader.ReadAll()
if err != nil {
return err
}
// Batch insert to database
return batchInsert(ctx, records)
}
Ingestion Patterns
1. Batch Ingestion (Files/Storage)
For periodic bulk loads:
Source → Extract → Transform → Load → Validate
↓ ↓ ↓ ↓ ↓
S3 Download Clean/Map Insert Count check
Key considerations:
- Use chunked reading for large files (>100MB)
- Implement idempotency with checksums
- Track file processing state
- Handle partial failures
2. Streaming Ingestion (Real-time)
For continuous data flow:
Source → Buffer → Process → Load → Ack
↓ ↓ ↓ ↓ ↓
Kafka In-memory Transform DB Commit offset
Key considerations:
- At-least-once vs exactly-once semantics
- Backpressure handling
- Dead letter queues for failures
- Checkpoint management
3. API Polling (Feeds)
For external API data:
Schedule → Fetch → Dedupe → Load → Update cursor
↓ ↓ ↓ ↓ ↓
Cron API call By ID Insert Last timestamp
Key considerations:
- Rate limiting and backoff
- Incremental loading (cursors, timestamps)
- API pagination handling
- Retry with exponential backoff
4. Change Data Capture (CDC)
For database replication:
Source DB → Capture changes → Transform → Target DB
↓ ↓ ↓ ↓
Postgres Debezium/WAL Map schema Insert/Update
Key considerations:
- Initial snapshot + streaming changes
- Schema evolution handling
- Ordering guarantees
- Conflict resolution
Library Recommendations
| Use Case | Python | TypeScript | Rust | Go |
|---|---|---|---|---|
| ETL Framework | dlt, Meltano, Dagster | - | - | - |
| Cloud Storage | boto3, gcsfs, adlfs | @aws-sdk/, @google-cloud/ | aws-sdk-s3, object_store | aws-sdk-go-v2 |
| File Processing | polars, pandas, pyarrow | papaparse, xlsx, parquetjs | polars-rs, arrow-rs | encoding/csv, parquet-go |
| Streaming | confluent-kafka, aiokafka | kafkajs | rdkafka-rs | franz-go, sarama |
| CDC | Debezium, pg_logical | - | - | - |
Reference Documentation
references/cloud-storage.md- S3, GCS, Azure Blob patternsreferences/file-formats.md- CSV, JSON, Parquet, Excel handlingreferences/api-feeds.md- REST polling, webhooks, GraphQL subscriptionsreferences/streaming-sources.md- Kafka, Kinesis, Pub/Subreferences/database-migration.md- Schema migration, CDC patternsreferences/etl-tools.md- dlt, Meltano, Airbyte, Fivetran
Scripts
scripts/validate_csv_schema.py- Validate CSV against expected schemascripts/test_s3_connection.py- Test S3 bucket connectivityscripts/generate_dlt_pipeline.py- Generate dlt pipeline scaffold
Chaining with Database Skills
After ingestion, chain to appropriate database skill:
| Destination | Chain to Skill |
|---|---|
| PostgreSQL, MySQL | databases-relational |
| MongoDB, DynamoDB | databases-document |
| Qdrant, Pinecone | databases-vector (after embedding) |
| ClickHouse, TimescaleDB | databases-timeseries |
| Neo4j | databases-graph |
For vector databases, chain through ai-data-engineering for embedding:
ingesting-data → ai-data-engineering → databases-vector
Source
git clone https://github.com/ancoleman/ai-design-components/blob/main/skills/ingesting-data/SKILL.mdView on GitHub Overview
Data ingestion patterns for loading data from external sources into databases. This skill covers cloud storage, file formats, API feeds, and streaming sources to support ETL/ELT pipelines, migrations, and CDC.
How This Skill Works
The skill guides you from source to destination using specialized ingestion patterns based on the data source (cloud storage, file formats, REST/GraphQL APIs, streaming systems) and provides a decision tree to select the right path. It also demonstrates language-specific quick starts (Python, TypeScript, Rust, Go) to implement the ingestion and connect to a target database.
When to Use It
- Importing CSV, JSON, Parquet, or Excel files
- Loading data from S3, GCS, or Azure Blob storage
- Consuming REST/GraphQL API feeds
- Streaming data ingestion from Kafka or Kinesis
- Building ETL/ELT pipelines and performing migrations/CDC
Quick Start
- Step 1: Choose your data source (cloud storage, files, or API/stream) and pick the matching ingestion pattern
- Step 2: Implement the ingest logic in your language of choice and configure the destination database
- Step 3: Run the pipeline, verify load results, and monitor for errors and data quality
Best Practices
- Map source schemas to destination tables and define stable primary keys
- Choose the appropriate pattern (cloud storage, file formats, API feeds, streaming) via the decision tree
- Validate data formats, handle schema drift, and implement idempotent loads
- Use correct write dispositions (e.g., merge/append) and track lineage
- Monitor ingestion with observability and audit logs; handle retries and backpressure
Example Use Cases
- Python: dlt-based ETL pipeline ingesting GitHub issues via REST API into Postgres
- Python: Polars-based file ingestion reading CSV/Parquet/JSON and writing to a DB
- TypeScript: S3 ingestion with AWS SDK and CSV/Parquet parsing into a database
- TypeScript: API feed polling (webhooks) for real-time ingestion into a data warehouse
- Rust: high-performance Parquet ingestion from S3 and transformation with Polars