Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

The Power BI Data Visualization World Championships is back! It's time to submit your entry. Live now!

AparnaRamakris

Batch at Streaming Speed: Simplifying File Processing with Spark Structured Streaming

For years, many teams assumed Spark Structured Streaming was only for real‑time IOT or sensor data and can be used for real time intelligence and reporting.In practice,spark structured streaming has options which makes batch processing much more simpler, safer, and more maintainable.There are three main features which lets you accomplish this ,foreachBatch checkpointing and Trigger Options .Lets look at this one by one.
 
foreachBatch:
 
The foreachBatch  operation enables developers to perform batch-like operations on streaming data. Instead of processing each individual record, which can be inefficient, foreachBatch processes the data in micro-batches, offering better performance and resource utilization.This approach also provides the flexibility to leverage the full power of Spark's DataFrames, including various transformations and aggregations, to perform complex computations on streaming data.
 
foreachBatch provides at‑least‑once delivery by default. For exactly‑once outcomes, make your sink logic idempotent (e.g., MERGE on natural keys, or dedupe using the provided batchId
 
Checkpointing:
 
Traditionally ETL pipelines relied on manual file scanning ,checking modified timestamps or last-inserted markers to detect new data. This approach was brittle and error-prone.Engineers had to maintain custom logic, handle race conditions, and ensure no file was missed or processed twice. Scaling this across multiple folders or partitions meant complex orchestration and constant monitoring.With Spark Structured Streaming and checkpointing ,the engine automatically tracks progress and manages state. Every processed file is recorded in the checkpoint, so new files are picked up seamlessly without manual bookkeeping. This eliminates duplicate processing, simplifies recovery after failures, and makes incremental ingestion effortless. Instead of writing custom scripts for file detection, you define your source and let Spark handle the rest ,bringing reliability and simplicity to batch and streaming pipelines alike.
 
Features of checkpointing:
 
  • Automatic file pickup: new files are detected and processed without custom “last modified” scanners.
  • Failure recovery: the engine continues from the last committed progress.
  • Trigger switching  is supported across runs with the same checkpoint

 

trigger(availableNow = True)

 

This trigger option processes all data currently available (files visible at start or discovered during the run) as a sequence of micro‑batches, then stops automatically which is perfect for catch‑ups, daily slices, or “drain the backlog” jobs.In Microsoft Fabric, using streaming for batch via availableNow can lower CU consumption compared to always‑on streaming (since compute isn’t permanently running). 

 

 Putting it all together in the code sample below .This can be created as a Fabric Notebook and scheduled using Fabric Pipelines 

 

# Define your schema

from pyspark.sql.types import StructField,StructType,StringType,IntegerType,FloatType
file_location = <full file path>
checkpoint_location = <checkpoint path>

data_schema = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("rate_code", IntegerType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_time_in_secs",IntegerType(),True),
    StructField("trip_distance",FloatType(),True),
    StructField("payment_type",StringType(),True),
    StructField("fare_amount",FloatType(),True)
    # Add all your columns here
])

df_stream = (
            myspark.readStream
            .format("csv")
            .option("Header",True)
            .schema(data_schema)
            .load(f"{file_location}/*.csv")
            )

def write_to_table(batch_df,batch_id):
    batch_df.write.mode("append").saveAsTable("taxi_data_stream_triggermode")
                                               
query=(
            df_stream.writeStream
            .foreachBatch(write_to_table)
            .option("checkpointLocation",f"{checkpoint_location}/")
            .trigger(availableNow=True)
            .start()
        )

print(query.status)
print(query.recentProgress)

 

Have you used or encountered specific use-case of structured streaming do comment on the post and share the knowledge with the wider community.