Hi everyone,
I'm working on a project where I need to aggregate events into session-level data on an hourly basis using Dataform and BigQuery. In our case, sessions can last up to 3 hours (that's the 99.99th percentile), so it's important for us to handle sessions that span multiple hours accurately.
To manage this, I'm trying to implement custom merge logic to properly update session metrics when new events come in for existing sessions. Specifically, I want to use a custom MERGE statement instead of a simple SELECT because it allows me to define how to merge new event data with existing aggregated sessions.
However, I'm running into issues with Dataform not supporting MERGE statements directly in SQLX files. My question is:
Is it possible to use a custom MERGE statement within Dataform's SQLX files?
If yes, how can I properly implement it so that Dataform executes it correctly? If not, are there any workarounds or best practices to achieve similar functionality?
I understand that Dataform handles incremental queries and can merge data based on unique keys, but in this case, I need more control over the merge logic to ensure sessions are accurately updated with new events.
Any insights or suggestions would be greatly appreciated!
Thanks in advance.
Here is the simplified version of the script:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(first_ts)",
requirePartitionFilter: true,
clusterBy: ["session_id"],
},
uniqueKey: ["session_id"]
}
pre_operations {
DECLARE event_timestamp_checkpoint TIMESTAMP DEFAULT (
${when(incremental(),
`SELECT last_processed_timestamp FROM sessions_meta_table`,
`TIMESTAMP('2000-01-01 00:00:00')`
)}
);
}
MERGE INTO ${self()} AS target
USING (
SELECT
session_id,
MIN(event_timestamp) AS first_ts,
MAX(event_timestamp) AS last_ts,
COUNT(*) AS event_count
FROM events_table AS event
WHERE session_id IS NOT NULL
AND event_timestamp > event_timestamp_checkpoint
GROUP BY session_id
) AS source
ON target.session_id = source.session_id
AND DATE(target.first_ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 HOUR)
WHEN MATCHED THEN
UPDATE SET
first_ts = LEAST(target.first_ts, source.first_ts),
last_ts = GREATEST(target.last_ts, source.last_ts),
event_count = target.event_count + source.event_count
WHEN NOT MATCHED THEN
INSERT (
session_id, first_ts, last_ts, event_count
)
VALUES (
source.session_id, source.first_ts, source.last_ts, source.event_count
);
post_operations {
UPDATE sessions_meta_table
SET last_processed_timestamp = (SELECT MAX(last_ts) FROM ${self()});
}