Hi everyone,
I'm working on a project where I need to aggregate events into session-level data on an hourly basis using Dataform and BigQuery. In our case, sessions can last up to 3 hours (that's the 99.99th percentile), so it's important for us to handle sessions that span multiple hours accurately.
To manage this, I'm trying to implement custom merge logic to properly update session metrics when new events come in for existing sessions. Specifically, I want to use a custom MERGE statement instead of a simple SELECT because it allows me to define how to merge new event data with existing aggregated sessions.
However, I'm running into issues with Dataform not supporting MERGE statements directly in SQLX files. My question is:
Is it possible to use a custom MERGE statement within Dataform's SQLX files?
If yes, how can I properly implement it so that Dataform executes it correctly? If not, are there any workarounds or best practices to achieve similar functionality?
I understand that Dataform handles incremental queries and can merge data based on unique keys, but in this case, I need more control over the merge logic to ensure sessions are accurately updated with new events.
Any insights or suggestions would be greatly appreciated!
Thanks in advance.
Here is the simplified version of the script:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(first_ts)",
requirePartitionFilter: true,
clusterBy: ["session_id"],
},
uniqueKey: ["session_id"]
}
pre_operations {
DECLARE event_timestamp_checkpoint TIMESTAMP DEFAULT (
${when(incremental(),
`SELECT last_processed_timestamp FROM sessions_meta_table`,
`TIMESTAMP('2000-01-01 00:00:00')`
)}
);
}
MERGE INTO ${self()} AS target
USING (
SELECT
session_id,
MIN(event_timestamp) AS first_ts,
MAX(event_timestamp) AS last_ts,
COUNT(*) AS event_count
FROM events_table AS event
WHERE session_id IS NOT NULL
AND event_timestamp > event_timestamp_checkpoint
GROUP BY session_id
) AS source
ON target.session_id = source.session_id
AND DATE(target.first_ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 HOUR)
WHEN MATCHED THEN
UPDATE SET
first_ts = LEAST(target.first_ts, source.first_ts),
last_ts = GREATEST(target.last_ts, source.last_ts),
event_count = target.event_count + source.event_count
WHEN NOT MATCHED THEN
INSERT (
session_id, first_ts, last_ts, event_count
)
VALUES (
source.session_id, source.first_ts, source.last_ts, source.event_count
);
post_operations {
UPDATE sessions_meta_table
SET last_processed_timestamp = (SELECT MAX(last_ts) FROM ${self()});
}
In Dataform, directly using MERGE statements in SQLX files isn't supported. However, you can still achieve your custom merge logic by executing the MERGE statement as part of the pre_operations or post_operations blocks. Here’s how you can implement the custom MERGE statement and ensure it runs correctly:
Use pre_operations or post_operations to Execute the MERGE Statement: You can place the MERGE statement inside a pre_operations or post_operations block in your SQLX file. This allows you to execute the custom MERGE before or after the main transformation logic.
Example Implementation: Given your script, here’s how to modify it to include the MERGE statement within the pre_operations block:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(first_ts)",
requirePartitionFilter: true,
clusterBy: ["session_id"],
},
uniqueKey: ["session_id"]
}
pre_operations {
DECLARE event_timestamp_checkpoint TIMESTAMP DEFAULT (
${when(incremental(),
`SELECT last_processed_timestamp FROM sessions_meta_table`,
`TIMESTAMP('2000-01-01 00:00:00')`
)}
);
-- Execute the MERGE statement as part of the pre_operations
MERGE INTO ${self()} AS target
USING (
SELECT
session_id,
MIN(event_timestamp) AS first_ts,
MAX(event_timestamp) AS last_ts,
COUNT(*) AS event_count
FROM events_table AS event
WHERE session_id IS NOT NULL
AND event_timestamp > event_timestamp_checkpoint
GROUP BY session_id
) AS source
ON target.session_id = source.session_id
AND DATE(target.first_ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 HOUR)
WHEN MATCHED THEN
UPDATE SET
first_ts = LEAST(target.first_ts, source.first_ts),
last_ts = GREATEST(target.last_ts, source.last_ts),
event_count = target.event_count + source.event_count
WHEN NOT MATCHED THEN
INSERT (
session_id, first_ts, last_ts, event_count
)
VALUES (
source.session_id, source.first_ts, source.last_ts, source.event_count
);
}
post_operations {
UPDATE sessions_meta_table
SET last_processed_timestamp = (SELECT MAX(last_ts) FROM ${self()});
}
If you encounter any limitations using pre_operations or post_operations, consider breaking the process into two separate Dataform tables: