Hi,
I'm working on creating an incremental pipeline where I aim to update a table incrementally from a partitioned table. I've followed the documentation, but I'm encountering an issue. When I run the query in Dataform, I receive an error about an "unrecognized variable". This is surprising because the compiled query shows no errors, and the table seems to execute normally.
config {
type: "incremental",
}
pre_operations {
DECLARE event_timestamp_checkpoint DEFAULT (
${when(incremental(),
`SELECT max(ingestionTime) FROM ${self()}`,
`SELECT datetime("2000-01-01")`)}
)
}
SELECT
*
FROM
${ref("items_filtered")}
WHERE ingestionTime > event_timestamp_checkpoint
The error messgae I received:
Unrecognized name: event_timestamp_checkpoint at [8:23]
However, the compiled queries tab indicates the following for the incremental query:
This query will process 5.05 MB when run.
DECLARE event_timestamp_checkpoint DEFAULT (
SELECT max(ingestionTime) FROM `...items_filtered`
)
;
SELECT *
FROM `...items_filtered`
WHERE ingestionTime > event_timestamp_checkpoint
Any guidance or suggestions would be greatly appreciated!
Solved! Go to Solution.
The error you're encountering is related to the scope of the variable event_timestamp_checkpoint
. In BigQuery, the variable declared in the DECLARE
statement is not available in the subsequent SELECT
statement when they are separated by a semicolon (;
). This is why you're seeing the "Unrecognized name" error.
To fix this, you can use the DECLARE
and SET
statements together in a single script, and then use the variable in the SELECT
statement. Here's how you can modify your script:
DECLARE event_timestamp_checkpoint DEFAULT TIMESTAMP("2000-01-01 00:00:00 UTC");
SET event_timestamp_checkpoint = (
${when(incremental(),
`SELECT max(ingestionTime) FROM ${self()}`,
`SELECT TIMESTAMP("2000-01-01 00:00:00 UTC")`)}
);
SELECT
*
FROM
${ref("items_filtered")}
WHERE ingestionTime > event_timestamp_checkpoint;
Here's a breakdown of the changes:
event_timestamp_checkpoint
variable with a default value.SET
statement to assign the value to the event_timestamp_checkpoint
variable based on your incremental condition.SELECT
statement remains unchanged.The error you're encountering is related to the scope of the variable event_timestamp_checkpoint
. In BigQuery, the variable declared in the DECLARE
statement is not available in the subsequent SELECT
statement when they are separated by a semicolon (;
). This is why you're seeing the "Unrecognized name" error.
To fix this, you can use the DECLARE
and SET
statements together in a single script, and then use the variable in the SELECT
statement. Here's how you can modify your script:
DECLARE event_timestamp_checkpoint DEFAULT TIMESTAMP("2000-01-01 00:00:00 UTC");
SET event_timestamp_checkpoint = (
${when(incremental(),
`SELECT max(ingestionTime) FROM ${self()}`,
`SELECT TIMESTAMP("2000-01-01 00:00:00 UTC")`)}
);
SELECT
*
FROM
${ref("items_filtered")}
WHERE ingestionTime > event_timestamp_checkpoint;
Here's a breakdown of the changes:
event_timestamp_checkpoint
variable with a default value.SET
statement to assign the value to the event_timestamp_checkpoint
variable based on your incremental condition.SELECT
statement remains unchanged.Thank you for your response. It works like this. Does this approach prevent the table from being fully scanned without using the pre_operations block?
Yes, this approach will still prevent a full table scan, assuming the ingestionTime
column in your items_filtered
table is properly partitioned and you're using the incremental feature of Dataform correctly.
Here's why:
Partitioned Table: BigQuery is optimized to read only the necessary partitions of a table when you filter by the partitioned column. In your case, the ingestionTime
column is likely the partitioned column. When you use the condition WHERE ingestionTime > event_timestamp_checkpoint
, BigQuery will only read the partitions that have ingestionTime
values greater than the event_timestamp_checkpoint
value.
Incremental Logic: The logic you've provided with the ${when(incremental(), ... )}
function ensures that:
event_timestamp_checkpoint
to the maximum ingestionTime
value already present in the target table. This means only new data (with ingestionTime
greater than this value) will be appended.event_timestamp_checkpoint
to a default old date (2000-01-01
), effectively including all data.By combining the partitioned table optimization of BigQuery with the incremental logic of Dataform, you ensure that only the necessary partitions are read during incremental updates, preventing a full table scan. The removal of the pre_operations
block and using the DECLARE
and SET
statements instead doesn't change this behavior; it just changes how the event_timestamp_checkpoint
variable is set.
This solution, without pre_operations block) compiles and runs fine in development workspace, but when I execute a release configuration with the same code then I get error:
"reason:"invalidQuery" location:"query" message:"Syntax error: Expected \"(\" or keyword SELECT or keyword WITH but got keyword DECLARE at [50:1]": invalid argument"
It does not seem to like using DECLARE or SET after the "BEGIN CREATE OR replace TABLE".
How can I get past this?
BEGIN
CREATE SCHEMA IF NOT EXISTS `XXXXXX.XXXXXX` OPTIONS(location="EU");
EXCEPTION WHEN ERROR THEN
IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND
NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND
NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
THEN
RAISE USING MESSAGE = @@error.message;
END IF;
END;
BEGIN
DECLARE dataform_table_type DEFAULT (
SELECT ANY_VALUE(table_type)
FROM `XXXXXX.XXXXXX.INFORMATION_SCHEMA.TABLES`
WHERE table_name = 'XXXXXX'
);
IF dataform_table_type IS NOT NULL AND dataform_table_type != 'BASE TABLE' THEN
IF dataform_table_type = 'BASE TABLE' THEN
DROP TABLE IF EXISTS `XXXXXX`;
ELSEIF dataform_table_type = "VIEW" THEN
DROP VIEW IF EXISTS `XXXXXX`;
ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN
DROP MATERIALIZED VIEW IF EXISTS `XXXXXX`;
END IF;
END IF;
BEGIN
CREATE OR REPLACE TABLE `XXXXXX.XXXXXX.XXXXXX`
PARTITION BY DATE_TRUNC(Date,MONTH)
OPTIONS()
AS (
DECLARE event_timestamp_checkpoint DEFAULT TIMESTAMP("2000-01-01 00:00:00 UTC");
SET event_timestamp_checkpoint = (
SELECT TIMESTAMP("2000-01-01 00:00:00 UTC")
);
SELECT
Date,
Field1,
field2
FROM
`XXXXXX.XXXXXX.XXXXXX`
WHERE timestamp(Date) > event_timestamp_checkpoint
);
END;
END;
The error you're encountering suggests that BigQuery doesn't support using the DECLARE and SET statements inside a CREATE OR REPLACE TABLE statement.
Key points to consider:
Here's your adjusted code:
BEGIN
CREATE SCHEMA IF NOT EXISTS `XXXXXX.XXXXXX` OPTIONS(location="EU");
EXCEPTION WHEN ERROR THEN
IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") OR
NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") OR
NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
THEN
RAISE USING MESSAGE = @@error.message;
END IF;
END;
WITH event_timestamp_checkpoint AS (
SELECT TIMESTAMP("2000-01-01 00:00:00 UTC")
)
CREATE OR REPLACE TABLE `XXXXXX.XXXXXX.XXXXXX`
PARTITION BY DATE_TRUNC(Date, MONTH)
OPTIONS()
AS (
SELECT
Date,
Field1,
field2
FROM
`XXXXXX.XXXXXX.XXXXXX`
WHERE timestamp(Date) > (SELECT * FROM event_timestamp_checkpoint)
);
How do I control the compilation results sqlx-files to the DDL to accept DECLARE and SET in the workflow execution?
How do I deploy the solution in this thread as a workflow execution without getting the error regarding the DECLARE and SET statements?
To deploy the solution in this thread as a workflow execution without getting the error regarding the DECLARE and SET statements, you can use a data pipeline orchestration tool such as Prefect or Airflow.
These tools allow you to define and execute data pipelines as a series of tasks. You can use them to split your code into multiple SQL files, each of which contains a single task.
For example, you could create two SQL files:
create_schema.sql
: This file would contain the code to create the schema and table.load_data.sql
: This file would contain the code to load the data into the table.You could then use your data pipeline orchestration tool to execute these two files in sequence.
This would allow you to use the DECLARE and SET statements in the load_data.sql
file, without getting the error.
Here is an example of how to deploy the solution in this thread as a workflow execution using Prefect:
from prefect import task, Flow
@task
def create_schema():
"""Creates the schema and table."""
sql = """
BEGIN
CREATE SCHEMA IF NOT EXISTS `XXXXXX.XXXXXX` OPTIONS(location="EU");
EXCEPTION WHEN ERROR THEN
IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND
NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND
NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
THEN
RAISE USING MESSAGE = @@error.message;
END IF;
END;
CREATE OR REPLACE TABLE `XXXXXX.XXXXXX.XXXXXX`
PARTITION BY DATE_TRUNC(Date, MONTH)
OPTIONS()
AS (
SELECT
Date,
Field1,
field2
FROM
`XXXXXX.XXXXXX.XXXXXX`
WHERE timestamp(Date) > (SELECT * FROM event_timestamp_checkpoint)
);
"""
with BigQueryClient() as client:
client.query(sql)
@task
def load_data():
"""Loads the data into the table."""
sql = """
DECLARE event_timestamp_checkpoint TIMESTAMP DEFAULT TIMESTAMP("2000-01-01 00:00:00 UTC");
SET event_timestamp_checkpoint = (SELECT MAX(Date) FROM `XXXXXX.XXXXXX.XXXXXX`);
INSERT INTO `XXXXXX.XXXXXX.XXXXXX`
SELECT
Date,
Field1,
field2
FROM
`XXXXXX.XXXXXX.XXXXXX`
WHERE timestamp(Date) > event_timestamp_checkpoint;
"""
with BigQueryClient() as client:
client.query(sql)
with Flow("Load data into BigQuery") as flow:
create_schema()
load_data()
flow.run()
This workflow will first create the schema and table. Then, it will load the data into the table, using the DECLARE and SET statements to prevent a full table scan.
You can deploy this workflow to a Prefect server or to a cloud-based platform such as Prefect Cloud. Once it is deployed, you can trigger it to run on a schedule or manually.
This is not resolved I believe. The original question was regarding DataForm. On BigQuery's DataForm there are 2 options: Run (incremental or full-refresh) & Start Execution (incremental or full-refresh). This question focuses on the former option. The provided answer address it correctly, but fails to serve the fundamental needs on Start Execution, with respect to BigQuery's partition mechanisms. Declare & Set: can only be used on Run option, while pre-opertations snippet can only be used on Start Execution option. Is there a user-friendly way to support both?
In Dataform, handling the execution of scripts, particularly for incremental updates versus full refreshes, requires a nuanced approach, especially when dealing with BigQuery's partitioned tables.
Understanding the Core Issue:
Dataform offers two primary options for script execution:
Run Option: This allows the use of DECLARE and SET statements within SQL scripts. It's straightforward for script execution but doesn't align seamlessly with the "Start Execution" option.
Start Execution Option: This typically utilizes pre-operations for initial setup tasks. However, BigQuery's scripting constraints can complicate the use of these operations.
Proposed Solutions:
Dynamic SQL Generation: Utilize Dataform's JavaScript capabilities to dynamically generate SQL code. This method can incorporate logic for variable setting and is compatible with both execution options.
Separation of Scripts: Develop distinct scripts or configurations for "Run" and "Start Execution" modes. Although this might lead to some code duplication, it's a practical method to address the differing requirements of each mode.
Using Dataform's JavaScript API: Dataform's JavaScript API offers more control over SQL script execution. It can include conditional logic to adapt to either incremental runs or full refreshes.
Implementing a JavaScript-Based Solution:
Here's an illustrative example of how you might use JavaScript in a SQLX file in Dataform to address this:
// Example SQLX file in Dataform
js {
// Determine if the run is incremental
const isIncremental = dataform.isIncremental();
// Define the checkpoint value based on the run type
const checkpointValue = isIncremental
? "SELECT MAX(ingestionTime) FROM ${dataform.resolve('your_dataset.your_table')}"
: "'2000-01-01 00:00:00 UTC'";
// Return the dynamically generated SQL query
return `SELECT * FROM your_dataset.your_table WHERE ingestionTime > TIMESTAMP(${checkpointValue})`;
}
Testing and Documentation:
Thorough Testing: It's crucial to test both incremental and full-refresh scenarios in a controlled environment to ensure the logic functions correctly for both execution modes.
Clear Documentation: Document any custom solutions or workarounds within your team or organization to maintain clarity and consistency.
While there's no direct, universal solution due to the distinct ways Dataform handles script execution for BigQuery, leveraging Dataform's JavaScript features to dynamically generate SQL scripts based on the execution context can be an effective strategy. This approach, however, necessitates careful testing and comprehensive documentation to ensure it fulfills the requirements of both incremental updates and full-refresh executions in your data pipeline.
Just my 2 cents. Should be resolved by now.
It seems the construction of the pre_op block has changed as all blocks (pre, main, post) do share the same context i.e. within the same BEGIN... END block. Maybe pre/post had their own BEGIN/END when first reported by OP.
I have tested with view, table and incremental. I declared a variable in pre_ops block and could reference it in the main block.
Remember the main block has to start with WITH or SELECT as it is wrapped at execution time (not Run) by DF in INSERT INTO AS or CREATE TABLE AS
Thank you for the explanation.
This is not resolved I believe .Variables declared in pre_operations are not recognized, the error is Unrecognized name: checkpointValue