Hi,
I'm creating an incremental table in Dataform, but want to change how duplicate rows are merged. The default behavior is for duplicate rows to overwrite existing rows in the table. Instead, I would like to sum values in the new duplicate rows to the existing rows' values.
For example, consider the source table below. Here, sales can may be added to the table after the date that they occur (insert_date = date the sales were added to table, transaction_date = date the sales occurred).
insert_date | transaction_date | sales_quantity |
2023-12-01 | 2023-12-01 | 10 |
2023-12-02 | 2023-12-02 | 20 |
2023-12-03 | 2023-12-01 | 5 |
I would like to incrementally (based on insert_date) generate a table that shows the total sales_quantity on a given transaction_date, where transaction_date is the unique key. The result would be as follows.
transaction_date | sales_quantity |
2023-12-01 | 15 |
2023-12-02 | 20 |
So I would like to sum the sales_quantity for duplicate rows, rather than overwrite it. But with the default behavior running on the sample code below, if I ran the incremental execution on 2023-12-01, 2023-12-02, and 2023-12-03 (where future insert_date rows don't exist in the source table on the date of the execution), I would end up with the following results at the end. During the last execution, the new row with a transaction_date of 2023-12-01 overwrote the previously added row, instead of adding to it.
SQLX file:
config { type: "incremental", uniqueKey: ["transaction_date"] } pre_operations { DECLARE insert_date_checkpoint DEFAULT ( ${when(incremental(), `SELECT MAX(insert_date) FROM ${self()}`, `SELECT DATE("2000-01-01")`)} ) } SELECT transaction_date, SUM(sales_quantity) AS sales_quantity FROM ${ref("table")} WHERE insert_date > insert_date_checkpoint GROUP BY transaction_date
Results:
transaction_date | sales_quantity |
2023-12-01 | 5 |
2023-12-02 | 20 |
How could I perform a custom merge on this and how would this look like? Everything I've tried this far leads me to an error 'Unexpected keyword MERGE at ...'
Thanks!
Solved! Go to Solution.
To handle duplicate rows by summing sales_quantity instead of overwriting them in Dataform, we use a MERGE statement. This allows us to explicitly define how to handle both matching and non-matching rows. The existing incremental logic using insert_date ensures that only new or updated data is processed. When a matching row (same transaction_date) is found, we instruct the MERGE statement to update the existing row by adding the new sales_quantity value to the current value. Here’s the revised SQLX code snippet to implement this behavior:
config {
type: "incremental",
uniqueKey: ["transaction_date"]
}
pre_operations {
DECLARE insert_date_checkpoint DEFAULT (
${when(incremental(),
`SELECT MAX(insert_date) FROM ${self()}`,
`SELECT DATE("2000-01-01")`
)}
);
}
MERGE INTO ${self()} AS target
USING (
SELECT transaction_date, SUM(sales_quantity) AS sales_quantity
FROM ${ref("table")}
WHERE insert_date > insert_date_checkpoint
GROUP BY transaction_date
) AS source
ON target.transaction_date = source.transaction_date
WHEN MATCHED THEN
UPDATE SET sales_quantity = target.sales_quantity + source.sales_quantity
WHEN NOT MATCHED THEN
INSERT (transaction_date, sales_quantity) VALUES (source.transaction_date, source.sales_quantity);
Let's say your incremental table already has:
transaction_date | sales_quantity |
---|---|
2023-12-01 | 15 |
2023-12-02 | 20 |
And you have new data in your source table:
insert_date | transaction_date | sales_quantity |
---|---|---|
2023-12-04 | 2023-12-01 | 3 |
2023-12-04 | 2023-12-03 | 8 |
After running the revised Dataform code, your incremental table will become:
transaction_date | sales_quantity |
---|---|
2023-12-01 | 18 |
2023-12-02 | 20 |
2023-12-03 | 8 |
Key Points