Why broadcast() doesn’t work inside Delta Lake’s merge() — and what to do instead

A common gotcha when optimizing Delta merge performance: the broadcast hint you pass to merge() is silently ignored. Here’s why, and the pattern that actually works.

The problem

When working with large Delta tables, it’s tempting to wrap your source DataFrame in broadcast() before passing it into merge() — hoping Spark will use a broadcast join under the hood and skip the expensive shuffle.

Python
from delta.tables import DeltaTable
from pyspark.sql.functions import broadcast

delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")
source_df   = spark.read.parquet("/path/to/source")

delta_table.alias("target") \
    .merge(
        broadcast(source_df).alias("source"),  # ← hint is silently ignored
        "target.id = source.id"
    ) \
    .whenNotMatchedInsertAll() \
    .execute()
⚠️ Why it fails:
Delta Lake’s merge() plans its own join strategy through its internal optimizer. It expects a plain DataFrame — any Spark broadcast hint applied beforehand is stripped and never reaches the physical plan.

The fix: anti-join + append

If your goal is only to insert non-matching records, skip merge() entirely. Use a left_anti join to isolate new rows — this is where broadcast() works perfectly — then append them directly to the Delta table.

Python
target_df  = spark.read.format("delta").load("/path/to/delta/table")
source_df  = spark.read.parquet("/path/to/source")

# broadcast() works here — applied to a standard Spark join
non_matching = source_df.join(
    broadcast(target_df.select("id")),  # ✓ broadcast on the smaller side
    on="id",
    how="left_anti"                       # keep rows NOT in target
)

non_matching.write \
    .format("delta") \
    .mode("append") \
    .save("/path/to/delta/table")
💡 Tip:
Only select the key columns when broadcasting the target: target_df.select("id"). This keeps the broadcast payload as small as possible and avoids broadcasting unnecessary columns.

When each approach applies

broadcast() inside merge()
Hint is silently ignored. Delta’s optimizer controls the join plan — it never sees the hint you applied at the DataFrame API level.
Anti-join + append
Broadcast hint applies on a regular Spark join. Faster, simpler, and avoids merge overhead entirely when you only need inserts.

If you need both updates and inserts, stick with merge() — but use the SQL /*+ BROADCAST(source) */ hint inside a spark.sql() call, which Delta’s SQL parser does respect. For insert-only workloads though, the anti-join pattern is the cleaner and more performant choice every time.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *