Dataproc serverless batch failure on accessing external PostgreSQL database

I have a PostgreSQL database on aws which I need to ingest into BigQuery. I am trying to use DataProc serverless for this task. Please note that the same job succeeds with Dataproc compute engine.  Can someone please help?

Here are the steps I am taking 

  • I am creating a NAT with ingress and egress rules for the default subnet I have. I am assigning a static IP for this,
  • I am uploading my python file in a bucket on GCS along with the Postgres JDBC connector jar file.
  • Running the following gcloud command in the cloud shell                                                                           gcloud dataproc batches submit pyspark gs://bucketpococt2023/ETL_Executable/pococt2023_serverless.py --region us-central1 --deps-bucket gs://bucketpococt2023 --batch pocserverlessoct2023 --version 2.1 --jars gs://bucketpococt2023/ETL_Executable/postgresql-42.5.1.jar,gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar --subnet default -- --metadata bigquery-connector-version=1.2.0 --metadata spark-bigquery-connector-version=0.27.1 --properties=spark.driver.extraClassPath=postgresql-42.5.1.jar
  • The connectivity succeeds and I am even able to view the schema of the dataframe. But it fails at the .show() stage. Please note that this code is working fine in DataProc with manual cluster setup, Here is the log. The executors are unable to locate the driver. Even .config("spark.executor.extraClassPath", "gs://bucketpococt2023/ETL_Executable/postgresql-42.5.1.jar") in the code has no effect. I have marked the place where I see the error, in red. I have marked the schema that is printed out, in blue.                                                                                                                                       

    Batch [pocserverlessoct2023] submitted.
    Using the default container image
    Waiting for container log creation
    PYSPARK_PYTHON=/opt/dataproc/conda/bin/python
    Generating /home/spark/.pip/pip.conf
    Configuring index-url as
    Configuring target as /mnt/dataproc/python/site-packages
    JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
    SPARK_EXTRA_CLASSPATH=
    :: loading settings :: file = /etc/spark/conf/ivysettings.xml
    root
    |-- tconst: string (nullable = true)
    |-- titletype: string (nullable = true)
    |-- primarytitle: string (nullable = true)
    |-- originaltitle: string (nullable = true)
    |-- isadult: string (nullable = true)
    |-- startyear: string (nullable = true)
    |-- endyear: string (nullable = true)
    |-- runtimeminutes: string (nullable = true)
    |-- genres: string (nullable = true)

    23/10/18 09:40:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.128.0.38 executor 1): java.lang.ClassNotFoundException: org.postgresql.Driver
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
    at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:120)
    at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

    23/10/18 09:40:26 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
    Traceback (most recent call last):
    File "/var/dataproc/tmp/srvls-batch-3cb1f466-1441-447f-bc43-ae9720ba507f/pococt2023_serverless.py", line 59, in <module>
    source_df_name.show(10)
    ^^^^^^^^^^^^^^^^^^^^^^^
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 899, in show
    File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
    File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o92.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.128.0.39 executor 0): java.lang.ClassNotFoundException: org.postgresql.Driver
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
    at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:120)
    at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

    Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
    at scala.collection.immutable.List.foreach(List.scala:333)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
    at scala.Option.foreach(Option.scala:437)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2271)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2292)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2311)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:528)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)
    Caused by: java.lang.ClassNotFoundException: org.postgresql.Driver
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
    at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:120)
    at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    ... 1 more

    ERROR: (gcloud.dataproc.batches.submit.pyspark) Batch job is FAILED. Detail: Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at:
    https://console.cloud.google.com/dataproc/batches/us-central1/pocserverlessoct2023?project=project-n...
    gcloud dataproc batches wait 'pocserverlessoct2023' --region 'us-central1' --project 'project-name'
    https://console.cloud.google.com/storage/browser/dataproc-staging-us-central1-176474834715-lw0bxzz5/...
    gs://dataproc-staging-us-central1-176474834715-lw0bxzz5/google-cloud-dataproc-metainfo/5ff47c9e-1d7f-4eec-8eec-ee5d388cbac5/jobs/srvls-batch-3cb1f466-1441-447f-bc43-ae9720ba507f/driveroutput.
    Running auto diagnostics on the batch. It may take few minutes before diagnostics output is available. Please check diagnostics output by running 'gcloud dataproc batches describe' command.

  • Pasting the PySpark code here please note the data ingestion from the internal bucket and write into BigQuery succeeds, this issue is only observed with external Database access.                             

    import time
    import re
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sha2

    bucket = "bucketpococt2023"
    my_spark_session = SparkSession.builder \
    .config('temporaryGcsBucket', bucket) \
    .config("master", "local[2]")\
    .config("spark.network.timeout","3600s") \
    .config("spark.executor.heartbeatInterval","3000s") \
    .config("viewsEnabled","true") \
    .config("spark.jars","gs://bucketpococt2023/ETL_Executable/postgresql-42.5.1.jar") \
    .config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar") \
    .config("spark.executor.extraClassPath", "gs://bucketpococt2023/ETL_Executable/postgresql-42.5.1.jar") \
    .config("deploy-mode", 'client') \
    .getOrCreate()

    #conf.set("spark.executor.heartbeatInterval","3600s")


    #JDBC Read Parameters
    host = <aws database address>
    database = "postgres" #<POSTGRES DATABASE>
    username = "postgres"
    password = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    port = "5432"
    partitions = 16
    partitionColumn = "hash_numeric"
    lowerBound = 0
    upperBound = partitions
    numPartitions = partitions
    fetchsize = 10
    #source_table = "title_basic"
    source_table = "inventory"

    source_table_query = """ SELECT * FROM inventory """

    #Read Source Table
    source_df_name = my_spark_session.read\
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{host}/{database}") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", username) \
    .option("password", password) \
    .option("port", port) \
    .option("query", source_table_query) \
    .load()


    source_df_name.printSchema()
    row_count = source_df_name.count()
    print("The number of rows in the extracted dataframe is {}".format(row_count))
    source_df_name.cache()
    source_df_name.show(1)
    for each in source_df_name.schema.names:
    source_df_name = source_df_name.withColumnRenamed(each, re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '')))
    source_df_name.printSchema()


    source_df_csv_path = "gs://bucketpococt2023/imdb_data/title.ratings.csv"
    source_df_rating = my_spark_session.read.format("csv").option("header", "true").load(source_df_csv_path)
    source_df_rating.show()
    for each in source_df_rating.schema.names:
    source_df_rating = source_df_rating.withColumnRenamed(each, re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '')))
    source_df_rating.printSchema()

     

    # Save the data to BigQuery
    source_df_rating.write.format('bigquery') \
    .option('table', 'imdb_cleaned_data.rating_table') \
    .mode("overwrite")\
    .save()


    source_df_name.write.format('bigquery') \
    .option('table', 'imdb_cleaned_data.name_table') \
    .mode("overwrite")\
    .save()

0 0 606
0 REPLIES 0