Reading Cloud Files in Spark: Directory Listing vs SQS

When building data pipelines on cloud platforms like AWS (S3), GCP (GCS), or Azure (ADLS), one of the earliest decisions you make is: how will Spark discover and read new files? There are two primary mechanisms — Directory Listing and SQS-based (Event-Driven) ingestion. They look similar on the surface but behave very differently at scale.

The two strategies at a glance

Method 1
Directory listing
  • Spark calls the cloud storage API to LIST all objects under a path
  • Works everywhere — no extra setup
  • Expensive at scale: one LIST per trigger, per prefix
  • Latency grows linearly with file count
Method 2
SQS / event-driven
  • Cloud storage emits ObjectCreated events to a queue (SQS, Pub/Sub)
  • Spark reads the queue — no listing at all
  • O(1) discovery regardless of total file count
  • Requires infrastructure setup upfront

Method 1 — directory listing

This is Spark’s default and requires zero infrastructure. On every trigger, the driver issues ListObjectsV2 calls against S3 (or equivalent) to discover which files are new since the last checkpoint. Spark then filters out already-processed files and schedules tasks for the new ones.

How it works

Spark (or the Delta/Structured Streaming engine) actively calls the cloud storage API to list all objects in a given path. It scans for new, modified, or unprocessed files each time a micro-batch or batch job runs.

Spark Driver LIST s3://bucket/path/ S3 / GCS / ADLS Returns file manifest Spark reads files

Step-by-step flow

1
Spark triggers a LIST API call to the cloud storage prefix (s3://data-lake/events/date=2024-10-01/).
2
The storage service returns a paginated list of all object keys under that prefix.
3
Spark compares the returned list against previously processed files (tracked in checkpoint or Delta log).
4
New or unprocessed files are scheduled for reading and distributed across executors.
5
On the next trigger, the entire listing process repeats from scratch.

Where it’s used

Spark Structured Streaming with cloudFiles (Auto Loader in “directory listing” mode), batch spark.read, or readStream on a folder path.

Python
# Directory listing — Structured Streaming
df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.useNotifications", "false")  # default — listing mode
    .load("s3://my-bucket/events/")
⚠️ The hidden cost:
A bucket with 10 million objects can take minutes just to list, even if only a few new files arrived. S3 charges per LIST request, so high-frequency triggers quickly rack up API costs. This mode works fine for small buckets or low-frequency batch jobs — it degrades badly at scale.

Method 2 — SQS-based (event-driven) ingestion

Rather than polling storage for what changed, Spark subscribes to a queue that cloud storage pushes change events into. On S3, you configure an S3 Event Notification that sends s3:ObjectCreated:* events to an SQS queue. Spark’s cloudFiles source then reads from the queue — it only sees files that actually arrived since the last trigger, with no listing overhead.

How it works

Spark subscribes to a queue (AWS SQS, Azure Event Grid, or GCS Pub/Sub) that receives real-time object creation notifications. Spark only processes files it is explicitly notified about.

New file lands in S3 S3 Event Notification SQS Queue Auto Loader / Spark Reads file

Step-by-step flow

1
A file is uploaded to S3 (or GCS / ADLS). The storage service emits an s3:ObjectCreated event.
2
The event is pushed to an SNS topic (optional fan-out) and then delivered to an SQS queue.
3
Spark (via Databricks Auto Loader in “file notification” mode) polls SQS at low cost and reads only the event messages — not the full directory.
4
The file path from the event message is used directly to read only that specific file.
5
Processed messages are deleted from SQS. Spark checkpoints the state.

Where it’s used

Databricks Auto Loader in fileNotificationMode, custom Spark streaming with SQS source connector, or Lambda-triggered pipelines feeding Spark.

Python
#SQS notification mode
df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.useNotifications", "true")   # SQS mode
    .option("cloudFiles.region", "us-east-1")
    .option("cloudFiles.queueUrl", "https://sqs.us-east-1.amazonaws.com/123456/my-queue")
    .load("s3://my-bucket/events/")
✓ Why this scales:
The SQS queue holds a backlog of exactly which files were created. Spark dequeues only the messages it hasn’t processed yet — O(1) per new file regardless of total bucket size. No listing, no pagination, no driver bottleneck.

Infrastructure you need to set up

Before enabling notification mode you need: an SQS queue in the same region as your bucket, an S3 Event Notification configured on the bucket to push s3:ObjectCreated:* to that queue, and IAM permissions for the Spark role to call sqs:ReceiveMessagesqs:DeleteMessage, and sqs:GetQueueAttributes.

Minimal IAM policy (attach to Spark execution role)

{
  "Effect": "Allow",
  "Action": [
    "sqs:ReceiveMessage",
    "sqs:DeleteMessage",
    "sqs:GetQueueAttributes",
    "sqs:ChangeMessageVisibility"
  ],
  "Resource": "arn:aws:sqs:us-east-1:123456:my-queue"
}

Understanding cloudFiles.includeExistingFiles

Understanding cloudFiles.includeExistingFiles

This option controls what happens on the very first run of your Auto Loader stream — should Spark read files that already exist in the S3 path, or only watch for files that arrive from now onwards?

✓ Why this scales:
The default value is true — meaning Auto Loader reads all existing files first, then continues watching for new ones. You only need to set it explicitly to false if you want to process only new files created after the stream starts.

⚠️ Important:
This option only takes effect on the very first run — when no checkpoint exists yet. Once the stream has started and a checkpoint is written, changing this option has no effect. The checkpoint already knows which files were processed or skipped.

If you need to change from false → true on a running pipeline, you must delete the checkpoint and restart. But be careful — this causes a full reprocess of all files.

Understanding modifiedAfter

It filters on S3 object metadata, not on data inside the file. The timestamp it checks is when the file was uploaded to S3, not the event_time column in your Parquet/JSON data. This is a very common source of confusion.

This option tells Auto Loader to only process files whose last-modified timestamp is after a specific date and time. It acts as a time-based filter on top of whatever files exist in S3 — older files are completely ignored, even if includeExistingFiles is true.

✓ Why this scales:
Format is UTC timestamp string: "yyyy-MM-dd'T'HH:mm:ss" — for example . "2024-06-01T00:00:00". Files modified before this timestamp are silently skipped.

Three important relationships to remember:

  1. includeExistingFiles = true + no modifiedAfter → It will process ALL existing files in S3, starting from the earliest available data (not latest)
  2. includeExistingFiles = true + modifiedAfter → It will start from files whose S3 LastModified time is AFTER your timestamp — this is the most useful combination
  3. includeExistingFiles = false + modifiedAftermodifiedAfter is redundant. It skips all existing files and processes only new files that arrive after the stream starts, not historical files.

Its companion option is modifiedBefore — together they define a strict time window for compliance or reproducible backfills.

And just like includeExistingFiles, both modifiedAfter and modifiedBefore are locked by the checkpoint on first run — changing them later has no effect.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *