When building data pipelines on cloud platforms like AWS (S3), GCP (GCS), or Azure (ADLS), one of the earliest decisions you make is: how will Spark discover and read new files? There are two primary mechanisms — Directory Listing and SQS-based (Event-Driven) ingestion. They look similar on the surface but behave very differently at scale.
The two strategies at a glance
- Spark calls the cloud storage API to
LISTall objects under a path - Works everywhere — no extra setup
- Expensive at scale: one
LISTper trigger, per prefix - Latency grows linearly with file count
- Cloud storage emits
ObjectCreatedevents to a queue (SQS, Pub/Sub) - Spark reads the queue — no listing at all
- O(1) discovery regardless of total file count
- Requires infrastructure setup upfront
Method 1 — directory listing
This is Spark’s default and requires zero infrastructure. On every trigger, the driver issues ListObjectsV2 calls against S3 (or equivalent) to discover which files are new since the last checkpoint. Spark then filters out already-processed files and schedules tasks for the new ones.
Spark (or the Delta/Structured Streaming engine) actively calls the cloud storage API to list all objects in a given path. It scans for new, modified, or unprocessed files each time a micro-batch or batch job runs.
LIST s3://bucket/path/
→
S3 / GCS / ADLS
→
Returns file manifest
→
Spark reads files
Step-by-step flow
s3://data-lake/events/date=2024-10-01/).
Where it’s used
Spark Structured Streaming with cloudFiles (Auto Loader in “directory listing” mode),
batch spark.read, or readStream on a folder path.
# Directory listing — Structured Streaming
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.option("cloudFiles.useNotifications", "false") # default — listing mode
.load("s3://my-bucket/events/")A bucket with 10 million objects can take minutes just to list, even if only a few new files arrived. S3 charges per
LIST request, so high-frequency triggers quickly rack up API costs.
This mode works fine for small buckets or low-frequency batch jobs — it degrades badly at scale.
Method 2 — SQS-based (event-driven) ingestion
Rather than polling storage for what changed, Spark subscribes to a queue that cloud storage pushes change events into. On S3, you configure an S3 Event Notification that sends s3:ObjectCreated:* events to an SQS queue. Spark’s cloudFiles source then reads from the queue — it only sees files that actually arrived since the last trigger, with no listing overhead.
Spark subscribes to a queue (AWS SQS, Azure Event Grid, or GCS Pub/Sub) that receives real-time object creation notifications. Spark only processes files it is explicitly notified about.
Step-by-step flow
s3:ObjectCreated event.
Where it’s used
Databricks Auto Loader in fileNotificationMode, custom Spark streaming with SQS source connector,
or Lambda-triggered pipelines feeding Spark.
#SQS notification mode
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.option("cloudFiles.useNotifications", "true") # SQS mode
.option("cloudFiles.region", "us-east-1")
.option("cloudFiles.queueUrl", "https://sqs.us-east-1.amazonaws.com/123456/my-queue")
.load("s3://my-bucket/events/")The SQS queue holds a backlog of exactly which files were created. Spark dequeues only the messages it hasn’t processed yet — O(1) per new file regardless of total bucket size. No listing, no pagination, no driver bottleneck.
Infrastructure you need to set up
Before enabling notification mode you need: an SQS queue in the same region as your bucket, an S3 Event Notification configured on the bucket to push s3:ObjectCreated:* to that queue, and IAM permissions for the Spark role to call sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueAttributes.
Minimal IAM policy (attach to Spark execution role)
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility"
],
"Resource": "arn:aws:sqs:us-east-1:123456:my-queue"
}
Understanding cloudFiles.includeExistingFiles
Understanding cloudFiles.includeExistingFiles
This option controls what happens on the very first run of your Auto Loader stream — should Spark read files that already exist in the S3 path, or only watch for files that arrive from now onwards?
The default value is
true — meaning Auto Loader
reads all existing files first, then continues watching for new ones.
You only need to set it explicitly to
false if you want to process only new files created after the stream starts.
⚠️ Important:
This option only takes effect on the very first run — when no checkpoint exists yet. Once the stream has started and a checkpoint is written, changing this option has no effect. The checkpoint already knows which files were processed or skipped.
If you need to change from false → true on a running pipeline, you must delete the checkpoint and restart. But be careful — this causes a full reprocess of all files.
Understanding modifiedAfter
It filters on S3 object metadata, not on data inside the file.
The timestamp it checks is when the file was uploaded to S3, not the
event_time column in your Parquet/JSON data.
This is a very common source of confusion.
This option tells Auto Loader to only process files whose last-modified timestamp is after a specific date and time. It acts as a time-based filter on top of whatever files exist in S3 — older files are completely ignored, even if includeExistingFiles is true.
Format is UTC timestamp string:
"yyyy-MM-dd'T'HH:mm:ss" — for example .
"2024-06-01T00:00:00". Files modified before this timestamp are silently skipped.
Three important relationships to remember:
-
includeExistingFiles= true + nomodifiedAfter→ It will process ALL existing files in S3, starting from the earliest available data (not latest) -
includeExistingFiles= true +modifiedAfter→ It will start from files whose S3 LastModified time is AFTER your timestamp — this is the most useful combination -
includeExistingFiles= false +modifiedAfter→modifiedAfteris redundant. It skips all existing files and processes only new files that arrive after the stream starts, not historical files.
Its companion option is modifiedBefore — together they define a strict time window for
compliance or reproducible backfills.
And just like includeExistingFiles, both modifiedAfter and modifiedBefore
are locked by the checkpoint on first run — changing them later has no effect.

Leave a Reply