Scenario: We stage Change Data Capture (CDC) data in an Operational Data Store (ODS) layer table. This table includes metadata columns such as src_updated_ts, id_version, extraction_ts, and operation (with values representing insert, update, or delete operations). The source table has an ID column as its primary key.
Currently, when constructing our data warehouse, our job invokes a view for each ODS table to calculate the latest snapshot. This snapshot essentially aims to reconstruct the source table from the CDC rows. Our approach involves using the ROW_NUMBER() function with the following logic: partition by ID and order by src_updated_ts (in descending order), id_version (in descending order), and extraction_ts (in descending order). We then select the latest record for each ID.
Until now, weโve been loading the warehouse once a day. However, weโre now planning to run the warehouse job every hour. Unfortunately, our current view-based method for calculating the latest snapshot is becoming prohibitively expensive and time-consuming. It requires scanning the entire ODS table for every view invocation, which is not feasible for frequent updates.
what am seeking help for: I want to materialize and calculate the data table's current snapshot as i get records inserted into ODS table. I have tried to utilize materialized view feature but couldn't use it as my query involves partition by or self join or sub-query.
What is the best way to achieve this in big query ?
Hi @crazyrevol ,
Your current strategy, utilizing view-based approaches for less frequent updates, might become costly and time-consuming with the need to refresh hourly, primarily due to full table scans. Unfortunately, materialized views in BigQuery, while efficient, do not support the complex logic your process requires, such as partitioning, self-joins, and subqueries. This limitation stems from materialized views being optimized for speed and simplicity, focusing on pre-computing query results rather than handling complex SQL operations.
Potential Solutions:
1. Incremental Materialization with MERGE:
MERGE
statement to efficiently update the snapshot table, handling inserts, updates, and deletes based on this timestamp. The MERGE
statement's ability to perform multiple DML operations in a single statement makes it well-suited for this task.MERGE
, and provides granular control over data manipulation.2. Streaming Inserts with Window Functions:
ROW_NUMBER()
) within a view to identify the latest record for each ID. This approach leverages BigQuery's streaming capabilities for near-real-time updates.src_updated_ts
for efficient data management.ROW_NUMBER() OVER (PARTITION BY ID ORDER BY src_updated_ts DESC)
) to select the latest record for each ID. This view provides the current snapshot of your data.Choosing the Right Approach:
The optimal solution depends on your specific requirements:
MERGE
approach offers more flexibility.Additional Tips:
src_updated_ts
, can significantly improve query performance and cost-efficiency.Thank you for your reply @ms4446 .
1. Incremental Materialization with MERGE:
This option is very clear to me. I have implemented it. The CDC table has real-time updates from cdc extraction tool. I have partitioned the CDC table on extraction_ts column (this is extraction time set by cdc tool). Every hour I'm taking the delta records from CDC table that are greater than max(extraction_ts) available in the snapshot table and MERGE them to snapshot table. That will satisfy our need. Cost-wise for MERGE query we're incurring the cost of partitions scanned on CDC table and the whole size of the destination snapshot table. Is there any optimization I can do to reduce cost of MERGE query?
MERGE INTO ODS_LATEST_SNAPSHOT_TABLE AS target
USING (
-- Subquery to select records from the source table
SELECT *
FROM ODS_CDC_TABLE
WHERE extraction_ts >= DATETIME('2024-04-10T16:14:47') -- partitioned column in source
QUALIFY row_number() over (partition by id order by src_updated_ts desc, extraction_ts desc )=1
) AS source
ON target.ID = source.ID
WHEN MATCHED AND source.src_updated_ts > target.src_updated_ts AND source.operation = 'UPDATE' THEN
-- Update the target table with values from the source
UPDATE SET
target.x = source.x
WHEN MATCHED AND source.operation = 'DELETE' THEN DELETE
WHEN NOT MATCHED THEN INSERT ROW;
2. Streaming Inserts with Window Functions: I think we are actually doing this right now. just not with BigQuery's stream API. Not sure what the benefit would be by switching from 3rd party tool to BigQuery stream ?
we already have records being inserted into our CDC table in real-time using external CDC tool. To get latest snapshot using a view is what we are doing right now. Exactly the way you described it - by having a view on top of the CDC table with window functions (like ROW_NUMBER()) and within that view to identify the latest record for each ID. It will always scan the whole table every time it is called, which is hourly.
May be am not understanding the solution #2 here?
Hi @crazyrevol ,
You're spot on about the cost factors. Here are some strategies to consider, building on what you're already doing:
MERGE Query Optimization:
Since your updates are hourly, partitioning your CDC table more granularly could significantly reduce the data scanned during each MERGE, thereby lowering costs.
Utilize a dynamic variable like @last_processed_timestamp
for each run to ensure only new data since the last update is processed. This reduces redundant data handling.
Applying clustering on the ID column in your snapshot table can enhance MERGE performance, particularly with large datasets.
Column Selection: Opt for explicit column selection in your queries (SELECT id, x, src_updated_ts
) instead of SELECT *
to minimize data transfer and processing overhead.
Handling Deletes: Include a WHEN NOT MATCHED BY SOURCE THEN DELETE
clause to maintain snapshot accuracy by removing obsolete records.
Here's an optimized MERGE example to consider, tailored for your setup:
MERGE INTO ODS_LATEST_SNAPSHOT_TABLE AS target
USING (
SELECT id, x, src_updated_ts -- Only necessary columns
FROM ODS_CDC_TABLE
WHERE extraction_ts >= @last_processed_timestamp
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY src_updated_ts DESC, extraction_ts DESC) = 1
) AS source
ON target.ID = source.ID
WHEN MATCHED AND source.src_updated_ts > target.src_updated_ts THEN UPDATE SET target.x = source.x
WHEN MATCHED AND source.operation = 'DELETE' THEN DELETE
WHEN NOT MATCHED BY SOURCE THEN DELETE
WHEN NOT MATCHED THEN INSERT (ID, x, src_updated_ts) VALUES (source.ID, source.x, source.src_updated_ts);
Utilizing BigQuery's streaming inserts can simplify your architecture by eliminating the need for an external tool, thus reducing maintenance and potential failure points.
BigQuery ensures exactly-once delivery and maintains record ordering, which are crucial for transactional data consistency.
Although potentially more costly, the operational simplicity and reliability improvements from using BigQuery streaming inserts might outweigh the increased expenses, especially when considering system robustness.
If your existing system integrates smoothly and performs well, a switch may not be essential. However, consolidating services within the Google Cloud Platform could provide additional benefits in the future.
Additional Considerations:
Deciding on the best strategy will depend on evaluating the performance, cost, and maintenance trade-offs specific to your operations.
@ms4446 wrote:WHEN NOT MATCHED BY SOURCE THEN DELETE
clarifying question on this clause - If it doesn't find in the below part of source records, will it delete ? I would not want that. If I don't get any cdc records for an ID, that doesn't mean delete.
@ms4446 wrote:SELECT id, x, src_updated_ts -- Only necessary columns FROM ODS_CDC_TABLE WHERE extraction_ts >= @last_processed_timestamp QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY src_updated_ts DESC, extraction_ts DESC) = 1 ) AS source
The choice of whether to include or exclude the WHEN NOT MATCHED BY SOURCE THEN DELETE
clause depends entirely on your desired data management strategy:
Include the clause: If your goal is for the snapshot table to strictly reflect the current state of the source system, then including this clause will ensure that records no longer present in the source are also removed from the snapshot. This approach keeps the snapshot table clean and aligned with the latest source data, but it does not preserve historical records that have been deleted from the source.
Omit the clause: If you prefer the snapshot table to retain all historical records, even if they are no longer updated or present in the source, then you should omit this clause. This way, the snapshot table will maintain a comprehensive history of all records ever captured, regardless of their current status in the source system.