I have a PostgreSQL database on aws which I need to ingest into BigQuery. I am trying to use DataProc serverless for this task. Please note that the same job succeeds with Dataproc compute engine. Can someone please help?
Here are the steps I am taking
Batch [pocserverlessoct2023] submitted.
Using the default container image
Waiting for container log creation
PYSPARK_PYTHON=/opt/dataproc/conda/bin/python
Generating /home/spark/.pip/pip.conf
Configuring index-url as
Configuring target as /mnt/dataproc/python/site-packages
JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
SPARK_EXTRA_CLASSPATH=
:: loading settings :: file = /etc/spark/conf/ivysettings.xml
root
|-- tconst: string (nullable = true)
|-- titletype: string (nullable = true)
|-- primarytitle: string (nullable = true)
|-- originaltitle: string (nullable = true)
|-- isadult: string (nullable = true)
|-- startyear: string (nullable = true)
|-- endyear: string (nullable = true)
|-- runtimeminutes: string (nullable = true)
|-- genres: string (nullable = true)
23/10/18 09:40:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.128.0.38 executor 1): java.lang.ClassNotFoundException: org.postgresql.Driver
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:120)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
23/10/18 09:40:26 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Traceback (most recent call last):
File "/var/dataproc/tmp/srvls-batch-3cb1f466-1441-447f-bc43-ae9720ba507f/pococt2023_serverless.py", line 59, in <module>
source_df_name.show(10)
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 899, in show
File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o92.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.128.0.39 executor 0): java.lang.ClassNotFoundException: org.postgresql.Driver
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:120)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:437)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2271)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2292)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2311)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:528)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: org.postgresql.Driver
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:120)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
... 1 more
ERROR: (gcloud.dataproc.batches.submit.pyspark) Batch job is FAILED. Detail: Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at:
https://console.cloud.google.com/dataproc/batches/us-central1/pocserverlessoct2023?project=project-n...
gcloud dataproc batches wait 'pocserverlessoct2023' --region 'us-central1' --project 'project-name'
https://console.cloud.google.com/storage/browser/dataproc-staging-us-central1-176474834715-lw0bxzz5/...
gs://dataproc-staging-us-central1-176474834715-lw0bxzz5/google-cloud-dataproc-metainfo/5ff47c9e-1d7f-4eec-8eec-ee5d388cbac5/jobs/srvls-batch-3cb1f466-1441-447f-bc43-ae9720ba507f/driveroutput.
Running auto diagnostics on the batch. It may take few minutes before diagnostics output is available. Please check diagnostics output by running 'gcloud dataproc batches describe' command.
import time
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2
bucket = "bucketpococt2023"
my_spark_session = SparkSession.builder \
.config('temporaryGcsBucket', bucket) \
.config("master", "local[2]")\
.config("spark.network.timeout","3600s") \
.config("spark.executor.heartbeatInterval","3000s") \
.config("viewsEnabled","true") \
.config("spark.jars","gs://bucketpococt2023/ETL_Executable/postgresql-42.5.1.jar") \
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar") \
.config("spark.executor.extraClassPath", "gs://bucketpococt2023/ETL_Executable/postgresql-42.5.1.jar") \
.config("deploy-mode", 'client') \
.getOrCreate()
#conf.set("spark.executor.heartbeatInterval","3600s")
#JDBC Read Parameters
host = <aws database address>
database = "postgres" #<POSTGRES DATABASE>
username = "postgres"
password = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
port = "5432"
partitions = 16
partitionColumn = "hash_numeric"
lowerBound = 0
upperBound = partitions
numPartitions = partitions
fetchsize = 10
#source_table = "title_basic"
source_table = "inventory"
source_table_query = """ SELECT * FROM inventory """
#Read Source Table
source_df_name = my_spark_session.read\
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("user", username) \
.option("password", password) \
.option("port", port) \
.option("query", source_table_query) \
.load()
source_df_name.printSchema()
row_count = source_df_name.count()
print("The number of rows in the extracted dataframe is {}".format(row_count))
source_df_name.cache()
source_df_name.show(1)
for each in source_df_name.schema.names:
source_df_name = source_df_name.withColumnRenamed(each, re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '')))
source_df_name.printSchema()
source_df_csv_path = "gs://bucketpococt2023/imdb_data/title.ratings.csv"
source_df_rating = my_spark_session.read.format("csv").option("header", "true").load(source_df_csv_path)
source_df_rating.show()
for each in source_df_rating.schema.names:
source_df_rating = source_df_rating.withColumnRenamed(each, re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '')))
source_df_rating.printSchema()
# Save the data to BigQuery
source_df_rating.write.format('bigquery') \
.option('table', 'imdb_cleaned_data.rating_table') \
.mode("overwrite")\
.save()
source_df_name.write.format('bigquery') \
.option('table', 'imdb_cleaned_data.name_table') \
.mode("overwrite")\
.save()
User | Count |
---|---|
4 | |
1 | |
1 | |
1 | |
1 |