I think I am surely doing something wrong here. I shall be grateful if someone can guide me.
Please see this pseudo code of Dataform pipeline
config {
type: "incremental",
description: "Information from equipments",
}
WITH equipments_table AS (
SELECT
EPS.equip_timestamp AS equip_seen,
EPS.session_id,
EPS.num AS equip_no,
V.my_date,
V.etc_Date,
V.force_abc,
V.force_region,
V.xyz_status
FROM ${ref("EPS_LOGIC_GATES")} EPS
JOIN ${ref('XXX_monthly_report')} V ON EPS.num = V.num
AND DATE(EPS.equip_timestamp) >= DATE_ADD( V.etc_Date , INTERVAL -7 DAY )
AND (DATE(EPS.equip_timestamp) < V.my_date OR my_date IS NULL)
WHERE
TIMESTAMP_TRUNC(EPS.equip_timestamp, DAY) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 7 DAY)
)
SELECT DISTINCT * FROM equipments_table
As you can see above:
1) In the config, I have set the type as incremental
config {
type: "incremental",
description: "Information from equipments",
}
This (below) is what I want to achieve:
1) My table: EPS_LOGIC_GATES is being populated with new data every hour.
2) Every hour I am running a Dataform pipeline (after EPS_LOGIC_GATES is populated).
3) The above is one example of my Dataform table.
4) Above I want the transformation in Dataform to be done ONLY ON RECORDS WHERE TIMESTAMP_TRUNC(EPS.equip_timestamp, DAY) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 7 DAY)
5) All the records where the condition 4 is not satisfied, it means the transformation is already carried out in the previous pipeline run (1 hour ago) and I do not want to touch those records
6) I am redoing/overwriting the transformations in some records as a precautionary measure (I am happy with that) within the 7 day interval
IS the above pseudo code achieving my purpose?
@ms4446 , I shall be grateful if you can help
Based on your description and the pseudo code provided, it seems like you are on the right track with your Dataform pipeline
Configuration:
config {
type: "incremental",
description: "Information from equipments",
}
type
to "incremental" instructs Dataform to optimize processing by focusing on new or updated data since the last pipeline run.WITH
Clause and Filtering:
WITH equipments_table AS (
SELECT
EPS.equip_timestamp AS equip_seen,
EPS.session_id,
EPS.num AS equip_no,
V.my_date,
V.etc_Date,
V.force_abc,
V.force_region,
V.xyz_status
FROM ${ref("EPS_LOGIC_GATES")} EPS
JOIN ${ref('XXX_monthly_report')} V
ON EPS.num = V.num
AND DATE(EPS.equip_timestamp) >= DATE_ADD( V.etc_Date , INTERVAL -7 DAY )
AND (DATE(EPS.equip_timestamp) < V.my_date OR my_date IS NULL)
WHERE TIMESTAMP_TRUNC(EPS.equip_timestamp, DAY) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 7 DAY)
)
TIMESTAMP_TRUNC...
condition is key. It ensures only records within the past 7 days are processed.SELECT DISTINCT * FROM equipments_table
:
equipments_table
after the transformations and filtering in the WITH
clause.Overall Assessment:
Your code should achieve the desired behavior:
EPS_LOGIC_GATES
within the past 7 days when run hourly.Key Assumptions and Enhancements:
EPS_LOGIC_GATES
is populated accurately and equip_timestamp
reliably reflects when a record was last transformed.last_transformation_timestamp
column to track when each record was last processed.This could further refine the incremental logic in future runs.Additional Considerations:
There seems to be a problem.
1) If I do this, both my incremental and non incremental query compiled query have the same processed size 281 GB. I had thought incremental query should be much lesser? Why is it so?
2) Also in the complied query I expected to see MERGE statement and INSERT Statement (because I think that is the way Dataform handles this behind the scenes) but I see exactly the same query with the project/dataset/table names instead of $ref.
Can you please advice on 1 and 2 @ms4446