Checkpointing in Spark Structured Streaming

In Structured Streaming, if you enable checkpointing for a streaming query, then you can restart the query after a failure and the restarted query will continue where the failed one left off, while ensuring fault tolerance and data consistency guarantees.

Why needed?

The primary goal of checkpointing is to ensure the fault-tolerance of streaming jobs. Thanks to the metadata stored in checkpoint files you will be able to restart your processing in case of any failure - business logic or technical error.

Checkpoints are also important to guarantee at-least once processing in case of any failure in the middle of currently processed micro-batch.

Enable checkpointing

To enable checkpointing, set the option checkpointLocation to a HDFS or cloud storage path. For example:

streamDataFrame.writeStream
  .format("csv")
  .option("path", "event/outputStoragePath")
  .option("checkpointLocation", "event/checkpointPath")
  .start()

What is stored in Checkpoint Path

Checkpoint is a physical directory and responsible for storing 4 types of directories:

  • source : files in this directory contain the information about different sources used in the streaming query.
  • offsets : The Offset directory contains a file with information about data that will be processed in given micro-batch execution. It’s generated before the physical execution of the micro-batch.
  • commits : The Commits directory is a marker kind of file and generated for each micro-batch.
  • state : it’ll be responsible for the storage of state generated by stateful processing logic.

How checkpoint enforces exactly once delivery guarantee in File Sink

Let’s see how checkpoint works in File Sink.

streamDataFrame.writeStream
  .format("csv")
  .option("path", "event/outputStoragePath")
  .option("checkpointLocation", "event/checkpointPath")
  .start()

In the above example we have provided Checkpoint directory and Storage directory Paths.

  • Under the Checkpoint directory : will generate offsets directory and commits directory.
  • Under the Storage directory : will generate actual data files and _spark_metadata directory.

what happens when we start processing :

First Step : Creates a file for Micro Batch One under Offsets directory. It contains offsets against which spark extract from the source.

Second Step : Next step it will extract the data from source and store the output files under Storage directory. The number of output files are depends on the number of partitions.

Third Step : Next step it will create file under _spark_metadata directory and store file names which have been written for Micro Batch one.

Fourth Step : Finally it will create a file under Commits folder once Micro Batch one is committed.

These Steps will executed for each Micro Batch run.

what happens in case of job failure:

Suppose job starts processing again for Micro Batch two, a file is created under offsets, and it then starts to write the data to storage directory. Assume one partial file is written, and there is a job failure. This means partial output has been written out and files are not created under _spark_metadata and Commits directory due to job failure.

  • when the query is restarted, Spark will simply check whether the last written offset has the corresponding commit log. in this case we don’t have the commit log because job was fail in between due to some reason.
  • Next spark extracts the Micro Batch two offsets again and then right to more files in the storage directory.
  • Next spark will create files under _spark_metadata and Commits directory for restarted Micro Batch two.

At the end of streaming job

  • Storage directory will contain 3 output files (suppose one file for per batch) for Micro Batch one, Failed Micro Batch two and restarted Micro Batch two.
  • _spark_metadata directory under Storage directory will contain only 2 files for Micro Batch one and restarted Micro Batch two.

when we wants to read data from storage directory Spark first check _spark_metadata directory because it only reads the output file which have an entry in _spark_metadata directory this ensures that no duplicate data is read.

Sample Code here

Comments