I was able to successfully read from JDBC source, and write the output back to BigQuery. However, I'm still stuck in fining the best way to handle BigQuery insert exceptions for bad rows.
For example, the following rows from the JDBC source, the first 2 rows are good ones and no issue inserting into BigQuery. However, third record has a bad datetime, and will cause my code to throw an expectation.
id | firstname | lastname | dateofbirth | |
005a31ba-d16c-42d5 | myemail@email.org | jogn | peter | 1996-07-01 00:00:00 |
007705f9-e248-492c | myemail@email.org | jogn | peter | 2000-09-15 00:00:00 |
042c5001-077f-4d49 | myemail@email.org | jogn | peter | 0001-01-01 00:00:00 |
I was expecting that bad row to be handled in Get Errors and Write Errors steps in my pipeline where it was supposed to write failed BigQuery rows to dead letter queue table.
My table looks like :
I was expecting that bad row to be handled in Get Errors and Write Errors steps in my pipeline where it was supposed to write failed BigQuery rows to dead letter queue table.
I'm getting the following exception :
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$8(StorageApiWriteUnshardedRecords.java:778) at org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:311) at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:965) at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:1113) Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Timestamp field value is out of range: -62135769600000000 on field dateofbirth. Entity: projects/dummy-project/datasets/jdbctests/tables/users/streams/Cig2NmRiZDZhMC0wMDAwLTI5ZDYtYjYzNS1jODJhZGQ2YzQzZTg6czEz
My python code :
import argparse
import logging
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.typehints.schemas import LogicalType, MillisInstant
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
users_schema = {
"fields": [
{"type": "STRING", "name": "id", "mode": "NULLABLE"},
{"type": "STRING", "name": "email", "mode": "NULLABLE"},
{"type": "STRING", "name": "firstname", "mode": "NULLABLE"},
{"type": "STRING", "name": "lastname", "mode": "NULLABLE"},
{"type": "TIMESTAMP", "name": "dateofbirth", "mode": "NULLABLE"},
]
}
error_schema = {
"fields": [
{"name": "destination", "type": "STRING", "mode": "NULLABLE"},
{"name": "row", "type": "STRING", "mode": "NULLABLE"},
{"name": "error_message", "type": "STRING", "mode": "NULLABLE"},
]
}
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("elment.logger - : %s", element)
yield element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
LogicalType.register_logical_type(MillisInstant)
with beam.Pipeline(options=pipeline_options) as p:
users = p | "Read users" >> ReadFromJdbc(
table_name="users",
query="SELECT id, email, firstname, lastname, dateofbirth FROM users;",
driver_class_name="com.mysql.cj.jdbc.Driver",
jdbc_url="xxxx",
username=r"xxxxx",
password=r"xxxxx",
classpath=["jdbc/mysql-connector-j-8.4.0.jar"],
)
result = (
users
| "map Dict" >> beam.Map(lambda x: x._asdict())
| "Log users_favfilms" >> beam.ParDo(LogResults())
| "write users to BQ"
>> beam.io.WriteToBigQuery(
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
schema=users_schema,
table="jdbctests.users",
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
)
_ = (
result.failed_rows_with_errors
| "Get Errors"
>> beam.Map(
lambda e: {
"destination": e[0],
"row": json.dumps(e[1]),
"error_message": e[2][0]["message"],
}
)
| "Write Errors"
>> beam.io.WriteToBigQuery(
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
table="jdbctests.jdbcerrros",
schema=error_schema,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
run()
I'd like to be able to handle those bad rows, and avoid interrupting the pipeline, and then process them at later stage. Any advice or hint would be very appreciated, thank you.