Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Using Custom MERGE Statements in Dataform for Hourly Session Aggregation

Hi everyone,

I'm working on a project where I need to aggregate events into session-level data on an hourly basis using Dataform and BigQuery. In our case, sessions can last up to 3 hours (that's the 99.99th percentile), so it's important for us to handle sessions that span multiple hours accurately.

To manage this, I'm trying to implement custom merge logic to properly update session metrics when new events come in for existing sessions. Specifically, I want to use a custom MERGE statement instead of a simple SELECT because it allows me to define how to merge new event data with existing aggregated sessions.

However, I'm running into issues with Dataform not supporting MERGE statements directly in SQLX files. My question is:

  • Is it possible to use a custom MERGE statement within Dataform's SQLX files?

  • If yes, how can I properly implement it so that Dataform executes it correctly? If not, are there any workarounds or best practices to achieve similar functionality?

I understand that Dataform handles incremental queries and can merge data based on unique keys, but in this case, I need more control over the merge logic to ensure sessions are accurately updated with new events.

Any insights or suggestions would be greatly appreciated!

Thanks in advance.

Here is the simplified version of the script:

 

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(first_ts)",
    requirePartitionFilter: true,
    clusterBy: ["session_id"],
  },
  uniqueKey: ["session_id"]
}

pre_operations {
  DECLARE event_timestamp_checkpoint TIMESTAMP DEFAULT (
    ${when(incremental(),
    `SELECT last_processed_timestamp FROM sessions_meta_table`,
    `TIMESTAMP('2000-01-01 00:00:00')`
    )}
  );
}

MERGE INTO ${self()} AS target
USING (
  SELECT
    session_id,
    MIN(event_timestamp) AS first_ts,
    MAX(event_timestamp) AS last_ts,
    COUNT(*) AS event_count
  FROM events_table AS event
  WHERE session_id IS NOT NULL
    AND event_timestamp > event_timestamp_checkpoint
  GROUP BY session_id
) AS source
ON target.session_id = source.session_id
AND DATE(target.first_ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 HOUR)

WHEN MATCHED THEN
  UPDATE SET
    first_ts = LEAST(target.first_ts, source.first_ts),
    last_ts = GREATEST(target.last_ts, source.last_ts),
    event_count = target.event_count + source.event_count

WHEN NOT MATCHED THEN
  INSERT (
    session_id, first_ts, last_ts, event_count
  )
  VALUES (
    source.session_id, source.first_ts, source.last_ts, source.event_count
  );

post_operations {
  UPDATE sessions_meta_table
  SET last_processed_timestamp = (SELECT MAX(last_ts) FROM ${self()});
}

 




0 1 176
1 REPLY 1