Hello, maybe a silly question, but I'm trying to edit current sqlx file, which is already in the pipeline and it loads data from source table, makes some transformation and save it into the destination table.
And I would like to edit the logic, to first check if data from specific date is already in the destination table if not, than continue the process, if yes, then finish the workflow without any data processing
I tried approach to declare a dependencies with custom SQL operations, but wasn't able to make it working
https://cloud.google.com/dataform/docs/dependencies
I would be really grateful for any advice to solve this issue
This is the sqlx file
config {
type: "incremental",
schema: "reporting",
tags: ["daily"],
bigquery: {
partitionBy: "date"
},
}
WITH
source AS (
SELECT * FROM
(SELECT
SUM(total) AS sumTotal,
`type`,
podcastId,
podcastTitle,
episodeId,
episodeTitle,
DATE(start) as date,
FROM ${ref(`sourcetable`)}
WHERE ${when(incremental(), `DATE(start) = DATE_SUB('${constants.PROCESSING_CURRENT_DATE}', INTERVAL ${constants.PROCESSING_DAYS_OFFSET} DAY) AND`)}
country IS NULL
GROUP BY ALL
)
)
SELECT * FROM source
Regards,
M
To achieve the desired logic in your Dataform SQLX file, you can implement a custom logic to check for the existence of data for a specific date in the destination table before processing new data. You can achieve this by using the pre_operations
configuration in Dataform to check the existence of data. If data exists, you can skip further processing. Here's an example of how to implement this:
Create a check for existing data:
pre_operations
statement to check if data for the specific date already exists in the destination table.Modify the transformation logic:
Here's how you can modify your SQLX file:
config {
type: "incremental",
schema: "reporting",
tags: ["daily"],
bigquery: {
partitionBy: "date"
},
pre_operations: [
// Create a temporary table to store the check result
`CREATE OR REPLACE TEMPORARY TABLE check_existing_data AS
SELECT COUNT(*) AS record_count
FROM ${this.schema}.${this.name}
WHERE date = DATE_SUB('${constants.PROCESSING_CURRENT_DATE}', INTERVAL ${constants.PROCESSING_DAYS_OFFSET} DAY)`
]
}
-- Check if data already exists for the specific date
DECLARE record_exists BOOL DEFAULT FALSE;
SELECT record_exists := (SELECT record_count > 0 FROM check_existing_data);
-- Proceed with the main transformation logic only if data does not exist
IF NOT record_exists THEN
WITH source AS (
SELECT * FROM
(SELECT
SUM(total) AS sumTotal,
`type`,
podcastId,
podcastTitle,
episodeId,
episodeTitle,
DATE(start) as date,
FROM ${ref(`sourcetable`)}
WHERE ${when(incremental(), `DATE(start) = DATE_SUB('${constants.PROCESSING_CURRENT_DATE}', INTERVAL ${constants.PROCESSING_DAYS_OFFSET} DAY) AND`)}
country IS NULL
GROUP BY ALL
)
)
SELECT * FROM source;
END IF;