Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Secrets and Dataflow python

What is the reccomended way to access secrets from secret manager in a dataflow (python) pipeline ?

0 5 1,822
5 REPLIES 5

Hi @iost ,

While JDBC is commonly used with Java, in Python, you would typically use a library like psycopg2 for PostgreSQL or mysql-connector-python for MySQL. Therefore, it is recommended to use these libraries instead of JDBC in Dataflow (Python) pipelines.

Error Handling

It is essential to include error handling in your Dataflow (Python) pipeline, especially when dealing with database connections. This ensures that if there is an issue with the connection or query, the pipeline can handle it gracefully.

One way to handle errors is to use a finally block. This ensures that the code in the finally block is always executed, even if an error occurs. For example:

 

try:
    # Connect to the database.
    connection = psycopg2.connect(database="my_database")

    # Execute the query.
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM my_table")

    # Process the results.
    for row in cursor:
        # ...
except Exception as e:
    print("An error occurred:", e)
finally:
    # Close the connection.
    cursor.close()
    connection.close()

Another way to handle errors is to use context managers. Context managers provide a way to automatically acquire and release resources, such as database connections. For example:

 

with psycopg2.connect(database="my_database") as connection:
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM my_table")

    # Process the results.
    for row in cursor:
        # ...

Closing Resources

It is important to always close resources like database connections and cursors in your Dataflow (Python) pipeline. This ensures that resources are released even if an error occurs.

Parameterized Queries

When inserting data into a database, it is crucial to use parameterized queries to prevent SQL injection attacks. Parameterized queries allow you to pass values to the query as parameters, rather than embedding them directly in the query string.

For example, the following query uses string formatting, which is not safe:

 

cursor.execute("INSERT INTO my_table VALUES ({})".format(value))

Instead, you should use a parameterized query like this:

 

cursor.execute("INSERT INTO my_table VALUES (?)", (value,))

Secret Manager Permissions

The Dataflow worker service account needs to have the "Secret Accessor" IAM role in order to access secrets from Secret Manager. You can grant this role to the service account using the Google Cloud Console, the Cloud SDK, or the Google Cloud Client Libraries.

Pipeline Execution

In the example pipeline provided in my previous response, the data is read from and written to the same database table. In a real-world scenario, you would likely have separate read and write operations, possibly with some transformations in between.

Beam's JdbcIO

Apache Beam provides a JdbcIO for reading from and writing to JDBC data sources. If you are working with databases, it might be more efficient to leverage this built-in IO rather than creating a custom one.

By following the above guidelines, you can securely access secrets from Secret Manager in your Dataflow (Python) pipeline.

Thank @ms4446  for your answer, however, it still doesn't answer the question how we should access SM inside dataflow. I'm struggling with this myself as I couldn't fine any official documentation or code snippet on how that should be handled. From what I've found, I should implement a method to retrieve the secret and pass it as pipelineOption .. Is this the best way to do it ?

thank you

@ms4446 i didn't ask about database connections, but accessing secrets from secret manager. Could you help?
Thank you

To access secrets from Secret Manager in a Dataflow (Python) pipeline securely, follow these steps:

Set Up Permissions

Ensure that the Dataflow worker service account has the "Secret Accessor" IAM role. This allows the Dataflow pipeline to access the secret from Secret Manager.

Accessing the Secret

Use the Secret Manager client library for Python. You can install it using pip install google-cloud-secret-manager.

Create a function in your Dataflow pipeline to fetch the secret using the Secret Manager SDK:

 

from google.cloud import secretmanager

def get_secret_from_secret_manager(secret_name, project_id):
    client = secretmanager.SecretManagerServiceClient()
    secret_version_name = f"projects/{project_id}/secrets/{secret_name}/versions/latest"
    response = client.access_secret_version(request={"name": secret_version_name})
    return response.payload.data.decode("UTF-8")

Use the Secret in the Pipeline

Call the get_secret_from_secret_manager() function in your pipeline to retrieve the secret value.

Use the secret value as needed in your pipeline, ensuring that it's not logged or exposed in any way.

Security Best Practices

  • Avoid hardcoding any secrets or sensitive information directly in your code.
  • Use environment variables or runtime parameters to pass the project ID and secret name to your pipeline, rather than hardcoding them.
  • Ensure that your Dataflow pipeline and any associated resources (like GCS buckets) have appropriate IAM roles and permissions set up to prevent unauthorized access.

I was able to work this out using setup() .. here's a sample :

 

import argparse
import json
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


class extrat_movie_name(beam.DoFn):
    def __init__(self):
        self.token = None

    def setup(self):
        def sample_access_secret_version(param=None):
            from google.cloud import secretmanager_v1
            SECRET_ID = "secret-xxxxx"
            PROJECT_ID = "project-xxxx"
            client = secretmanager_v1.SecretManagerServiceClient()
            request = secretmanager_v1.AccessSecretVersionRequest(
                name=f"projects/{PROJECT_ID}/secrets/{SECRET_ID}/versions/latest",
            )
            response = client.access_secret_version(request=request)
            return response.payload.data.decode('UTF-8')
        self.token = sample_access_secret_version()

    """extract movie name"""
    def process(self, element):
        movie = element['name']
        return [movie+self.token] #movie
    
def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default='gs://xx/yyy/sample.json',
        help='Input specified as a GCS path containing movies json.')    
    parser.add_argument(
        '--output',
        dest='output',
        default='gs://xx/zz/full-movies.txt')
     
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    
    with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | 'Reading movies JSON file' >> ReadFromText(known_args.input)
            | 'Load JSON to Dict' >> beam.Map(json.loads)
            | 'Get movie name' >> beam.ParDo(extrat_movie_name())
            | 'Write to local' >> WriteToText(known_args.output)
        )
if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()