Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Ingestion from pubsublite to GCS never works

I'm following the code example here

https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-spark-streaming-from-pubsublite

 

 

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("orc").path("..")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop() 

 

 

The ingestion never works as the no data has been written to gcs if it exit as timeout, The only scenario  that it could possibly work is when it use once trigger and all the instant available data is read before timeout. 

0 5 270
5 REPLIES 5

 

Welcome to the Google Cloud Community!

The issue that you are encountering is that your Pub/Sub Lite subscription might be empty, misconfigured Spark code, or network problems.

Here are the potential ways that may help you to solve your issue:

  • Review your Pub/Sub Lite Subscription: Ensure messages are published to your Pub/Sub Lite subscription using the Google Cloud console. Make sure the subscription links to the correct topic, the Spark service account has the pubsublite.subscriber role, and message order is set if needed.

  • Correct Spark Configuration: Replace your placeholders in pubsublite.subscription with the full path to your subscription. Also, Specify a clear Google Cloud Storage path and ensure the Spark service account has write access.

  • Network Connectivity and Resources: Ensure your Spark cluster can connect to both Pub/Sub Lite and GCS. Also, check that the cluster has enough CPU and memory resources to handle the workload.

  • Debugging Steps: You might create a simple example that can be easily reproduced, and add detailed logging to your Spark job. Check the error messages from both your Spark job and the Google Cloud console for helpful information.

You may refer to the following documentation, which might help you address and troubleshoot the ingestion issues you're facing:

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.

@nikacalupas  Same setup with spark 2 works perfectly 

@nikacalupas  But we need to upgrade all our pipeline to spark3 so spark2 is not an option

Here is a simplified pyspark I have tried.

df = spark.readStream.format("pubsublite").option("pubsublite.subscription", "myproject/subscriptions/test_subscription").option("pubsublite.flowcontrol.maxmessagesperbatch", 30000).load()
q = df.select("*").writeStream.format("orc").option("path", "gs://spark3-test/").option("checkpointLocation", "gs://spark3-test/cp").option("maxRecordsPerFile","30000").outputMode("append").start()
q.awaitTermination(60000)
q.stop()

And I see a lot of small files generated 

Screenshot 2024-12-16 at 9.42.58 AM.png

The files are all empty

It looks to me the microbatch were written but they are all empty?