Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

creating latest snapshot from cdc data of a source data table

Scenario: We stage Change Data Capture (CDC) data in an Operational Data Store (ODS) layer table. This table includes metadata columns such as src_updated_ts, id_version, extraction_ts, and operation (with values representing insert, update, or delete operations). The source table has an ID column as its primary key.

Currently, when constructing our data warehouse, our job invokes a view for each ODS table to calculate the latest snapshot. This snapshot essentially aims to reconstruct the source table from the CDC rows. Our approach involves using the ROW_NUMBER() function with the following logic: partition by ID and order by src_updated_ts (in descending order), id_version (in descending order), and extraction_ts (in descending order). We then select the latest record for each ID.

Until now, weโ€™ve been loading the warehouse once a day. However, weโ€™re now planning to run the warehouse job every hour. Unfortunately, our current view-based method for calculating the latest snapshot is becoming prohibitively expensive and time-consuming. It requires scanning the entire ODS table for every view invocation, which is not feasible for frequent updates.

what am seeking help for: I want to materialize and calculate the data table's current snapshot as i get records inserted into ODS table. I have tried to utilize materialized view feature but couldn't use it as my query involves partition by or self join or sub-query. 

What is the best way to achieve this in big query ? 

6 5 736
5 REPLIES 5

Hi @crazyrevol ,

Your current strategy, utilizing view-based approaches for less frequent updates, might become costly and time-consuming with the need to refresh hourly, primarily due to full table scans. Unfortunately, materialized views in BigQuery, while efficient, do not support the complex logic your process requires, such as partitioning, self-joins, and subqueries. This limitation stems from materialized views being optimized for speed and simplicity, focusing on pre-computing query results rather than handling complex SQL operations.

Potential Solutions:

1. Incremental Materialization with MERGE:

  • Concept: Instead of rebuilding the entire snapshot, focus on processing only the new or changed CDC records since the last update. This approach minimizes the amount of data processed, improving efficiency.
  • How:
    • Use a "last processed timestamp" or a transaction log table to track the latest processed record.
    • Employ a MERGE statement to efficiently update the snapshot table, handling inserts, updates, and deletes based on this timestamp. The MERGE statement's ability to perform multiple DML operations in a single statement makes it well-suited for this task.
  • Pros: Highly efficient for frequent updates, supports complex logic within MERGE, and provides granular control over data manipulation.
  • Cons: Requires meticulous management of the progress tracker (timestamp or log) to avoid data loss or duplication. Careful handling of potential merge conflicts is also necessary, especially for delete operations.

2. Streaming Inserts with Window Functions:

  • Concept: Continuously stream CDC records into a partitioned snapshot table. Utilize window functions (like ROW_NUMBER()) within a view to identify the latest record for each ID. This approach leverages BigQuery's streaming capabilities for near-real-time updates.
  • How:
    • Create a partitioned snapshot table, potentially partitioned by src_updated_ts for efficient data management.
    • Stream CDC records into this table as they arrive, using BigQuery's streaming insert API.
    • Define a view that uses window functions (e.g., ROW_NUMBER() OVER (PARTITION BY ID ORDER BY src_updated_ts DESC)) to select the latest record for each ID. This view provides the current snapshot of your data.
  • Pros: Offers near-real-time updates, leverages BigQuery's strengths in streaming and partitioning, and allows for complex logic through window functions.
  • Cons: Requires awareness of streaming insert limitations and costs. Additional logic might be necessary for handling late-arriving or out-of-order data to ensure data consistency.

Choosing the Right Approach:

The optimal solution depends on your specific requirements:

  • Data Volume & Update Frequency: High volume or the need for very low latency might favor streaming inserts.
  • Complexity: If your snapshot logic involves complex transformations or calculations, the MERGE approach offers more flexibility.
  • Operational Overhead: Consider the ease of implementation and ongoing maintenance, as well as your team's familiarity with BigQuery features. Streaming inserts might require additional infrastructure and monitoring.

Additional Tips:

  • Partitioning: Partitioning your snapshot table, especially by a timestamp column like src_updated_ts, can significantly improve query performance and cost-efficiency.
  • Error Handling: Implement robust error handling and retry mechanisms to ensure data integrity, especially when dealing with streaming inserts or potential merge conflicts.
  • Monitoring: Regularly review performance and resource usage using BigQuery's built-in monitoring tools or third-party solutions. This helps identify bottlenecks and optimize your solution.

Thank you for your reply @ms4446 .

1. Incremental Materialization with MERGE: 

This option is very clear to me. I have implemented it. The CDC table has real-time updates from cdc extraction tool. I have partitioned the CDC table on extraction_ts column (this is extraction time set by cdc tool). Every hour I'm taking the delta records from CDC table that are greater than max(extraction_ts) available in the snapshot table and MERGE them to snapshot table. That will satisfy our need. Cost-wise for MERGE query we're incurring the cost of partitions scanned on CDC table and the whole size of the destination snapshot table. Is there any optimization I can do to reduce cost of MERGE query? 

 

 

MERGE INTO ODS_LATEST_SNAPSHOT_TABLE AS target
USING (
    -- Subquery to select records from the source table
    SELECT *
      FROM ODS_CDC_TABLE
      WHERE extraction_ts >= DATETIME('2024-04-10T16:14:47') -- partitioned column in source
      QUALIFY row_number() over (partition by id order by src_updated_ts desc, extraction_ts desc )=1
) AS source
ON target.ID = source.ID
WHEN MATCHED AND source.src_updated_ts > target.src_updated_ts AND source.operation = 'UPDATE' THEN
-- Update the target table with values from the source
UPDATE SET
target.x = source.x
WHEN MATCHED AND source.operation = 'DELETE' THEN DELETE
WHEN NOT MATCHED THEN INSERT ROW;

 

 

2. Streaming Inserts with Window Functions: I think we are actually doing this right now. just not with BigQuery's stream API. Not sure what the benefit would be by switching from 3rd party tool to BigQuery stream ?

we already have records being inserted into our CDC table in real-time using external CDC tool. To get latest snapshot using a view is what we are doing right now. Exactly the way you described it - by having a view on top of the CDC table with window functions (like ROW_NUMBER()) and within that view to identify the latest record for each ID. It will always scan the whole table every time it is called, which is hourly. 
May be am not understanding the solution #2 here?

Hi @crazyrevol ,

You're spot on about the cost factors. Here are some strategies to consider, building on what you're already doing:

  1. MERGE Query Optimization:

 Since your updates are hourly, partitioning your CDC table more granularly could significantly reduce the data scanned during each MERGE, thereby lowering costs.

Utilize a dynamic variable like @last_processed_timestamp for each run to ensure only new data since the last update is processed. This reduces redundant data handling.

Applying clustering on the ID column in your snapshot table can enhance MERGE performance, particularly with large datasets.

Column Selection: Opt for explicit column selection in your queries (SELECT id, x, src_updated_ts) instead of SELECT * to minimize data transfer and processing overhead.

Handling Deletes: Include a WHEN NOT MATCHED BY SOURCE THEN DELETE clause to maintain snapshot accuracy by removing obsolete records.

  1. Here's an optimized MERGE example to consider, tailored for your setup:

 
MERGE INTO ODS_LATEST_SNAPSHOT_TABLE AS target
USING (
  SELECT id, x, src_updated_ts -- Only necessary columns
  FROM ODS_CDC_TABLE
  WHERE extraction_ts >= @last_processed_timestamp
  QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY src_updated_ts DESC, extraction_ts DESC) = 1
) AS source
ON target.ID = source.ID
WHEN MATCHED AND source.src_updated_ts > target.src_updated_ts THEN UPDATE SET target.x = source.x
WHEN MATCHED AND source.operation = 'DELETE' THEN DELETE
WHEN NOT MATCHED BY SOURCE THEN DELETE
WHEN NOT MATCHED THEN INSERT (ID, x, src_updated_ts) VALUES (source.ID, source.x, source.src_updated_ts);
  1.  

  2. BigQuery Streaming Inserts vs. External CDC Tool:

Utilizing BigQuery's streaming inserts can simplify your architecture by eliminating the need for an external tool, thus reducing maintenance and potential failure points.

BigQuery ensures exactly-once delivery and maintains record ordering, which are crucial for transactional data consistency.

 Although potentially more costly, the operational simplicity and reliability improvements from using BigQuery streaming inserts might outweigh the increased expenses, especially when considering system robustness.

If your existing system integrates smoothly and performs well, a switch may not be essential. However, consolidating services within the Google Cloud Platform could provide additional benefits in the future.

  1. Additional Considerations:

    • Error Handling and Monitoring: Develop mechanisms to promptly identify and resolve issues to maintain high data quality.
    • Backfilling and Historical Data: Combine batch loading capabilities with streaming inserts for optimal handling of large data volumes or historical backfills.
    • Security and Compliance: Ensure that your data handling practices comply with your organization's security standards and regulatory requirements.

Deciding on the best strategy will depend on evaluating the performance, cost, and maintenance trade-offs specific to your operations.


@ms4446 wrote:

WHEN NOT MATCHED BY SOURCE THEN DELETE


clarifying question on this clause - If it doesn't find in the below part of source records, will it delete ? I would not want that. If I don't get any cdc records for an ID, that doesn't mean delete. 


@ms4446 wrote:

SELECT id, x, src_updated_ts -- Only necessary columns FROM ODS_CDC_TABLE WHERE extraction_ts >= @last_processed_timestamp QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY src_updated_ts DESC, extraction_ts DESC) = 1 ) AS source


 

 

 

The choice of whether to include or exclude the WHEN NOT MATCHED BY SOURCE THEN DELETE clause depends entirely on your desired data management strategy:

  • Include the clause: If your goal is for the snapshot table to strictly reflect the current state of the source system, then including this clause will ensure that records no longer present in the source are also removed from the snapshot. This approach keeps the snapshot table clean and aligned with the latest source data, but it does not preserve historical records that have been deleted from the source.

  • Omit the clause: If you prefer the snapshot table to retain all historical records, even if they are no longer updated or present in the source, then you should omit this clause. This way, the snapshot table will maintain a comprehensive history of all records ever captured, regardless of their current status in the source system.