I have CDC data being ingested into BigQuery from a Postgres database using append-only mode. I cannot figure out how to merge new CDC data into existing CDC-created base tables.
In all of the GCP-proffered tutorials and guides, the details are glossed over on how to actually filter down to newly delivered CDC data. For example in this tutorial, the `session_delta` data is already provided. In a practical application, the metadata available in the `datastream_metadata` field is insufficient to load new data; as far as I can tell, the Datastream metadata only includes timing and ordering data provided by the postgres source system, but does not included metadata to order when events are delivered by Datastream.
Here's how the scenario should play out, following the steps in the GCP tutorial:
T1: We use an orchestrator to build our base table and get the current state of each record
T1 + delta_1: New data accumulates in the CDC table
T2: We query the base table for the MAX(datastream_metadata.change_sequence_number); use that max value to query the CDC table for all records between then and the latest; insert these new records into a Delta table. Merge delta table records into the base table.
Here's the scenario I'm worried about:
T1: We use an orchestrator to build our base table and get the current state of each record
T1 + delta_1: New data accumulates in the CDC table.
T1 + delta_2: Late-arriving cdc data that was produced by the upstream Postgres DB earlier than MAX(datastream_metadata.change_sequence_number) accumulates in the CDC table.
T2: We query the base table for the MAX(datastream_metadata.change_sequence_number); use that max value to query the CDC table for all records between then and the latest; insert these new records into a Delta table. Merge delta table records into the base table. Late arriving data that was produced before MAX(datastream_metadata.change_sequence_number) but arrived after MAX(datastream_metadata.change_sequence_number) will not be accounted for.
In an ideal world, I would have access to and rely on the
read_timestamp Datastream metadata to collect data that had been ingested since the last time the base table was built (collection late-arriving data in the process), and
then use the `change_sequence_number` to merge the new data into the existing data.
Am I misunderstanding what metadata is available? I see in the
Datastream FAQ that event ordering is explicitly
not guaranteed; is there some downstream process that guarantees ordering? How is it possible to account for late-arriving data with the available metadata?