I want to create a Spark Streaming application in DataProc that has a subscription to PubSub Lite or PubSub (I discovered there is no support for a regular PubSub service, so I tried to use the "Lite" service).
The idea is straightforward: as soon as a new message arrives in PubSub Lite, Spark pipeline should be processed.
Here is a simple code example I used - https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-spark-streaming-from-pubsublite
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"
spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
)
.load()
)
sdf = sdf.withColumn("data", sdf.data.cast(StringType()))
query = (
sdf.writeStream.format("console")
.outputMode("append")
.trigger(processingTime="1 second")
.start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
I expect the pipeline to be triggered immediately after a new message is published into pubsub, but it doesn't happen, instead, all the accumulated messages are processed in 1 minute interval time.
1. What configuration properties am I missing?
2. Is there some 3rd party connector for a regular PubSub service or a workaround?
Solved! Go to Solution.
Your current Spark Streaming setup with Pub/Sub Lite processes data every second. To enhance this for near-real-time processing, consider the following adjustments:
Reducing Trigger Interval:
# ... previous code ...
# Trigger every 100 milliseconds
query = (
sdf.writeStream.format("console")
.outputMode("append")
.trigger(processingTime="100 milliseconds")
.start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
Exploring Pub/Sub Connectors:
Alternative Approaches:
Select the method that aligns best with your project's latency, throughput, and resource requirements.
The issue you're encountering, where messages are processed in batches with a delay despite setting a 100-millisecond trigger interval, can be attributed to several factors. Here are some potential causes and solutions:
Processing Time vs. Data Arrival Time:
Current batch is falling behind
suggests that the processing time for each batch is longer than the trigger interval. This can cause delays and backlog in processing.Resource Constraints and Configuration:
spark.streaming.backpressure.enabled
(for rate limiting) and spark.executor.memory
(for memory allocation).PubSub Lite Configuration:
Network Latency and System Overheads:
Debugging Steps:
Alternative Approaches:
While reducing the trigger interval is a step in the right direction, achieving near-real-time processing in Spark Structured Streaming, especially with PubSub Lite, involves a combination of appropriate resource allocation, efficient job configuration, and understanding the inherent limitations of the technologies involved.