What is the reccomended way to access secrets from secret manager in a dataflow (python) pipeline ?
Hi @iost ,
While JDBC is commonly used with Java, in Python, you would typically use a library like psycopg2
for PostgreSQL or mysql-connector-python
for MySQL. Therefore, it is recommended to use these libraries instead of JDBC in Dataflow (Python) pipelines.
Error Handling
It is essential to include error handling in your Dataflow (Python) pipeline, especially when dealing with database connections. This ensures that if there is an issue with the connection or query, the pipeline can handle it gracefully.
One way to handle errors is to use a finally
block. This ensures that the code in the finally
block is always executed, even if an error occurs. For example:
try:
# Connect to the database.
connection = psycopg2.connect(database="my_database")
# Execute the query.
cursor = connection.cursor()
cursor.execute("SELECT * FROM my_table")
# Process the results.
for row in cursor:
# ...
except Exception as e:
print("An error occurred:", e)
finally:
# Close the connection.
cursor.close()
connection.close()
Another way to handle errors is to use context managers. Context managers provide a way to automatically acquire and release resources, such as database connections. For example:
with psycopg2.connect(database="my_database") as connection:
cursor = connection.cursor()
cursor.execute("SELECT * FROM my_table")
# Process the results.
for row in cursor:
# ...
Closing Resources
It is important to always close resources like database connections and cursors in your Dataflow (Python) pipeline. This ensures that resources are released even if an error occurs.
Parameterized Queries
When inserting data into a database, it is crucial to use parameterized queries to prevent SQL injection attacks. Parameterized queries allow you to pass values to the query as parameters, rather than embedding them directly in the query string.
For example, the following query uses string formatting, which is not safe:
cursor.execute("INSERT INTO my_table VALUES ({})".format(value))
Instead, you should use a parameterized query like this:
cursor.execute("INSERT INTO my_table VALUES (?)", (value,))
Secret Manager Permissions
The Dataflow worker service account needs to have the "Secret Accessor" IAM role in order to access secrets from Secret Manager. You can grant this role to the service account using the Google Cloud Console, the Cloud SDK, or the Google Cloud Client Libraries.
Pipeline Execution
In the example pipeline provided in my previous response, the data is read from and written to the same database table. In a real-world scenario, you would likely have separate read and write operations, possibly with some transformations in between.
Beam's JdbcIO
Apache Beam provides a JdbcIO
for reading from and writing to JDBC data sources. If you are working with databases, it might be more efficient to leverage this built-in IO rather than creating a custom one.
By following the above guidelines, you can securely access secrets from Secret Manager in your Dataflow (Python) pipeline.
Thank @ms4446 for your answer, however, it still doesn't answer the question how we should access SM inside dataflow. I'm struggling with this myself as I couldn't fine any official documentation or code snippet on how that should be handled. From what I've found, I should implement a method to retrieve the secret and pass it as pipelineOption .. Is this the best way to do it ?
thank you
@ms4446 i didn't ask about database connections, but accessing secrets from secret manager. Could you help?
Thank you
To access secrets from Secret Manager in a Dataflow (Python) pipeline securely, follow these steps:
Set Up Permissions
Ensure that the Dataflow worker service account has the "Secret Accessor" IAM role. This allows the Dataflow pipeline to access the secret from Secret Manager.
Accessing the Secret
Use the Secret Manager client library for Python. You can install it using pip install google-cloud-secret-manager
.
Create a function in your Dataflow pipeline to fetch the secret using the Secret Manager SDK:
from google.cloud import secretmanager
def get_secret_from_secret_manager(secret_name, project_id):
client = secretmanager.SecretManagerServiceClient()
secret_version_name = f"projects/{project_id}/secrets/{secret_name}/versions/latest"
response = client.access_secret_version(request={"name": secret_version_name})
return response.payload.data.decode("UTF-8")
Use the Secret in the Pipeline
Call the get_secret_from_secret_manager()
function in your pipeline to retrieve the secret value.
Use the secret value as needed in your pipeline, ensuring that it's not logged or exposed in any way.
Security Best Practices
I was able to work this out using setup() .. here's a sample :
import argparse
import json
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
class extrat_movie_name(beam.DoFn):
def __init__(self):
self.token = None
def setup(self):
def sample_access_secret_version(param=None):
from google.cloud import secretmanager_v1
SECRET_ID = "secret-xxxxx"
PROJECT_ID = "project-xxxx"
client = secretmanager_v1.SecretManagerServiceClient()
request = secretmanager_v1.AccessSecretVersionRequest(
name=f"projects/{PROJECT_ID}/secrets/{SECRET_ID}/versions/latest",
)
response = client.access_secret_version(request=request)
return response.payload.data.decode('UTF-8')
self.token = sample_access_secret_version()
"""extract movie name"""
def process(self, element):
movie = element['name']
return [movie+self.token] #movie
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://xx/yyy/sample.json',
help='Input specified as a GCS path containing movies json.')
parser.add_argument(
'--output',
dest='output',
default='gs://xx/zz/full-movies.txt')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
(
p
| 'Reading movies JSON file' >> ReadFromText(known_args.input)
| 'Load JSON to Dict' >> beam.Map(json.loads)
| 'Get movie name' >> beam.ParDo(extrat_movie_name())
| 'Write to local' >> WriteToText(known_args.output)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()