I was following the example https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-spark-streaming-from-pubsublite
But I have a question, In my test when query.stop() is called, it just interrupt the pipelime immediately. My question is how do I make sure everything I have ingested are committed in subscription so when next time I start the job it start from where I have processed?
In Spark Streaming with Google Cloud Pub/Sub Lite, effective message acknowledgment and checkpointing are critical for ensuring reliable data processing. When Spark processes a message from Pub/Sub Lite, it does not automatically remove it from the subscription. Instead, you must explicitly acknowledge each message to confirm successful processing.
Spark Streaming leverages checkpointing to maintain its progress. By periodically saving the offset (position) of the last processed message, Spark ensures that if an application fails or restarts, it can resume from the last checkpoint, thereby avoiding the reprocessing of previously handled messages.
To guarantee exactly-once processing and avoid duplicates, several steps must be followed. Firstly, enable checkpointing in your Spark Structured Streaming application by configuring a reliable storage location, such as Google Cloud Storage or HDFS, for storing checkpoint data:
// ... your Spark session creation ...
spark.conf.set("spark.sql.streaming.checkpointLocation", "gs://your-bucket/checkpoints")
Explicit acknowledgment of messages is also crucial. In your message processing logic, acknowledge messages only after successful processing. For instance, when using the pubsublite-spark-sql-streaming
connector, you can explicitly acknowledge messages as shown below:
import com.google.cloud.pubsublite.spark.PslSparkUtils
query.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// Process your batchDF
// ...
// Acknowledge messages after processing
PslSparkUtils.acknowledge(batchDF, "pubsublite.subscription")
}
.start()
For graceful shutdowns, ensure that the Spark application completes processing the current micro-batch before stopping. This can be achieved by calling query.awaitTermination()
after query.stop()
, ensuring Spark waits for the micro-batch to finish processing and commit its checkpoint before shutting down:
query.stop() // Request shutdown
query.awaitTermination() // Wait for the graceful shutdown
Several important considerations must be kept in mind. While the above steps aim for exactly-once processing, the distributed nature of Spark and Pub/Sub Lite means that occasional duplicates may occur in rare failure scenarios. If strict duplicate elimination is required, additional deduplication logic should be implemented.
Pub/Sub Lite guarantees at-least-once message delivery. By combining this with proper acknowledgment and checkpointing, you can achieve exactly-once processing within your application. Additionally, be prepared to handle errors during message processing. Implementing retry mechanisms and delaying acknowledgment until successful retries are completed can help manage errors effectively.
Thanks! But do I have to explicitly acknowledge the the messages? Or I can just rely on the library to commit the offset?
And in my test
query.stop() // Request shutdown query.awaitTermination() // Wait for the graceful shutdown
seems not gracefylly shutdown the streaming application, at least I see the parquet file it outputs is corrupted. I guess it didn't wait until the write complete flush all the data?
@ms4446 In my test
query.stop() // Request shutdown query.awaitTermination() // Wait for the graceful shutdown
Doesn't really gracefully shutdown, it doesn commit the offset at all for all data that has been processed. And it will repeated consume data.
I don't find
PslSparkUtils.acknowledge(batchDF, "pubsublite.subscription")
Is it some code I should implement myself?
In Spark Streaming with Pub/Sub Lite, the process of committing offsets is typically managed by the library itself, and explicit acknowledgment is generally not required. However, if the default behavior is not committing offsets as expected, some additional steps may be necessary to ensure proper offset management.
If query.stop() and query.awaitTermination() are not committing offsets correctly, leading to repeated data consumption, consider the following approach:
// Configure checkpointing
val query = df.writeStream
.format("console")
.option("checkpointLocation", "gs://your-bucket/checkpoints")
.start()
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
val spark = SparkSession.builder.appName("PubSubLiteExample").getOrCreate()
val df = spark.readStream
.format("pubsublite")
.option("project", "your-project-id")
.option("subscription", "your-subscription-id")
.load()
val query: StreamingQuery = df.writeStream
.foreachBatch { (batchDF, batchId) =>
// Process your batchDF
batchDF.show()
// Explicitly commit offsets if needed
// Your custom code for committing offsets, if required
}
.option("checkpointLocation", "gs://your-bucket/checkpoints")
.start()
query.awaitTermination()
import scala.concurrent.duration._
val timeout = Duration(60, "seconds")
query.stop() // Request shutdown
query.awaitTermination(timeout.toMillis) // Wait for graceful shutdown
The PslSparkUtils.acknowledge function mentioned earlier does not exist in the Pub/Sub Lite Spark library. Instead, offset management is typically handled internally by the library. If you find that the library isn't committing offsets as expected, consider raising an issue with Google Cloud support or the library maintainers to address this behavior.
Please note explicit acknowledgment is generally not required as the Pub/Sub Lite Spark connector should handle offset committing internally. However, if offsets are not being committed as expected, ensure that checkpointing is correctly configured, use foreachBatch for finer control over processing, and handle graceful shutdown appropriately. If issues persist, consider reaching out to Google Cloud support for further assistance.
My code is very similar
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
val spark = SparkSession.builder.appName("PubSubLiteExample").getOrCreate()
val df = spark.readStream
.format("pubsublite")
.option("project", "your-project-id")
.option("subscription", "your-subscription-id")
.load()
val query: StreamingQuery = df.writeStream
.format("parquet")
.option("path", savePath)
.option("checkpointLocation", "gs://your-bucket/checkpoints")
.start()
query.stop()
query.awaitTermination()
I found 2 issues from the code above, one is the parquet is nor properly closed. I do get corrupted parquet file. The other is the processed data(saved or flushed in disk) is not committed in subscription and when I start the pipeline above again there are always duplicates and even checkpoint doesn't really help
Issue 1: Corrupted Parquet Files
To ensure that Parquet files are properly closed and not corrupted, you need to allow Spark to finish writing the current batch before stopping the query. Using a graceful shutdown approach helps in properly closing the files.
Issue 2: Offsets Not Committed
To ensure that offsets are committed after processing, you might need to make sure that the processing logic within each micro-batch completes successfully before stopping the query.
Improved Approach
Here's a revised version of your code with proper handling for graceful shutdown and ensuring the offsets are committed:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
val spark = SparkSession.builder.appName("PubSubLiteExample").getOrCreate()
val df = spark.readStream
.format("pubsublite")
.option("project", "your-project-id")
.option("subscription", "your-subscription-id")
.load()
val query: StreamingQuery = df.writeStream
.format("parquet")
.option("path", savePath)
.option("checkpointLocation", "gs://your-bucket/checkpoints")
.start()
// Instead of stopping immediately, wait for the termination gracefully
sys.ShutdownHookThread {
println("Gracefully stopping Spark Streaming Application...")
query.stop()
query.awaitTermination()
println("Application stopped.")
}
// To keep the application running, call awaitTermination
query.awaitTermination()
Graceful Shutdown:
Keep Application Running: