Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

How to handle exceptions in Apache Beam (python), for reading from JDBC and writing to BigQuery

I was able to successfully read from JDBC source, and write the output back to BigQuery. However, I'm still stuck in fining the best way to handle BigQuery insert exceptions for bad rows.

For example, the following rows from the JDBC source, the first 2 rows are good ones and no issue inserting into BigQuery. However, third record has a bad datetime, and will cause my code to throw an expectation.

 

idemailfirstnamelastnamedateofbirth
005a31ba-d16c-42d5myemail@email.orgjognpeter1996-07-01 00:00:00
007705f9-e248-492cmyemail@email.orgjognpeter2000-09-15 00:00:00
042c5001-077f-4d49myemail@email.orgjognpeter0001-01-01 00:00:00

I was expecting that bad row to be handled in Get Errors and Write Errors steps in my pipeline where it was supposed to write failed BigQuery rows to dead letter queue table.

My table looks like :

I was expecting that bad row to be handled in Get Errors and Write Errors steps in my pipeline where it was supposed to write failed BigQuery rows to dead letter queue table.

I'm getting the following exception :

 

 

org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$8(StorageApiWriteUnshardedRecords.java:778) at org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:311) at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:965) at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:1113) Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Timestamp field value is out of range: -62135769600000000 on field dateofbirth. Entity: projects/dummy-project/datasets/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz

 

 

 

My python code :

 

 

import argparse
import logging
import json

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.typehints.schemas import LogicalType, MillisInstant
from apache_beam.io.gcp.bigquery_tools import RetryStrategy


users_schema = {
    "fields": [
        {"type": "STRING", "name": "id", "mode": "NULLABLE"},
        {"type": "STRING", "name": "email", "mode": "NULLABLE"},
        {"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
        {"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
        {"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
    ]
}


error_schema = {
    "fields": [
        {"name": "destination", "type": "STRING", "mode": "NULLABLE"},
        {"name": "row", "type": "STRING", "mode": "NULLABLE"},
        {"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
    ]
}


class LogResults(beam.DoFn):
    """Just log the results"""

    def process(self, element):
        logging.info("elment.logger - : %s", element)
        yield element


def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
    LogicalType.register_logical_type(MillisInstant)

    with beam.Pipeline(options=pipeline_options) as p:
        users = p | "Read users" >> ReadFromJdbc(
            table_name="users",
            query="SELECT id, email, firstname, lastname, dateofbirth FROM users;",
            driver_class_name="com.mysql.cj.jdbc.Driver",
            jdbc_url="xxxx",
            username=r"xxxxx",
            password=r"xxxxx",
            classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
        )

        result = (
            users
            | "map Dict" >> beam.Map(lambda x: x._asdict())
            | "Log users_favfilms" >> beam.ParDo(LogResults())
            | "write users to BQ"
            >> beam.io.WriteToBigQuery(
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                schema=users_schema,
                table="jdbctests.users",
                method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
                insert_retry_strategy=RetryStrategy.RETRY_NEVER,
            )
        )

        _ = (
            result.failed_rows_with_errors
            | "Get Errors"
            >> beam.Map(
                lambda e: {
                    "destination": e[0],
                    "row": json.dumps(e[1]),
                    "error_message": e[2][0]["message"],
                }
            )
            | "Write Errors"
            >> beam.io.WriteToBigQuery(
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
                table="jdbctests.jdbcerrros",
                schema=error_schema,
                insert_retry_strategy=RetryStrategy.RETRY_NEVER,
            )
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    run()

 

 

 

I'd like to be able to handle those bad rows, and avoid interrupting the pipeline, and then process them at later stage. Any advice or hint would be very appreciated, thank you. 

 

2 3 2,919
3 REPLIES 3

Handling exceptions for BigQuery insertions in an Apache Beam (Python) pipeline involves capturing rows that fail during insertion and routing them to a "dead-letter" table. This approach ensures that bad rows don't interrupt the pipeline but are instead logged for later inspection and processing.

Here’s a approach to handling bad rows in your Dataflow pipeline, along with explanations and optimizations:

Key Improvements

  • Schema Enforcement with beam.Row: Converting your data into beam.Row objects allows Beam to leverage built-in schema validation before writing to BigQuery. This helps catch invalid dates and other schema violations earlier in the pipeline.
  • Error Handling with beam.PTransform: A custom beam.PTransform (BigQueryWriteErrorTransform) provides a structured way to:
    • Isolate failed writes.
    • Extract error details.
    • Optionally, apply transformations to the failed rows.
    • Write errors to a dead letter queue table.
  • Flexible Retry Strategy: Using RetryStrategy.RETRY_ON_TRANSIENT_ERROR for failed BigQuery writes allows retries for temporary issues (e.g., network glitches), improving robustness.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryInsertError, RetryStrategy
import json
import argparse
import logging

# Define your schemas
users_schema = {
    "fields": [
        {"type": "STRING", "name": "id", "mode": "NULLABLE"},
        {"type": "STRING", "name": "email", "mode": "NULLABLE"},
        {"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
        {"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
        {"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
    ]
}

error_schema = {
    "fields": [
        {"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
        {"name": "row", "type": "STRING", "mode": "NULLABLE"},
    ]
}

class BigQueryWriteErrorTransform(beam.PTransform):
    def __init__(self, table_name, schema, retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR):
        self.table_name = table_name
        self.schema = schema
        self.retry_strategy = retry_strategy

    def expand(self, pcoll):
        # Write to BigQuery
        main_write = pcoll | "WriteToBigQuery" >> WriteToBigQuery(
            table=self.table_name,
            schema=self.schema,
            insert_retry_strategy=self.retry_strategy,
            method=WriteToBigQuery.Method.STREAMING_INSERTS  # Avoid Storage API limitations for bad rows
        )

        # Handle failed writes
        error_rows = main_write[BigQueryInsertError] | "ExtractErrorRows" >> beam.Map(
            lambda err: {
                "error_message": str(err.error_message),
                "row": json.dumps(err.row)
            }
        )

        # Write error rows to a different table
        error_rows | "WriteErrorRows" >> WriteToBigQuery(
            table="your-project:your-dataset.jdbc_errors",
            schema=error_schema,
            insert_retry_strategy=RetryStrategy.RETRY_NEVER,
        )

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)

    with beam.Pipeline(options=pipeline_options) as p:
        users = p | "Read users" >> ReadFromJdbc(
            table_name="users",
            query="SELECT id, email, firstname, lastname, dateofbirth FROM users;",
            driver_class_name="com.mysql.cj.jdbc.Driver",
            jdbc_url="jdbc:mysql://your-database-url:3306/your-database",
            username="your-username",
            password="your-password",
            classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
        )

        # Map to Beam Rows and apply the custom transform
        users | "MapToBeamRow" >> beam.Map(
            lambda x: beam.Row(
                id=str(x["id"]),
                email=x["email"],
                firstname=x["firstname"],
                lastname=x["lastname"],
                dateofbirth=x["dateofbirth"]
            )
        ) | BigQueryWriteErrorTransform("your-project:your-dataset.users", users_schema)

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    run()

Additional Notes

  • Customization: The BigQueryWriteErrorTransform class is highly customizable. You can tailor the way you log, process, or store failed rows to meet your specific requirements. This flexibility allows you to implement complex error handling strategies, data transformations, or custom logging formats.

  • Schema Rigor: While Beam's schema enforcement is helpful, ensuring strong schema validation at the JDBC source (perhaps using libraries like Cerberus or database constraints) will help prevent bad data from entering your pipeline altogether. By catching schema violations early, you can avoid unnecessary processing and reduce the number of rows that end up in your dead-letter table.

  • Monitoring and Alerting: Integrate Cloud Monitoring or a similar tool to receive alerts when errors are written to your dead-letter table. This enables you to promptly investigate the root cause of the bad data and take corrective action. Cloud Monitoring provides a robust platform for monitoring pipeline metrics, setting up alerts, and gaining insights into the overall health of your data processing pipeline.

Hello,

Thank you for your engagement regarding this issue. We haven’t heard back from you regarding this issue for sometime now. Hence, I'm going to close this issue which will no longer be monitored. However, if you have any new issues, Please don’t hesitate to create a new issue . We will be happy to assist you on the same.

Regards,
Jai Ade

thank you @ms4446  and @jaia , sorry for the delay. I just wanted to test rhroguht;y before replying back.

Using the code provided by @ms4446  I faced few issues listed below. I was able to get the code to run, however, I'm still face Runtime Exceptions.

Issues :

- No BigQueryInsertError  can be imported.

- I had to use STORAGE_WRITE_API  due to this bug (https://github.com/apache/beam/issues/28359 ), I couldn't get STREAM_API to work.

- I had to convert Row to Dict before writing to BigQuery.

 

The code :

The code will run fine with good rows, however, as soon as I add the bad row I get the runTime exception.

 

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.gcp.bigquery import (
    WriteToBigQuery,
    RetryStrategy,
)
import json
import argparse
import logging
from apache_beam.typehints.schemas import LogicalType, MillisInstant

# Define your schemas
users_schema = {
    "fields": [
        {"type": "STRING", "name": "id", "mode": "NULLABLE"},
        {"type": "STRING", "name": "email", "mode": "NULLABLE"},
        {"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
        {"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
        {"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
    ]
}

error_schema = {
    "fields": [
        {"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
        {"name": "row", "type": "STRING", "mode": "NULLABLE"},
    ]
}


class BigQueryWriteErrorTransform(beam.PTransform):
    def __init__(
        self, table_name, schema, retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
    ):
        self.table_name = table_name
        self.schema = schema
        self.retry_strategy = retry_strategy

    def expand(self, pcoll):
        # Write to BigQuery
        main_write = (
            pcoll
            | beam.Map(lambda x: x._asdict())
            | "WriteToBigQuery"
            >> WriteToBigQuery(
                table=self.table_name,
                schema=self.schema,
                insert_retry_strategy=self.retry_strategy,
                method=WriteToBigQuery.Method.STORAGE_WRITE_API,  # Avoid Storage API limitations for bad rows
            )
        )

        # Handle failed writes
        error_rows = main_write[
            "FailedRowsWithErrors"
        ] | "ExtractErrorRows" >> beam.Map(
            lambda err: {
                "error_message": str(err.error_message),
                "row": json.dumps(err.row),
            }
        )

        # Write error rows to a different table
        error_rows | "WriteErrorRows" >> WriteToBigQuery(
            table="dev-data-infra:jdbctests.jdbc_errors",
            schema=error_schema,
            insert_retry_strategy=RetryStrategy.RETRY_NEVER,
        )


def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
    LogicalType.register_logical_type(MillisInstant)

    with beam.Pipeline(options=pipeline_options) as p:
        users = p | "Read users" >> ReadFromJdbc(
            table_name="users",
            query="SELECT id, email, firstname, lastname, dateofbirth FROM users where id IN ('1', '2', '3');",
            driver_class_name="com.mysql.cj.jdbc.Driver",
            jdbc_url="xxxxx",
            username=r"xxxx",
            password=r"xxxx",
            classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
        )

        # Map to Beam Rows and apply the custom transform
        users | "MapToBeamRow" >> beam.Map(
            lambda x: beam.Row(
                id=x[0],
                email=x[1],
                firstname=x[2],
                lastname=x[3],
                dateofbirth=x[4],
            )
        ) | BigQueryWriteErrorTransform("dev-data-infra:jdbctests.users", users_schema)


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    run()

 

 

Additional Notes:

According to a comment from one of Dataflow folks to my post on stackoverflow :

"Thanks for the clarification. Looking deeper into the code, seems like we intentially raise a RunTimeException"

https://github.com/apache/beam/blob/cfbcdf03906f0ffe3553b94f391c566eeed8b2fe/sdks/java/io/google-clo...

 

https://stackoverflow.com/questions/78627113/how-to-handle-exceptions-in-apache-beam-python-for-read...