Hi Team,
I'm trying to perform incremental load of data from AWS RDS (Entrata PostgreSQL) to Cloud SQL PostgreSQL. I'm trying to set up this pipeline using Composer submitting as a Dataproc serverless job. Can you please help me with the best connection options including networking, VPC settings etc between GCP & AWS?
Thanks,
Vigneswar Jeyaraj
To set up an incremental data load from AWS RDS (PostgreSQL) to Cloud SQL (PostgreSQL) using Cloud Composer and submitting jobs to Dataproc Serverless, you need to consider several aspects including networking, security, and data transfer methods. Here are some steps that might help configure the necessary settings:
Networking and VPC Settings
You have several options depending on your desired level of security and network isolation:
Public IPs with Firewall Rules
VPC Peering
AWS PrivateLink
Data Transfer
For incremental data transfer, consider this setup:
Dataproc Serverless with PySpark:
Change Data Capture (CDC):
Example
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
# JDBC connection details
rds_jdbc_url = "jdbc:postgresql://<RDS_HOSTNAME>:<PORT>/<DATABASE_NAME>"
cloud_sql_jdbc_url = "jdbc:postgresql://<CLOUD_SQL_HOSTNAME>:<PORT>/<DATABASE_NAME>"
# Create a Spark session
spark = SparkSession.builder.appName("incremental_load").getOrCreate()
# Get last load time from Composer, update as needed
last_load_time = composer_environment.get_variable("last_load_time")
# Load data from AWS RDS
df_rds = spark.read.format("jdbc") \
.option("url", rds_jdbc_url) \
.option("user", "<RDS_USERNAME>") \
.option("password", "<RDS_PASSWORD>") \
.option("dbtable", "<TABLE_NAME>") \
.load()
# Filter for new data based on timestamp
df_new_data = df_rds.filter(F.col("last_modified_timestamp") > last_load_time)
# Write incremental data to Cloud SQL
df_new_data.write.format("jdbc") \
.option("url", cloud_sql_jdbc_url) \
.option("user", "<CLOUD_SQL_USERNAME>") \
.option("password", "<CLOUD_SQL_PASSWORD>") \
.option("dbtable", "<TABLE_NAME>") \
.mode("append") \
.save()
# Update Composer variable to persist new 'last_load_time'
composer_environment.set_variable("last_load_time", current_timestamp)
Scheduling