Get the FREE Ultimate OpenClaw Setup Guide →

fabric-pandas-perf-remediate

npx machina-cli add skill PatrickGallucci/fabric-skills/fabric-pandas-perf-remediate --openclaw
Files (1)
SKILL.md
13.5 KB

Fabric Pandas Performance Troubleshooting

Diagnose and resolve pandas-related performance issues in Microsoft Fabric Spark notebooks, including memory exhaustion, slow conversions, and suboptimal pandas API on Spark usage.

When to Use This Skill

  • Notebook cells hang or timeout during pandas operations
  • toPandas() fails with OutOfMemoryError or Java heap space errors
  • collect() crashes the driver node
  • Pandas API on Spark (pyspark.pandas / ps) runs slower than expected
  • DataFrame conversion between Spark and pandas causes memory spikes
  • Notebook kernel restarts unexpectedly during data processing
  • Large dataset operations exhaust driver memory on Fabric capacity
  • Need to choose between pandas, Spark DataFrame, or pandas API on Spark

Prerequisites

  • Microsoft Fabric workspace with Data Engineering experience
  • Fabric capacity F2 or higher (F64+ recommended for large datasets)
  • PySpark notebook with Spark session active
  • Basic familiarity with pandas and PySpark DataFrames

Quick Diagnosis

Symptom-to-Solution Map

SymptomLikely CauseJump To
toPandas() OOM errorDataset too large for drivertoPandas Optimization
Kernel restart during pandas opDriver memory exhaustedDriver Memory Tuning
pyspark.pandas slower than native pandasSpark overhead on small dataRight-Size Your Approach
Slow groupby/merge in pandas API on SparkExcessive shufflingShuffle Optimization
Cell timeout on DataFrame conversionLarge collect to driverIncremental Processing
ArrowInvalid or conversion errorsSchema mismatch / nullsArrow Conversion Fixes
High memory but slow pandas operationsGC pressure / fragmentationMemory Profiling

Right-Size Your Approach

Critical Decision: Choose the right DataFrame API for your data size and workload.

Dataset Size Decision Tree:
─────────────────────────────────────────────────────────
< 100 MB          → Native pandas (pd.DataFrame)
100 MB - 1 GB     → pandas API on Spark (ps.DataFrame)
> 1 GB            → PySpark DataFrame (spark.DataFrame)
> 10 GB           → PySpark + partitioning + Delta optimization

Mixed workload?   → Process in Spark, convert final aggregation to pandas
Visualization?    → Aggregate in Spark first, toPandas() on summary only
ML feature eng?   → Spark for transforms, pandas for final model input

API Comparison

OperationNative pandaspandas API on SparkPySpark DataFrame
Memory modelSingle-node (driver)DistributedDistributed
Max practical size~2-4 GB10s-100s GBTB+
Startup overheadNoneSpark sessionSpark session
groupby speed (small)FastSlower (shuffle)Slower (shuffle)
groupby speed (large)OOM riskFastFast
Interop with Spark.toPandas().to_spark()Native

toPandas Optimization

Problem

toPandas() collects the entire distributed DataFrame to the single driver node. This is the #1 cause of OOM in Fabric notebooks.

Solutions (Progressive)

1. Reduce data BEFORE conversion

# BAD - converts entire table
pdf = spark_df.toPandas()

# GOOD - filter and select first
pdf = (spark_df
    .filter("date >= '2024-01-01'")
    .select("customer_id", "revenue", "region")
    .toPandas())

2. Aggregate in Spark, convert summary

# BAD - convert raw data then aggregate in pandas
pdf = spark_df.toPandas()
result = pdf.groupby('region')['revenue'].sum()

# GOOD - aggregate in Spark first
summary = spark_df.groupBy("region").agg(F.sum("revenue").alias("total_revenue"))
pdf = summary.toPandas()  # Only converting small aggregated result

3. Enable Apache Arrow for faster conversion

# Enable Arrow-based columnar transfer (3-100x faster)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")

# Now toPandas() uses Arrow columnar format
pdf = spark_df.toPandas()

4. Use sampling for exploration

# Sample before converting (for EDA/visualization)
pdf = spark_df.sample(fraction=0.01, seed=42).toPandas()

# Or limit rows
pdf = spark_df.limit(100000).toPandas()

5. Chunk large conversions

# Process in chunks using partitioning
def process_in_chunks(spark_df, chunk_col="date", process_fn=None):
    """Convert Spark DF to pandas in manageable chunks."""
    chunks = [row[chunk_col] for row in spark_df.select(chunk_col).distinct().collect()]
    results = []
    for chunk_val in chunks:
        chunk_pdf = spark_df.filter(F.col(chunk_col) == chunk_val).toPandas()
        if process_fn:
            chunk_pdf = process_fn(chunk_pdf)
        results.append(chunk_pdf)
    return pd.concat(results, ignore_index=True)

Driver Memory Tuning

Fabric Driver Memory by Node Size

Node SizevCoresMemoryRecommended Max toPandas()
Small432 GB~4-6 GB
Medium864 GB~10-12 GB
Large16128 GB~20-25 GB
X-Large32256 GB~40-50 GB

Rule of thumb: toPandas() safe limit ≈ 15-20% of total driver memory (pandas creates copies during operations).

Configure Driver Memory

# Check current driver memory
print(f"Driver memory: {spark.conf.get('spark.driver.memory', 'default')}")

# Set via environment Spark properties (before session starts)
# In Fabric Environment > Spark properties:
# spark.driver.memory = 28g  (for Medium nodes)

# Or override in notebook (must be first cell)
%%configure
{
    "driverMemory": "28g",
    "driverCores": 8
}

Resource Profile Selection for Pandas Workloads

# For notebooks heavy on pandas operations (read-heavy pattern)
spark.conf.set("spark.fabric.resourceProfile", "readHeavyForSpark")

# Key settings this enables:
# - spark.databricks.delta.optimizeWrite.enabled = true
# - Optimized read paths for Delta tables

Shuffle Optimization

Tune for pandas API on Spark Operations

# Reduce shuffle partitions for smaller datasets
# Default 200 is too high for datasets < 10 GB
spark.conf.set("spark.sql.shuffle.partitions", "auto")  # Adaptive (AQE)

# Or set explicitly based on data size
# Rule: ~128 MB per partition
data_size_gb = 5
optimal_partitions = max(1, int(data_size_gb * 1024 / 128))
spark.conf.set("spark.sql.shuffle.partitions", str(optimal_partitions))

# Enable Adaptive Query Execution (on by default in Fabric)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Broadcast Join Optimization

# Increase broadcast threshold for pandas API on Spark joins
# Default: 10 MB - increase for medium lookup tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")  # 100 MB

# Force broadcast for known small DataFrames
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_lookup_df), "key_col")

Enable Autotune

# Let Fabric auto-optimize shuffle, broadcast, and partition settings
spark.conf.set("spark.ms.autotune.enabled", "true")

# Autotune requires:
# - Runtime 1.1 or 1.2
# - Queries > 15 seconds
# - Not in high concurrency mode
# - ~20-25 iterations to learn optimal settings

Arrow Conversion Fixes

Common Errors and Solutions

ErrorCauseFix
ArrowInvalid: Could not convert XUnsupported typeCast column before conversion
ArrowNotImplementedErrorNested typesFlatten struct/array columns
pyarrow.lib.ArrowMemoryErrorOOM during Arrow transferReduce data size or increase memory
Null handling mismatchPandas NaN vs Spark nullUse spark.sql.execution.arrow.pyspark.fallback.enabled
# Fix mixed types before conversion
from pyspark.sql.types import StringType, DoubleType

spark_df = spark_df.withColumn("mixed_col", F.col("mixed_col").cast(StringType()))

# Flatten nested structs
spark_df = spark_df.select(
    "simple_col",
    F.col("struct_col.field1").alias("field1"),
    F.col("struct_col.field2").alias("field2")
)

# Handle null-heavy columns
spark_df = spark_df.fillna({"numeric_col": 0, "string_col": ""})

Incremental Processing

Pattern: Spark Processing with Pandas Finish

import pyspark.sql.functions as F
import pandas as pd

# Step 1: Heavy lifting in Spark (distributed)
aggregated = (spark_df
    .filter(F.col("status") == "active")
    .groupBy("category", "month")
    .agg(
        F.sum("amount").alias("total"),
        F.count("*").alias("cnt"),
        F.avg("score").alias("avg_score")
    ))

# Step 2: Verify size before conversion
row_count = aggregated.count()
print(f"Rows to convert: {row_count:,}")
assert row_count < 1_000_000, f"Too many rows ({row_count:,}) for toPandas()"

# Step 3: Convert small result to pandas for visualization/export
pdf = aggregated.toPandas()

# Step 4: Pandas-specific operations (plotting, styling, etc.)
pivot = pdf.pivot_table(index='category', columns='month', values='total')

Memory Profiling

Monitor Driver Memory in Notebook

import os, psutil

def check_memory():
    """Report current driver memory usage."""
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    print(f"RSS Memory:  {mem_info.rss / 1024**3:.2f} GB")
    print(f"VMS Memory:  {mem_info.vms / 1024**3:.2f} GB")
    
    # System-wide
    sys_mem = psutil.virtual_memory()
    print(f"System Used: {sys_mem.used / 1024**3:.2f} / {sys_mem.total / 1024**3:.2f} GB ({sys_mem.percent}%)")

# Call before/after pandas operations
check_memory()
pdf = spark_df.toPandas()
check_memory()

Reduce pandas Memory Footprint

def optimize_pandas_dtypes(df):
    """Downcast pandas DataFrame dtypes to reduce memory."""
    for col in df.select_dtypes(include=['int64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')
    for col in df.select_dtypes(include=['float64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() / len(df) < 0.5:  # Categorical threshold
            df[col] = df[col].astype('category')
    return df

pdf = optimize_pandas_dtypes(pdf)
print(f"Memory after optimization: {pdf.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

pandas API on Spark Best Practices

Use pyspark.pandas Instead of Conversion

import pyspark.pandas as ps

# Read directly as pandas-on-Spark DataFrame
psdf = ps.read_delta("Tables/my_table")

# Or convert from Spark DataFrame (lazy, no collect)
psdf = spark_df.pandas_api()

# Operations run distributed (no driver memory pressure)
result = psdf.groupby("region")["revenue"].sum()

# Convert to pandas only for final small result
pdf = result.to_pandas()

Common Pitfalls

# BAD: iterrows/itertuples on pandas-on-Spark (extremely slow)
for idx, row in psdf.iterrows():  # Anti-pattern!
    process(row)

# GOOD: Use vectorized operations
psdf["new_col"] = psdf["col_a"] * psdf["col_b"]

# BAD: apply with Python UDF (serialization overhead)
psdf["result"] = psdf["col"].apply(lambda x: complex_fn(x))

# GOOD: Use Spark-native functions or pandas_udf
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def optimized_fn(series: pd.Series) -> pd.Series:
    return series * 2 + 1

Troubleshooting Checklist

  1. Check data size before any toPandas() / collect() call
  2. Enable Arrow transfer: spark.sql.execution.arrow.pyspark.enabled = true
  3. Filter/aggregate in Spark before converting to pandas
  4. Match node size to workload (Medium 64 GB minimum for pandas-heavy notebooks)
  5. Use pandas API on Spark for distributed pandas-like operations
  6. Monitor memory with psutil before/after conversions
  7. Set resource profile to readHeavyForSpark for notebook-heavy workloads
  8. Enable autotune for automatic shuffle and partition optimization
  9. Downcast dtypes after conversion to reduce pandas memory footprint
  10. Chunk processing for datasets that exceed single-node memory

Automation & Diagnostics

Run the diagnostic script to collect Spark session configuration, memory settings, and environment details for troubleshooting.

See the detailed reference guide for advanced patterns including pandas UDFs, Koalas migration, Native Execution Engine integration, and capacity planning formulas.

Use the notebook template as a starting point for memory-safe pandas workflows in Fabric notebooks.

Source

git clone https://github.com/PatrickGallucci/fabric-skills/blob/main/skills/fabric-pandas-perf-remediate/SKILL.mdView on GitHub

Overview

Diagnose and optimize pandas workloads in Fabric Spark notebooks. Avoid driver OOMs, slow toPandas, and excessive shuffles by selecting the right DataFrame API, profiling memory, and applying targeted tuning for Fabric capacity.

How This Skill Works

It starts with symptom mapping (e.g., toPandas OOM, kernel restarts) to decide the right API: pandas, pandas API on Spark, or PySpark. It then leverages memory profiling, interop checks, and Fabric-specific tuning (shuffle settings, broadcast joins, and resource profiles) to reduce data movement and memory pressure, including Native Execution Engine integration where applicable.

When to Use It

  • Notebook cells hang or timeout during pandas operations
  • toPandas() runs out of memory on large datasets
  • collect() crashes the driver
  • DataFrame conversion between Spark and pandas spikes memory
  • Need to choose between pandas, Spark DataFrame, or pandas API on Spark

Quick Start

  1. Step 1: Identify symptoms and map to a likely cause (OOM, timeout, or slow API).
  2. Step 2: Run a quick memory profile and compare pandas, pandas API on Spark, and PySpark DataFrame paths.
  3. Step 3: Apply the recommended changes (data reduction, partition tuning, and API choice) and validate results.

Best Practices

  • Profile memory usage end-to-end before changing code (driver and executors)
  • Start with native pandas for small data, then move to Spark-based APIs as data grows
  • Avoid collect() on large DFs; prefer incremental processing or summaries
  • Tune shuffle and join parameters; enable broadcast joins for small lookups
  • Leverage Fabric-native execution and partitioning to optimize data locality

Example Use Cases

  • Diagnose a 50 MB dataset where toPandas OOM occurs and switch to pandas API on Spark
  • Fix slow pandas API on Spark by repartitioning and using broadcast joins
  • Convert-heavy workflow where Spark-to-pandas conversion caused spikes; use incremental processing
  • Kernel restarts due to driver memory exhaustion; apply driver memory tuning
  • Performance audit of a pandas groupby on Spark with large shuffle, reduce shuffles

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers