I'm following the code example here
https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-spark-streaming-from-pubsublite
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"
spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
)
.load()
)
sdf = sdf.withColumn("data", sdf.data.cast(StringType()))
query = (
sdf.writeStream.format("orc").path("..")
.outputMode("append")
.trigger(processingTime="1 second")
.start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
The ingestion never works as the no data has been written to gcs if it exit as timeout, The only scenario that it could possibly work is when it use once trigger and all the instant available data is read before timeout.
Welcome to the Google Cloud Community!
The issue that you are encountering is that your Pub/Sub Lite subscription might be empty, misconfigured Spark code, or network problems.
Here are the potential ways that may help you to solve your issue:
Review your Pub/Sub Lite Subscription: Ensure messages are published to your Pub/Sub Lite subscription using the Google Cloud console. Make sure the subscription links to the correct topic, the Spark service account has the pubsublite.subscriber role, and message order is set if needed.
Correct Spark Configuration: Replace your placeholders in pubsublite.subscription with the full path to your subscription. Also, Specify a clear Google Cloud Storage path and ensure the Spark service account has write access.
Network Connectivity and Resources: Ensure your Spark cluster can connect to both Pub/Sub Lite and GCS. Also, check that the cluster has enough CPU and memory resources to handle the workload.
Debugging Steps: You might create a simple example that can be easily reproduced, and add detailed logging to your Spark job. Check the error messages from both your Spark job and the Google Cloud console for helpful information.
You may refer to the following documentation, which might help you address and troubleshoot the ingestion issues you're facing:
Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.
@nikacalupas Same setup with spark 2 works perfectly
@nikacalupas But we need to upgrade all our pipeline to spark3 so spark2 is not an option
Here is a simplified pyspark I have tried.
df = spark.readStream.format("pubsublite").option("pubsublite.subscription", "myproject/subscriptions/test_subscription").option("pubsublite.flowcontrol.maxmessagesperbatch", 30000).load()
q = df.select("*").writeStream.format("orc").option("path", "gs://spark3-test/").option("checkpointLocation", "gs://spark3-test/cp").option("maxRecordsPerFile","30000").outputMode("append").start()
q.awaitTermination(60000)
q.stop()
And I see a lot of small files generated
The files are all empty
It looks to me the microbatch were written but they are all empty?