Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataform incremental tables

I think I am surely doing something wrong here. I shall be grateful if someone can guide me.

Please see this pseudo code of Dataform pipeline

 

config {
type: "incremental",
description: "Information from equipments",

}




WITH equipments_table AS (

SELECT
EPS.equip_timestamp AS equip_seen,
EPS.session_id,
EPS.num AS equip_no,

V.my_date,
V.etc_Date,
V.force_abc,
V.force_region,
V.xyz_status
FROM ${ref("EPS_LOGIC_GATES")} EPS
JOIN ${ref('XXX_monthly_report')} V ON EPS.num = V.num
AND DATE(EPS.equip_timestamp) >= DATE_ADD( V.etc_Date , INTERVAL -7 DAY )
AND (DATE(EPS.equip_timestamp) < V.my_date OR my_date IS NULL)
WHERE
TIMESTAMP_TRUNC(EPS.equip_timestamp, DAY) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 7 DAY)

)

SELECT DISTINCT * FROM equipments_table

 


As you can see above:

1) In the config, I have set the type as incremental

 

config {
type: "incremental",
description: "Information from equipments",

}

 

This (below) is what I want to achieve:

1) My table: EPS_LOGIC_GATES is being populated with new data every hour.
2) Every hour I am running a Dataform pipeline (after EPS_LOGIC_GATES is populated).
3) The above is one example of my Dataform table.
4) Above I want the transformation in Dataform to be done ONLY ON RECORDS WHERE TIMESTAMP_TRUNC(EPS.equip_timestamp, DAY) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 7 DAY)
5) All the records where the condition 4 is not satisfied, it means the transformation is already carried out in the previous pipeline run (1 hour ago) and I do not want to touch those records
6) I am redoing/overwriting the transformations in some records as a precautionary measure (I am happy with that) within the 7 day interval

IS the above pseudo code achieving my purpose?

@ms4446 , I shall be grateful if you can help

0 2 1,687
2 REPLIES 2

Based on your description and the pseudo code provided, it seems like you are on the right track with your Dataform pipeline

Configuration:

 
config {
  type: "incremental",
  description: "Information from equipments",
}
  • Correct: Setting type to "incremental" instructs Dataform to optimize processing by focusing on new or updated data since the last pipeline run.

WITH Clause and Filtering:

 
WITH equipments_table AS (
  SELECT
    EPS.equip_timestamp AS equip_seen,
    EPS.session_id,
    EPS.num AS equip_no,

    V.my_date,
    V.etc_Date,
    V.force_abc,
    V.force_region,
    V.xyz_status

  FROM ${ref("EPS_LOGIC_GATES")} EPS

  JOIN ${ref('XXX_monthly_report')} V 
    ON EPS.num = V.num
    AND DATE(EPS.equip_timestamp) >= DATE_ADD( V.etc_Date , INTERVAL -7 DAY )
    AND (DATE(EPS.equip_timestamp) < V.my_date OR my_date IS NULL)

  WHERE TIMESTAMP_TRUNC(EPS.equip_timestamp, DAY) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 7 DAY)
)
  • Core Incremental Logic: The TIMESTAMP_TRUNC... condition is key. It ensures only records within the past 7 days are processed.
  • Other Conditions: Additional join criteria and filtering will be applied only to records within the 7-day window.

SELECT DISTINCT * FROM equipments_table:

  • This statement outputs all unique records from equipments_table after the transformations and filtering in the WITH clause.

Overall Assessment:

Your code should achieve the desired behavior:

  1. Hourly Run: The pipeline will capture and process new/updated data in EPS_LOGIC_GATES within the past 7 days when run hourly.
  2. Avoid Touching Older Records: Records older than 7 days should not be picked up due to the filtering.
  3. Redoing Transformations (precaution):Records within the 7-day window will be re-processed to apply the latest logic.

Key Assumptions and Enhancements:

  • Data Accuracy: Ensure EPS_LOGIC_GATES is populated accurately and equip_timestampreliably reflects when a record was last transformed.
  • Tracking Transformations (Enhancement):Consider adding a last_transformation_timestamp column to track when each record was last processed.This could further refine the incremental logic in future runs.

Additional Considerations:

  • Efficiency and Performance Monitoring: As the dataset grows, monitor the pipeline's performance to ensure it remains scalable and efficient. Proactive performance monitoring is crucial.
  • Testing and Validation: Thoroughly test your Dataform code and logic to ensure it produces the expected results under various conditions.

There seems to be a problem.

1) If I do this, both my incremental and non incremental query compiled query have the same processed size 281 GB. I had thought incremental query should be much lesser? Why is it so?

2) Also in the complied query I expected to see MERGE statement and INSERT Statement (because I think that is the way Dataform handles this behind the scenes) but I see exactly the same query with the project/dataset/table names instead of $ref.

Can you please advice on 1 and 2 @ms4446