I am developing a Dataflow Pipeline as follows:
1. Read files from GCS having common prefix into a Deferred Dataframe
2. Apply Transformations to the Deferred Dataframe
3. Convert Deferred Dataframe to a PCollection
4. Write this PCollection to Cloud SQL for PostgreSQL (I am stuck at this step)
Can someone please advice me how can the step 4 be accomplished, and will the write operation happen parallelly on multiple worker machine
Solved! Go to Solution.
The error message ValueError: Attempted to encode null for non-nullable field "suppressed"
indicates that your pipeline is trying to process a record where the suppressed
field is null
(or None
in Python), but the schema defined for your data expects this field to be non-nullable.
Possible solutions:
Modify the schema to allow nulls: If null values are acceptable for the "suppressed" field, update the schema of your Business class (or the Beam schema) to specify that this field is nullable.
Data cleaning before writing: Add a step to clean the data before writing it to Jdbc. This could involve:
Debug and inspect data: Use logging or a ParDo transform to identify and understand why records have null "suppressed" values.
Example implementations:
def clean_data(element):
if element.suppressed is None:
element = element._replace(suppressed='default_value') # Replace with appropriate value
return element
# In your pipeline
output_pcollection = (
filtered_pcollection
| 'Clean Data' >> beam.Map(clean_data)
| 'Map to Business' >> beam.Map(lambda element: element).with_output_types(Business)
)
class Business(typing.NamedTuple):
series_reference: str
period: float
data_value: float
suppressed: typing.Optional[str] # Now allows null values
status: str
units: str
magnitude: int
subject: str
group: str
series_title_1: str
series_title_2: str
series_title_3: str
series_title_4: str
series_title_5: str
Choose the best approach based on your specific data and use case. If null values are expected and acceptable for the "suppressed" field, modifying the schema is likely the simplest solution.
To write your PCollection
to Cloud SQL for PostgreSQL in your Dataflow Pipeline, you can use Apache Beam's JdbcIO connector. Here are some steps you can take:
Add Required Dependencies: Ensure your project includes the following dependencies:
org.apache.beam:beam-sdks-java-io-jdbc
org.postgresql:postgresql
Example Maven dependencies:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>[Beam version]</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>[PostgreSQL driver version]</version>
</dependency>
Configure JdbcIO: Set up a JdbcIO.Write
transformation with the following details:
DataSourceConfiguration
for specifying connection details like driver class name, JDBC URL, username, password, etc. For Cloud SQL, the JDBC URL format should be jdbc:postgresql://host:port/database?cloudSqlInstance=INSTANCE_CONNECTION_NAME&socketFactory=com.google.cloud.sql.postgres.SocketFactory
.PCollection
.PCollection
if you're using a prepared statement.Writing the PCollection: Apply the JdbcIO.Write
transformation to your PCollection
. This will write each element to Cloud SQL based on your configuration and SQL statement.
Parallelism: The write operation will indeed happen in parallel across multiple worker machines. JdbcIO manages connections and distributes writes efficiently. However, be mindful of potential contention issues when writing large volumes of data concurrently.
Additional Notes:
Helpful Resources:
Hi @ms4446 , thank you for your response and time.
After implementing the pipeline using WriteToJdbc transform(The question on stackoverflow which you shared as part of important resources is posted by me in past), but when I run the pipeline, I am receiving below error:
RuntimeError: Unable to fetch remote job server jar at https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-extensions-schemaio-expansion-se... <urlopen error Tunnel connection failed: 403 Forbidden>
Here are some steps to troubleshoot and resolve this issue:
Check Network Restrictions:
https://repo.maven.apache.org
. Some corporate networks have strict firewall rules that might block access to external repositories.Proxy Configuration:
HTTP_PROXY
and HTTPS_PROXY
.-Dhttp.proxyHost=proxy_host -Dhttp.proxyPort=proxy_port
.Maven Settings:
settings.xml
file for any misconfigurations that might be causing the issue. Ensure that the Maven Central repository is correctly configured.Direct Download:
Check for Service Outages:
Upgrade Apache Beam Version:
Logging and Debugging:
I will definitely check Point 1 and 2, However, reg. Point 3, I am using Python, does this apply to me?
If you are using Python for your Beam pipeline, point 3 regarding Maven settings does not directly apply to you.
The issue I posted in my previous comment is fixed now - RuntimeError: Unable to fetch remote job server jar at https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-extensions-schemaio-expansion-se... <urlopen error Tunnel connection failed: 403 Forbidden>
However, I am receiving following error message:
Caused by: java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto:
name: "SERIES_REFERENCE"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
I checked the resource available on this error message: one of which is this: https://stackoverflow.com/questions/68758361/error-beamlogical-typejavasdkv1-while-using-apache-beam...
However, I am not sure, how can I implement this in the Python Pipeline. Can you please guide me
Following is my code:
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.io.jdbc import WriteToJdbc
import yaml
def datatype_mapping(datatype
if datatype == 'date':
return 'datatime64'
elif datatype == 'varchar':
return object
elif datatype == 'bigint':
return 'int64'
elif datatype == 'numeric':
return 'float64'
def get_jdbc_url(host_ip, host_port, db_name
return f'jdbc:postgresql://{host_ip}:{host_port}/{db_name}'
def get_table_details(config, source_system
return config['database']['tableDetails'][source_system]['master_table_name']
def run(argv=None
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--input', dest='input', default='gs://dti_source_data/business-financial-data-september-2023-quarter.csv', help='Input file to process')
parser.add_argument('--config_file', dest='config_file', help='Path to configuration file')
parser.add_argument('--source_system', dest='source_system', help='Source system name')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline = beam.Pipeline(options=PipelineOptions(pipeline_args))
with open(known_args.config_file, 'r') as f:
config = yaml.safe_load(f)
host_ip = config['database']['connectionDetails']['host_ip']
host_port = config['database']['connectionDetails']['host_port']
db_name = config['database']['connectionDetails']['db_name']
username = config['database']['connectionDetails']['username']
password = config['database']['connectionDetails']['password']
jdbc_url = get_jdbc_url(host_ip, host_port, db_name)
master_table = get_table_details(config, known_args.source_system)
columns_metadata_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']
logging.info(columns_metadata_dict)
dtype_dict = {}
parse_dates_list = []
for column in columns_metadata_dict:
if columns_metadata_dict[column] == 'date':
parse_dates_list.append(column.upper())
else:
dtype_dict[column.upper()] = datatype_mapping(columns_metadata_dict[column])
logging.info(dtype_dict)
logging.info(parse_dates_list)
with pipeline as p:
deferred_dataframe = p | read_csv(known_args.input, delimiter=',', dtype=dtype_dict, quotechar='"', parse_dates = parse_dates_list)
logging.info(deferred_dataframe.dtypes)
deferred_dataframe.columns = [column.upper() for column in deferred_dataframe.columns]
dataframe_to_pcollection = to_pcollection(deferred_dataframe)
required_column_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']
required_column_list = []
for column_name in required_column_dict:
required_column_list.append(column_name.upper())
filtered_pcollection = dataframe_to_pcollection | beam.Select(*required_column_list)
filtered_pcollection | beam.io.WriteToCsv(path='gs://dti_source_data/processed.csv')
filtered_pcollection | WriteToJdbc(
table_name=master_table,
driver_class_name='org.postgresql.Driver',
jdbc_url=jdbc_url,
username=username,
password=password
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
The error you're encountering, java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto
, is related to the schema interpretation of your data by Apache Beam, particularly with the handling of a field with a logical type beam:logical:pythonsdk_any:v1
. This error typically occurs when there's a mismatch or an unsupported data type between your DataFrame and the schema expected by the database or Beam's internal handling.
From the code you've provided, it seems you are reading a CSV file into a deferred DataFrame, applying transformations, and then attempting to write this data to a PostgreSQL database using WriteToJdbc
. Here are some steps and considerations to address the issue:
Schema Compatibility:
Handling Special Data Types:
SERIES_REFERENCE
). Check if this field or others have data types that are not natively supported or might be causing issues. For instance, complex types like arrays or custom objects might need special handling.Debugging Schema Issues:
deferred_dataframe.dtypes
). This can help identify any fields that might be causing issues.Beam Schema Translation:
Explicitly Define the Schema:
PCollection
before writing to the database.beam.Schema
to define a schema that matches your database table, and then use beam.Map
or similar transformations to ensure your data conforms to this schema.Test with Simplified Data:
Sure, thanks for your time. I will try to narrow down the issue, and will keep you posted.
Hi @ms4446 , the issue I was facing before - java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto, was because of the varchar fields. I tried inserting integer data and it worked fine.
Now when I am trying with whole data by explicitly defining the schema, I am receiving following error message:
Caused by: java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1, beam:coder:custom_window:v1, beam:coder:nullable:v1]
Following is the updated code, I am not sure, how to resolve this,
import argparse
import logging
import typing
import numpy as np
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.io.jdbc import WriteToJdbc
import yaml
class Business(typing.NamedTuple):
series_reference: str
period: float
data_value: float
suppressed: str
status: str
units: str
magnitude: int
subject: str
group: str
series_title_1: str
series_title_2: str
series_title_3: str
series_title_4: str
series_title_5: str
def datatype_mapping(datatype):
if datatype == 'date':
return np.datetime64
elif datatype == 'varchar':
return str
elif datatype == 'bigint':
return np.int64
elif datatype == 'numeric':
return np.float64
def get_jdbc_url(host_ip, host_port, db_name):
return f'jdbc:postgresql://{host_ip}:{host_port}/{db_name}'
def get_table_details(config, source_system):
return config['database']['tableDetails'][source_system]['master_table_name']
def get_renamed_columns(original_columns):
columns_rename_map = dict()
for column in original_columns:
columns_rename_map[column] = column.lower()
return columns_rename_map
def run(argv=None):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--input', dest='input', default='gs://dti_source_data/business-financial-data-september-2023-quarter.csv', help='Input file to process')
parser.add_argument('--config_file', dest='config_file', help='Path to configuration file')
parser.add_argument('--source_system', dest='source_system', help='Source system name')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline = beam.Pipeline(options=PipelineOptions(pipeline_args))
with open(known_args.config_file, 'r') as f:
config = yaml.safe_load(f)
host_ip = config['database']['connectionDetails']['host_ip']
host_port = config['database']['connectionDetails']['host_port']
db_name = config['database']['connectionDetails']['db_name']
username = config['database']['connectionDetails']['username']
password = config['database']['connectionDetails']['password']
jdbc_url = get_jdbc_url(host_ip, host_port, db_name)
master_table = get_table_details(config, known_args.source_system)
columns_metadata_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']
#DEBUG
logging.info(columns_metadata_dict)
dtype_dict = {}
parse_dates_list = []
for column in columns_metadata_dict:
if columns_metadata_dict[column] == 'date':
parse_dates_list.append(column)
else:
dtype_dict[column] = datatype_mapping(columns_metadata_dict[column])
# DEBUG
logging.info(dtype_dict)
logging.info(parse_dates_list)
with pipeline as p:
# Read the csv file as Deferred dataframe using Apache Beam native Dataframe
deferred_dataframe = p | read_csv(known_args.input, delimiter=',', dtype=dtype_dict, quotechar='"', parse_dates = parse_dates_list, na_values='')
# Rename the columns to lower case names to comply with Cloud SQL for PostgreSQL table
columns_rename_map = get_renamed_columns(deferred_dataframe.columns)
deferred_dataframe.rename(columns=columns_rename_map, inplace=True)
# DEBUG
logging.info(deferred_dataframe.dtypes)
dataframe_to_pcollection = to_pcollection(deferred_dataframe)
required_column_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']
required_column_list = []
for column_name in required_column_dict:
required_column_list.append(column_name.lower())
filtered_pcollection = dataframe_to_pcollection | beam.Select(*required_column_list)
output_pcollection = filtered_pcollection | beam.Map(lambda element: element).with_output_types(Business)
output_pcollection | beam.io.WriteToCsv(path='gs://dti_source_data/processed.csv')
output_pcollection | WriteToJdbc(
table_name=master_table,
driver_class_name='org.postgresql.Driver',
jdbc_url=jdbc_url,
username=username,
password=password
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
The target table schema is as follows:
Just now, updated the code, I was not registering the coder. That error is gone, testing further now
import argparse
import logging
import typing
import numpy as np
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.io.jdbc import WriteToJdbc
from apache_beam import coders
import yaml
class Business(typing.NamedTuple):
series_reference: str
period: float
data_value: float
suppressed: str
status: str
units: str
magnitude: int
subject: str
group: str
series_title_1: str
series_title_2: str
series_title_3: str
series_title_4: str
series_title_5: str
coders.registry.register_coder(Business, coders.RowCoder)
def datatype_mapping(datatype):
if datatype == 'date':
return np.datetime64
elif datatype == 'varchar':
return str
elif datatype == 'bigint':
return np.int64
elif datatype == 'numeric':
return np.float64
def get_jdbc_url(host_ip, host_port, db_name):
return f'jdbc:postgresql://{host_ip}:{host_port}/{db_name}'
def get_table_details(config, source_system):
return config['database']['tableDetails'][source_system]['master_table_name']
def get_renamed_columns(original_columns):
columns_rename_map = dict()
for column in original_columns:
columns_rename_map[column] = column.lower()
return columns_rename_map
def run(argv=None):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--input', dest='input', default='gs://dti_source_data/business-financial-data-september-2023-quarter.csv', help='Input file to process')
parser.add_argument('--config_file', dest='config_file', help='Path to configuration file')
parser.add_argument('--source_system', dest='source_system', help='Source system name')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline = beam.Pipeline(options=PipelineOptions(pipeline_args))
with open(known_args.config_file, 'r') as f:
config = yaml.safe_load(f)
host_ip = config['database']['connectionDetails']['host_ip']
host_port = config['database']['connectionDetails']['host_port']
db_name = config['database']['connectionDetails']['db_name']
username = config['database']['connectionDetails']['username']
password = config['database']['connectionDetails']['password']
jdbc_url = get_jdbc_url(host_ip, host_port, db_name)
master_table = get_table_details(config, known_args.source_system)
columns_metadata_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']
#DEBUG
logging.info(columns_metadata_dict)
dtype_dict = {}
parse_dates_list = []
for column in columns_metadata_dict:
if columns_metadata_dict[column] == 'date':
parse_dates_list.append(column)
else:
dtype_dict[column] = datatype_mapping(columns_metadata_dict[column])
# DEBUG
logging.info(dtype_dict)
logging.info(parse_dates_list)
with pipeline as p:
# Read the csv file as Deferred dataframe using Apache Beam native Dataframe
deferred_dataframe = p | read_csv(known_args.input, delimiter=',', dtype=dtype_dict, quotechar='"', parse_dates = parse_dates_list, na_values='')
# Rename the columns to lower case names to comply with Cloud SQL for PostgreSQL table
columns_rename_map = get_renamed_columns(deferred_dataframe.columns)
deferred_dataframe.rename(columns=columns_rename_map, inplace=True)
# DEBUG
logging.info(deferred_dataframe.dtypes)
dataframe_to_pcollection = to_pcollection(deferred_dataframe)
required_column_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']
required_column_list = []
for column_name in required_column_dict:
required_column_list.append(column_name.lower())
filtered_pcollection = dataframe_to_pcollection | beam.Select(*required_column_list)
output_pcollection = filtered_pcollection | beam.Map(lambda element: element).with_output_types(Business)
output_pcollection | beam.io.WriteToCsv(path='gs://dti_source_data/processed.csv')
output_pcollection | WriteToJdbc(
table_name=master_table,
driver_class_name='org.postgresql.Driver',
jdbc_url=jdbc_url,
username=username,
password=password
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Now I am receiving following error message:
ValueError: Attempted to encode null for non-nullable field "suppressed". [while running 'Map(<lambda at dti_pipeline.py:120>)-ptransform-99']
The error message ValueError: Attempted to encode null for non-nullable field "suppressed"
indicates that your pipeline is trying to process a record where the suppressed
field is null
(or None
in Python), but the schema defined for your data expects this field to be non-nullable.
Possible solutions:
Modify the schema to allow nulls: If null values are acceptable for the "suppressed" field, update the schema of your Business class (or the Beam schema) to specify that this field is nullable.
Data cleaning before writing: Add a step to clean the data before writing it to Jdbc. This could involve:
Debug and inspect data: Use logging or a ParDo transform to identify and understand why records have null "suppressed" values.
Example implementations:
def clean_data(element):
if element.suppressed is None:
element = element._replace(suppressed='default_value') # Replace with appropriate value
return element
# In your pipeline
output_pcollection = (
filtered_pcollection
| 'Clean Data' >> beam.Map(clean_data)
| 'Map to Business' >> beam.Map(lambda element: element).with_output_types(Business)
)
class Business(typing.NamedTuple):
series_reference: str
period: float
data_value: float
suppressed: typing.Optional[str] # Now allows null values
status: str
units: str
magnitude: int
subject: str
group: str
series_title_1: str
series_title_2: str
series_title_3: str
series_title_4: str
series_title_5: str
Choose the best approach based on your specific data and use case. If null values are expected and acceptable for the "suppressed" field, modifying the schema is likely the simplest solution.
Hi @ms4446 , thank you for your support. The pipeline is working fine now. I am new to Dataflow, but I learned a lot of things on this thread. Thanks to you!
Seeing at my code, can you please suggest me any best practices which I should follow in Dataflow Pipeline or any resources you feel would be great for me to learn Dataflow
I'm glad to hear that your Dataflow pipeline is working well now! Here are some best practices and resources that can help you further enhance your skills and the efficiency of your pipelines:
Optimize for Parallel Processing:
GroupByKey
judiciously, as it can be a costly operation in terms of time and resources.Efficient I/O Operations:
Coders and Serialization:
Error Handling and Dead Letter Patterns:
Monitoring and Logging:
Resource Management:
Testing:
Cost Optimization:
State and Timers:
Idempotency:
Basically, according to this
From documentation: https://beam.apache.org/releases/pydoc/current/apache_beam.io.jdbc.html
Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use that to expand transforms. Currently Jdbc transforms use the ‘beam-sdks-java-io-expansion-service’ jar for this purpose.
While downloading I believe it's throwing me that error message
The error you're encountering is likely due to issues with accessing or building the beam-sdks-java-io-expansion-service
jar. Here are steps to troubleshoot and potentially resolve this issue:
Check Beam Version:
Network and Proxy Configuration:
403 Forbidden
response, which could be due to network restrictions or proxy settings.Manual Download:
beam-sdks-java-io-expansion-service
jar from the Maven repository and place it in a location accessible to your pipeline. This bypasses the need for the SDK to download it automatically.https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/[version]/b...
. Replace [version]
with the version of Apache Beam you are using.Environment Variables:
BEAM_EXPANSION_SERVICE
to the local path of the jar file.Logging and Debugging:
Alternative Solutions:
DoFn
to perform database operations, though this might require more manual management of connections and transactions.Hi @ms4446 ,
As I already mentioned in my previous comment the pipeline is working fine now. However, I still have some follow up questions.
Reg. Point 3: If I tried to manually download the jar file, and place it in the directory from where I am running the pipeline. But the pipeline didn't detect the file, instead the SDK tried fetching the file from maven repository
Reg. Point 6: If I use a custom DoFn, like psycopg2, will the write operation take place in parallel.
Regarding your follow-up questions:
Manual Download of the Jar File:
Using a Custom DoFn
for Database Writes:
DoFn
(such as one utilizing psycopg2
for PostgreSQL), the write operation can still occur in parallel, but with some caveats:
DoFn
. This includes opening and closing connections and handling any connection pooling if necessary.DoFn
to manage any issues that arise during database interactions.DoFn
allows for custom processing logic, it may not be as optimized as dedicated IO connectors like WriteToJdbc
. You'll need to ensure that your custom code is efficient and doesn't become a bottleneck.Using a custom DoFn
for database writes gives you flexibility and control, but it also requires careful management of resources and error handling. It's a good approach when you need custom behavior that's not supported by the built-in IO connectors.
Hi @ms4446 , I hope you are doing great.
import argparse
import logging
import typing
import numpy as np
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection
from apache_beam import coders
import yaml
def datatype_mapping(datatype):
if datatype == 'date':
return np.datetime64
elif datatype == 'varchar':
return object
elif datatype == 'bigint':
return np.int64
elif datatype == 'numeric':
return np.float64
def get_jdbc_url(host_ip, host_port, db_name):
return f'jdbc:postgresql://{host_ip}:{host_port}/{db_name}'
def get_table_details(config, source_system):
return config['database']['tableDetails'][source_system]['master_table_name']
def get_renamed_columns(original_columns):
columns_rename_map = dict()
for column in original_columns:
columns_rename_map[column] = column.lower()
return columns_rename_map
def get_connection_string(db_name, db_user, db_pass, host_ip, host_port):
connection_string = f'dbname={db_name} user={db_user} password={db_pass} host={host_ip} port={host_port}'
return connection_string
class Business(typing.NamedTuple):
series_reference: typing.Optional[str]
period: typing.Optional[float]
data_value: typing.Optional[float]
suppressed: typing.Optional[str]
status: typing.Optional[str]
units: typing.Optional[str]
magnitude: typing.Optional[int]
subject: typing.Optional[str]
group: typing.Optional[str]
series_title_1: typing.Optional[str]
series_title_2: typing.Optional[str]
series_title_3: typing.Optional[str]
series_title_4: typing.Optional[str]
series_title_5: typing.Optional[str]
coders.registry.register_coder(Business, coders.RowCoder)
class ConvertToKeyValuePairs(beam.DoFn):
def process(self, element):
argslist = list()
for key, value in element.__dict__.items():
argslist.append((key, value))
yield (element.__dict__['series_reference'], tuple(argslist))
class WriteToDB(beam.DoFn):
def __init__(self, connection_string, table_name):
self.connection_string = connection_string
self.table_name = table_name
def process(self, element):
import psycopg2
import numpy as np
from psycopg2.extras import execute_batch
from psycopg2.extensions import register_adapter, AsIs
register_adapter(np.int64, AsIs)
conn = psycopg2.connect(self.connection_string)
with conn.cursor() as cur:
columns = []
values = []
for row in element[1]:
for column in row:
columns.append(column[0])
break
for row in element[1]:
temp_values = []
for column in row:
temp_values.append(column[1])
values.append(tuple(temp_values))
placeholders = ','.join(['%s'] * len(columns))
columns_string = '"' + '","'.join(columns) + '"'
insert_query = f'INSERT INTO {self.table_name} ({columns_string}) VALUES ({placeholders})'
execute_batch(cur, insert_query, tuple(values))
conn.commit()
conn.close()
def run(argv=None):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--input', dest='input', default='gs://dti_source_data/business-financial-data-september-2023-quarter.csv', help='Input file to process')
parser.add_argument('--config_file', dest='config_file', help='Path to configuration file')
parser.add_argument('--source_system', dest='source_system', help='Source system name')
parser.add_argument('--key_columns', dest='key_columns', help='Key columns string separated by comma')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline = beam.Pipeline(options=PipelineOptions(pipeline_args))
with open(known_args.config_file, 'r') as f:
config = yaml.safe_load(f)
host_ip = config['database']['connectionDetails']['host_ip']
host_port = config['database']['connectionDetails']['host_port']
db_name = config['database']['connectionDetails']['db_name']
username = config['database']['connectionDetails']['username']
password = config['database']['connectionDetails']['password']
psycopg_connection_string = get_connection_string(db_name, username, password, host_ip, host_port)
jdbc_url = get_jdbc_url(host_ip, host_port, db_name)
master_table = get_table_details(config, known_args.source_system)
columns_metadata_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']
#DEBUG
logging.info(columns_metadata_dict)
dtype_dict = {}
parse_dates_list = []
for column in columns_metadata_dict:
if columns_metadata_dict[column] == 'date':
parse_dates_list.append(column)
else:
dtype_dict[column] = datatype_mapping(columns_metadata_dict[column])
# DEBUG
logging.info(dtype_dict)
logging.info(parse_dates_list)
with pipeline as p:
# Read the csv file as Deferred dataframe using Apache Beam native Dataframe
deferred_dataframe = p | "Read the CSV files from GCS" >> read_csv(known_args.input, delimiter=',', dtype=dtype_dict, quotechar='"', parse_dates = parse_dates_list, na_values='')
# Rename the columns to lower case names to comply with Cloud SQL for PostgreSQL table
columns_rename_map = get_renamed_columns(deferred_dataframe.columns)
deferred_dataframe.rename(columns=columns_rename_map, inplace=True)
# DEBUG
logging.info(deferred_dataframe.dtypes)
dataframe_to_pcollection = to_pcollection(deferred_dataframe)
required_column_dict = config['database']['tableDetails'][known_args.source_system]['columns_metadata']
required_column_list = []
for column_name in required_column_dict:
required_column_list.append(column_name.lower())
required_columns_pcollection = dataframe_to_pcollection | "Filter the PCollection" >> beam.Select(*required_column_list)
output_pcollection = required_columns_pcollection | "Map to Business" >> beam.Map(lambda element: element).with_output_types(Business)
# output_pcollection | "Write PCollection to GCS as CSV file" >> beam.io.WriteToCsv(path='gs://dti_source_data/processed.csv')
key_value_pairs_pcollection = output_pcollection | beam.ParDo(ConvertToKeyValuePairs())
batched_pcollection = key_value_pairs_pcollection | beam.GroupIntoBatches(batch_size=200)
batched_pcollection | beam.ParDo(WriteToDB(psycopg_connection_string, master_table))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Above is my current code, but the problem I am facing here is, when I run this pipeline for my actual data which is approximately 288 MB (450000 records) for one of the source system, it takes an hour to complete, and the write IO throughput is no going above approx. 180/s. Please check below screenshot.
Can you please guide me, how can I increase the Write IO, and increase the pipeline performance.
Further, I also observed that, the CPU and Memory are not getting utilized, instead it auto scales because the steps progress slow
Your pipeline adeptly manages data ingestion from GCS, transformation, and loading into PostgreSQL, utilizing Apache Beam's Dataframes for efficient processing. To enhance performance and throughput, we'll explore targeted optimization strategies.
Potential Bottlenecks and Optimization Strategies
Database Writes:
Bulk Loading: Integrate PostgreSQL's COPY
command within your WriteToDB
DoFn for a substantial performance boost. Consider buffering data to a temporary file if necessary.
Batch Sizing: Experiment with different batch sizes, guided by profiling insights, to find a balance that minimizes transaction overhead while managing memory usage and database load effectively.
Indexing and Connection Pooling: Ensure your PostgreSQL is properly indexed for key operations and implement connection pooling to reduce connection overhead, enhancing overall efficiency.
I/O Optimization:
File Format Consideration: If CSV parsing is identified as a bottleneck, converting your data to a more efficient format like Parquet could offer substantial improvements. Proceed with this step only if evidence strongly supports it.
Disk Performance: Confirm that your Dataflow workers utilize SSDs to maximize I/O performance, which can be specified through machine type settings.
Network Optimization:
Latency Reduction: Measure potential network latency improvements by aligning the geographical placement of your Dataflow workers and PostgreSQL database. Simple latency tests from a Compute Engine VM to your database can quantify benefits.
Dataflow Parallelism:
Autoscaling Utilization: Leverage Dataflow's autoscaling feature to dynamically adjust worker numbers. Fine-tuning num_workers
and max_num_workers
can offer better control over resource utilization.
Database Concurrency: Ensure your PostgreSQL setup is configured to handle concurrent connections in line with the number of Dataflow workers, optimizing database access.
Actionable Next Steps
Profiling: Employ Dataflow's profiling tools to identify time-intensive operations, focusing on CPU utilization, memory usage, and shuffle sizes to guide optimizations.
Bulk Loading: Explore integrating the COPY
command into your WriteToDB
DoFn, likely the most impactful optimization for database writes.
Batch Size Testing: Experiment with various batch sizes, informed by profiling, to optimize for your specific workload.
I/O and Network Adjustments: Address any identified disk or network latency issues to ensure efficient data processing and transfer.
Beyond the Basics
Pipeline Restructuring: If your data and transformations allow, consider loading data into PostgreSQL with minimal preprocessing and leveraging SQL for complex operations within the database. This approach can exploit the database's processing capabilities.
Database Tuning: Collaborate with database experts to fine-tune PostgreSQL settings, including optimal indexing and memory configurations, ensuring your database environment is fully optimized for your workload.
Emphasizing Iterative Optimization and Monitoring
Iterative Approach: Optimization is an iterative process. Continuously profile, implement changes, and evaluate performance to refine your pipeline in cycles.
Collaboration: Work closely with database administrators and infrastructure experts to identify and implement optimizations, leveraging their specialized knowledge for deeper insights.
Monitoring and Alerting: Establish comprehensive monitoring and alerting for both the Dataflow pipeline and PostgreSQL to quickly detect performance issues and assess the impact of optimizations over time.
For the most effective optimization, please share details about your data characteristics, such as size and complexity, and your PostgreSQL setup, including whether it's a managed service and its current configuration. This information will enable more customized recommendations.
Factors to Consider
Cost Implications: Balance the performance gains from optimizations against potential cost increases, such as larger worker machine types or running more PostgreSQL replicas.
Data Freshness Requirements: Consider how critical real-time or near real-time data is for your use case, as this will influence your approach to batching and pipeline restructuring.
Evolving Data: If you anticipate significant changes in data volume or structure, prioritize optimizations that offer flexibility and scalability to accommodate future needs.