Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataform - Incremental Pipeline Issue: Unrecognized Variable Error in Dataform Using pre_operations

Hi,

I'm working on creating an incremental pipeline where I aim to update a table incrementally from a partitioned table. I've followed the documentation, but I'm encountering an issue. When I run the query in Dataform, I receive an error about an "unrecognized variable". This is surprising because the compiled query shows no errors, and the table seems to execute normally.

 

config {
  type: "incremental",
}

pre_operations {
  DECLARE event_timestamp_checkpoint DEFAULT (
    ${when(incremental(),
    `SELECT max(ingestionTime) FROM ${self()}`,
    `SELECT datetime("2000-01-01")`)}
  )
}

SELECT
  *
FROM
  ${ref("items_filtered")}
WHERE ingestionTime > event_timestamp_checkpoint

 

The error messgae I received:
Unrecognized name: event_timestamp_checkpoint at [8:23]

However, the compiled queries tab indicates the following for the incremental query:
This query will process 5.05 MB when run.

 

DECLARE event_timestamp_checkpoint DEFAULT (
    SELECT max(ingestionTime) FROM `...items_filtered`
  )
;
SELECT * 
FROM   `...items_filtered`
WHERE ingestionTime > event_timestamp_checkpoint

 

 Any guidance or suggestions would be greatly appreciated!

Solved Solved
1 12 6,746
1 ACCEPTED SOLUTION

The error you're encountering is related to the scope of the variable event_timestamp_checkpoint. In BigQuery, the variable declared in the DECLARE statement is not available in the subsequent SELECT statement when they are separated by a semicolon (;). This is why you're seeing the "Unrecognized name" error.

To fix this, you can use the DECLARE and SET statements together in a single script, and then use the variable in the SELECT statement. Here's how you can modify your script:

DECLARE event_timestamp_checkpoint DEFAULT TIMESTAMP("2000-01-01 00:00:00 UTC");

SET event_timestamp_checkpoint = (
${when(incremental(),
`SELECT max(ingestionTime) FROM ${self()}`,
`SELECT TIMESTAMP("2000-01-01 00:00:00 UTC")`)}
);

SELECT
*
FROM
${ref("items_filtered")}
WHERE ingestionTime > event_timestamp_checkpoint;

Here's a breakdown of the changes:

  1. I've initialized the event_timestamp_checkpoint variable with a default value.
  2. I've used the SET statement to assign the value to the event_timestamp_checkpoint variable based on your incremental condition.
  3. The SELECT statement remains unchanged.

View solution in original post

12 REPLIES 12

The error you're encountering is related to the scope of the variable event_timestamp_checkpoint. In BigQuery, the variable declared in the DECLARE statement is not available in the subsequent SELECT statement when they are separated by a semicolon (;). This is why you're seeing the "Unrecognized name" error.

To fix this, you can use the DECLARE and SET statements together in a single script, and then use the variable in the SELECT statement. Here's how you can modify your script:

DECLARE event_timestamp_checkpoint DEFAULT TIMESTAMP("2000-01-01 00:00:00 UTC");

SET event_timestamp_checkpoint = (
${when(incremental(),
`SELECT max(ingestionTime) FROM ${self()}`,
`SELECT TIMESTAMP("2000-01-01 00:00:00 UTC")`)}
);

SELECT
*
FROM
${ref("items_filtered")}
WHERE ingestionTime > event_timestamp_checkpoint;

Here's a breakdown of the changes:

  1. I've initialized the event_timestamp_checkpoint variable with a default value.
  2. I've used the SET statement to assign the value to the event_timestamp_checkpoint variable based on your incremental condition.
  3. The SELECT statement remains unchanged.

Thank you for your response. It works like this. Does this approach prevent the table from being fully scanned without using the pre_operations block?

Yes, this approach will still prevent a full table scan, assuming the ingestionTime column in your items_filtered table is properly partitioned and you're using the incremental feature of Dataform correctly.

Here's why:

  1. Partitioned Table: BigQuery is optimized to read only the necessary partitions of a table when you filter by the partitioned column. In your case, the ingestionTime column is likely the partitioned column. When you use the condition WHERE ingestionTime > event_timestamp_checkpoint, BigQuery will only read the partitions that have ingestionTime values greater than the event_timestamp_checkpoint value.

  2. Incremental Logic: The logic you've provided with the ${when(incremental(), ... )} function ensures that:

    • For incremental runs (i.e., when updating the table with new data), it sets the event_timestamp_checkpoint to the maximum ingestionTime value already present in the target table. This means only new data (with ingestionTime greater than this value) will be appended.
    • For full-refresh runs (i.e., when rebuilding the entire table), it sets the event_timestamp_checkpoint to a default old date (2000-01-01), effectively including all data.

By combining the partitioned table optimization of BigQuery with the incremental logic of Dataform, you ensure that only the necessary partitions are read during incremental updates, preventing a full table scan. The removal of the pre_operations block and using the DECLARE and SET statements instead doesn't change this behavior; it just changes how the event_timestamp_checkpoint variable is set.

This solution, without pre_operations block) compiles and runs fine in development workspace, but when I execute a release configuration with the same code then I get error:

"reason:"invalidQuery" location:"query" message:"Syntax error: Expected \"(\" or keyword SELECT or keyword WITH but got keyword DECLARE at [50:1]": invalid argument"

It does not seem to like using DECLARE or SET after the "BEGIN CREATE OR replace TABLE". 

How can I get past this?

BEGIN
  CREATE SCHEMA IF NOT EXISTS `XXXXXX.XXXXXX` OPTIONS(location="EU");
EXCEPTION WHEN ERROR THEN
  IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND
    NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND
    NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
  THEN
    RAISE USING MESSAGE = @@error.message;
  END IF;
END;
    BEGIN
      DECLARE dataform_table_type DEFAULT (
  SELECT ANY_VALUE(table_type)
  FROM `XXXXXX.XXXXXX.INFORMATION_SCHEMA.TABLES`
  WHERE table_name = 'XXXXXX'
);
      IF dataform_table_type IS NOT NULL AND dataform_table_type != 'BASE TABLE' THEN
  IF dataform_table_type = 'BASE TABLE' THEN
    DROP TABLE IF EXISTS `XXXXXX`;
  ELSEIF dataform_table_type = "VIEW" THEN
    DROP VIEW IF EXISTS `XXXXXX`;
  ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN
    DROP MATERIALIZED VIEW IF EXISTS `XXXXXX`;
  END IF;
END IF;
      BEGIN
        
            CREATE OR REPLACE TABLE `XXXXXX.XXXXXX.XXXXXX`
    PARTITION BY DATE_TRUNC(Date,MONTH)
    
    OPTIONS()
    AS (
      
DECLARE event_timestamp_checkpoint DEFAULT TIMESTAMP("2000-01-01 00:00:00 UTC");

SET event_timestamp_checkpoint = (
SELECT TIMESTAMP("2000-01-01 00:00:00 UTC")
);


SELECT 
    Date,
    Field1,
    field2
FROM 
    `XXXXXX.XXXXXX.XXXXXX`
WHERE timestamp(Date) > event_timestamp_checkpoint



    );
        
      END;
    END;

 

 

The error you're encountering suggests that BigQuery doesn't support using the DECLARE and SET statements inside a CREATE OR REPLACE TABLE statement.

Key points to consider:

  1. Script Structure in BigQuery: BigQuery's scripting has its nuances. Specifically, when using a WITH clause, it must be in the same script segment as your CREATE OR REPLACE TABLE statement. This means you should not split them with BEGIN and END blocks to avoid potential complications.
  2. Data Type Consistency: The timestamp(Date) function transforms a date value into a timestamp. It's vital to ensure the "Date" column in the table XXXXXX.XXXXXX.XXXXXX is either of the DATE data type or can be easily converted to a timestamp. Furthermore, when comparing with the event_timestamp_checkpoint, ensure that data types match.
  3. Handling Existing Tables/Views: While CREATE OR REPLACE TABLE does overwrite an existing table of the same name, it doesn't do the same for views or materialized views with the same name. Therefore, if there's a chance that a view or materialized view might share the table name you intend to create, you might encounter errors. In such cases, the IF logic (that was removed) might be crucial.
  4. Code Formatting: A point on aesthetics – maintain a consistent code format. For instance, while you've indented the event_timestamp_checkpoint definition, the subsequent SELECT statement isn't. Uniform formatting simplifies code interpretation.

Here's your adjusted code:

 

BEGIN
  CREATE SCHEMA IF NOT EXISTS `XXXXXX.XXXXXX` OPTIONS(location="EU");
EXCEPTION WHEN ERROR THEN
  IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") OR
     NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") OR
     NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
  THEN
    RAISE USING MESSAGE = @@error.message;
  END IF;
END;

WITH event_timestamp_checkpoint AS (
  SELECT TIMESTAMP("2000-01-01 00:00:00 UTC")
)

CREATE OR REPLACE TABLE `XXXXXX.XXXXXX.XXXXXX`
  PARTITION BY DATE_TRUNC(Date, MONTH)
  OPTIONS()
  AS (
    SELECT 
      Date,
      Field1,
      field2
    FROM 
      `XXXXXX.XXXXXX.XXXXXX`
    WHERE timestamp(Date) > (SELECT * FROM event_timestamp_checkpoint)
  );
  • Performance: For extensive tables like XXXXXX.XXXXXX.XXXXXX, considering a materialized view might be beneficial. Materialized views are essentially pre-computed tables that bolster query speeds.
  • Security: If XXXXXX.XXXXXX.XXXXXX holds sensitive information, think about data encryption or implementing access restrictions.
  • Monitoring: Continuously monitoring both the performance and the usage metrics of the table XXXXXX.XXXXXX.XXXXXX is essential to guarantee its optimal utility.


How do I control the compilation results sqlx-files to the DDL  to accept DECLARE and SET in the workflow execution?

How do I deploy the solution in this thread as a workflow execution without getting the error regarding the DECLARE and SET statements?



To deploy the solution in this thread as a workflow execution without getting the error regarding the DECLARE and SET statements, you can use a data pipeline orchestration tool such as Prefect or Airflow.

These tools allow you to define and execute data pipelines as a series of tasks. You can use them to split your code into multiple SQL files, each of which contains a single task.

For example, you could create two SQL files:

  • create_schema.sql: This file would contain the code to create the schema and table.
  • load_data.sql: This file would contain the code to load the data into the table.

You could then use your data pipeline orchestration tool to execute these two files in sequence.

This would allow you to use the DECLARE and SET statements in the load_data.sql file, without getting the error.

Here is an example of how to deploy the solution in this thread as a workflow execution using Prefect:

 

from prefect import task, Flow

@task
def create_schema():
  """Creates the schema and table."""

  sql = """
    BEGIN
      CREATE SCHEMA IF NOT EXISTS `XXXXXX.XXXXXX` OPTIONS(location="EU");
    EXCEPTION WHEN ERROR THEN
      IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND
        NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND
        NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
      THEN
        RAISE USING MESSAGE = @@error.message;
      END IF;
    END;

    CREATE OR REPLACE TABLE `XXXXXX.XXXXXX.XXXXXX`
      PARTITION BY DATE_TRUNC(Date, MONTH)
      OPTIONS()
      AS (
        SELECT 
          Date,
          Field1,
          field2
        FROM 
          `XXXXXX.XXXXXX.XXXXXX`
        WHERE timestamp(Date) > (SELECT * FROM event_timestamp_checkpoint)
      );
  """

  with BigQueryClient() as client:
    client.query(sql)

@task
def load_data():
  """Loads the data into the table."""

  sql = """
    DECLARE event_timestamp_checkpoint TIMESTAMP DEFAULT TIMESTAMP("2000-01-01 00:00:00 UTC");

    SET event_timestamp_checkpoint = (SELECT MAX(Date) FROM `XXXXXX.XXXXXX.XXXXXX`);

    INSERT INTO `XXXXXX.XXXXXX.XXXXXX`
      SELECT 
        Date,
        Field1,
        field2
      FROM 
        `XXXXXX.XXXXXX.XXXXXX`
      WHERE timestamp(Date) > event_timestamp_checkpoint;
  """

  with BigQueryClient() as client:
    client.query(sql)

with Flow("Load data into BigQuery") as flow:
  create_schema()
  load_data()

flow.run()

This workflow will first create the schema and table. Then, it will load the data into the table, using the DECLARE and SET statements to prevent a full table scan.

You can deploy this workflow to a Prefect server or to a cloud-based platform such as Prefect Cloud. Once it is deployed, you can trigger it to run on a schedule or manually.

This is not resolved I believe. The original question was regarding DataForm. On BigQuery's DataForm there are 2 options: Run (incremental or full-refresh) & Start Execution (incremental or full-refresh). This question focuses on the former option. The provided answer address it correctly, but fails to serve the fundamental needs on Start Execution, with respect to BigQuery's partition mechanisms. Declare & Set: can only be used on Run option, while pre-opertations snippet can only be used on Start Execution option. Is there a user-friendly way to support both?

In Dataform, handling the execution of scripts, particularly for incremental updates versus full refreshes, requires a nuanced approach, especially when dealing with BigQuery's partitioned tables.

Understanding the Core Issue:

Dataform offers two primary options for script execution:

  1. Run Option: This allows the use of DECLARE and SET statements within SQL scripts. It's straightforward for script execution but doesn't align seamlessly with the "Start Execution" option.

  2. Start Execution Option: This typically utilizes pre-operations for initial setup tasks. However, BigQuery's scripting constraints can complicate the use of these operations.

Proposed Solutions:

  1. Dynamic SQL Generation: Utilize Dataform's JavaScript capabilities to dynamically generate SQL code. This method can incorporate logic for variable setting and is compatible with both execution options.

  2. Separation of Scripts: Develop distinct scripts or configurations for "Run" and "Start Execution" modes. Although this might lead to some code duplication, it's a practical method to address the differing requirements of each mode.

  3. Using Dataform's JavaScript API: Dataform's JavaScript API offers more control over SQL script execution. It can include conditional logic to adapt to either incremental runs or full refreshes.

Implementing a JavaScript-Based Solution:

Here's an illustrative example of how you might use JavaScript in a SQLX file in Dataform to address this:

 

// Example SQLX file in Dataform
js {
  // Determine if the run is incremental
  const isIncremental = dataform.isIncremental();

  // Define the checkpoint value based on the run type
  const checkpointValue = isIncremental
    ? "SELECT MAX(ingestionTime) FROM ${dataform.resolve('your_dataset.your_table')}"
    : "'2000-01-01 00:00:00 UTC'";

  // Return the dynamically generated SQL query
  return `SELECT * FROM your_dataset.your_table WHERE ingestionTime > TIMESTAMP(${checkpointValue})`;
}

Testing and Documentation:

  1. Thorough Testing: It's crucial to test both incremental and full-refresh scenarios in a controlled environment to ensure the logic functions correctly for both execution modes.

  2. Clear Documentation: Document any custom solutions or workarounds within your team or organization to maintain clarity and consistency.

While there's no direct, universal solution due to the distinct ways Dataform handles script execution for BigQuery, leveraging Dataform's JavaScript features to dynamically generate SQL scripts based on the execution context can be an effective strategy. This approach, however, necessitates careful testing and comprehensive documentation to ensure it fulfills the requirements of both incremental updates and full-refresh executions in your data pipeline.

Just my 2 cents. Should be resolved by now.

It seems the construction of the pre_op block has changed as all blocks (pre, main, post) do share the same context i.e. within the same BEGIN... END block. Maybe pre/post had their own BEGIN/END when first reported by OP.

I have tested with view, table and incremental. I declared a variable in pre_ops block and could reference it in the main block.

Remember the main block has to start with WITH or SELECT as it is wrapped at execution time (not Run) by DF in INSERT INTO AS or CREATE TABLE AS

Thank you for the explanation.

This is not resolved I believe .Variables declared in pre_operations are not recognized, the error is Unrecognized name: checkpointValue