Hi Folks,
I'm trying to submit to submit a Dataproc serverless job using service account (have necessary permissions) to load multiple csv files (> 100GB) from GCS bucket to Cloud SQL PostgreSQL instance.
Can you please help me with the command that needs to be submitted? How can I reference the JSON file linked with the service account for authentication or is it not necessary to mention?
gcloud dataproc batches submit spark \ --region=us \ --service-account=test@Test.iam.gserviceaccount.com \ --job-type=spark \ --python-file=test.py \ --cluster=cluster_name
When I submit this batch job, how does it work in the background? Does the cluster gets created and deleted after job gets completed or it needs to be deleted manually?
Thanks,
Vigneswar Jeyaraj
Here is how to submit a Dataproc Serverless job using a service account, a potential script, and an explanation of the background processes:
Steps to Submit a Dataproc Serverless Job Using a Service Account
1. Grant Permissions
test@Test.iam.gserviceaccount.com
) has the following roles to interact with Google Cloud services:
roles/dataproc.editor
for broad permissions, or create a custom, more restrictive role tailored to your needs.2. Create Python Script
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
def load_and_insert(spark, gcs_bucket, gcs_path, sql_instance, sql_user, sql_password, sql_db, sql_table):
# Read CSV files from GCS
df = spark.read.option("header", True).csv(f"gs://{gcs_bucket}/{gcs_path}/*.csv")
# Write data to Cloud SQL
df.write \
.format("jdbc") \
.mode("append") \
.option("url", f"jdbc:postgresql://{sql_instance}/{sql_db}") \
.option("dbtable", sql_table) \
.option("user", sql_user) \
.option("password", sql_password) \
.save()
if __name__ == "__main__":
spark = SparkSession.builder.appName("GCS-to-SQL").getOrCreate()
# Replace with your actual values
gcs_bucket = "your-gcs-bucket"
gcs_path = "your/csv/data/path"
sql_instance = "your-cloud-sql-instance"
sql_user = "your-sql-username"
sql_password = "your-sql-password"
sql_db = "your-database-name"
sql_table = "your-target-table"
load_and_insert(spark, gcs_bucket, gcs_path, sql_instance, sql_user, sql_password, sql_db, sql_table)
3. Submit the Dataproc Serverless Batch
gcloud dataproc batches submit pyspark \
--region=us-central1 \
--batch=csv-to-cloudsql-batch \
--service-account=test@Test.iam.gserviceaccount.com \
gs://your-gcs-bucket/test.py
Background Processes
Additional Considerations
requirements.txt
file or a packaged dependencies archive and the --py-files
option if your script requires non-standard Python libraries.Thank you for your response. Though we are providing the necessary permissions to the service account, is it necessary to provide username and password in the
# Write data to Cloud SQL
module of the code? Is there any way that we can connect without these user credentials?
Yes, connecting to a database like Cloud SQL from a Spark job involves providing credentials such as a username and password within your code. However, embedding credentials directly in your code is not a best practice due to security concerns. Fortunately, Google Cloud offers more secure methods to authenticate and connect to Cloud SQL without hardcoding credentials in your application code.
Cloud SQL Proxy
Cloud SQL IAM Database Authentication
An even more seamless method is Cloud SQL IAM database authentication. This feature allows you to authenticate database users directly with Google Cloud IAM, avoiding separate database passwords altogether.
To use IAM database authentication:
Preference: Cloud SQL IAM database authentication is generally preferred due to its ease of use and tight integration with Google Cloud's permission model.
Example
import google.auth
from google.auth.transport.requests import Request
from sqlalchemy import create_engine
# Obtain credentials and request an access token
credentials, project = google.auth.default()
credentials.refresh(Request())
# Connection string (replace placeholders)
connection_string = f"postgresql+pg8000://<db_user>@/<db_name>?host=/cloudsql/<instance_connection_name>&password={credentials.token}"
# Create a database engine
engine = create_engine(connection_string)
Google Cloud provides robust tools for secure database connections. Leveraging IAM, either through the Cloud SQL Proxy or even better, IAM Database Authentication, significantly enhances security compared to embedding credentials in code.
Thank you so much for your response. We have created a connection string in the code below,
import google.auth
from google.auth.transport.requests import Request
from sqlalchemy import create_engine
# Obtain credentials and request an access token
credentials, project = google.auth.default()
credentials.refresh(Request())
# Connection string (replace placeholders)
connection_string = f"postgresql+pg8000://pipelsvc-acct@alytED?host=35.289.58.202&password={credentials.token}"
print("String: ",connection_string)
print(type(credentials))
print("Credentials:", credentials.token)
# Create a database engine
engine = create_engine(connection_string)
# Connect to the database
connection = engine.connect()
print(connection)
# Declare a metadata object
metadata = MetaData()
# Define the table you want to query
table_name = 't'
table = Table(table_name, metadata, autoload=True, autoload_with=engine)
# Query to select all data from the table
query = table.select()
# Execute the query and fetch the results
result_proxy = connection.execute(query)
results = result_proxy.fetchall()
# Print the results
for row in results:
print(row)
# Close the connection
connection.close()
Modified some of the sensitive info for security purpose. When I submit this code, I'm getting below errors as below,
TimeoutError: [Errno 110] Connection timed out
pg8000.exceptions.InterfaceError: Can't create a connection to host 35.225.58.231 and port 5432 (timeout is None and source_address is None).
raise InterfaceError( sqlalchemy.exc.InterfaceError: (pg8000.exceptions.InterfaceError) Can't create a connection to host 35.289.58.202 and port 5432 (timeout is None and source_address is None). (Background on this error at: https://sqlalche.me/e/20/rvf5)
Can you please help me what is wrong with the code?
Thanks,
Vigneswar Jeyaraj
The errors you're encountering suggest that your application is unable to establish a connection to the specified Cloud SQL instance. This can be due to several reasons, including network issues, incorrect connection details, or misconfiguration of the Cloud SQL instance or the client's environment. Here are some troubleshooting steps:
1. Verify Cloud SQL Instance Details
Ensure that the IP address 35.289.58.202
and port 5432
are correct for your Cloud SQL instance. Typically, Cloud SQL instances are accessed through a public IP address or via the Cloud SQL Proxy for private IP connections. If you're using a public IP, make sure it's correctly configured in your Cloud SQL instance settings.
2. Check Network Accessibility
If you're connecting directly to a public IP, ensure that your network environment (e.g., firewall rules) allows outbound connections to the Cloud SQL instance's IP address and port (5432
for PostgreSQL).
For Cloud SQL instances configured with a public IP, ensure that the instance is configured to accept connections from the IP address(es) of the client machine or network from which you're trying to connect.
3. Use Cloud SQL Proxy for Secure Connections
For a more secure and reliable connection method, especially if you're encountering network issues or if your instance uses a private IP, consider using the Cloud SQL Proxy. The Cloud SQL Proxy provides a secure tunnel between your application and the Cloud SQL instance without the need to allowlist IP addresses or configure SSL.
When using the Cloud SQL Proxy, your connection string's host parameter would typically be set to localhost
or the path to a Unix socket provided by the proxy, rather than a direct IP address.
4. IAM Authentication Considerations
When using IAM authentication, ensure that the Cloud SQL instance has IAM database authentication enabled and that the IAM user or service account has the role of Cloud SQL Instance User
at the minimum.
The database user (pipelsvc-acct
in your connection string) must be created in the Cloud SQL instance and linked to the corresponding IAM account or service account. This user is different from the IAM entity and must be set up within the database.
5. Correct Use of Access Tokens
Ensure that the access token is valid at the time of connection. Tokens have a limited lifetime and must be refreshed periodically. Your current code snippet correctly refreshes the token, but if there's a significant delay between token refresh and usage, the token might expire.
6. SQLAlchemy Engine Configuration
When creating the SQLAlchemy engine, additional parameters might be needed for optimal performance or compatibility, such as pool sizes or timeout settings. However, these are less likely to be the cause of the connection timeout issue you're facing.
7. Debugging and Logs
Enable verbose logging for both SQLAlchemy and the Cloud SQL Proxy (if used) to get more detailed error messages. This can help pinpoint whether the issue is at the network level, authentication level, or with the SQL query execution.