In the realm of data warehouse and data lake management, data is frequently collected from diverse systems, with Relational Database Management Systems (RDBMS) serving as a primary source. Within the Hadoop-oriented Online Analytical Processing (OLAP) framework, RDBMS integration is typically accomplished using Apache Sqoop, a tool provided by Hadoop. Many data engineers configure their Sqoop jobs with customized source SQL, thereby utilizing the comprehensive MapReduce capabilities offered by Hadoop.
The recent migration of such Apache Sqoop jobs to the cloud has presented a complex challenge. On Google Cloud Platform (GCP), customers transitioning from the Hadoop ecosystem often opt for Cloud Dataproc. However, within this environment, Apache Sqoop is regarded as a third-party tool with limited or no official support.
This blog aims to illuminate a solution that addresses this particular obstacle, empowering data architects to effectively design their data platforms within cloud environments.
Solution: lift & shift the Sqoop configuration to Dataproc Serverless
We propose a solution to migrate Sqoop jobs to a Dataproc Serverless Spark-JDBC environment. This approach requires minimal development effort, leveraging reusable Apache SparkJDBC based templates and existing Sqoop configurations, including custom source SQL.
High-level migration architecture
An example: ingestion from Oracle
The subsequent section of this blog will illustrate a prevalent data-ingestion use case involving an Oracle source and a Hadoop destination. It will elucidate the requisite steps to transition the ingestion job to the Google Cloud Platform (GCP) by leveraging Spark-JDBC jobs within the Dataproc Serverless environment.
This section offers a comprehensive overview of strategies for optimizing Dataproc Spark-JDBC jobs to achieve efficient data ingestion. It encompasses various stages, including the establishment of database connectivity, refinement of read schemas, configuration of batch sizes, implementation of partitioning, monitoring of job execution, and adjustment of Spark configurations. Furthermore, the document explores the potential advantages of employing a query pushdown tool. Adherence to the recommendations and best practices delineated herein will empower users to enhance the performance of their Spark-JDBC jobs and ensure optimal data processing outcomes.
Spark-JDBC code snippet
The following code snippet is the recommended method for Spark-JDBC read operations, utilizing a SQL query encapsulated as a database table reference dbtable . This technique surpasses the prepareQuery approach by granting developers greater control over pipeline optimization. Specifically, it enables adjustments at the source level, including partition column selection, specification of lower and upper bounds, and determination of the number of partitions.
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
Steps to be followed
Below are the suggested steps
Verify database connectivity (assuming that the network-foundations are set):
- Ensure the Oracle JDBC driver is correctly configured in your Spark cluster. This includes adding the driver JAR file to and setting the appropriate property in your Spark configuration.
- Test the database connection using a simple Spark SQL query. For example, you can use the method to execute a query like to verify the connection.
- These details are the same as the JDBC connection details used for Sqoop import operation.
Optimize read schema:
- Identifying a Partition Column
Determine the column with an integer, date, or timestamp datatype that exhibits a relatively even distribution of values, as this would be most suitable for read partitioning. The identified column-signature needs to be used as the partitionColumn in the Spark-JDBC read-block. The configuration is equivalent to the Sqoop import –split-by import control argument value with the known lowerBound and upperBound.
- How to synthesize a Partition Column?
In most scenarios, when reading a SQL result set, a synthetic column is generated from the existing columns to manage the range of lower and upper bounds. Ex: ora_hash(loc,96,0) function using which a fairly distributed column can be coded to number of buckets.
- SQL Query to extract specific columns
Analyze the schema of your Oracle table or the result-set and identify any unnecessary columns. This can be done using tools like Oracle SQL Developer or by examining the table definition in the Oracle data dictionary. Use the statement to only read the required columns. This can significantly reduce the amount of data transferred from Oracle to Spark, improving performance.
Configure catch size
- Set the configuration to an appropriate value to control the batch size of data read from Oracle. A larger batch size can improve performance by reducing the number of round trips between Spark and Oracle, but it can also increase memory usage.
- Experiment with different batch sizes to find the optimal value for your job.
Use partitioning
- If you are reading a single large table: Partition your Oracle table based on a relevant column to improve data locality and reduce network traffic. This can be done using the clause in your Oracle CREATE TABLE statement.
- If you are reading the resultset of a query: Fair distributed partitionColumn will help.
- When you read data from a partitioned Oracle table using Spark, Spark will automatically distribute the data across its executors based on the partition scheme.
Leverage the capabilities of the source database, assuming sufficient resources are available.
- Example: To enhance the efficiency of data extraction from the Oracle source database, the Database Administrator recommends the implementation of the SQL parallel hint, contingent upon available resources. This optimization will facilitate increased parallelization of read operations, thereby improving overall extraction performance.
Tune Spark configurations
- Refer to the Spark-JDBC documentation to better understand the areas where we can tune the job to achieve optimal performance.
- Adjust Dataproc serverless Spark properties to optimize job resource allocation.
- When utilizing the source database's parallel read capabilities, it is advisable to adjust the Spark configurations to employ a comparable number of executor-cores. This alignment will optimize read performance.
Monitor job execution
- Use Dataproc serverless monitoring charts and the Dataproc-provided Spark UI to monitor the execution of your Spark-JDBC job and identify any bottlenecks.
- The Dataproc-provided Spark UI provides real-time information about the progress of your job, including the number of tasks completed, the amount of data processed, and the execution time of each stage.
- Dataproc serverless monitoring charts allow you to track the history of your Spark jobs and analyze their performance over time.