Writing a PCollection<Row> to Cloud SQL

I am developing a Dataflow Pipeline as follows:

1. Read files from GCS having common prefix into a Deferred Dataframe

2. Apply Transformations to the Deferred Dataframe

3. Convert Deferred Dataframe to a PCollection

4. Write this PCollection to Cloud SQL for PostgreSQL (I am stuck at this step)

Can someone please advice me how can the step 4 be accomplished, and will the write operation happen parallelly on multiple worker machine

Solved Solved
0 21 1,631
1 ACCEPTED SOLUTION

The error message ValueError: Attempted to encode null for non-nullable field "suppressed" indicates that your pipeline is trying to process a record where the suppressed field is null (or None in Python), but the schema defined for your data expects this field to be non-nullable.

Possible solutions:

  1. Modify the schema to allow nulls: If null values are acceptable for the "suppressed" field, update the schema of your Business class (or the Beam schema) to specify that this field is nullable.

  2. Data cleaning before writing: Add a step to clean the data before writing it to Jdbc. This could involve:

    • Setting a default value: Replace null values in the "suppressed" field with a relevant default value (e.g., an empty string or a placeholder).
    • Filtering out records with null values: If records with null "suppressed" values are invalid, filter them out before reaching the WriteToJdbc transform.
  3. Debug and inspect data: Use logging or a ParDo transform to identify and understand why records have null "suppressed" values.

Example implementations:

  1. Data cleaning with a Map transform:
 
def clean_data(element):
  if element.suppressed is None:
    element = element._replace(suppressed='default_value')  # Replace with appropriate value
  return element

# In your pipeline
output_pcollection = (
    filtered_pcollection
    | 'Clean Data' >> beam.Map(clean_data)
    | 'Map to Business' >> beam.Map(lambda element: element).with_output_types(Business)
)
  1. Modifying Business class schema:
 
class Business(typing.NamedTuple):
  series_reference: str
  period: float
  data_value: float
  suppressed: typing.Optional[str]  # Now allows null values
  status: str
  units: str
  magnitude: int
  subject: str
  group: str
  series_title_1: str
  series_title_2: str
  series_title_3: str
  series_title_4: str
  series_title_5: str

Choose the best approach based on your specific data and use case. If null values are expected and acceptable for the "suppressed" field, modifying the schema is likely the simplest solution.

View solution in original post

21 REPLIES 21

To write your PCollection to Cloud SQL for PostgreSQL in your Dataflow Pipeline, you can use Apache Beam's JdbcIO connector. Here are some steps you can take:

  1. Add Required Dependencies: Ensure your project includes the following dependencies:

    • Apache Beam's JDBC IO module: org.apache.beam:beam-sdks-java-io-jdbc
    • PostgreSQL JDBC Driver: org.postgresql:postgresql

    Example Maven dependencies: 

     
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-jdbc</artifactId>
      <version>[Beam version]</version>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>[PostgreSQL driver version]</version>
    </dependency>
    
  2. Configure JdbcIO: Set up a JdbcIO.Write transformation with the following details:

    • DataSource Configuration: Use DataSourceConfiguration for specifying connection details like driver class name, JDBC URL, username, password, etc. For Cloud SQL, the JDBC URL format should be jdbc:postgresql://host:port/database?cloudSqlInstance=INSTANCE_CONNECTION_NAME&socketFactory=com.google.cloud.sql.postgres.SocketFactory.
    • Statement: Define the SQL insert or update statement. This can be a static or dynamically built statement using elements in the PCollection.
    • PreparedStatementSetter: Implement this interface to set the parameters for each element in the PCollection if you're using a prepared statement.
  3. Writing the PCollection: Apply the JdbcIO.Write transformation to your PCollection. This will write each element to Cloud SQL based on your configuration and SQL statement.

  4. Parallelism: The write operation will indeed happen in parallel across multiple worker machines. JdbcIO manages connections and distributes writes efficiently. However, be mindful of potential contention issues when writing large volumes of data concurrently.

  5. Additional Notes:

    • Batching: Consider using batching for improved performance.
    • Connection Pooling: Configure connection pool settings appropriately for your data size and processing needs.
    • Resource Allocation: Ensure your Cloud SQL instance is adequately resourced to handle concurrent writes.
    • Security: Secure your connections, especially when handling sensitive data.
  6. Helpful Resources:

Hi @ms4446 , thank you for your response and time.

After implementing the pipeline using WriteToJdbc transform(The question on stackoverflow which you shared as part of important resources is posted by me in past), but when I run the pipeline, I am receiving below error:

RuntimeError: Unable to fetch remote job server jar at https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-extensions-schemaio-expansion-se... <urlopen error Tunnel connection failed: 403 Forbidden>

Here are some steps to troubleshoot and resolve this issue:

  1. Check Network Restrictions:

    • Ensure that your network allows outbound connections to https://repo.maven.apache.org. Some corporate networks have strict firewall rules that might block access to external repositories.
  2. Proxy Configuration:

    • If you are behind a proxy, you need to configure your environment to use the proxy for accessing external URLs. This can typically be done by setting environment variables like HTTP_PROXY and HTTPS_PROXY.
    • For Java applications, you can also set proxy settings via JVM arguments, e.g., -Dhttp.proxyHost=proxy_host -Dhttp.proxyPort=proxy_port.
  3. Maven Settings:

    • Check your settings.xml file for any misconfigurations that might be causing the issue. Ensure that the Maven Central repository is correctly configured.
  4. Direct Download:

    • As a temporary workaround, you can manually download the required JAR file from the Maven repository and place it in a location accessible to your pipeline. However, this is not a recommended long-term solution.
  5. Check for Service Outages:

    • Occasionally, the Maven Central repository might be down or experiencing issues. You can check for any reported outages or maintenance activities.
  6. Upgrade Apache Beam Version:

    • If you are using an older version of Apache Beam, consider upgrading to the latest version. Sometimes, issues are related to bugs in older versions that have been fixed in newer releases.
  7. Logging and Debugging:

    • Increase the logging level to get more detailed error messages. This can provide additional clues about what might be going wrong.

I will definitely check Point 1 and 2, However, reg. Point 3, I am using Python, does this apply to me?

If you are using Python for your  Beam pipeline, point 3 regarding Maven settings does not directly apply to you.

The issue I posted in my previous comment is fixed now - RuntimeError: Unable to fetch remote job server jar at https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-extensions-schemaio-expansion-se... <urlopen error Tunnel connection failed: 403 Forbidden>

However, I am receiving following error message:

Caused by: java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto:

name: "SERIES_REFERENCE"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}

I checked the resource available on this error message: one of which is this: https://stackoverflow.com/questions/68758361/error-beamlogical-typejavasdkv1-while-using-apache-beam...

However, I am not sure, how can I implement this in the Python Pipeline. Can you please guide me

Following is my code:

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.io.jdbc import WriteToJdbc
import yaml

def datatype_mapping(datatype‌‌
    if datatype == 'date':
        return 'datatime64'
    elif datatype == 'varchar':
        return object
    elif datatype == 'bigint':
        return 'int64'
    elif datatype == 'numeric':
        return 'float64'
   
def get_jdbc_url(host_ip, host_port, db_name‌‌
    return f'jdbc:postgresql://{host_ip}:{host_port}/{db_name}'

def get_table_details(config, source_system‌‌
    return config['database']['tableDetails'][source_system]['master_table_name']

def run(argv=None‌‌
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--input', dest='input', default='gs://dti_source_data/business-financial-data-september-2023-quarter.csv', help='Input file to process')
    parser.add_argument('--config_file', dest='config_file', help='Path to configuration file')
    parser.add_argument('--source_system', dest='source_system', help='Source system name')

    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline = beam.Pipeline(options=PipelineOptions(pipeline_args))

    with open(known_args.config_file, 'r') as f:
        config = yaml.safe_load(f)

    host_ip = config['database']['connectionDetails']['host_ip']
    host_port = config['database']['connectionDetails']['host_port']
    db_name = config['database']['connectionDetails']['db_name']
    username = config['database']['connectionDetails']['username']
    password = config['database']['connectionDetails']['password']

    jdbc_url = get_jdbc_url(host_ip, host_port, db_name)
    master_table = get_table_details(config, known_args.source_system)

    columns_metadata_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']

    logging.info(columns_metadata_dict)

    dtype_dict = {}
    parse_dates_list = []
    for column in columns_metadata_dict:
        if columns_metadata_dict[column] == 'date':
            parse_dates_list.append(column.upper())
        else:
            dtype_dict[column.upper()] = datatype_mapping(columns_metadata_dict[column])

    logging.info(dtype_dict)
    logging.info(parse_dates_list)

    with pipeline as p:
        deferred_dataframe = p | read_csv(known_args.input, delimiter=',', dtype=dtype_dict, quotechar='"', parse_dates = parse_dates_list)

        logging.info(deferred_dataframe.dtypes)

        deferred_dataframe.columns = [column.upper() for column in deferred_dataframe.columns]

        dataframe_to_pcollection = to_pcollection(deferred_dataframe)

        required_column_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']

        required_column_list = []

        for column_name in required_column_dict:
            required_column_list.append(column_name.upper())

        filtered_pcollection = dataframe_to_pcollection | beam.Select(*required_column_list)

        filtered_pcollection | beam.io.WriteToCsv(path='gs://dti_source_data/processed.csv')

        filtered_pcollection | WriteToJdbc(
            table_name=master_table,
            driver_class_name='org.postgresql.Driver',
            jdbc_url=jdbc_url,
            username=username,
            password=password
        )

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
 

The error you're encountering, java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto, is related to the schema interpretation of your data by Apache Beam, particularly with the handling of a field with a logical type beam:logical:pythonsdk_any:v1. This error typically occurs when there's a mismatch or an unsupported data type between your DataFrame and the schema expected by the database or Beam's internal handling.

From the code you've provided, it seems you are reading a CSV file into a deferred DataFrame, applying transformations, and then attempting to write this data to a PostgreSQL database using WriteToJdbc. Here are some steps and considerations to address the issue:

  1. Schema Compatibility:

    • Ensure that the schema of the DataFrame (after transformations) is compatible with the schema of the target table in PostgreSQL. This includes matching data types and column names.
  2. Handling Special Data Types:

    • The error message suggests an issue with a specific field (SERIES_REFERENCE). Check if this field or others have data types that are not natively supported or might be causing issues. For instance, complex types like arrays or custom objects might need special handling.
  3. Debugging Schema Issues:

    • Log the schema of the DataFrame before writing to the database (deferred_dataframe.dtypes). This can help identify any fields that might be causing issues.
    • Temporarily write the DataFrame to a CSV file (as you're already doing) and inspect the output to ensure the data looks as expected.
  4. Beam Schema Translation:

    • Apache Beam attempts to translate the schema of your DataFrame into a Beam schema. If there are fields with data types that Beam cannot translate, it might lead to errors. You might need to explicitly define or convert these fields to compatible types.
  5. Explicitly Define the Schema:

    • If automatic schema inference is causing issues, consider explicitly defining the schema for the PCollection before writing to the database.
    • Use beam.Schema to define a schema that matches your database table, and then use beam.Map or similar transformations to ensure your data conforms to this schema.
  6. Test with Simplified Data:

    • As a troubleshooting step, try writing a simplified version of your DataFrame (with fewer columns or simpler data types) to the database. This can help isolate the problem.

Sure, thanks for your time. I will try to narrow down the issue, and will keep you posted.

Hi @ms4446 , the issue I was facing before - java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto, was because of the varchar fields. I tried inserting integer data and it worked fine.

Now when I am trying with whole data by explicitly defining the schema, I am receiving following error message:

Caused by: java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1, beam:coder:custom_window:v1, beam:coder:nullable:v1]

Following is the updated code, I am not sure, how to resolve this,

import argparse
import logging
import typing

import numpy as np

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.io.jdbc import WriteToJdbc
import yaml


class Business(typing.NamedTuple):
    series_reference: str
    period: float
    data_value: float
    suppressed: str
    status: str
    units: str
    magnitude: int
    subject: str
    group: str
    series_title_1: str
    series_title_2: str
    series_title_3: str
    series_title_4: str
    series_title_5: str


def datatype_mapping(datatype):
    if datatype == 'date':
        return np.datetime64
    elif datatype == 'varchar':
        return str
    elif datatype == 'bigint':
        return np.int64
    elif datatype == 'numeric':
        return np.float64
    

def get_jdbc_url(host_ip, host_port, db_name):
    return f'jdbc:postgresql://{host_ip}:{host_port}/{db_name}'


def get_table_details(config, source_system):
    return config['database']['tableDetails'][source_system]['master_table_name']


def get_renamed_columns(original_columns):
    columns_rename_map = dict()
    for column in original_columns:
        columns_rename_map[column] = column.lower()
    return columns_rename_map


def run(argv=None):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--input', dest='input', default='gs://dti_source_data/business-financial-data-september-2023-quarter.csv', help='Input file to process')
    parser.add_argument('--config_file', dest='config_file', help='Path to configuration file')
    parser.add_argument('--source_system', dest='source_system', help='Source system name')

    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline = beam.Pipeline(options=PipelineOptions(pipeline_args))

    with open(known_args.config_file, 'r') as f:
        config = yaml.safe_load(f)

    host_ip = config['database']['connectionDetails']['host_ip']
    host_port = config['database']['connectionDetails']['host_port']
    db_name = config['database']['connectionDetails']['db_name']
    username = config['database']['connectionDetails']['username']
    password = config['database']['connectionDetails']['password']

    jdbc_url = get_jdbc_url(host_ip, host_port, db_name)
    master_table = get_table_details(config, known_args.source_system)

    columns_metadata_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']

    #DEBUG
    logging.info(columns_metadata_dict)

    dtype_dict = {}
    parse_dates_list = []
    for column in columns_metadata_dict:
        if columns_metadata_dict[column] == 'date':
            parse_dates_list.append(column)
        else:
            dtype_dict[column] = datatype_mapping(columns_metadata_dict[column])

    # DEBUG
    logging.info(dtype_dict)
    logging.info(parse_dates_list)

    with pipeline as p:
        # Read the csv file as Deferred dataframe using Apache Beam native Dataframe
        deferred_dataframe = p | read_csv(known_args.input, delimiter=',', dtype=dtype_dict, quotechar='"', parse_dates = parse_dates_list, na_values='')
        
        # Rename the columns to lower case names to comply with Cloud SQL for PostgreSQL table
        columns_rename_map = get_renamed_columns(deferred_dataframe.columns)
        deferred_dataframe.rename(columns=columns_rename_map, inplace=True)

        # DEBUG
        logging.info(deferred_dataframe.dtypes)

        dataframe_to_pcollection = to_pcollection(deferred_dataframe)

        required_column_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']

        required_column_list = []

        for column_name in required_column_dict:
            required_column_list.append(column_name.lower())

        filtered_pcollection = dataframe_to_pcollection | beam.Select(*required_column_list)

        output_pcollection = filtered_pcollection | beam.Map(lambda element: element).with_output_types(Business)

        output_pcollection | beam.io.WriteToCsv(path='gs://dti_source_data/processed.csv')

        output_pcollection | WriteToJdbc(
            table_name=master_table,
            driver_class_name='org.postgresql.Driver',
            jdbc_url=jdbc_url,
            username=username,
            password=password
        )

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

The target table schema is as follows:

patilb1997_0-1702995728018.png

 

Just now, updated the code, I was not registering the coder. That error is gone, testing further now

import argparse
import logging
import typing

import numpy as np

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.io.jdbc import WriteToJdbc
from apache_beam import coders
import yaml


class Business(typing.NamedTuple):
    series_reference: str
    period: float
    data_value: float
    suppressed: str
    status: str
    units: str
    magnitude: int
    subject: str
    group: str
    series_title_1: str
    series_title_2: str
    series_title_3: str
    series_title_4: str
    series_title_5: str

coders.registry.register_coder(Business, coders.RowCoder)

def datatype_mapping(datatype):
    if datatype == 'date':
        return np.datetime64
    elif datatype == 'varchar':
        return str
    elif datatype == 'bigint':
        return np.int64
    elif datatype == 'numeric':
        return np.float64
    

def get_jdbc_url(host_ip, host_port, db_name):
    return f'jdbc:postgresql://{host_ip}:{host_port}/{db_name}'


def get_table_details(config, source_system):
    return config['database']['tableDetails'][source_system]['master_table_name']


def get_renamed_columns(original_columns):
    columns_rename_map = dict()
    for column in original_columns:
        columns_rename_map[column] = column.lower()
    return columns_rename_map


def run(argv=None):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--input', dest='input', default='gs://dti_source_data/business-financial-data-september-2023-quarter.csv', help='Input file to process')
    parser.add_argument('--config_file', dest='config_file', help='Path to configuration file')
    parser.add_argument('--source_system', dest='source_system', help='Source system name')

    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline = beam.Pipeline(options=PipelineOptions(pipeline_args))

    with open(known_args.config_file, 'r') as f:
        config = yaml.safe_load(f)

    host_ip = config['database']['connectionDetails']['host_ip']
    host_port = config['database']['connectionDetails']['host_port']
    db_name = config['database']['connectionDetails']['db_name']
    username = config['database']['connectionDetails']['username']
    password = config['database']['connectionDetails']['password']

    jdbc_url = get_jdbc_url(host_ip, host_port, db_name)
    master_table = get_table_details(config, known_args.source_system)

    columns_metadata_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']

    #DEBUG
    logging.info(columns_metadata_dict)

    dtype_dict = {}
    parse_dates_list = []
    for column in columns_metadata_dict:
        if columns_metadata_dict[column] == 'date':
            parse_dates_list.append(column)
        else:
            dtype_dict[column] = datatype_mapping(columns_metadata_dict[column])

    # DEBUG
    logging.info(dtype_dict)
    logging.info(parse_dates_list)

    with pipeline as p:
        # Read the csv file as Deferred dataframe using Apache Beam native Dataframe
        deferred_dataframe = p | read_csv(known_args.input, delimiter=',', dtype=dtype_dict, quotechar='"', parse_dates = parse_dates_list, na_values='')
        
        # Rename the columns to lower case names to comply with Cloud SQL for PostgreSQL table
        columns_rename_map = get_renamed_columns(deferred_dataframe.columns)
        deferred_dataframe.rename(columns=columns_rename_map, inplace=True)

        # DEBUG
        logging.info(deferred_dataframe.dtypes)

        dataframe_to_pcollection = to_pcollection(deferred_dataframe)

        required_column_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']

        required_column_list = []

        for column_name in required_column_dict:
            required_column_list.append(column_name.lower())

        filtered_pcollection = dataframe_to_pcollection | beam.Select(*required_column_list)

        output_pcollection = filtered_pcollection | beam.Map(lambda element: element).with_output_types(Business)

        output_pcollection | beam.io.WriteToCsv(path='gs://dti_source_data/processed.csv')

        output_pcollection | WriteToJdbc(
            table_name=master_table,
            driver_class_name='org.postgresql.Driver',
            jdbc_url=jdbc_url,
            username=username,
            password=password
        )

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Now I am receiving following error message:

ValueError: Attempted to encode null for non-nullable field "suppressed". [while running 'Map(<lambda at dti_pipeline.py:120>)-ptransform-99']

The error message ValueError: Attempted to encode null for non-nullable field "suppressed" indicates that your pipeline is trying to process a record where the suppressed field is null (or None in Python), but the schema defined for your data expects this field to be non-nullable.

Possible solutions:

  1. Modify the schema to allow nulls: If null values are acceptable for the "suppressed" field, update the schema of your Business class (or the Beam schema) to specify that this field is nullable.

  2. Data cleaning before writing: Add a step to clean the data before writing it to Jdbc. This could involve:

    • Setting a default value: Replace null values in the "suppressed" field with a relevant default value (e.g., an empty string or a placeholder).
    • Filtering out records with null values: If records with null "suppressed" values are invalid, filter them out before reaching the WriteToJdbc transform.
  3. Debug and inspect data: Use logging or a ParDo transform to identify and understand why records have null "suppressed" values.

Example implementations:

  1. Data cleaning with a Map transform:
 
def clean_data(element):
  if element.suppressed is None:
    element = element._replace(suppressed='default_value')  # Replace with appropriate value
  return element

# In your pipeline
output_pcollection = (
    filtered_pcollection
    | 'Clean Data' >> beam.Map(clean_data)
    | 'Map to Business' >> beam.Map(lambda element: element).with_output_types(Business)
)
  1. Modifying Business class schema:
 
class Business(typing.NamedTuple):
  series_reference: str
  period: float
  data_value: float
  suppressed: typing.Optional[str]  # Now allows null values
  status: str
  units: str
  magnitude: int
  subject: str
  group: str
  series_title_1: str
  series_title_2: str
  series_title_3: str
  series_title_4: str
  series_title_5: str

Choose the best approach based on your specific data and use case. If null values are expected and acceptable for the "suppressed" field, modifying the schema is likely the simplest solution.

Hi @ms4446 , thank you for your support. The pipeline is working fine now. I am new to Dataflow, but I learned a lot of things on this thread. Thanks to you!

Seeing at my code, can you please suggest me any best practices which I should follow in Dataflow Pipeline or any resources you feel would be great for me to learn Dataflow

I'm glad to hear that your Dataflow pipeline is working well now! Here are some best practices and resources that can help you further enhance your skills and the efficiency of your pipelines:

Best Practices for Dataflow Pipelines

  1. Optimize for Parallel Processing:

    • Design your transforms to be as parallelizable as possible. Avoid operations that require all data to be present on a single worker.
    • Use GroupByKey judiciously, as it can be a costly operation in terms of time and resources.
  2. Efficient I/O Operations:

    • When reading from and writing to databases, use batching to reduce the number of I/O operations.
    • Monitor and tune the performance of source/sink operations to avoid bottlenecks.
  3. Coders and Serialization:

    • Use efficient coders for serialization. Custom coders can be created for complex types, but ensure they are as efficient as possible.
    • Avoid using Python's default pickling for large data sets due to its inefficiency.
  4. Error Handling and Dead Letter Patterns:

    • Implement robust error handling, especially for I/O operations. Consider using a dead-letter pattern to handle erroneous records.
  5. Monitoring and Logging:

    • Utilize Dataflow's monitoring tools to keep an eye on your pipeline's performance and resource usage.
    • Implement comprehensive logging to help with debugging and performance tuning.
  6. Resource Management:

    • Choose the appropriate worker machine types and the number of workers based on your workload.
    • Use autoscaling to efficiently handle variable workloads.
  7. Testing:

    • Write unit and integration tests for your pipeline components.
    • Test your pipeline with varying data volumes to understand how it scales.
  8. Cost Optimization:

    • Be aware of the cost implications of your pipeline design choices, such as the use of shuffle operations and the choice of worker types.
  9. State and Timers:

    • Use stateful processing and timers judiciously, as they can add complexity and affect performance.
  10. Idempotency:

    • Ensure that your pipeline's operations are idempotent, especially when re-processing data or handling failures.

Basically, according to this 

From documentation: https://beam.apache.org/releases/pydoc/current/apache_beam.io.jdbc.html

Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use that to expand transforms. Currently Jdbc transforms use the ‘beam-sdks-java-io-expansion-service’ jar for this purpose.

While downloading I believe it's throwing me that error message

The error you're encountering is likely due to issues with accessing or building the beam-sdks-java-io-expansion-service jar. Here are steps to troubleshoot and potentially resolve this issue:

  1. Check Beam Version:

    • Ensure you are using a stable and released version of Apache Beam. If you are working with a version built directly from the Beam Git repository, consider switching to a stable release.
  2. Network and Proxy Configuration:

    • Verify your network and proxy settings. If your environment requires a proxy to access external resources, ensure that it's correctly configured. This is particularly relevant as the error message indicates a 403 Forbidden response, which could be due to network restrictions or proxy settings.
  3. Manual Download:

  4. Environment Variables:

    • If manually downloading the jar, you might need to set an environment variable to tell Beam where to find the jar. For example, you can set BEAM_EXPANSION_SERVICE to the local path of the jar file.
  5. Logging and Debugging:

    • Increase the logging level in your Apache Beam pipeline to get more detailed information about the error. This might provide additional insights into why the download is failing.
  6. Alternative Solutions:

    • If the issue is specific to the JDBC IO expansion service, consider alternative methods for writing to your database, such as using a custom DoFn to perform database operations, though this might require more manual management of connections and transactions.

Hi @ms4446 ,

As I already mentioned in my previous comment the pipeline is working fine now. However, I still have some follow up questions.

Reg. Point 3: If I tried to manually download the jar file, and place it in the directory from where I am running the pipeline. But the pipeline didn't detect the file, instead the SDK tried fetching the file from maven repository

Reg. Point 6: If I use a custom DoFn, like psycopg2, will the write operation take place in parallel.

Regarding your follow-up questions:

  1. Manual Download of the Jar File:

    • If manually downloading the jar file and placing it in the directory from where you are running the pipeline did not work, it's possible that Apache Beam's Python SDK does not automatically detect and use locally stored jars. The SDK might be designed to fetch the jar from the Maven repository regardless of local availability. This behavior would be consistent with the error message you received, indicating an attempt to fetch the file from the Maven repository.
  2. Using a Custom DoFn for Database Writes:

    • If you use a custom DoFn (such as one utilizing psycopg2 for PostgreSQL), the write operation can still occur in parallel, but with some caveats:
      • Parallelism Control: You have more control over the parallelism. The degree of parallelism will depend on how you partition your data and the number of workers in your Dataflow job.
      • Connection Management: You will need to manage database connections within your DoFn. This includes opening and closing connections and handling any connection pooling if necessary.
      • Error Handling: You should implement robust error handling within your DoFn to manage any issues that arise during database interactions.
      • Performance Considerations: While DoFn allows for custom processing logic, it may not be as optimized as dedicated IO connectors like WriteToJdbc. You'll need to ensure that your custom code is efficient and doesn't become a bottleneck.

Using a custom DoFn for database writes gives you flexibility and control, but it also requires careful management of resources and error handling. It's a good approach when you need custom behavior that's not supported by the built-in IO connectors.

Hi @ms4446 , I hope you are doing great.

import argparse
import logging
import typing
import numpy as np

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection
from apache_beam import coders
import yaml


def datatype_mapping(datatype):
    if datatype == 'date':
        return np.datetime64
    elif datatype == 'varchar':
        return object
    elif datatype == 'bigint':
        return np.int64
    elif datatype == 'numeric':
        return np.float64
    

def get_jdbc_url(host_ip, host_port, db_name):
    return f'jdbc:postgresql://{host_ip}:{host_port}/{db_name}'


def get_table_details(config, source_system):
    return config['database']['tableDetails'][source_system]['master_table_name']


def get_renamed_columns(original_columns):
    columns_rename_map = dict()
    for column in original_columns:
        columns_rename_map[column] = column.lower()
    return columns_rename_map


def get_connection_string(db_name, db_user, db_pass, host_ip, host_port):
    connection_string = f'dbname={db_name} user={db_user} password={db_pass} host={host_ip} port={host_port}'
    return connection_string     
    

class Business(typing.NamedTuple):
    series_reference: typing.Optional[str]
    period: typing.Optional[float]
    data_value: typing.Optional[float]
    suppressed: typing.Optional[str]
    status: typing.Optional[str]
    units: typing.Optional[str]
    magnitude: typing.Optional[int]
    subject: typing.Optional[str]
    group: typing.Optional[str]
    series_title_1: typing.Optional[str]
    series_title_2: typing.Optional[str]
    series_title_3: typing.Optional[str]
    series_title_4: typing.Optional[str]
    series_title_5: typing.Optional[str]

coders.registry.register_coder(Business, coders.RowCoder)


class ConvertToKeyValuePairs(beam.DoFn):
    def process(self, element):
        argslist = list()
        for key, value in element.__dict__.items():
            argslist.append((key, value))
        yield (element.__dict__['series_reference'], tuple(argslist))
    

class WriteToDB(beam.DoFn):
    def __init__(self, connection_string, table_name):
        self.connection_string = connection_string
        self.table_name = table_name
    
    def process(self, element):
        import psycopg2
        import numpy as np
        from psycopg2.extras import execute_batch
        from psycopg2.extensions import register_adapter, AsIs
        register_adapter(np.int64, AsIs)
        
        conn = psycopg2.connect(self.connection_string)
        
        with conn.cursor() as cur:
            columns = []
            values = []
            for row in element[1]: 
                for column in row:
                    columns.append(column[0])
                break
            
            for row in element[1]:
                temp_values = []
                for column in row:
                    temp_values.append(column[1])
                values.append(tuple(temp_values))
            
            placeholders = ','.join(['%s'] * len(columns))
            columns_string = '"' + '","'.join(columns) + '"'
            
            insert_query = f'INSERT INTO {self.table_name} ({columns_string}) VALUES ({placeholders})'
            
            execute_batch(cur, insert_query, tuple(values))
            
        conn.commit()
        conn.close()      


def run(argv=None):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--input', dest='input', default='gs://dti_source_data/business-financial-data-september-2023-quarter.csv', help='Input file to process')
    parser.add_argument('--config_file', dest='config_file', help='Path to configuration file')
    parser.add_argument('--source_system', dest='source_system', help='Source system name')
    parser.add_argument('--key_columns', dest='key_columns', help='Key columns string separated by comma')

    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline = beam.Pipeline(options=PipelineOptions(pipeline_args))

    with open(known_args.config_file, 'r') as f:
        config = yaml.safe_load(f)

    host_ip = config['database']['connectionDetails']['host_ip']
    host_port = config['database']['connectionDetails']['host_port']
    db_name = config['database']['connectionDetails']['db_name']
    username = config['database']['connectionDetails']['username']
    password = config['database']['connectionDetails']['password']

    psycopg_connection_string = get_connection_string(db_name, username, password, host_ip, host_port)
    jdbc_url = get_jdbc_url(host_ip, host_port, db_name)
    master_table = get_table_details(config, known_args.source_system)

    columns_metadata_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']

    #DEBUG
    logging.info(columns_metadata_dict)

    dtype_dict = {}
    parse_dates_list = []
    for column in columns_metadata_dict:
        if columns_metadata_dict[column] == 'date':
            parse_dates_list.append(column)
        else:
            dtype_dict[column] = datatype_mapping(columns_metadata_dict[column])
    
    # DEBUG
    logging.info(dtype_dict)
    logging.info(parse_dates_list)

    with pipeline as p:

        # Read the csv file as Deferred dataframe using Apache Beam native Dataframe
        deferred_dataframe = p | "Read the CSV files from GCS" >> read_csv(known_args.input, delimiter=',', dtype=dtype_dict, quotechar='"', parse_dates = parse_dates_list, na_values='')
        
        # Rename the columns to lower case names to comply with Cloud SQL for PostgreSQL table
        columns_rename_map = get_renamed_columns(deferred_dataframe.columns)
        deferred_dataframe.rename(columns=columns_rename_map, inplace=True)
        
        # DEBUG
        logging.info(deferred_dataframe.dtypes)

        dataframe_to_pcollection = to_pcollection(deferred_dataframe)

        required_column_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']

        required_column_list = []

        for column_name in required_column_dict:
            required_column_list.append(column_name.lower())

        required_columns_pcollection = dataframe_to_pcollection | "Filter the PCollection" >> beam.Select(*required_column_list)
        
        output_pcollection = required_columns_pcollection | "Map to Business" >> beam.Map(lambda element: element).with_output_types(Business)

        # output_pcollection | "Write PCollection to GCS as CSV file" >> beam.io.WriteToCsv(path='gs://dti_source_data/processed.csv')
        
        key_value_pairs_pcollection = output_pcollection | beam.ParDo(ConvertToKeyValuePairs())
        
        batched_pcollection = key_value_pairs_pcollection | beam.GroupIntoBatches(batch_size=200)

        batched_pcollection | beam.ParDo(WriteToDB(psycopg_connection_string, master_table))
        
        
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Above is my current code, but the problem I am facing here is, when I run this pipeline for my actual data which is approximately 288 MB (450000 records) for one of the source system, it takes an hour to complete, and the write IO throughput is no going above approx. 180/s. Please check below screenshot.

IMG_20240201_001102.jpg

Can you please guide me, how can I increase the Write IO, and increase the pipeline performance.

Further, I also observed that, the CPU and Memory are not getting utilized, instead it auto scales because the steps progress slow

Your pipeline adeptly manages data ingestion from GCS, transformation, and loading into PostgreSQL, utilizing Apache Beam's Dataframes for efficient processing. To enhance performance and throughput, we'll explore targeted optimization strategies.

Potential Bottlenecks and Optimization Strategies

Database Writes:

  • Bulk Loading: Integrate PostgreSQL's COPY command within your WriteToDB DoFn for a substantial performance boost. Consider buffering data to a temporary file if necessary.

  • Batch Sizing: Experiment with different batch sizes, guided by profiling insights, to find a balance that minimizes transaction overhead while managing memory usage and database load effectively.

  • Indexing and Connection Pooling: Ensure your PostgreSQL is properly indexed for key operations and implement connection pooling to reduce connection overhead, enhancing overall efficiency.

I/O Optimization:

  • File Format Consideration: If CSV parsing is identified as a bottleneck, converting your data to a more efficient format like Parquet could offer substantial improvements. Proceed with this step only if evidence strongly supports it.

  • Disk Performance: Confirm that your Dataflow workers utilize SSDs to maximize I/O performance, which can be specified through machine type settings.

Network Optimization:

  • Latency Reduction: Measure potential network latency improvements by aligning the geographical placement of your Dataflow workers and PostgreSQL database. Simple latency tests from a Compute Engine VM to your database can quantify benefits.

Dataflow Parallelism:

  • Autoscaling Utilization: Leverage Dataflow's autoscaling feature to dynamically adjust worker numbers. Fine-tuning num_workers and max_num_workers can offer better control over resource utilization.

  • Database Concurrency: Ensure your PostgreSQL setup is configured to handle concurrent connections in line with the number of Dataflow workers, optimizing database access.

Actionable Next Steps

  • Profiling: Employ Dataflow's profiling tools to identify time-intensive operations, focusing on CPU utilization, memory usage, and shuffle sizes to guide optimizations.

  • Bulk Loading: Explore integrating the COPY command into your WriteToDB DoFn, likely the most impactful optimization for database writes.

  • Batch Size Testing: Experiment with various batch sizes, informed by profiling, to optimize for your specific workload.

  • I/O and Network Adjustments: Address any identified disk or network latency issues to ensure efficient data processing and transfer.

Beyond the Basics

  • Pipeline Restructuring: If your data and transformations allow, consider loading data into PostgreSQL with minimal preprocessing and leveraging SQL for complex operations within the database. This approach can exploit the database's processing capabilities.

  • Database Tuning: Collaborate with database experts to fine-tune PostgreSQL settings, including optimal indexing and memory configurations, ensuring your database environment is fully optimized for your workload.

Emphasizing Iterative Optimization and Monitoring

  • Iterative Approach: Optimization is an iterative process. Continuously profile, implement changes, and evaluate performance to refine your pipeline in cycles.

  • Collaboration: Work closely with database administrators and infrastructure experts to identify and implement optimizations, leveraging their specialized knowledge for deeper insights.

  • Monitoring and Alerting: Establish comprehensive monitoring and alerting for both the Dataflow pipeline and PostgreSQL to quickly detect performance issues and assess the impact of optimizations over time.

For the most effective optimization, please share details about your data characteristics, such as size and complexity, and your PostgreSQL setup, including whether it's a managed service and its current configuration. This information will enable more customized recommendations.

Factors to Consider

  • Cost Implications: Balance the performance gains from optimizations against potential cost increases, such as larger worker machine types or running more PostgreSQL replicas.

  • Data Freshness Requirements: Consider how critical real-time or near real-time data is for your use case, as this will influence your approach to batching and pipeline restructuring.

  • Evolving Data: If you anticipate significant changes in data volume or structure, prioritize optimizations that offer flexibility and scalability to accommodate future needs.