Hi,
I'm creating an incremental table in Dataform, but want to change how duplicate rows are merged. The default behavior is for duplicate rows to overwrite existing rows in the table. Instead, I would like to sum values in the new duplicate rows to the existing rows' values.
For example, consider the source table below. Here, sales can may be added to the table after the date that they occur (insert_date = date the sales were added to table, transaction_date = date the sales occurred).
insert_date | transaction_date | sales_quantity |
2023-12-01 | 2023-12-01 | 10 |
2023-12-02 | 2023-12-02 | 20 |
2023-12-03 | 2023-12-01 | 5 |
I would like to incrementally (based on insert_date) generate a table that shows the total sales_quantity on a given transaction_date, where transaction_date is the unique key. The result would be as follows.
transaction_date | sales_quantity |
2023-12-01 | 15 |
2023-12-02 | 20 |
So I would like to sum the sales_quantity for duplicate rows, rather than overwrite it. But with the default behavior running on the sample code below, if I ran the incremental execution on 2023-12-01, 2023-12-02, and 2023-12-03 (where future insert_date rows don't exist in the source table on the date of the execution), I would end up with the following results at the end. During the last execution, the new row with a transaction_date of 2023-12-01 overwrote the previously added row, instead of adding to it.
SQLX file:
config { type: "incremental", uniqueKey: ["transaction_date"] } pre_operations { DECLARE insert_date_checkpoint DEFAULT ( ${when(incremental(), `SELECT MAX(insert_date) FROM ${self()}`, `SELECT DATE("2000-01-01")`)} ) } SELECT transaction_date, SUM(sales_quantity) AS sales_quantity FROM ${ref("table")} WHERE insert_date > insert_date_checkpoint GROUP BY transaction_date
Results:
transaction_date | sales_quantity |
2023-12-01 | 5 |
2023-12-02 | 20 |
How could I perform a custom merge on this and how would this look like? Everything I've tried this far leads me to an error 'Unexpected keyword MERGE at ...'
Thanks!
Solved! Go to Solution.
To handle duplicate rows by summing sales_quantity instead of overwriting them in Dataform, we use a MERGE statement. This allows us to explicitly define how to handle both matching and non-matching rows. The existing incremental logic using insert_date ensures that only new or updated data is processed. When a matching row (same transaction_date) is found, we instruct the MERGE statement to update the existing row by adding the new sales_quantity value to the current value. Here’s the revised SQLX code snippet to implement this behavior:
config {
type: "incremental",
uniqueKey: ["transaction_date"]
}
pre_operations {
DECLARE insert_date_checkpoint DEFAULT (
${when(incremental(),
`SELECT MAX(insert_date) FROM ${self()}`,
`SELECT DATE("2000-01-01")`
)}
);
}
MERGE INTO ${self()} AS target
USING (
SELECT transaction_date, SUM(sales_quantity) AS sales_quantity
FROM ${ref("table")}
WHERE insert_date > insert_date_checkpoint
GROUP BY transaction_date
) AS source
ON target.transaction_date = source.transaction_date
WHEN MATCHED THEN
UPDATE SET sales_quantity = target.sales_quantity + source.sales_quantity
WHEN NOT MATCHED THEN
INSERT (transaction_date, sales_quantity) VALUES (source.transaction_date, source.sales_quantity);
Let's say your incremental table already has:
transaction_date | sales_quantity |
---|---|
2023-12-01 | 15 |
2023-12-02 | 20 |
And you have new data in your source table:
insert_date | transaction_date | sales_quantity |
---|---|---|
2023-12-04 | 2023-12-01 | 3 |
2023-12-04 | 2023-12-03 | 8 |
After running the revised Dataform code, your incremental table will become:
transaction_date | sales_quantity |
---|---|
2023-12-01 | 18 |
2023-12-02 | 20 |
2023-12-03 | 8 |
Key Points
Hi @lb_431,
Welcome to Google Cloud Community!
It seems like you have a similar case here.
According to @ms4446, you have two options to sum values in duplicate rows:
transaction_date
. In your MERGE statement, use a WHEN MATCHED
clause to update sales_quantity
by adding the new row's value to the existing one.sales_quantity
values in your main SELECT statement. incremental()
, self()
, and previous_data()
to manage the data flow. This approach allows for fully customized SQL logic to handle the incremental update as per your requirements.Note: For handling future dates,UNION ALL
will help you in combining current data with a separate query for potential future transaction dates.
I hope the above information is helpful.
To handle duplicate rows by summing sales_quantity instead of overwriting them in Dataform, we use a MERGE statement. This allows us to explicitly define how to handle both matching and non-matching rows. The existing incremental logic using insert_date ensures that only new or updated data is processed. When a matching row (same transaction_date) is found, we instruct the MERGE statement to update the existing row by adding the new sales_quantity value to the current value. Here’s the revised SQLX code snippet to implement this behavior:
config {
type: "incremental",
uniqueKey: ["transaction_date"]
}
pre_operations {
DECLARE insert_date_checkpoint DEFAULT (
${when(incremental(),
`SELECT MAX(insert_date) FROM ${self()}`,
`SELECT DATE("2000-01-01")`
)}
);
}
MERGE INTO ${self()} AS target
USING (
SELECT transaction_date, SUM(sales_quantity) AS sales_quantity
FROM ${ref("table")}
WHERE insert_date > insert_date_checkpoint
GROUP BY transaction_date
) AS source
ON target.transaction_date = source.transaction_date
WHEN MATCHED THEN
UPDATE SET sales_quantity = target.sales_quantity + source.sales_quantity
WHEN NOT MATCHED THEN
INSERT (transaction_date, sales_quantity) VALUES (source.transaction_date, source.sales_quantity);
Let's say your incremental table already has:
transaction_date | sales_quantity |
---|---|
2023-12-01 | 15 |
2023-12-02 | 20 |
And you have new data in your source table:
insert_date | transaction_date | sales_quantity |
---|---|---|
2023-12-04 | 2023-12-01 | 3 |
2023-12-04 | 2023-12-03 | 8 |
After running the revised Dataform code, your incremental table will become:
transaction_date | sales_quantity |
---|---|
2023-12-01 | 18 |
2023-12-02 | 20 |
2023-12-03 | 8 |
Key Points
Thank you @ms4446 for your answer. I think this possibility should be added to the documentation of Dataform for incremental tables: https://cloud.google.com/dataform/docs/incremental-tables#merge_rows_in_an_incremental_table
Hello again,
While your solution would work in the incremental case, it will not work if you run the query with the non-incremental case (full refresh for example). I guess the best way to have a robust pipeline is to use a custom operation with IF statements instead of an incremental table.