I have the following SLQX and relative table already created in my dataset:
config { type: "incremental", schema: "analytics_sanitized_test", description: "...", columns: { ... }, bigquery: { partitionBy: "DATE_TRUNC(created_at, MONTH)", requirePartitionFilter: true, clusterBy: ["created_at", "user_id"] }, tags: ["daily"], protected: true } SELECT src.userId AS user_id, TRIM(LOWER(src.market)) AS market, TRIM(LOWER(src.banReason)) AS ban_reason, src.createdAt AS created_at FROM ${ref("analytics", "user_banned")} AS src ${ when(incremental(), `LEFT JOIN ${self()} AS trg ON (trg.user_id = src.userId AND DATE(trg.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY))`) } WHERE ${ when(incremental(), ` trg.user_id IS NULL AND DATE(src.createdAt) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)`, ` DATE(src.createdAt) != CURRENT_DATE() `) } QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1
It doesn't matter if I run it manually from the UI or schedule it from the Workflow Configs, the logs always comment out the incremental version of the query.
See below:
BEGIN CREATE SCHEMA IF NOT EXISTS `xxx.analytics_sanitized_test` OPTIONS(location="us-central1"); EXCEPTION WHEN ERROR THEN IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission") THEN RAISE USING MESSAGE = @@error.message; END IF; END; BEGIN DECLARE dataform_table_type DEFAULT ( SELECT ANY_VALUE(table_type) FROM `xxx.analytics_sanitized_test.INFORMATION_SCHEMA.TABLES` WHERE table_name = 'user_banned' ); IF dataform_table_type IS NOT NULL THEN IF dataform_table_type = 'VIEW' THEN DROP VIEW IF EXISTS `xxx.analytics_sanitized_test.user_banned`; ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN DROP MATERIALIZED VIEW IF EXISTS `xxx.analytics_sanitized_test.user_banned`; END IF; END IF; IF dataform_table_type IS NOT NULL THEN BEGIN DECLARE dataform_columns ARRAY<STRING>; DECLARE dataform_columns_list STRING; SET dataform_columns = ( SELECT ARRAY_AGG(DISTINCT "`" || column_name || "`") FROM `xxx.analytics_sanitized_test.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = 'user_banned' ); SET dataform_columns_list = ( SELECT STRING_AGG(column) FROM UNNEST(dataform_columns) AS column); EXECUTE IMMEDIATE """ CREATE OR REPLACE PROCEDURE `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`() OPTIONS(strict_mode=false) BEGIN INSERT INTO `xxx.analytics_sanitized_test.user_banned` (""" || dataform_columns_list || """) SELECT """ || dataform_columns_list || """ FROM ( SELECT src.userId AS user_id, TRIM(LOWER(src.market)) AS market, TRIM(LOWER(src.banReason)) AS ban_reason, src.createdAt AS created_at FROM `xxx.analytics.user_banned` AS src LEFT JOIN `xxx.analytics_sanitized_test.user_banned` AS trg ON (trg.user_id = src.userId AND DATE(trg.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)) WHERE trg.user_id IS NULL AND DATE(src.createdAt) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1 ); END; """; CALL `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`(); DROP PROCEDURE IF EXISTS `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`; END; ELSE BEGIN CREATE TABLE IF NOT EXISTS `xxx.analytics_sanitized_test.user_banned` PARTITION BY DATE_TRUNC(created_at, MONTH) CLUSTER BY created_at, user_id OPTIONS(description='''Track user ban events''', require_partition_filter=true) AS ( SELECT src.userId AS user_id, TRIM(LOWER(src.market)) AS market, TRIM(LOWER(src.banReason)) AS ban_reason, src.createdAt AS created_at FROM `xxx.analytics.user_banned` AS src WHERE DATE(src.createdAt) != CURRENT_DATE() QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1 ); END; END IF; END;
The desired outcome is that, once the incremental table is present in the target BQ dataset, it renders/compile the incremental version and not the non-incremental one. Or at least uses the INSERT INTO and not the CREATE OR REPLACE TABLE statement
Further information:
Read the documentation available on incremental tables (here)
Browsed the Google Cloud Community forum (here)
Created a copy of the source table without stream attached
Changed the definition inside the when(incremental()..) to keep it at the bottom.
Run with and without Full Refresh from UI and Workflow Configs
Removed protected: true
Thanks.
Solved! Go to Solution.
Timezone discrepancies and scheduling nuances can indeed lead to unexpected behavior in data processing workflows, particularly when these workflows depend on data being available up to a specific point in time. Your empirical solution of adjusting the Workflow Config runs to 04:00 AM CEST to ensure all records from the previous day are captured is a practical and effective approach to addressing this challenge.
Understanding Timezone and Scheduling Issues
Timezone Handling: Data processing systems often default to UTC or adopt the timezone set at the project or environment level. A misalignment between the source data generation and the processing schedule can result in missing or incomplete data captures. It's essential to align these timezones to ensure data consistency.
Data Availability: Making sure that all data from the source tables for the previous day is available by the time the workflow runs is crucial. Adjusting the schedule, as you have done, accommodates any delays in data generation or availability, ensuring a more reliable data processing workflow.
Workflow Scheduling: It's important to consider the timezone of both the scheduler and the data source. Adjusting the schedule based on observed behavior is often necessary to guarantee the reliability of data processing operations.
Dealing with Logs Showing Incremental Part Commented Out
The issue of Workflow Configs logs showing the incremental part of the SQLX commented out, despite performing an incremental load, can indeed be perplexing and hinder effective debugging. Here are some steps and considerations to help clarify this situation:
Logs Interpretation: The presentation of logs might not always reflect the actual operations accurately, especially for compiled or transformed code. This could be due to the logging mechanism simplifying the process for readability or the specific logging level configuration.
Verify Incremental Behavior: To confirm that incremental loads are indeed occurring, you can:
Consult Documentation or Support: For deeper insights into how logging is handled for incremental loads, consulting Dataform documentation or reaching out to Google Cloud support can be beneficial.
Successfully addressing the scheduling issue is a significant step forward. However, the logging discrepancy still requires attention to ensure you can reliably debug and monitor your Dataform workflows moving forward. Your proactive approach to both identifying and solving these issues is commendable and contributes to a more robust data processing practice.
You're encountering an issue where Google Cloud Dataform isn't executing your incremental logic correctly within your SQLX file. Let's explore potential causes and solutions.
Potential Causes
when(incremental(),...)
, errors in the configuration block (type, bigquery.partitionBy, bigquery.requirePartitionFilter, bigquery.clusterBy).Troubleshooting Steps
when(incremental(),...)
Placement: Double-check its syntax and position within your SQLX file.Example:
config {
type: "incremental",
schema: "analytics_sanitized_test",
description: "...",
columns: { ... },
bigquery: {
partitionBy: "DATE_TRUNC(created_at, MONTH)",
requirePartitionFilter: true,
clusterBy: ["created_at", "user_id"]
},
tags: ["daily"],
protected: true
}
SELECT
src.userId AS user_id,
TRIM(LOWER(src.market)) AS market,
TRIM(LOWER(src.banReason)) AS ban_reason,
src.createdAt AS created_at
FROM `xxx.analytics.user_banned` AS src
WHERE ${when(incremental(),
"LEFT JOIN ${self()} AS trg
ON trg.user_id = src.userId
AND DATE(trg.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)
AND trg.user_id IS NULL
AND DATE(src.createdAt) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)",
"DATE(src.createdAt) != CURRENT_DATE()") }
QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1
Additional Considerations
Hi,
after some more debugging I realised that there was an issue with the Workflow Config schedule and its timezone.
Originally I set it to run at 00:15 AM CEST every day, so I could fetch all the records from the source tables that were generated at CURRENT_DATE() - 1 .
However, the scheduler was probably running it sometime before and no rows was captured. I moved the Workflow Config runs to 04:00 AM CEST and it is working as expected.
I have no way to verify this since the scheduler shows always the correct times, so it is just an empirical proof.
There is still the problem that the Workflow Configs logs are showing the incremental part of the SQLX commented out, but it performs an incremental load nonetheless. This is very confusing to debug.
Timezone discrepancies and scheduling nuances can indeed lead to unexpected behavior in data processing workflows, particularly when these workflows depend on data being available up to a specific point in time. Your empirical solution of adjusting the Workflow Config runs to 04:00 AM CEST to ensure all records from the previous day are captured is a practical and effective approach to addressing this challenge.
Understanding Timezone and Scheduling Issues
Timezone Handling: Data processing systems often default to UTC or adopt the timezone set at the project or environment level. A misalignment between the source data generation and the processing schedule can result in missing or incomplete data captures. It's essential to align these timezones to ensure data consistency.
Data Availability: Making sure that all data from the source tables for the previous day is available by the time the workflow runs is crucial. Adjusting the schedule, as you have done, accommodates any delays in data generation or availability, ensuring a more reliable data processing workflow.
Workflow Scheduling: It's important to consider the timezone of both the scheduler and the data source. Adjusting the schedule based on observed behavior is often necessary to guarantee the reliability of data processing operations.
Dealing with Logs Showing Incremental Part Commented Out
The issue of Workflow Configs logs showing the incremental part of the SQLX commented out, despite performing an incremental load, can indeed be perplexing and hinder effective debugging. Here are some steps and considerations to help clarify this situation:
Logs Interpretation: The presentation of logs might not always reflect the actual operations accurately, especially for compiled or transformed code. This could be due to the logging mechanism simplifying the process for readability or the specific logging level configuration.
Verify Incremental Behavior: To confirm that incremental loads are indeed occurring, you can:
Consult Documentation or Support: For deeper insights into how logging is handled for incremental loads, consulting Dataform documentation or reaching out to Google Cloud support can be beneficial.
Successfully addressing the scheduling issue is a significant step forward. However, the logging discrepancy still requires attention to ensure you can reliably debug and monitor your Dataform workflows moving forward. Your proactive approach to both identifying and solving these issues is commendable and contributes to a more robust data processing practice.