I have 2 tables
I want to insert new records into condensed_log_table but to condense the table into a smaller version i need to read the latest logs from the raw table and UNION it with the condensed_log_table since the algorithm on which logs to remove is based on previous logs.
Question:
How would I write this in Dataform? This is one of my many attempts
.sqlx file where I do the inserts into condensed_log_table
config {
type: "operations",
hasOutput: true,
}
pre_operations {
DECLARE timestamp_checkpoint TIMESTAMP;
...logic for getting latest timestamp
};
WITH old_and_new_states AS (
SELECT
id, insert_time, state
FROM ${ref("raw_logs")} AS state_log
WHERE inserted_at > timestamp_checkpoint
latest_events AS logs
UNION ALL
SELECT
id, insert_time, state
FROM ${ref("condensed_log_table")}
)
SELECT
*
FROM
old_and_new_states
QUALIFY state != LAG(state) OVER (PARTITION BY id ORDER BY inserted_at ASC)
OR LAG(state) OVER (PARTITION BY id ORDER BY inserted_at ASC) IS NULL
.sqlx file where I declare the condensed_log_table
config {
type: "operations",
name: "condensed_log_table",
hasOutput: true,
tags: [
"my_tag",
]
}
CREATE TABLE IF NOT EXISTS ${self()} (
state STRING,
inserted_at TIMESTAMP,
id STRING,
)
One of the many errors I get is
To create an incremental table in Dataform that depends on itself, you can use the assertions feature and follow a structured approach to manage your operations. Here's how you can do it:
Define the condensed_log_table:
Use a table configuration to create or update the condensed_log_table.
Perform the incremental logic using a SQLX file:
Use operations configuration to handle the incremental logic, including reading from both the raw and condensed tables and performing the UNION operation.
Here’s an example of how you can structure your Dataform project:
1. Define condensed_log_table
// models/condensed_log_table.sql
config {
type: "table",
description: "Condensed log table with incremental updates.",
tags: ["my_tag"],
}
CREATE TABLE IF NOT EXISTS ${self()} (
id STRING,
insert_time TIMESTAMP,
state STRING
);
2. Perform Incremental Updates
// operations/update_condensed_log_table.sqlx
config {
type: "operations",
description: "Inserts new records into condensed_log_table by condensing logs.",
hasOutput: true,
}
pre_operations {
DECLARE timestamp_checkpoint TIMESTAMP DEFAULT (
SELECT MAX(insert_time)
FROM ${ref("condensed_log_table")}
);
};
WITH latest_logs AS (
SELECT
id, insert_time, state
FROM
${ref("raw_logs_table")}
WHERE
insert_time > timestamp_checkpoint
),
condensed_with_latest AS (
SELECT
id, insert_time, state
FROM
${ref("condensed_log_table")}
UNION ALL
SELECT
id, insert_time, state
FROM
latest_logs
),
final_result AS (
SELECT
id, insert_time, state
FROM
condensed_with_latest
QUALIFY
state != LAG(state) OVER (PARTITION BY id ORDER BY insert_time ASC)
OR LAG(state) OVER (PARTITION BY id ORDER BY insert_time ASC) IS NULL
)
INSERT INTO ${ref("condensed_log_table")}
SELECT * FROM final_result;
These enhancements should improved error handling, performance optimization, and backfilling capabilities. An ASSERT statement ensures the existence of the raw_logs_table, preventing runtime errors if the table is missing. To optimize performance as tables grow, consider leveraging BigQuery features such as partitioning and clustering. Partitioning by insert_time and clustering by id can significantly enhance query efficiency. For historical data backfilling, modify the timestamp_checkpoint logic or create a separate backfill script. Additionally, ensure the condensed_log_table definition includes partitioning by insert_time and clustering by id to maintain optimal performance.
How do you handle the case when the table doesn't exist yet?
In a normal incremental table you don't have to think about the cold start issue, but here I assume you would have to run the first job and then run the second job after that?
To effectively manage the creation and incremental updates of a self-referencing table in Dataform, it is important to address the cold start issue.his involves checking for the existence of the table and applying different logic based on whether the table already exists. The following approach outlines a method to achieve this.
Step 1: Define the Table
Firstly, ensure that the condensed_log_table is defined with the required schema. This is done using a table configuration in Dataform, which guarantees that the table will be created if it does not already exist.
// models/condensed_log_table.sql
config {
type: "table",
description: "Condensed log table with incremental updates.",
tags: ["my_tag"],
}
CREATE TABLE IF NOT EXISTS ${self()} (
id STRING,
insert_time TIMESTAMP,
state STRING
);
Step 2: Handle Incremental Updates with Table Existence Check
The incremental update logic is implemented in a separate operations configuration file. This includes pre-operations to check for the existence of the condensed_log_table and conditional logic to set the appropriate timestamp_checkpoint
// operations/update_condensed_log_table.sqlx
config {
type: "operations",
description: "Inserts new records into condensed_log_table by condensing logs.",
hasOutput: true,
}
pre_operations {
DECLARE timestamp_checkpoint TIMESTAMP;
DECLARE table_exists BOOL;
-- Check if condensed_log_table exists
SET table_exists = (
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_name = '${self().name}'
) > 0;
-- Set timestamp_checkpoint based on table existence
IF table_exists THEN
SET timestamp_checkpoint = (
SELECT MAX(insert_time)
FROM ${ref("condensed_log_table")}
);
ELSE
SET timestamp_checkpoint = TIMESTAMP '1970-01-01 00:00:00 UTC';
END IF;
};
-- Incremental update logic
WITH latest_logs AS (
SELECT
id, insert_time, state
FROM
${ref("raw_logs_table")}
WHERE
insert_time > timestamp_checkpoint
),
condensed_with_latest AS (
SELECT
id, insert_time, state
FROM
${ref("condensed_log_table")}
UNION ALL
SELECT
id, insert_time, state
FROM
latest_logs
),
final_result AS (
SELECT
id, insert_time, state
FROM
condensed_with_latest
QUALIFY
state != LAG(state) OVER (PARTITION BY id ORDER BY insert_time ASC)
OR LAG(state) OVER (PARTITION BY id ORDER BY insert_time ASC) IS NULL
)
-- Conditional insert based on table existence
INSERT INTO ${ref("condensed_log_table")}
SELECT * FROM final_result;
The pre_operations section checks for the existence of condensed_log_table by querying information_schema.tables. Depending on whether the table exists, it sets the timestamp_checkpoint to either the maximum insert_time from the table or a very old date (e.g., 1970-01-01). This ensures that the first run of the incremental update initializes the table with data from raw_logs_table.
The main body of the script retrieves new logs from raw_logs_table that have an insert_time greater than timestamp_checkpoint. These new logs are then combined with existing data from condensed_log_table. The combined data set is filtered to remove rows where the state has not changed, using the QUALIFY clause with the LAG function.
Finally, the script inserts the result back into condensed_log_table, ensuring that the table is incrementally updated with new and condensed data.
This approach effectively handles the cold start issue and ensures that the condensed_log_table is properly initialized and incrementally updated in Dataform. By checking for the table's existence and applying conditional logic, you can maintain a streamlined and efficient data processing workflow.
Ah ok that makes sense.
Although are you sure that I can use a pre_operation in a type = "operations" config?
I get this compilation error
Actions may only include pre_operations if they create a dataset.
and this is even despite me having
In Dataform, handling the initialization and incremental updates of a self-referencing table requires a careful approach due to limitations with using pre_operations in operations actions. To address this, the logic can be split into two separate actions: one for checking and initializing the table if it doesn't exist, and another for performing the incremental updates.
Step 1: Define the Table
The condensed_log_table must first be defined to ensure it exists with the required schema. This is achieved using a table configuration in Dataform:
// models/condensed_log_table.sql
config {
type: "table",
description: "Condensed log table with incremental updates.",
tags: ["my_tag"],
}
CREATE TABLE IF NOT EXISTS ${self()} (
id STRING,
insert_time TIMESTAMP,
state STRING
);
Step 2: Initialize the Table if it Doesn't Exist
A separate operations action checks for the existence of condensed_log_table and initializes it with data from raw_logs_table if it is not already present:
// operations/initialize_condensed_log_table.sqlx
config {
type: "operations",
description: "Initializes condensed_log_table with data if it doesn't exist.",
hasOutput: true,
}
DECLARE table_exists BOOL;
-- Check if condensed_log_table exists
SET table_exists = (
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_name = '${self().name}'
) > 0;
-- Insert initial data if the table doesn't exist
IF NOT table_exists THEN
INSERT INTO ${ref("condensed_log_table")}
SELECT
id, insert_time, state
FROM
${ref("raw_logs_table")};
END IF;
Step 3: Perform Incremental Updates
Another operations action handles the incremental update logic. It sets the timestamp_checkpoint based on the existing data in condensed_log_table and performs the necessary updates:
// operations/update_condensed_log_table.sqlx
config {
type: "operations",
description: "Inserts new records into condensed_log_table by condensing logs.",
hasOutput: true,
}
DECLARE timestamp_checkpoint TIMESTAMP;
-- Set timestamp_checkpoint based on the max insert_time in condensed_log_table
SET timestamp_checkpoint = (
SELECT MAX(insert_time)
FROM ${ref("condensed_log_table")}
);
-- Incremental update logic
WITH latest_logs AS (
SELECT
id, insert_time, state
FROM
${ref("raw_logs_table")}
WHERE
insert_time > timestamp_checkpoint
),
condensed_with_latest AS (
SELECT
id, insert_time, state
FROM
${ref("condensed_log_table")}
UNION ALL
SELECT
id, insert_time, state
FROM
latest_logs
),
final_result AS (
SELECT
id, insert_time, state
FROM
condensed_with_latest
QUALIFY
state != LAG(state) OVER (PARTITION BY id ORDER BY insert_time ASC)
OR LAG(state) OVER (PARTITION BY id ORDER BY insert_time ASC) IS NULL
)
-- Insert the final result into condensed_log_table
INSERT INTO ${ref("condensed_log_table")}
SELECT * FROM final_result;
By separating the initialization and incremental update logic into distinct actions, the limitations of using pre_operations in operations actions are effectively circumvented. This ensures that the condensed_log_table is properly initialized and incrementally updated, providing a streamlined and efficient data processing workflow. This approach addresses the cold start issue and maintains the integrity and performance of the data pipeline in Dataform.