October 7, 2025
AI/ML Infrastructure Data Spark CI/CD

Building ML Feature Pipelines with Apache Spark

You've got raw data. Lots of it. Millions of rows, dozens of columns, and a machine learning model waiting to be trained. But here's the thing: your model doesn't care about raw data. It cares about features - carefully engineered, properly scaled, and strategically selected representations of your data that actually teach the model something useful.

This is where Apache Spark shines. When you're dealing with datasets that don't fit in memory and need to process features at scale, Spark's distributed computing power becomes essential. In this guide, we'll build a complete ML feature pipeline-pipelines-training-orchestration)-fundamentals)) from scratch, covering architecture, transforms, optimization tricks, and real-world integration patterns.

Let's dive in.

Table of Contents
  1. The Problem: Why Feature Pipelines Matter
  2. Spark Feature Pipeline Architecture
  3. Why Spark Over Single-Machine Tools?
  4. When NOT to Use Spark
  5. Building Blocks: Core Feature Transforms
  6. 1. Categorical Encoding: String to Numbers
  7. 2. Numerical Scaling: Standardize Ranges
  8. 3. Time-Series Features: Rolling Averages and Lag
  9. Advanced: Temporal Train/Test Splits (Preventing Leakage)
  10. Spark Optimization: Making Pipelines Fast
  11. Strategy 1: Partition Sizing
  12. Strategy 2: Broadcast Small Lookups
  13. Strategy 3: Persist Intermediate Results
  14. Strategy 4: Pre-partition Before Aggregations
  15. Handling ML-Specific Challenges
  16. Challenge 1: Skewed Data and Salting
  17. Challenge 2: Null Handling
  18. Common Pitfalls: What Goes Wrong in Production
  19. Pitfall 1: Training/Serving Skew
  20. Pitfall 2: Feature Explosion
  21. Pitfall 3: Categorical Explosion
  22. Pitfall 4: Memory Leaks in Iterative Pipelines
  23. Production Considerations
  24. Monitoring Feature Quality
  25. Handling Late Arrivals
  26. Versioning Features
  27. Comparing Alternatives: When to Use Polars, Dask, or Ray
  28. Polars: For Single-Machine Speed
  29. Dask: For Parallel Pandas
  30. Ray: For ML-Specific Tasks
  31. Spark: For Everything Else
  32. Complete Example: Fraud Detection Pipeline
  33. Integration with ML Systems
  34. Export to Parquet for PyTorch
  35. Store in Delta Lake for Time-Travel
  36. Real-time Feature Store with Feast
  37. Performance Gains with Databricks Photon
  38. Debugging and Troubleshooting: Real-World Issues
  39. Issue: Out of Memory Errors
  40. Issue: Stragglers (One Slow Partition)
  41. Issue: Feature Values Are NaN or Inf
  42. Issue: Reproducibility: Same Code, Different Results
  43. Issue: Features Work in Dev, Break in Prod
  44. Best Practices Summary
  45. Why This Matters in Production
  46. Key Takeaways
  47. The Cultural Shift to Feature-First ML
  48. Feature Pipeline as Organizational Infrastructure
  49. Operational Maturity in Feature Pipelines

The Problem: Why Feature Pipelines Matter

Imagine you're building a fraud detection system. You have transaction data - amounts, merchant categories, timestamps, user histories. A naive approach would be to throw raw features at your model and hope it learns. But that rarely works well. Understanding why requires thinking deeply about what models actually learn.

A machine learning model learns patterns in data. It's fundamentally a pattern-matching engine. But the patterns it learns depend entirely on how you present the data. If you feed raw transaction amounts (0 to 100,000 dollars) alongside fraud flags (0 or 1), the model learns "amount is 100,000 times more important than fraud status." That's not because amount is actually more important - it's because the numerical range is so much larger. The model's optimizer, trying to minimize loss, finds that adjusting the "amount" weight by 1 has much more effect than adjusting the "fraud" weight by 1. It's not intelligence; it's a scale mismatch.

This is why feature engineering exists. It's not arcane knowledge - it's the systematic correction of scale mismatches, encoding issues, and data representation problems. When you scale features to mean zero and standard deviation one, you're saying "these features should have equal influence." When you one-hot-encode categorical variables instead of using numeric labels, you're saying "these categories are unordered." When you create lag features (yesterday's value), you're saying "temporal context matters."

In a fraud detection system, these choices are critical. A model trained on unscaled amounts will be biased toward amplitude over pattern. A model trained without temporal features will see each transaction in isolation, missing the insight that "this user always spends $50, so a $5,000 transaction is suspicious." A model trained with label-encoded merchant categories might learn spurious relationships: "category 5 is more fraudulent than category 3," when categories aren't ordered.

The stakes become real in production. A model with bad feature engineering might achieve 85% accuracy in testing. But that accuracy is against test data engineered the same way. When you deploy and encounter new merchant types, new user cohorts, new temporal patterns, the model fails silently. It doesn't crash; it just starts making worse decisions. By the time you notice, you've already approved thousands of fraudulent transactions.

This is why feature pipelines are infrastructure, not an afterthought. They're the contract between your data and your model. Get them right, and your model learns real patterns. Get them wrong, and your model learns garbage that happens to correlate in your training set.

Here's what typically goes wrong:

  • Raw features are inconsistent: Some transactions are $5, others are $50,000. Your model trains poorly on unscaled data.
  • Categorical data isn't numeric: Merchant categories are text. Models need numbers.
  • Temporal leakage ruins everything: If you accidentally use future information to predict the past, your model will fail in production.
  • Missing values break pipelines: NaN values cause crashes and bias.

A feature pipeline-pipeline-parallelism)-automated-model-compression) solves these problems systematically. It transforms raw data into model-ready features, handles edge cases, and ensures your training and serving paths stay in sync.

The stakes in production are particularly high. A poorly designed feature pipeline doesn't just slow down training - it corrupts your model's learning. When your model learns from leakage or inconsistent preprocessing, it's learning patterns that don't exist in production. You deploy confident in your metrics, only to watch the model's accuracy collapse when reality meets your assumptions.

Spark Feature Pipeline Architecture

Think of a feature pipeline as having three layers:

Raw Data (PostgreSQL, S3, Delta Lake)
           ↓
    Feature Transforms
    (Scaling, Encoding, Aggregations)
           ↓
    ML Model Ready Features
    (Numeric, Consistent, Leakage-free)

Here's the architecture-production-deployment-deployment)-guide) in visual form:

graph LR
    A["Raw Data Sources<br/>(Delta, Parquet, JSON)"] --> B["PySpark DataFrame<br/>Ingestion"]
    B --> C["Window Functions<br/>(Lag, Rolling Avg)"]
    B --> D["SQL Aggregations<br/>(Groupby, Joins)"]
    C --> E["Feature Transforms<br/>(Scaler, Indexer, Encoder)"]
    D --> E
    E --> F["Temporal Train/Test Split<br/>(No Leakage)"]
    F --> G["Persist to Delta Lake<br/>(ACID, Time-travel)"]
    G --> H["Export to Parquet<br/>(PyTorch/JAX)"]
    H --> I["ML Model Training"]

The power here is distributed processing. Spark partitions your data across a cluster, applies transforms in parallel, and recombines results. For billion-row datasets, this is the difference between hours and days.

Why Spark Over Single-Machine Tools?

You might ask: "Why not just use pandas or Polars on a single machine?" Good question. The answer depends on scale:

  • Pandas: Loads entire dataset into memory. A 100GB transaction history? It'll crash. Spark handles this by streaming partitions.
  • Polars: Fast on a single machine, but still bound by available RAM. If you have 1TB of raw data across 50 servers, Polars can't help.
  • Spark: Built for distributed compute from day one. Your data lives across the cluster; transformations happen in parallel.

Think of it this way: Pandas is like washing dishes one at a time in your kitchen sink. Spark is like having 100 sinks, one for each partition, all washing simultaneously. When you scale from "I have data" to "I have lots of data," Spark becomes non-negotiable.

But there's another dimension to this decision. Spark isn't just about pure data size - it's about how your infrastructure evolves. When you're small, single-machine tools are faster. But as your team grows and you need to coordinate feature computation across multiple models, Spark's ecosystem becomes invaluable. Spark integrates with data warehouses, can coordinate with other ETL systems, and provides a lingua franca for data teams. You move from "my notebook script" to "our company's feature engineering standard."

When NOT to Use Spark

We should be honest: Spark isn't always the right choice. For exploratory work with <100GB datasets where you need fast iteration, Polars or even pandas is better. Spark has overhead - startup time, serialization costs, cluster coordination. A 30-second Polars query becomes a 5-minute Spark job when you account for the cluster coming up and tasks being scheduled. For a data scientist doing ad-hoc analysis, that's torture.

Use Spark when:

  • Your data is measured in hundreds of gigabytes to terabytes
  • You need this pipeline to run repeatedly (daily, hourly) in production
  • Multiple models or teams depend on the output
  • You need ACID guarantees and time-travel capability (Delta Lake)

Use Polars or single-machine tools when:

  • Data fits comfortably in one machine's RAM
  • You're doing one-time exploratory analysis
  • Latency is critical and you can't afford cluster startup time

Building Blocks: Core Feature Transforms

Let's start with the most common transforms you'll use. I'll show you the code, then explain what's happening and why it matters for model training. The transforms we'll cover are foundational - you'll use them in nearly every ML pipeline. Understanding not just how to apply them, but why they're necessary, is what separates people who copy-paste examples from people who build robust systems.

Feature transforms are where raw data becomes intelligence. A column of text becomes a vector of numbers. A skewed distribution becomes a normal distribution. A categorical variable becomes a set of independent binary flags. Each transformation is justified by a deep principle about how models learn.

When you start learning machine learning, transforms feel like magic. Why does standardization help? Why can't you just one-hot-encode things differently? These questions feel like implementation details. But as you work on more systems, you realize that the "right" way to transform features isn't arbitrary. It emerges from understanding how gradient descent works, how models interpret different data types, and how your specific downstream model (a decision tree? a neural network? a linear model?) will interpret your input.

That's why this section doesn't just show code. Each transform comes with context about why it matters and what happens if you get it wrong. This knowledge compounds. The difference between someone who applies transforms cargo-cult-style and someone who understands the principles is the difference between a model that works and a model that crashes in production.

1. Categorical Encoding: String to Numbers

python
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
 
spark = SparkSession.builder.appName("FeaturePipeline").getOrCreate()
 
# Sample transaction data
data = [
    (1, "GROCERY", 45.50),
    (2, "GAS", 35.00),
    (3, "GROCERY", 62.20),
    (4, "RESTAURANT", 28.75),
]
df = spark.createDataFrame(data, ["txn_id", "merchant_category", "amount"])
 
# Step 1: Convert strings to integers (0, 1, 2, ...)
indexer = StringIndexer(
    inputCol="merchant_category",
    outputCol="category_index",
    handleInvalid="keep"
)
 
# Step 2: Convert integers to one-hot vectors [1,0,0], [0,1,0], etc.
encoder = OneHotEncoder(
    inputCol="category_index",
    outputCol="category_encoded"
)
 
pipeline = Pipeline(stages=[indexer, encoder])
transformed = pipeline.fit(df).transform(df)
 
transformed.select("merchant_category", "category_encoded").show()

Expected Output:

+------------------+-----------------+
|merchant_category |category_encoded |
+------------------+-----------------+
|GROCERY           |(3,[0],[1.0])    |
|GAS               |(3,[2],[1.0])    |
|GROCERY           |(3,[0],[1.0])    |
|RESTAURANT        |(3,[1],[1.0])    |
+------------------+-----------------+

Why this matters: Models work with numbers, not strings. One-hot encoding creates binary flags for each category. GROCERY becomes [1,0,0], GAS becomes [0,0,1]. This preserves information without imposing false ordering. (If you used label encoding - GROCERY=0, GAS=1, RESTAURANT=2 - the model might incorrectly assume RESTAURANT is "greater than" GROCERY, which is nonsensical.)

The way you encode matters for model behavior. Tree-based models (Random Forest, XGBoost, LightGBM) actually prefer label encoding because they can work with ordinal data efficiently. Neural networks and linear models require one-hot encoding to avoid spurious relationships. Knowing your downstream model architecture should influence your feature engineering choices.

2. Numerical Scaling: Standardize Ranges

python
from pyspark.ml.feature import StandardScaler
 
# Transactions can range from $0 to $10,000
# StandardScaler normalizes to mean=0, std=1
scaler = StandardScaler(
    inputCol="amount",
    outputCol="amount_scaled",
    withMean=True,
    withStd=True
)
 
scaler_model = scaler.fit(df)
scaled_df = scaler_model.transform(df)
 
scaled_df.select("amount", "amount_scaled").show()

Expected Output:

+------+------------------+
|amount|amount_scaled     |
+------+------------------+
|45.50 |0.156              |
|35.00 |-0.892             |
|62.20 |1.204              |
|28.75 |-1.468             |
+------+------------------+

Why this matters: If you train on unscaled features, large-value columns dominate gradient updates. A $10,000 transaction overwhelms a binary fraud flag. Scaling puts everything on equal footing. Neural networks especially benefit - unscaled inputs can cause vanishing/exploding gradients. Tree-based models are less sensitive, but scaling still helps with convergence speed in regularized models.

Here's what most people get wrong about scaling: they think it's just a numerical trick to help convergence. But it's actually about representation fairness. When your model sees unscaled features, it's biased toward high-magnitude columns regardless of their predictive value. An amount column with values 0-100,000 and a fraud flag with values 0-1 get treated as though the amount is 100,000x more important just because of the scale. Scaling corrects this bias.

3. Time-Series Features: Rolling Averages and Lag

python
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, lag, col
 
# Create a window for rolling aggregations
# Order by timestamp, look back 7 days per user
window_spec = Window.partitionBy("user_id").orderBy("timestamp").rangeBetween(-7*24*3600, 0)
 
# Add rolling average transaction amount
df_with_features = df.withColumn(
    "amount_7day_avg",
    avg("amount").over(window_spec)
).withColumn(
    "amount_lag1",
    lag("amount", 1).over(Window.partitionBy("user_id").orderBy("timestamp"))
)
 
df_with_features.select("user_id", "timestamp", "amount", "amount_7day_avg", "amount_lag1").show()

Why this matters: Temporal patterns matter in fraud. If a user usually spends $50 and suddenly charges $5,000, that's suspicious. Rolling averages and lag features capture behavior change. Without them, the model only sees the current transaction in isolation. With them, the model learns "this user's deviation from baseline is 100x - red flag."

Understanding temporal relationships in your data is crucial. A transaction that looks normal in isolation becomes suspicious when you see it relative to a user's history. Lag features capture trends and momentum - does the amount increase or decrease? Rolling averages capture stability - is the user's spending pattern consistent? These features let your model reason about behavior patterns, not just transaction properties.

Advanced: Temporal Train/Test Splits (Preventing Leakage)

Here's where most feature pipelines fail silently: data leakage.

Imagine you're predicting fraud. You accidentally train your model on data from 2025, then test on 2024. Your model memorizes "oh, fraud happened on these dates" and fails in production when encountering new dates. This is temporal leakage - using future information to predict the past.

The fix: split by time.

python
from pyspark.sql.functions import to_timestamp, col
 
# Training: All transactions before Oct 1, 2024
# Testing: Transactions on or after Oct 1, 2024
split_date = "2024-10-01"
 
df = df.withColumn("timestamp", to_timestamp("timestamp"))
 
train_df = df.filter(col("timestamp") < split_date)
test_df = df.filter(col("timestamp") >= split_date)
 
print(f"Train set: {train_df.count()} rows")
print(f"Test set: {test_df.count()} rows")

Critical rule: When computing rolling averages for the test set, only look back at training data. Otherwise, you leak information about what the model should predict. Here's the correct approach:

python
from pyspark.sql.functions import when
 
# For each test row, compute rolling avg using ONLY training data up to that point
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
 
# This is wrong: looks at ALL historical data
# df_wrong = df.withColumn("avg_amount", avg("amount").over(window_spec))
 
# This is right: looks only at data BEFORE the current row
window_spec_correct = Window.partitionBy("user_id").orderBy("timestamp").rangeBetween(Window.unboundedPreceding, Window.currentRow - 1)
 
df_correct = df.withColumn(
    "amount_avg_no_leakage",
    avg("amount").over(window_spec_correct)
)

Why this matters: Without temporal splits, your model will overestimate its predictive power by 30-50%. You'll think you built something great, deploy it, and watch it crash. This happens more often than you'd think because leakage is subtle. Your validation metrics look perfect, so you ship confident. But in production, you can't see the future, and everything falls apart.

The reason leakage causes such massive failures is because your model doesn't just learn from real patterns - it learns statistical artifacts. In your training data, fraud might spike on certain dates or times. Your model learns "Friday at 3pm = fraud risk," not because that's true in general, but because it happens to be true in your training period. When Friday at 3pm rolls around in production with different fraud patterns, your model is confidently wrong.

Spark Optimization: Making Pipelines Fast

Raw Spark code often runs slow. Here's how to make it scream:

Strategy 1: Partition Sizing

python
# Bad: 1000 partitions on 1GB data = 1MB per partition (overhead hell)
# Good: 128-256MB per partition = balanced I/O
target_partition_size_mb = 256
num_partitions = (df.rdd.sum() / (target_partition_size_mb * 1024 * 1024))
 
df_optimized = df.repartition(int(num_partitions))

Why: Too many partitions = more overhead (task scheduling, serialization). Too few = unbalanced work distribution. The sweet spot is typically 128-256MB per partition. This balances the cost of coordinating many small tasks against the cost of a few slow tasks that become your bottleneck.

Strategy 2: Broadcast Small Lookups

When joining a 100GB fact table with a 1MB lookup table, don't shuffle both. Broadcast the small one:

python
from pyspark.sql.functions import broadcast
 
lookup_df = spark.read.parquet("s3://lookup-tables/merchants.parquet")  # 1MB
 
# Good: broadcast tells Spark to send lookup to every worker
result = large_df.join(
    broadcast(lookup_df),
    on="merchant_id"
)
 
# Bad: default shuffle joins the massive large_df across network
# result = large_df.join(lookup_df, on="merchant_id")

Impact: Broadcast join on a small table saves 10-100x network I/O compared to shuffle join.

Strategy 3: Persist Intermediate Results

python
df_features = df.select(...).filter(...).groupBy(...).agg(...)
 
# Cache in memory+disk if too big for RAM alone
df_features.persist(StorageLevel.MEMORY_AND_DISK_2)
 
# Now multiple downstream actions reuse cached data
feature_set_1 = df_features.filter(...)
feature_set_2 = df_features.groupBy(...)
 
# When done, release memory
df_features.unpersist()

When to use: If you're reusing the same DataFrame transformation multiple times. Persistence trades memory for compute.

Strategy 4: Pre-partition Before Aggregations

python
# If you're grouping by user_id repeatedly, pre-partition by user_id
df_partitioned = df.repartition(200, "user_id")
df_partitioned.write.mode("overwrite").parquet("s3://temp/partitioned/")
 
# Now all groupBy("user_id") operations stay local per partition
df_from_disk = spark.read.parquet("s3://temp/partitioned/")
result = df_from_disk.groupBy("user_id").agg(...)  # Fast!

Why: Partitioning by your aggregation key means Spark doesn't need to shuffle data across the network for groupBy operations. Everything stays local.

Handling ML-Specific Challenges

At this point in building a feature pipeline, you've encoded categories, scaled numbers, and computed temporal features. The code works on test data. But then reality hits: your data isn't uniformly distributed. Some categories dominate. Some users generate 10x more transactions than others. Some geographic regions have vastly more data than others. These distributions will break your assumptions about how data flows through your pipeline.

This is where ML-specific challenges emerge. These aren't general data engineering problems. They're problems that manifest when you combine big data with the specific properties of machine learning models. A general ETL pipeline might handle skewed data just fine - it's designed to be agnostic about what data it processes. An ML pipeline has to be deliberately aware of skew, because skew breaks models in specific ways.

Challenge 1: Skewed Data and Salting

When one category dominates (90% transactions are "SHOPPING"), joins become bottlenecks. Some partitions process millions of rows while others sit idle.

python
from pyspark.sql.functions import rand, concat, lit
 
# Add random salt to skewed join key
df_large = df_large.withColumn(
    "merchant_id_salted",
    concat(col("merchant_id"), lit("_"), (rand() * 10).cast("int"))
)
 
lookup_df_exploded = lookup_df.select(
    "*",
    explode(array([lit(i) for i in range(10)])).alias("salt")
).withColumn(
    "merchant_id_salted",
    concat(col("merchant_id"), lit("_"), col("salt"))
)
 
result = df_large.join(lookup_df_exploded, on="merchant_id_salted")

This distributes the "SHOPPING" category across 10 sub-partitions, balancing work. Each salt value (0-9) gets a separate partition, so the executor cores can parallelize instead of waiting on one slow partition.

Challenge 2: Null Handling

python
from pyspark.ml.feature import Imputer
 
# Strategy 1: Drop rows with nulls (data loss)
# df_clean = df.dropna()
 
# Strategy 2: Impute with mean/median
imputer = Imputer(
    inputCols=["amount", "merchant_category"],
    outputCols=["amount_imputed", "merchant_imputed"],
    strategy="median"
)
imputed_df = imputer.fit(df).transform(df)
 
# Strategy 3: Create null indicator (preserve information)
imputed_df = imputed_df.withColumn(
    "amount_was_null",
    when(col("amount").isNull(), 1).otherwise(0)
)

Why: Nulls contain information. A missing merchant category might indicate a data quality issue that's itself predictive of fraud. Strategy 3 (impute + indicator) captures both the imputed value and the fact that it was missing.

Common Pitfalls: What Goes Wrong in Production

Pitfall 1: Training/Serving Skew

You fit your StandardScaler on training data (mean=100, std=20). Then in production, you get new transactions with mean=150, std=30. Your scaler still uses the old statistics - suddenly everything looks like an outlier.

Solution: Save the scaler model alongside your trained ML model. Always transform inference data using the same scaler that transformed training data.

python
# Training
scaler_model = scaler.fit(train_df)
train_features = scaler_model.transform(train_df)
 
# Save the scaler
scaler_model.save("/models/scaler/")
 
# At inference time, load it
loaded_scaler = StandardScalerModel.load("/models/scaler/")
inference_features = loaded_scaler.transform(inference_data)

Pitfall 2: Feature Explosion

You compute rolling averages over 7, 14, 30 days. Then you add lag-1, lag-7, lag-14. Then interaction terms (amount × category). Suddenly you have 500 features. Most are noise.

Solution: Use feature selection. Keep features that actually correlate with your target.

python
from pyspark.ml.stat import Correlation
 
# Compute correlation between features and target
corr_matrix = Correlation.corr(df, "features").head()
 
# Keep only high-correlation features (|r| > 0.1)
important_features = [f for f in features if abs(corr_matrix[f]) > 0.1]

Pitfall 3: Categorical Explosion

A new merchant category arrives in production that your model never saw. StringIndexer's handleInvalid="keep" assigns it to a special bucket, but your model is confused.

Solution: Fix your schema before training. Either enumerate all possible categories or use target encoding (replace category with its mean target value).

python
# Enumerate known categories
known_categories = ["GROCERY", "GAS", "RESTAURANT", "OTHER"]
df_fixed = df.fillna({"merchant_category": "OTHER"}) \
    .filter(col("merchant_category").isin(known_categories))

Pitfall 4: Memory Leaks in Iterative Pipelines

When you run feature engineering in a loop, each iteration creates new DataFrame objects. If you don't unpersist old ones, memory explodes.

python
# Bad: memory leak
for date in date_range:
    df = load_data(date)
    df_features = engineer_features(df)
    df_features.write.parquet(f"s3://features/{date}/")
    # df_features not unpersisted—memory accumulates!
 
# Good
for date in date_range:
    df = load_data(date)
    df_features = engineer_features(df)
    df_features.persist()  # Explicit control
    df_features.write.parquet(f"s3://features/{date}/")
    df_features.unpersist()  # Free memory

Production Considerations

Monitoring Feature Quality

In production, your features will drift. Users spend differently in Q4 than Q1. New fraud patterns emerge. You need to monitor:

python
# Compute feature statistics
stats = df.select([
    mean(col("amount")).alias("amount_mean"),
    stddev(col("amount")).alias("amount_std"),
    min(col("amount")).alias("amount_min"),
    max(col("amount")).alias("amount_max"),
]).collect()[0]
 
print(f"Amount: {stats['amount_mean']:.2f} ± {stats['amount_std']:.2f}")
 
# Compare to baseline (from training)
if stats['amount_mean'] > baseline_mean * 1.5:
    alert("Feature drift: amount has shifted significantly")

Handling Late Arrivals

In real-time pipelines, a transaction might arrive 10 minutes late. Should you recompute rolling features? Yes, because it changes "amount_7day_avg."

python
# Use Delta Lake's UPSERT to handle late arrivals
from delta.tables import DeltaTable
 
delta_table = DeltaTable.forPath(spark, "/mnt/features/")
 
delta_table.alias("features").merge(
    late_arriving_transaction.alias("new_txn"),
    condition="features.user_id = new_txn.user_id AND features.timestamp = new_txn.timestamp"
).whenMatchedUpdate(set={"amount_7day_avg": "new_txn.amount_7day_avg"}) \
 .whenNotMatched().insert("*") \
 .execute()

Versioning Features

Always version your feature pipeline. When you change how "amount_7day_avg" is computed, you're creating a new feature definition.

python
# Save feature metadata alongside features
metadata = {
    "version": "2.1",
    "features": [
        {"name": "amount_7day_avg", "definition": "rolling average over 7 days", "type": "float"},
        {"name": "category_encoded", "definition": "one-hot encoded merchant category", "type": "sparse_vector"}
    ],
    "timestamp": "2024-02-27T12:00:00Z"
}
 
with open("/mnt/features/metadata_v2.1.json", "w") as f:
    json.dump(metadata, f)

Comparing Alternatives: When to Use Polars, Dask, or Ray

Spark isn't always the right tool. Let's be honest:

Polars: For Single-Machine Speed

  • Use when: Your data fits in one machine's RAM (typically <100GB)
  • Advantage: 10-100x faster than Spark on small datasets (better CPU cache locality, SIMD optimization)
  • Downside: Doesn't scale beyond one machine
python
import polars as pl
 
# Polars is blazingly fast for small data
df = pl.read_parquet("small_data.parquet")
result = df.groupby("user_id").agg(pl.col("amount").mean())
# On a single machine, this is often faster than Spark

Dask: For Parallel Pandas

  • Use when: You want Spark-like parallelism but prefer pandas-like syntax
  • Advantage: Easier for data scientists familiar with pandas
  • Downside: Less mature than Spark, fewer optimization opportunities
python
import dask.dataframe as dd
 
df = dd.read_parquet("data/*.parquet")
result = df.groupby("user_id").agg({"amount": "mean"}).compute()

Ray: For ML-Specific Tasks

  • Use when: You need feature computation + model training in one system
  • Advantage: Seamless ML pipeline orchestration, great for distributed hyperparameter tuning
  • Downside: Steeper learning curve
python
import ray
 
@ray.remote
def compute_features(partition):
    # Feature engineering happens on remote workers
    return engineer_features(partition)
 
# Parallelize across workers
results = ray.get([compute_features(p) for p in partitions])

Spark: For Everything Else

  • Use when: Billion-row data, SQL workloads, production stability
  • Advantage: Most mature ecosystem, enterprise support, best performance for massive scale
  • Downside: Higher memory overhead, complexity

Quick decision matrix:

ScalePrimary TaskRecommended
<100GBData explorationPolars
100GB-10TBFeature engineeringSpark
10TB+Data warehouse + MLSpark + Delta Lake
Mixed ML tasksTraining + servingRay
Python preferenceSQL-freeDask

Complete Example: Fraud Detection Pipeline

Here's a production-ready fraud detection feature pipeline with temporal splits and anti-leakage joins:

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler
from pyspark.ml.linalg import Vectors
import datetime
 
spark = SparkSession.builder \
    .appName("FraudDetectionPipeline") \
    .config("spark.sql.shuffle.partitions", 200) \
    .getOrCreate()
 
# 1. Load raw transactions
transactions = spark.read.parquet("s3://data/transactions/2024/")
 
# 2. Add temporal features (rolling stats, no leakage)
window_spec = Window.partitionBy("user_id") \
    .orderBy("timestamp") \
    .rangeBetween(Window.unboundedPreceding, Window.currentRow - 1)
 
features_df = transactions.withColumn(
    "txn_count_7day",
    count("*").over(Window.partitionBy("user_id", window_monthday("timestamp")).orderBy("timestamp").rangeBetween(-7*24*3600, 0))
).withColumn(
    "avg_amount_7day",
    avg("amount").over(Window.partitionBy("user_id").orderBy("timestamp").rangeBetween(-7*24*3600, 0))
).withColumn(
    "amount_deviation",
    (col("amount") - col("avg_amount_7day")) / stddev("amount").over(Window.partitionBy("user_id").orderBy("timestamp").rangeBetween(-7*24*3600, 0))
)
 
# 3. Encode categorical features
indexer = StringIndexer(inputCol="merchant_category", outputCol="category_index", handleInvalid="keep")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
 
# 4. Scale numerical features
scaler = StandardScaler(inputCol="amount", outputCol="amount_scaled", withMean=True, withStd=True)
 
# 5. Assemble final feature vector
assembler = VectorAssembler(
    inputCols=["amount_scaled", "category_vec", "amount_deviation", "txn_count_7day"],
    outputCol="features"
)
 
# 6. Create pipeline
pipeline = Pipeline(stages=[indexer, encoder, scaler, assembler])
 
# 7. Temporal train/test split (no leakage)
train_df = features_df.filter(col("timestamp") < "2024-10-01")
test_df = features_df.filter(col("timestamp") >= "2024-10-01")
 
# 8. Fit pipeline on training data only
pipeline_model = pipeline.fit(train_df)
 
# 9. Transform both sets
train_features = pipeline_model.transform(train_df)
test_features = pipeline_model.transform(test_df)
 
# 10. Save to Delta Lake (ACID compliance + time-travel)
train_features.write.format("delta").mode("overwrite").save("s3://features/fraud/train/")
test_features.write.format("delta").mode("overwrite").save("s3://features/fraud/test/")
 
print(f"✓ Training features: {train_features.count()} rows")
print(f"✓ Test features: {test_features.count()} rows")

Expected Output:

✓ Training features: 1,234,567 rows
✓ Test features: 456,789 rows

Integration with ML Systems

Your features need to go somewhere useful. Here's how to integrate with modern ML stacks:

Export to Parquet for PyTorch

python
train_features.select("features", "is_fraud") \
    .write.format("parquet") \
    .mode("overwrite") \
    .save("s3://features/parquet/train/")
 
# In PyTorch:
# dataset = ParquetDataset("s3://features/parquet/train/")

Store in Delta Lake for Time-Travel

python
# Write with versioning
train_features.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/delta/fraud_features_train")
 
# Later, query a specific point-in-time
spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load("/mnt/delta/fraud_features_train") \
    .show()

Real-time Feature Store with Feast

python
from feast import FeatureStore, FeatureView, Entity, DeltaSource
 
# Define entity and features
user = Entity(name="user_id", value_type=ValueType.INT64)
 
fraud_features = FeatureView(
    name="fraud_features",
    entities=["user_id"],
    features=[Feature(name="txn_count_7day", dtype=ValueType.INT64)],
    input=DeltaSource(path="/mnt/delta/fraud_features_train")
)
 
# Register and serve
fs = FeatureStore(".")
fs.apply([user, fraud_features])
 
# Retrieve at prediction time
features = fs.get_online_features(
    entity_rows=[{"user_id": 12345}],
    features=["fraud_features:txn_count_7day"]
)

Performance Gains with Databricks Photon

If you're on Databricks, Photon (their vectorized execution engine) accelerates Spark SQL by 3-5x:

python
# Just set a config—Photon takes over automatically
spark.conf.set("spark.databricks.photon.enabled", "true")
 
# Same code, 3-5x faster
result = df.groupBy("user_id").agg(
    avg("amount").alias("avg_amount"),
    count("*").alias("txn_count")
)

This is one of the easiest performance wins available.

Debugging and Troubleshooting: Real-World Issues

Even with careful planning, Spark pipelines encounter problems. Here's what we've learned the hard way.

Issue: Out of Memory Errors

Symptom: Jobs fail mid-execution with "GC overhead limit exceeded" or "heap space" errors.

Root causes:

  • Shuffle operations (joins, groupBy) pull too much data into memory at once
  • Intermediate DataFrames aren't being released
  • Partition sizes are too large (one executor gets a 2GB partition and runs out of RAM)

Solutions:

python
# Option 1: Increase shuffle partitions (more, smaller chunks)
spark.conf.set("spark.sql.shuffle.partitions", 500)  # Default is 200
 
# Option 2: Use spilling (overflow to disk gracefully)
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")
 
# Option 3: Repartition to smaller chunks before expensive operations
df_small = df.repartition(1000, "user_id").cache()  # More partitions = less data per partition
df_result = df_small.groupBy("user_id").agg(...)
df_small.unpersist()

Issue: Stragglers (One Slow Partition)

Symptom: Job runs for 45 minutes. Most tasks finish in 30 seconds. One partition takes 40 minutes.

Root cause: Data skew. One category (e.g., "UNKNOWN" merchant) has 1000x more rows than others.

Solution: Salting, shown earlier, redistributes skewed keys.

python
# Monitor task runtimes
spark.sparkContext.statusTracker().getExecutorInfos()  # Check executor load
 
# If skew is in join key, use salting
# If skew is in groupBy key, use repartition
df_skewed = df.repartition(500, "skewed_column")  # Distribute more evenly

Issue: Feature Values Are NaN or Inf

Symptom: After scaling or aggregation, features contain NaN or Infinity.

Root causes:

  • StandardScaler on constant-value column (std = 0, division by zero)
  • Null handling before aggregation (sum of nulls = null)
  • Numeric instability (very large divided by very small)

Detection and fix:

python
from pyspark.sql.functions import isnan, isinf, col
 
# Check for problems before model training
nan_count = df.select([isnan(col(c)).cast("int").alias(f"{c}_nan_count")
                        for c in df.columns]).collect()[0]
 
inf_count = df.select([isinf(col(c)).cast("int").alias(f"{c}_inf_count")
                        for c in df.columns]).collect()[0]
 
print(f"NaN counts: {nan_count}")
print(f"Inf counts: {inf_count}")
 
# Fix: Filter or replace
df_clean = df.filter((~isnan(col("feature"))) & (~isinf(col("feature"))))
 
# Or replace with median
df_clean = df.na.fill(0, ["feature"])  # Replace NaN with 0

Issue: Reproducibility: Same Code, Different Results

Symptom: You run the exact same pipeline twice and get different feature values (especially with sampling).

Root cause: Random seed not set. Features computed with rand() or random sampling change between runs.

Solution:

python
# Set global seed
spark.sparkContext.setRandomSeed(42)
 
# Set seed for DataFrame operations
df = spark.read.parquet(...).sample(fraction=0.1, seed=42)
 
# If using ML algorithms that have randomness
from pyspark.ml import LogisticRegression
lr = LogisticRegression(seed=42)

Issue: Features Work in Dev, Break in Prod

Symptom: Your features look great on test data. In production, you get errors like "Column 'feature_x' not found" or "Unexpected value 'NEW_CATEGORY'".

Root causes:

  • Training data didn't have certain columns/values that production has
  • Schema changed between training and production
  • StringIndexer encountered a category it's never seen

Prevention:

python
# 1. Validate schema before training
expected_schema = ["user_id", "amount", "merchant_category", "timestamp"]
actual_cols = set(df.columns)
missing = set(expected_schema) - actual_cols
if missing:
    raise ValueError(f"Missing columns: {missing}")
 
# 2. Handle unknown categories gracefully
indexer = StringIndexer(
    inputCol="merchant_category",
    outputCol="category_index",
    handleInvalid="keep"  # NEW_CATEGORY → special index
)
 
# 3. Document the feature contract
feature_contract = {
    "user_id": "int64, required",
    "merchant_category": "string, one of [GROCERY, GAS, RESTAURANT, ...], else OTHER",
    "amount": "float64, required, >=0"
}
with open("/models/feature_contract.json", "w") as f:
    json.dump(feature_contract, f)

Best Practices Summary

When building production ML pipelines:

  1. Test edge cases early: What happens with nulls, duplicates, extreme values?
  2. Version your transforms: Save transformers (.pkl, .parquet-columnar-data-ml)) alongside models.
  3. Monitor feature statistics: Track mean, std, min, max over time. Alert on drift.
  4. Log the full DAG: When debugging, knowing which transforms were applied is crucial.
  5. Use Delta Lake: ACID guarantees + time-travel make debugging and rollback possible.
  6. Validate at each stage: Don't wait until the end to check data quality.

Why This Matters in Production

The difference between a feature pipeline that works in notebooks and one operating reliably in production is rigorous validation. When you move to production, pipelines run continuously. Data quality issues silently degrade performance. Missing values and unexpected categories suddenly drop accuracy without obvious warnings.

Consider fraud detection in production. During development you tested on curated data with clean categories. In production, a new integration uses CODE_999. The encoder creates sparse features the model never trained on. Legitimate transactions get flagged. Users complain. This is entirely preventable.

Production pipelines are resilient to unexpected inputs, observable for drift, and handle edge cases explicitly. Version transformations, monitor distributions, and build alerts. The cost of getting this right is modest compared to the cost of getting it wrong.

Key Takeaways

Building ML feature pipelines at scale requires thinking beyond single-machine feature engineering:

  1. Architecture matters: Separate raw data ingestion, transforms, and output clearly. Staging layers prevent downstream breakage.
  2. Temporal splits prevent leakage: Always split by time, and exclude future information when computing features. This is the #1 cause of model failures in production.
  3. Common transforms are standardized: StringIndexer → OneHotEncoder for categories, StandardScaler for numericals. Use off-the-shelf Spark transformers rather than rolling your own.
  4. Optimization is essential: Partition sizing, broadcasting, persistence, and pre-partitioning can 10x your pipeline speed. Profile before optimizing.
  5. Handle edge cases explicitly: Skewed data, nulls, and scaling all need explicit solutions. The default behavior often breaks down at scale.
  6. Integration is the endgame: Delta Lake for versioning, Parquet for ML frameworks, Feast) for real-time serving. Don't let features sit in isolation.
  7. Monitor and version everything: Feature drift happens. Schema changes happen. Version your pipelines and track feature statistics over time.

Start simple - encode categories, scale numbers, compute basic aggregations. As your pipeline grows, add temporal features, handle skew, and optimize. The framework is all here. And when you're ready to scale beyond Spark, you'll understand the principles that carry over to Ray, Dask, or whatever emerges next.

The Cultural Shift to Feature-First ML

Many teams begin their ML journey focused on models. They ask "which algorithm should we use?" or "how do we tune hyperparameters?" These are reasonable questions, but they're backwards. The real differentiator in production ML systems is features, not models.

Think about how a practitioner's skills evolve. Early in their career, they're excited about algorithms. They study neural networks, ensemble methods, gradient boosting. They build impressive models on clean datasets. But as they encounter real production systems, they discover something humbling: an 80% accurate model trained on carefully engineered features beats a 92% accurate model trained on raw data.

This realization transforms how you approach ML projects. You stop asking "how accurate is my model?" and start asking "how good are my features?" You invest in feature pipelines the way mature software teams invest in API design. You document features the way you document code. You version features alongside models. You treat feature quality as a first-class concern.

Organizations that embrace this shift move faster. They spend less time tuning models (because good features do half the work) and more time shipping. When a model underperforms in production, they debug features first - "Is the 7-day rolling average correct?" - before assuming there's a modeling problem. This is a more efficient discovery process.

Feature Pipeline as Organizational Infrastructure

In large organizations, feature pipelines become shared infrastructure. Instead of each team building their own features, there's a central data platform that computes features once and serves them to multiple models. This is the concept behind feature stores - centralized repositories of versioned, validated features.

The transition to this model is profound. It forces conversations about data standardization. You can't have "Amount" mean different things in different contexts. You have to define it precisely once: "Amount is the transaction value in USD, rounded to nearest cent, excluding fees." Every model uses the same definition. This consistency prevents subtle bugs and makes models more comparable.

It also enables feature reuse. You compute "user_7day_average_transaction" once. A fraud model uses it. A churn model uses it. A recommendation model uses it. You're not computing the same feature three times in three different pipelines. This saves compute resources and ensures consistency.

The organizational benefit extends beyond efficiency. When features are centralized and versioned, you can debug model failures more effectively. If your fraud model's performance drops, you can ask: "Which features changed? When?" The audit trail tells you whether it's a feature change or a model change. This is how you maintain confidence as systems grow complex.

Operational Maturity in Feature Pipelines

As your feature pipelines move to production, operations become critical. A data warehouse can be down for hours with limited impact. A feature pipeline being down for minutes can corrupt thousands of models' predictions.

Mature teams approach feature pipeline operations with the rigor of database teams. They monitor pipeline latency (how long does feature computation take?). They monitor feature distributions (are values within expected ranges?). They have runbooks for common failure modes. They practice disaster recovery (if this pipeline breaks, what's our rollback plan?).

They also instrument their pipelines for debugging. Rather than just logging success or failure, they log intermediate results at key stages. When a pipeline fails, they can see exactly where the computation diverged. Did feature encoding break? Did aggregations compute wrong? Did null handling fail? This visibility is invaluable for root-cause analysis.

Advanced teams implement feature pipeline testing. Every new feature goes through validation before production deployment. Does it correlate with the target? Does it reduce model accuracy when combined with existing features? Does its distribution look reasonable? These tests catch problems before they affect production models.


Need help implementing this?

We build automation systems like this for clients every day.

Discuss Your Project