Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Merge daily snapshots to a table incrementaly.

Hello everyone,

I will present the challenge I have.

I need to keep two BQ tables: a raw table that gets populated daily with content from a .csv file and a stage table that is merged daily with the raw table. In both these 2 tables I have 2 timestamp columns: insert_date_time and update_date_time. Please consider the scenario:

1. First .csv file comes, it is loaded into the raw table, both insert_date_time and update_date_time columns get CURRENT_TIMESTAMP() value. When merging between the raw and the stage table the content of the raw table is inserted as is to the stage table because the stage table is empty.

2. The next day, the .csv file comes with the deltaload(an existing record updated with new values, the new one, plus a set of new ones that did not exist before).All content of the .csv file is INSERTED into the raw table as is.

I want to merge the raw table with the stage table "incremental" and not with an "operation" file that does the "merge" with DML language.

How I want to merge:

  • Today's records from the raw table that do not exist in the stage table are inserted in the stage table.
  • Today's records from the raw table that can be identified by primary key in the stage table need to update the corresponding ones in the stage table except for the "insert_date_time" which has to be the original one, when the record was inserted in the stage table.

Can this flow be implemented with the "incremental" config type?

 

Solved Solved
2 1 477
1 ACCEPTED SOLUTION

You can effectively implement this flow in BigQuery using the MERGE statement. Here's  how you'd accomplish this, along with potential optimizations.

Core Concepts

  • Raw Table: Your daily CSV uploads populate this table, and both insert_date_time and update_date_time initially receive CURRENT_TIMESTAMP().
  • Stage Table: Functions as the consolidated, updated table incorporating deltas from the raw table.
  • MERGE Statement: The powerful tool to elegantly insert new records from the raw table and update existing ones in the stage table.

Steps

  1. Data Load: You'll continue your existing process of loading daily CSV files into the raw table.

  2. MERGE Implementation: Here's a tailored MERGE statement to execute daily:

     
    MERGE stage_table AS target
    USING raw_table AS source
    ON target.primary_key = source.primary_key -- Match on your primary key
    
    WHEN MATCHED THEN
        UPDATE SET 
            target.column1 = source.column1, -- Update regular columns
            target.column2 = source.column2, 
            -- ... other columns to update
            target.update_date_time = CURRENT_TIMESTAMP()  -- Update timestamp
    
    WHEN NOT MATCHED THEN
        INSERT (primary_key, column1, column2, ..., insert_date_time, update_date_time) 
        VALUES (source.primary_key, source.column1, source.column2, ..., source.insert_date_time, source.update_date_time) 
    

Explanation

  • The MERGE statement compares the stage (target) and raw (source) tables based on your primary key.
  • WHEN MATCHED: Existing records in the stage table get updated with values from the raw table, and the update_date_time is refreshed. The original insert_date_time remains unchanged.
  • WHEN NOT MATCHED: New records from the raw table are inserted into the stage table, preserving their original insert_date_time.

Optimization (Optional)

If your tables become very large, consider adding partitioning on a suitable column (e.g., a date column) to both the raw and stage tables. This will limit the MERGE operation to scan only relevant partitions, significantly improving performance.

Automation

Encapsulate this SQL and the load process into a scheduled Cloud Function or orchestrate this using a tool like Cloud Composer (Airflow) for a managed workflow.

View solution in original post

1 REPLY 1

You can effectively implement this flow in BigQuery using the MERGE statement. Here's  how you'd accomplish this, along with potential optimizations.

Core Concepts

  • Raw Table: Your daily CSV uploads populate this table, and both insert_date_time and update_date_time initially receive CURRENT_TIMESTAMP().
  • Stage Table: Functions as the consolidated, updated table incorporating deltas from the raw table.
  • MERGE Statement: The powerful tool to elegantly insert new records from the raw table and update existing ones in the stage table.

Steps

  1. Data Load: You'll continue your existing process of loading daily CSV files into the raw table.

  2. MERGE Implementation: Here's a tailored MERGE statement to execute daily:

     
    MERGE stage_table AS target
    USING raw_table AS source
    ON target.primary_key = source.primary_key -- Match on your primary key
    
    WHEN MATCHED THEN
        UPDATE SET 
            target.column1 = source.column1, -- Update regular columns
            target.column2 = source.column2, 
            -- ... other columns to update
            target.update_date_time = CURRENT_TIMESTAMP()  -- Update timestamp
    
    WHEN NOT MATCHED THEN
        INSERT (primary_key, column1, column2, ..., insert_date_time, update_date_time) 
        VALUES (source.primary_key, source.column1, source.column2, ..., source.insert_date_time, source.update_date_time) 
    

Explanation

  • The MERGE statement compares the stage (target) and raw (source) tables based on your primary key.
  • WHEN MATCHED: Existing records in the stage table get updated with values from the raw table, and the update_date_time is refreshed. The original insert_date_time remains unchanged.
  • WHEN NOT MATCHED: New records from the raw table are inserted into the stage table, preserving their original insert_date_time.

Optimization (Optional)

If your tables become very large, consider adding partitioning on a suitable column (e.g., a date column) to both the raw and stage tables. This will limit the MERGE operation to scan only relevant partitions, significantly improving performance.

Automation

Encapsulate this SQL and the load process into a scheduled Cloud Function or orchestrate this using a tool like Cloud Composer (Airflow) for a managed workflow.