I was able to successfully read from JDBC source, and write the output back to BigQuery. However, I'm still stuck in fining the best way to handle BigQuery insert exceptions for bad rows.
For example, the following rows from the JDBC source, the first 2 rows are good ones and no issue inserting into BigQuery. However, third record has a bad datetime, and will cause my code to throw an expectation.
id | firstname | lastname | dateofbirth | |
005a31ba-d16c-42d5 | myemail@email.org | jogn | peter | 1996-07-01 00:00:00 |
007705f9-e248-492c | myemail@email.org | jogn | peter | 2000-09-15 00:00:00 |
042c5001-077f-4d49 | myemail@email.org | jogn | peter | 0001-01-01 00:00:00 |
I was expecting that bad row to be handled in Get Errors and Write Errors steps in my pipeline where it was supposed to write failed BigQuery rows to dead letter queue table.
My table looks like :
I was expecting that bad row to be handled in Get Errors and Write Errors steps in my pipeline where it was supposed to write failed BigQuery rows to dead letter queue table.
I'm getting the following exception :
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$8(StorageApiWriteUnshardedRecords.java:778) at org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:311) at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:965) at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:1113) Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Timestamp field value is out of range: -62135769600000000 on field dateofbirth. Entity: projects/dummy-project/datasets/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz
My python code :
import argparse
import logging
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.typehints.schemas import LogicalType, MillisInstant
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
users_schema = {
"fields": [
{"type": "STRING", "name": "id", "mode": "NULLABLE"},
{"type": "STRING", "name": "email", "mode": "NULLABLE"},
{"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
{"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
{"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
]
}
error_schema = {
"fields": [
{"name": "destination", "type": "STRING", "mode": "NULLABLE"},
{"name": "row", "type": "STRING", "mode": "NULLABLE"},
{"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
]
}
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("elment.logger - : %s", element)
yield element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
LogicalType.register_logical_type(MillisInstant)
with beam.Pipeline(options=pipeline_options) as p:
users = p | "Read users" >> ReadFromJdbc(
table_name="users",
query="SELECT id, email, firstname, lastname, dateofbirth FROM users;",
driver_class_name="com.mysql.cj.jdbc.Driver",
jdbc_url="xxxx",
username=r"xxxxx",
password=r"xxxxx",
classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
)
result = (
users
| "map Dict" >> beam.Map(lambda x: x._asdict())
| "Log users_favfilms" >> beam.ParDo(LogResults())
| "write users to BQ"
>> beam.io.WriteToBigQuery(
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
schema=users_schema,
table="jdbctests.users",
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
)
_ = (
result.failed_rows_with_errors
| "Get Errors"
>> beam.Map(
lambda e: {
"destination": e[0],
"row": json.dumps(e[1]),
"error_message": e[2][0]["message"],
}
)
| "Write Errors"
>> beam.io.WriteToBigQuery(
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
table="jdbctests.jdbcerrros",
schema=error_schema,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
run()
I'd like to be able to handle those bad rows, and avoid interrupting the pipeline, and then process them at later stage. Any advice or hint would be very appreciated, thank you.
Handling exceptions for BigQuery insertions in an Apache Beam (Python) pipeline involves capturing rows that fail during insertion and routing them to a "dead-letter" table. This approach ensures that bad rows don't interrupt the pipeline but are instead logged for later inspection and processing.
Here’s a approach to handling bad rows in your Dataflow pipeline, along with explanations and optimizations:
Key Improvements
beam.Row
: Converting your data into beam.Row
objects allows Beam to leverage built-in schema validation before writing to BigQuery. This helps catch invalid dates and other schema violations earlier in the pipeline.beam.PTransform
: A custom beam.PTransform
(BigQueryWriteErrorTransform) provides a structured way to:
RetryStrategy.RETRY_ON_TRANSIENT_ERROR
for failed BigQuery writes allows retries for temporary issues (e.g., network glitches), improving robustness.import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryInsertError, RetryStrategy
import json
import argparse
import logging
# Define your schemas
users_schema = {
"fields": [
{"type": "STRING", "name": "id", "mode": "NULLABLE"},
{"type": "STRING", "name": "email", "mode": "NULLABLE"},
{"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
{"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
{"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
]
}
error_schema = {
"fields": [
{"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
{"name": "row", "type": "STRING", "mode": "NULLABLE"},
]
}
class BigQueryWriteErrorTransform(beam.PTransform):
def __init__(self, table_name, schema, retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR):
self.table_name = table_name
self.schema = schema
self.retry_strategy = retry_strategy
def expand(self, pcoll):
# Write to BigQuery
main_write = pcoll | "WriteToBigQuery" >> WriteToBigQuery(
table=self.table_name,
schema=self.schema,
insert_retry_strategy=self.retry_strategy,
method=WriteToBigQuery.Method.STREAMING_INSERTS # Avoid Storage API limitations for bad rows
)
# Handle failed writes
error_rows = main_write[BigQueryInsertError] | "ExtractErrorRows" >> beam.Map(
lambda err: {
"error_message": str(err.error_message),
"row": json.dumps(err.row)
}
)
# Write error rows to a different table
error_rows | "WriteErrorRows" >> WriteToBigQuery(
table="your-project:your-dataset.jdbc_errors",
schema=error_schema,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
with beam.Pipeline(options=pipeline_options) as p:
users = p | "Read users" >> ReadFromJdbc(
table_name="users",
query="SELECT id, email, firstname, lastname, dateofbirth FROM users;",
driver_class_name="com.mysql.cj.jdbc.Driver",
jdbc_url="jdbc:mysql://your-database-url:3306/your-database",
username="your-username",
password="your-password",
classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
)
# Map to Beam Rows and apply the custom transform
users | "MapToBeamRow" >> beam.Map(
lambda x: beam.Row(
id=str(x["id"]),
email=x["email"],
firstname=x["firstname"],
lastname=x["lastname"],
dateofbirth=x["dateofbirth"]
)
) | BigQueryWriteErrorTransform("your-project:your-dataset.users", users_schema)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
run()
Additional Notes
Customization: The BigQueryWriteErrorTransform
class is highly customizable. You can tailor the way you log, process, or store failed rows to meet your specific requirements. This flexibility allows you to implement complex error handling strategies, data transformations, or custom logging formats.
Schema Rigor: While Beam's schema enforcement is helpful, ensuring strong schema validation at the JDBC source (perhaps using libraries like Cerberus
or database constraints) will help prevent bad data from entering your pipeline altogether. By catching schema violations early, you can avoid unnecessary processing and reduce the number of rows that end up in your dead-letter table.
Monitoring and Alerting: Integrate Cloud Monitoring or a similar tool to receive alerts when errors are written to your dead-letter table. This enables you to promptly investigate the root cause of the bad data and take corrective action. Cloud Monitoring provides a robust platform for monitoring pipeline metrics, setting up alerts, and gaining insights into the overall health of your data processing pipeline.
Hello,
Thank you for your engagement regarding this issue. We haven’t heard back from you regarding this issue for sometime now. Hence, I'm going to close this issue which will no longer be monitored. However, if you have any new issues, Please don’t hesitate to create a new issue . We will be happy to assist you on the same.
Regards,
Jai Ade
thank you @ms4446 and @jaia , sorry for the delay. I just wanted to test rhroguht;y before replying back.
Using the code provided by @ms4446 I faced few issues listed below. I was able to get the code to run, however, I'm still face Runtime Exceptions.
Issues :
- No BigQueryInsertError can be imported.
- I had to use STORAGE_WRITE_API due to this bug (https://github.com/apache/beam/issues/28359 ), I couldn't get STREAM_API to work.
- I had to convert Row to Dict before writing to BigQuery.
The code :
The code will run fine with good rows, however, as soon as I add the bad row I get the runTime exception.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.gcp.bigquery import (
WriteToBigQuery,
RetryStrategy,
)
import json
import argparse
import logging
from apache_beam.typehints.schemas import LogicalType, MillisInstant
# Define your schemas
users_schema = {
"fields": [
{"type": "STRING", "name": "id", "mode": "NULLABLE"},
{"type": "STRING", "name": "email", "mode": "NULLABLE"},
{"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
{"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
{"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
]
}
error_schema = {
"fields": [
{"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
{"name": "row", "type": "STRING", "mode": "NULLABLE"},
]
}
class BigQueryWriteErrorTransform(beam.PTransform):
def __init__(
self, table_name, schema, retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
):
self.table_name = table_name
self.schema = schema
self.retry_strategy = retry_strategy
def expand(self, pcoll):
# Write to BigQuery
main_write = (
pcoll
| beam.Map(lambda x: x._asdict())
| "WriteToBigQuery"
>> WriteToBigQuery(
table=self.table_name,
schema=self.schema,
insert_retry_strategy=self.retry_strategy,
method=WriteToBigQuery.Method.STORAGE_WRITE_API, # Avoid Storage API limitations for bad rows
)
)
# Handle failed writes
error_rows = main_write[
"FailedRowsWithErrors"
] | "ExtractErrorRows" >> beam.Map(
lambda err: {
"error_message": str(err.error_message),
"row": json.dumps(err.row),
}
)
# Write error rows to a different table
error_rows | "WriteErrorRows" >> WriteToBigQuery(
table="dev-data-infra:jdbctests.jdbc_errors",
schema=error_schema,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
LogicalType.register_logical_type(MillisInstant)
with beam.Pipeline(options=pipeline_options) as p:
users = p | "Read users" >> ReadFromJdbc(
table_name="users",
query="SELECT id, email, firstname, lastname, dateofbirth FROM users where id IN ('1', '2', '3');",
driver_class_name="com.mysql.cj.jdbc.Driver",
jdbc_url="xxxxx",
username=r"xxxx",
password=r"xxxx",
classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
)
# Map to Beam Rows and apply the custom transform
users | "MapToBeamRow" >> beam.Map(
lambda x: beam.Row(
id=x[0],
email=x[1],
firstname=x[2],
lastname=x[3],
dateofbirth=x[4],
)
) | BigQueryWriteErrorTransform("dev-data-infra:jdbctests.users", users_schema)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
run()
Additional Notes:
According to a comment from one of Dataflow folks to my post on stackoverflow :
"Thanks for the clarification. Looking deeper into the code, seems like we intentially raise a RunTimeException"