I want to create a Spark Streaming application in DataProc that has a subscription to PubSub Lite or PubSub (I discovered there is no support for a regular PubSub service, so I tried to use the "Lite" service).
The idea is straightforward: as soon as a new message arrives in PubSub Lite, Spark pipeline should be processed.
Here is a simple code example I used - https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-spark-streaming-from-pubsublite
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"
spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
)
.load()
)
sdf = sdf.withColumn("data", sdf.data.cast(StringType()))
query = (
sdf.writeStream.format("console")
.outputMode("append")
.trigger(processingTime="1 second")
.start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
I expect the pipeline to be triggered immediately after a new message is published into pubsub, but it doesn't happen, instead, all the accumulated messages are processed in 1 minute interval time.
1. What configuration properties am I missing?
2. Is there some 3rd party connector for a regular PubSub service or a workaround?
Solved! Go to Solution.
Your current Spark Streaming setup with Pub/Sub Lite processes data every second. To enhance this for near-real-time processing, consider the following adjustments:
Reducing Trigger Interval:
# ... previous code ...
# Trigger every 100 milliseconds
query = (
sdf.writeStream.format("console")
.outputMode("append")
.trigger(processingTime="100 milliseconds")
.start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
Exploring Pub/Sub Connectors:
Alternative Approaches:
Select the method that aligns best with your project's latency, throughput, and resource requirements.
The issue you're encountering, where messages are processed in batches with a delay despite setting a 100-millisecond trigger interval, can be attributed to several factors. Here are some potential causes and solutions:
Processing Time vs. Data Arrival Time:
Current batch is falling behind
suggests that the processing time for each batch is longer than the trigger interval. This can cause delays and backlog in processing.Resource Constraints and Configuration:
spark.streaming.backpressure.enabled
(for rate limiting) and spark.executor.memory
(for memory allocation).PubSub Lite Configuration:
Network Latency and System Overheads:
Debugging Steps:
Alternative Approaches:
While reducing the trigger interval is a step in the right direction, achieving near-real-time processing in Spark Structured Streaming, especially with PubSub Lite, involves a combination of appropriate resource allocation, efficient job configuration, and understanding the inherent limitations of the technologies involved.
Your current Spark Streaming setup with Pub/Sub Lite processes data every second. To enhance this for near-real-time processing, consider the following adjustments:
Reducing Trigger Interval:
# ... previous code ...
# Trigger every 100 milliseconds
query = (
sdf.writeStream.format("console")
.outputMode("append")
.trigger(processingTime="100 milliseconds")
.start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
Exploring Pub/Sub Connectors:
Alternative Approaches:
Select the method that aligns best with your project's latency, throughput, and resource requirements.
@ms4446
thank you so much for the detailed explanation! I would definitely take a look at alternative solutions
Regarding the delay in message processing, I decreased it to 100 milliseconds, but I still experience a 1-minute delay, here is my console output:
24/01/09 17:11:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
| subscription|partition|offset|key| data| publish_timestamp|event_timestamp|attributes|
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
|projects/sandbox-...| 0| 0| []|[7B 20 20 20 20 2...|2024-01-09 17:04:...| NULL| {}|
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
24/01/09 17:11:32 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 100 milliseconds, but spent 4145 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
| subscription|partition|offset|key| data| publish_timestamp|event_timestamp|attributes|
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
|projects/sandbox-...| 0| 1| []|[7B 20 20 20 20 2...|2024-01-09 17:11:...| NULL| {}|
|projects/sandbox-...| 0| 2| []|[7B 20 20 20 20 2...|2024-01-09 17:11:...| NULL| {}|
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
24/01/09 17:12:29 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 100 milliseconds, but spent 586 milliseconds
As you can see the messages are not processed in near-real-time, but rather in Batches and every batch triggers only after 1-minute delay.
I use the gcp cli to publish a message into pubsub lite:
gcloud pubsub lite-topics publish my-topic --location=europe-west2 --message='my message'
Is there something I can be missing?
The issue you're encountering, where messages are processed in batches with a delay despite setting a 100-millisecond trigger interval, can be attributed to several factors. Here are some potential causes and solutions:
Processing Time vs. Data Arrival Time:
Current batch is falling behind
suggests that the processing time for each batch is longer than the trigger interval. This can cause delays and backlog in processing.Resource Constraints and Configuration:
spark.streaming.backpressure.enabled
(for rate limiting) and spark.executor.memory
(for memory allocation).PubSub Lite Configuration:
Network Latency and System Overheads:
Debugging Steps:
Alternative Approaches:
While reducing the trigger interval is a step in the right direction, achieving near-real-time processing in Spark Structured Streaming, especially with PubSub Lite, involves a combination of appropriate resource allocation, efficient job configuration, and understanding the inherent limitations of the technologies involved.
cool, thanks for the detailed replies!