I'm following the code example here
https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-spark-streaming-from-pubsublite
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"
spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
)
.load()
)
sdf = sdf.withColumn("data", sdf.data.cast(StringType()))
query = (
sdf.writeStream.format("orc").path("..")
.outputMode("append")
.trigger(processingTime="1 second")
.start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
The ingestion never works as the no data has been written to gcs if it exit as timeout, The only scenario that it could possibly work is when it use once trigger and all the instant available data is read before timeout.