Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataform: Unable to run incremental workflow

I have the following SLQX and relative table already created in my dataset:

config {
    type: "incremental",
    schema: "analytics_sanitized_test",
    description: "...",
    columns: {
        ...
    },
    bigquery: {
        partitionBy: "DATE_TRUNC(created_at, MONTH)",
        requirePartitionFilter: true,
        clusterBy: ["created_at", "user_id"]
    },
    tags: ["daily"],
    protected: true
}

SELECT
  src.userId AS user_id,
  TRIM(LOWER(src.market)) AS market,
  TRIM(LOWER(src.banReason)) AS ban_reason,
  src.createdAt AS created_at
FROM
  ${ref("analytics", "user_banned")} AS src
  ${
      when(incremental(),
          `LEFT JOIN ${self()} AS trg ON
                (trg.user_id = src.userId
                AND DATE(trg.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY))`)
  }
WHERE
  ${
      when(incremental(),
          ` trg.user_id IS NULL
                  AND DATE(src.createdAt) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)`,
          ` DATE(src.createdAt) != CURRENT_DATE() `)
  }
  QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1

It doesn't matter if I run it manually from the UI or schedule it from the Workflow Configs, the logs always comment out the incremental version of the query.

See below:

BEGIN
  CREATE SCHEMA IF NOT EXISTS `xxx.analytics_sanitized_test` OPTIONS(location="us-central1");
EXCEPTION WHEN ERROR THEN
  IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND
    NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND
    NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
  THEN
    RAISE USING MESSAGE = @@error.message;
  END IF;
END;
    BEGIN
      DECLARE dataform_table_type DEFAULT (
  SELECT ANY_VALUE(table_type)
  FROM `xxx.analytics_sanitized_test.INFORMATION_SCHEMA.TABLES`
  WHERE table_name = 'user_banned'
);
          IF dataform_table_type IS NOT NULL THEN
      IF dataform_table_type = 'VIEW' THEN DROP VIEW IF EXISTS `xxx.analytics_sanitized_test.user_banned`;
ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN DROP MATERIALIZED VIEW IF EXISTS `xxx.analytics_sanitized_test.user_banned`;
END IF;
    END IF;
      IF dataform_table_type IS NOT NULL THEN
        BEGIN
          DECLARE dataform_columns ARRAY<STRING>;
          DECLARE dataform_columns_list STRING;
          SET dataform_columns = (
            SELECT
              ARRAY_AGG(DISTINCT "`" || column_name || "`")
            FROM `xxx.analytics_sanitized_test.INFORMATION_SCHEMA.COLUMNS`
            WHERE table_name = 'user_banned'
          );
          SET dataform_columns_list = (
            SELECT
              STRING_AGG(column)
            FROM UNNEST(dataform_columns) AS column);

          EXECUTE IMMEDIATE
          """
            CREATE OR REPLACE PROCEDURE `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`() OPTIONS(strict_mode=false)
            BEGIN
              

              INSERT INTO `xxx.analytics_sanitized_test.user_banned`
              (""" || dataform_columns_list || """)
              SELECT """ || dataform_columns_list || """
              FROM (
                

SELECT
  src.userId AS user_id,
  TRIM(LOWER(src.market)) AS market,
  TRIM(LOWER(src.banReason)) AS ban_reason,
  src.createdAt AS created_at
FROM
  `xxx.analytics.user_banned` AS src
  LEFT JOIN `xxx.analytics_sanitized_test.user_banned` AS trg ON
                (trg.user_id = src.userId
                AND DATE(trg.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY))
WHERE
   trg.user_id IS NULL
                  AND DATE(src.createdAt) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1

              );

              
            END;
          """;
          CALL `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`();
          DROP PROCEDURE IF EXISTS `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`;
        END;
      ELSE
        BEGIN
          
              CREATE TABLE IF NOT EXISTS `xxx.analytics_sanitized_test.user_banned`
    PARTITION BY DATE_TRUNC(created_at, MONTH)
    CLUSTER BY created_at, user_id
    OPTIONS(description='''Track user ban events''', require_partition_filter=true)
    AS (
      

SELECT
  src.userId AS user_id,
  TRIM(LOWER(src.market)) AS market,
  TRIM(LOWER(src.banReason)) AS ban_reason,
  src.createdAt AS created_at
FROM
  `xxx.analytics.user_banned` AS src
  
WHERE
   DATE(src.createdAt) != CURRENT_DATE() 
  QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1

    );
          
        END;
      END IF;
    END;

 

The desired outcome is that, once the incremental table is present in the target BQ dataset, it renders/compile the incremental version and not the non-incremental one. Or at least uses the INSERT INTO and not the CREATE OR REPLACE TABLE statement

Further information:

  • I'm on Dataform 2.9.0 and this happen with all the tables in my project. 
  • Read the documentation available on incremental tables (here)

  • Browsed the Google Cloud Community forum (here)

  • Created a copy of the source table without stream attached

  • Changed the definition inside the when(incremental()..) to keep it at the bottom.

  • Run with and without Full Refresh from UI and Workflow Configs

  • Removed protected: true

  • revert version 2.9.0 to 2.8.4
  • keep generating the same incremental table over and over again from Workspace

Thanks.

 

Solved Solved
0 3 800
1 ACCEPTED SOLUTION

Timezone discrepancies and scheduling nuances can indeed lead to unexpected behavior in data processing workflows, particularly when these workflows depend on data being available up to a specific point in time. Your empirical solution of adjusting the Workflow Config runs to 04:00 AM CEST to ensure all records from the previous day are captured is a practical and effective approach to addressing this challenge.

Understanding Timezone and Scheduling Issues

  • Timezone Handling: Data processing systems often default to UTC or adopt the timezone set at the project or environment level. A misalignment between the source data generation and the processing schedule can result in missing or incomplete data captures. It's essential to align these timezones to ensure data consistency.

  • Data Availability: Making sure that all data from the source tables for the previous day is available by the time the workflow runs is crucial. Adjusting the schedule, as you have done, accommodates any delays in data generation or availability, ensuring a more reliable data processing workflow.

  • Workflow Scheduling: It's important to consider the timezone of both the scheduler and the data source. Adjusting the schedule based on observed behavior is often necessary to guarantee the reliability of data processing operations.

Dealing with Logs Showing Incremental Part Commented Out

The issue of Workflow Configs logs showing the incremental part of the SQLX commented out, despite performing an incremental load, can indeed be perplexing and hinder effective debugging. Here are some steps and considerations to help clarify this situation:

  • Logs Interpretation: The presentation of logs might not always reflect the actual operations accurately, especially for compiled or transformed code. This could be due to the logging mechanism simplifying the process for readability or the specific logging level configuration.

  • Verify Incremental Behavior: To confirm that incremental loads are indeed occurring, you can:

    • Check the timestamps or row counts in the target table before and after runs to ensure that only new or updated records are being added.
    • Monitor query costs and performance in BigQuery, as incremental loads should typically consume fewer resources than full table loads.
  • Consult Documentation or Support: For deeper insights into how logging is handled for incremental loads, consulting Dataform documentation or reaching out to Google Cloud support can be beneficial.

Successfully addressing the scheduling issue is a significant step forward. However, the logging discrepancy still requires attention to ensure you can reliably debug and monitor your Dataform workflows moving forward. Your proactive approach to both identifying and solving these issues is commendable and contributes to a more robust data processing practice.

View solution in original post

3 REPLIES 3

You're encountering an issue where Google Cloud Dataform isn't executing your incremental logic correctly within your SQLX file. Let's explore potential causes and solutions.

Potential Causes

  • Dataform Configuration Issue: Incorrect use of when(incremental(),...), errors in the configuration block (type, bigquery.partitionBy, bigquery.requirePartitionFilter, bigquery.clusterBy).
  • Dataform Bug: Specific to your version (2.9.0)
  • Source Data Characteristics: Not enough new or updated data within your lookback window.

Troubleshooting Steps

  1. Verify when(incremental(),...) Placement: Double-check its syntax and position within your SQLX file.
  2. Review Configuration:
    • Type: Set to "incremental".
    • PartitionBy: Matches your incremental logic.
    • RequirePartitionFilter: Set to 'true'.
    • ClusterBy: Experiment if needed.
  3. Introduce Test Data: Ensure there's modifiable data within your lookback window.
  4. Dataform Version: Consider an alternate version for testing.
  5. Dataform Support: Reach out with your SQLX files, logs, and troubleshooting steps for assistance.

Example:

 
config { 
  type: "incremental", 
  schema: "analytics_sanitized_test", 
  description: "...", 
  columns: { ... }, 
  bigquery: { 
    partitionBy: "DATE_TRUNC(created_at, MONTH)", 
    requirePartitionFilter: true, 
    clusterBy: ["created_at", "user_id"] 
  }, 
  tags: ["daily"], 
  protected: true 
} 

SELECT 
  src.userId AS user_id, 
  TRIM(LOWER(src.market)) AS market, 
  TRIM(LOWER(src.banReason)) AS ban_reason, 
  src.createdAt AS created_at 
FROM `xxx.analytics.user_banned` AS src 
WHERE ${when(incremental(), 
             "LEFT JOIN ${self()} AS trg 
               ON trg.user_id = src.userId 
               AND DATE(trg.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY) 
               AND trg.user_id IS NULL 
               AND DATE(src.createdAt) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)", 
             "DATE(src.createdAt) != CURRENT_DATE()") }
QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1 

Additional Considerations

  • Permissions and Access: Verify Dataform service account permissions.
  • Debugging and Logs: Check Dataform and BigQuery logs.
  • Dataform Documentation and Community: Seek help and potential solutions

Hi,

after some more debugging I realised that there was an issue with the Workflow Config schedule and its timezone. 

Originally I set it to run at 00:15 AM CEST every day, so I could fetch all the records from the source tables that were generated at CURRENT_DATE() - 1  .

However, the scheduler was probably running it sometime before and no rows was captured. I moved the Workflow Config runs to 04:00 AM CEST and it is working as expected. 

I have no way to verify this since the scheduler shows always the correct times, so it is just an empirical proof.


There is still the problem that the Workflow Configs logs are showing the incremental part of the SQLX commented out, but it performs an incremental load nonetheless. This is very confusing to debug.

Timezone discrepancies and scheduling nuances can indeed lead to unexpected behavior in data processing workflows, particularly when these workflows depend on data being available up to a specific point in time. Your empirical solution of adjusting the Workflow Config runs to 04:00 AM CEST to ensure all records from the previous day are captured is a practical and effective approach to addressing this challenge.

Understanding Timezone and Scheduling Issues

  • Timezone Handling: Data processing systems often default to UTC or adopt the timezone set at the project or environment level. A misalignment between the source data generation and the processing schedule can result in missing or incomplete data captures. It's essential to align these timezones to ensure data consistency.

  • Data Availability: Making sure that all data from the source tables for the previous day is available by the time the workflow runs is crucial. Adjusting the schedule, as you have done, accommodates any delays in data generation or availability, ensuring a more reliable data processing workflow.

  • Workflow Scheduling: It's important to consider the timezone of both the scheduler and the data source. Adjusting the schedule based on observed behavior is often necessary to guarantee the reliability of data processing operations.

Dealing with Logs Showing Incremental Part Commented Out

The issue of Workflow Configs logs showing the incremental part of the SQLX commented out, despite performing an incremental load, can indeed be perplexing and hinder effective debugging. Here are some steps and considerations to help clarify this situation:

  • Logs Interpretation: The presentation of logs might not always reflect the actual operations accurately, especially for compiled or transformed code. This could be due to the logging mechanism simplifying the process for readability or the specific logging level configuration.

  • Verify Incremental Behavior: To confirm that incremental loads are indeed occurring, you can:

    • Check the timestamps or row counts in the target table before and after runs to ensure that only new or updated records are being added.
    • Monitor query costs and performance in BigQuery, as incremental loads should typically consume fewer resources than full table loads.
  • Consult Documentation or Support: For deeper insights into how logging is handled for incremental loads, consulting Dataform documentation or reaching out to Google Cloud support can be beneficial.

Successfully addressing the scheduling issue is a significant step forward. However, the logging discrepancy still requires attention to ensure you can reliably debug and monitor your Dataform workflows moving forward. Your proactive approach to both identifying and solving these issues is commendable and contributes to a more robust data processing practice.