1)What is the best way to join large data with a small lookup table?
Ans: Use Broadcast join (or Map side join in Hive)
Syntax:
#Pyspark
from pyspark.sql.functions import broadcast
result = large_df.join(
broadcast(small_lookup_df),
on="country_code",
how="left"
)
//Scala
import org.apache.spark.sql.functions.broadcast
val result = largeDf.join(
broadcast(smallLookupDf),
Seq("country_code"),
"left"
)
--SQL
SELECT /*+ BROADCAST(lookup) */ t.*, l.country_name
FROM transactions t
JOIN country_lookup l
ON t.country_code = l.code
- Joining billions of transaction rows with a 200-row country code table.
- Enriching a large event log with a small product catalog.
- Mapping user IDs to subscription tiers stored in a small config table.
- Adding currency labels to a large financial dataset from a 50-row currency reference.
Sometimes, a broadcast join may not be applied even when one of the tables is very small. You might expect Spark to automatically use a broadcast join, but that’s not always the case.
Click here to learn when and why broadcast joins might not work.
2)How do you optimize spark jobs?
- Use broadcast join when one side is a small lookup table — wrap it with broadcast()
- Fix data skew by salting the skewed key to spread it across partitions evenly
- Use DataFrames over RDDs — the Catalyst optimizer can’t optimize RDDs
- Avoid UDFs wherever possible — they break Catalyst optimization and serialize data to Python; use built-in Spark SQL functions instead
- Push filters early — filter rows as close to the data source as possible before any joins or aggregations
- Reduce shuffle operations — every groupBy, join, and repartition triggers a shuffle; minimize them
- Use repartition() before large joins on the join key to co-locate matching rows
- Use coalesce() instead of repartition() when reducing partition count — it avoids a full shuffle
- Cache or persist intermediate DataFrames that are reused multiple times in the same job
- Use columnar file formats — Parquet or ORC over CSV or JSON — and enable predicate pushdown
- Select only the columns you need early — don’t carry unused columns through the whole pipeline
- Tune the number of partitions — aim for partition sizes of 100–200 MB; too many small partitions is as bad as too few large ones
- Use mapPartitions instead of map when initializing expensive objects like DB connections
- Monitor the Spark UI — look at the DAG, shuffle read/write sizes, and task duration skew to find the real bottleneck
3)Once you cache a dataframe, how do you clear the cache?
Clear a specific DataFrame:
df.unpersist()
Clear everything cached in the session:
spark.catalog.clearCache()
That’s it. unpersist() is for targeted cleanup, clearCache() is the nuclear option that wipes everything at once.
4)When you create a dataframe, where does it get stored?
DataFrame itself is not stored anywhere when you create it — it’s just a execution plan (a set of instructions). Nothing actually runs or gets stored until you call an action like .show(), .count(), or .write().
This is Spark’s lazy evaluation — defining a DataFrame just builds the DAG, it doesn’t touch any data.
When you do trigger an action, the data lives in:
-
- Executor memory (RAM) — by default, processed data lives temporarily in the memory of worker nodes during computation
- Executor disk — if data spills over from RAM during a shuffle or sort
- Off-heap memory — if configured, Spark can store data outside JVM heap to reduce GC pressure
Only when you explicitly call .cache() or .persist() does Spark hold the data in memory across actions — otherwise it recomputes from scratch every time an action is called
5) Left anti and left semi join in spark
Left Semi Join Returns rows from the left table where a match exists in the right table — but only the left table’s columns are returned.
left.join(right, on="id", how="left_semi")
Think of it as: “give me all left rows that have a match on the right” — like a filter using the right table.
Left Anti Join Returns rows from the left table where no match exists in the right table.
left.join(right, on="id", how="left_anti")
Think of it as: “give me all left rows that have no match on the right” — the exact opposite of semi.
6) What is Adaptive query execution?
Adaptive Query Execution (AQE) is a Spark feature (introduced in Spark 3.0) that re-optimizes the query plan at runtime based on actual data statistics — instead of relying only on estimates made before the job starts.
Enable it with:
spark.conf.set("spark.sql.adaptive.enabled", "true")
What it does automatically:
1. Dynamically coalesces shuffle partitions After a shuffle, if many partitions are small, AQE merges them into fewer, larger ones — avoiding thousands of tiny tasks.
2. Dynamically switches join strategies If Spark estimated a table to be large but it turns out small after filtering, AQE switches it to a broadcast join automatically — no manual hint needed.
3. Dynamically handles skew Detects skewed partitions at runtime and splits them into smaller sub-partitions to balance the workload.
Why it matters:
Without AQE, Spark’s optimizer makes decisions based on statistics collected before the job runs — which are often wrong or outdated. With AQE, it looks at the actual shuffle output sizes mid-job and adjusts on the fly.
7) Parquet vs Delta file format
| Parquet | Delta | |
| Format | Open columnar file format | Parquet files + transaction log (_delta_log) |
| ACID transactions | No | Yes |
| Updates / Deletes | Not supported natively | Supported (UPDATE, DELETE, MERGE) |
| Schema evolution | Limited | Full support |
| Time travel | No | Yes — query older versions of data |
| Streaming + batch | Batch only | Both unified |
| Data versioning | No | Yes — full history in transaction log |
| Performance | Fast reads | Fast reads + optimized with Z-ordering, compaction |
| Compaction | Manual | Built-in (OPTIMIZE command) |
| Vacuum / cleanup | Manual | Built-in (VACUUM command) |
| Compatibility | Fast reads | Needs Delta engine (Spark, Databricks, etc.) |
Parquet is a file format. Delta is a storage layer built on top of Parquet that adds reliability, versioning, and mutability.
If you’re just reading static data — Parquet is fine. If you’re building a data pipeline where data changes, gets updated, or needs to be reliable — use Delta.
8) What is the difference between cache() and persist()?
cache() is just a shorthand for persist() with the default storage level MEMORY_AND_DISK.
persist() gives you control over the storage level — memory only, disk only, off-heap, serialized, etc.
Use persist() when you need fine-grained control over where and how the data is stored.
9) What is the difference between repartition and coalesce?
Both change the number of partitions — but repartition() does a full shuffle and can increase or decrease partitions, while coalesce() avoids a shuffle by merging existing partitions and can only decrease them. Use coalesce() when reducing partition count to save the shuffle cost.
10) What happens during a shuffle in Spark?
- A shuffle is triggered by operations like
groupBy,join,distinct, andrepartition. - Spark writes intermediate data to disk on each executor, then transfers it across the network to redistribute rows by key.
- It’s the most expensive operation in Spark — it involves disk I/O, network transfer, and deserialization. Minimizing shuffles is one of the core goals of Spark optimization.

Leave a Reply