Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Custom merge (incremental table) with DataForm

Hi, 

I'm creating an incremental table in Dataform, but want to change how duplicate rows are merged. The default behavior is for duplicate rows to overwrite existing rows in the table. Instead, I would like to sum values in the new duplicate rows to the existing rows' values.

For example, consider the source table below. Here, sales can may be added to the table after the date that they occur (insert_date = date the sales were added to table, transaction_date = date the sales occurred).

 

insert_datetransaction_datesales_quantity
2023-12-012023-12-0110
2023-12-022023-12-0220
2023-12-032023-12-015

I would like to incrementally (based on insert_date) generate a table that shows the total sales_quantity on a given transaction_date, where transaction_date is the unique key. The result would be as follows.

 

transaction_datesales_quantity
2023-12-0115
2023-12-0220

So I would like to sum the sales_quantity for duplicate rows, rather than overwrite it. But with the default behavior running on the sample code below, if I ran the incremental execution on 2023-12-01, 2023-12-02, and 2023-12-03 (where future insert_date rows don't exist in the source table on the date of the execution), I would end up with the following results at the end. During the last execution, the new row with a transaction_date of 2023-12-01 overwrote the previously added row, instead of adding to it.

SQLX file:

 

config {
  type: "incremental",
  uniqueKey: ["transaction_date"]
}

pre_operations {
  DECLARE insert_date_checkpoint DEFAULT (
    ${when(incremental(),
    `SELECT MAX(insert_date) FROM ${self()}`,
    `SELECT DATE("2000-01-01")`)}
  )
}

SELECT
  transaction_date,
  SUM(sales_quantity) AS sales_quantity
FROM
  ${ref("table")}
WHERE
  insert_date > insert_date_checkpoint
GROUP BY
  transaction_date

 

Results:

transaction_datesales_quantity
2023-12-015
2023-12-0220

 

How could I perform a custom merge on this and how would this look like? Everything I've tried this far leads me to an error 'Unexpected keyword MERGE at ...'

Thanks! 

Solved Solved
0 4 1,745
1 ACCEPTED SOLUTION

To handle duplicate rows by summing sales_quantity instead of overwriting them in Dataform, we use a MERGE statement. This allows us to explicitly define how to handle both matching and non-matching rows. The existing incremental logic using insert_date ensures that only new or updated data is processed. When a matching row (same transaction_date) is found, we instruct the MERGE statement to update the existing row by adding the new sales_quantity value to the current value. Here’s the revised SQLX code snippet to implement this behavior:

config {
  type: "incremental",
  uniqueKey: ["transaction_date"]
}

pre_operations {
  DECLARE insert_date_checkpoint DEFAULT (
    ${when(incremental(), 
      `SELECT MAX(insert_date) FROM ${self()}`, 
      `SELECT DATE("2000-01-01")`
    )}
  );
}

MERGE INTO ${self()} AS target
USING (
  SELECT transaction_date, SUM(sales_quantity) AS sales_quantity
  FROM ${ref("table")}
  WHERE insert_date > insert_date_checkpoint
  GROUP BY transaction_date
) AS source
ON target.transaction_date = source.transaction_date
WHEN MATCHED THEN
  UPDATE SET sales_quantity = target.sales_quantity + source.sales_quantity
WHEN NOT MATCHED THEN
  INSERT (transaction_date, sales_quantity) VALUES (source.transaction_date, source.sales_quantity);

 

  • Incremental Data: The pre_operations section determines the maximum insert_date processed so far. Only rows with a newer insert_date are considered.
  • Source Data Preparation: The USING clause aggregates the new data from your source table (table), calculating the summed sales_quantity for each unique transaction_date.
  • Merge Logic:
    • WHEN MATCHED: If a row in the target (your incremental table) matches the transaction_date of a row in the source, it updates the target's sales_quantity by adding the source's sales_quantity.
    • WHEN NOT MATCHED: If there's no match, the entire row from the source is inserted into the target.

Let's say your incremental table already has:

 

transaction_date sales_quantity
2023-12-01 15
2023-12-02 20

And you have new data in your source table:

 

insert_date transaction_date sales_quantity
2023-12-04 2023-12-01 3
2023-12-04 2023-12-03 8

After running the revised Dataform code, your incremental table will become:

transaction_date sales_quantity
2023-12-01 18
2023-12-02 20
2023-12-03 8

Key Points

  • Dataform supports MERGE statements within its framework, provided the underlying data warehouse supports it (e.g., BigQuery).
  • Ensure appropriate permissions to modify the table in your database.

 

View solution in original post

4 REPLIES 4

Hi @lb_431,

Welcome to Google Cloud Community!

It seems like you have a similar case here.

According to @ms4446,  you have two options to sum values in duplicate rows:

  1. Modify the Incremental Table Definition:
    • Custom MERGE Statement: For managing duplicate rows, use the MERGE statement with custom logic. And in your SELECT statement, include a subquery for identifying existing rows for each transaction_date. In your MERGE statement, use a WHEN MATCHED clause to update sales_quantity by adding the new row's value to the existing one.

    • Custom pre_operations: By implementing a custom pre-operation, it simply aggregates and temporarily stores the total sales_quantity for each transaction_date. Join this temporary data with your source table and sum the sales_quantity values in your main SELECT statement.

  2. Create a Custom Operations SQLX File:
    • You can utilize Dataform functions like incremental(), self(), and previous_data() to manage the data flow. This approach allows for fully customized SQL logic to handle the incremental update as per your requirements.

Note:  For handling future dates,UNION ALL will help you in combining current data with a separate query for potential future transaction dates.

I hope the above information is helpful.

To handle duplicate rows by summing sales_quantity instead of overwriting them in Dataform, we use a MERGE statement. This allows us to explicitly define how to handle both matching and non-matching rows. The existing incremental logic using insert_date ensures that only new or updated data is processed. When a matching row (same transaction_date) is found, we instruct the MERGE statement to update the existing row by adding the new sales_quantity value to the current value. Here’s the revised SQLX code snippet to implement this behavior:

config {
  type: "incremental",
  uniqueKey: ["transaction_date"]
}

pre_operations {
  DECLARE insert_date_checkpoint DEFAULT (
    ${when(incremental(), 
      `SELECT MAX(insert_date) FROM ${self()}`, 
      `SELECT DATE("2000-01-01")`
    )}
  );
}

MERGE INTO ${self()} AS target
USING (
  SELECT transaction_date, SUM(sales_quantity) AS sales_quantity
  FROM ${ref("table")}
  WHERE insert_date > insert_date_checkpoint
  GROUP BY transaction_date
) AS source
ON target.transaction_date = source.transaction_date
WHEN MATCHED THEN
  UPDATE SET sales_quantity = target.sales_quantity + source.sales_quantity
WHEN NOT MATCHED THEN
  INSERT (transaction_date, sales_quantity) VALUES (source.transaction_date, source.sales_quantity);

 

  • Incremental Data: The pre_operations section determines the maximum insert_date processed so far. Only rows with a newer insert_date are considered.
  • Source Data Preparation: The USING clause aggregates the new data from your source table (table), calculating the summed sales_quantity for each unique transaction_date.
  • Merge Logic:
    • WHEN MATCHED: If a row in the target (your incremental table) matches the transaction_date of a row in the source, it updates the target's sales_quantity by adding the source's sales_quantity.
    • WHEN NOT MATCHED: If there's no match, the entire row from the source is inserted into the target.

Let's say your incremental table already has:

 

transaction_date sales_quantity
2023-12-01 15
2023-12-02 20

And you have new data in your source table:

 

insert_date transaction_date sales_quantity
2023-12-04 2023-12-01 3
2023-12-04 2023-12-03 8

After running the revised Dataform code, your incremental table will become:

transaction_date sales_quantity
2023-12-01 18
2023-12-02 20
2023-12-03 8

Key Points

  • Dataform supports MERGE statements within its framework, provided the underlying data warehouse supports it (e.g., BigQuery).
  • Ensure appropriate permissions to modify the table in your database.

 

Thank you @ms4446 for your answer. I think this possibility should be added to the documentation of Dataform for incremental tables: https://cloud.google.com/dataform/docs/incremental-tables#merge_rows_in_an_incremental_table

Hello again,
While your solution would work in the incremental case, it will not work if you run the query with the non-incremental case (full refresh for example). I guess the best way to have a robust pipeline is to use a custom operation with IF statements instead of an incremental table.