Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Using Custom MERGE Statements in Dataform for Hourly Session Aggregation

Hi everyone,

I'm working on a project where I need to aggregate events into session-level data on an hourly basis using Dataform and BigQuery. In our case, sessions can last up to 3 hours (that's the 99.99th percentile), so it's important for us to handle sessions that span multiple hours accurately.

To manage this, I'm trying to implement custom merge logic to properly update session metrics when new events come in for existing sessions. Specifically, I want to use a custom MERGE statement instead of a simple SELECT because it allows me to define how to merge new event data with existing aggregated sessions.

However, I'm running into issues with Dataform not supporting MERGE statements directly in SQLX files. My question is:

  • Is it possible to use a custom MERGE statement within Dataform's SQLX files?

  • If yes, how can I properly implement it so that Dataform executes it correctly? If not, are there any workarounds or best practices to achieve similar functionality?

I understand that Dataform handles incremental queries and can merge data based on unique keys, but in this case, I need more control over the merge logic to ensure sessions are accurately updated with new events.

Any insights or suggestions would be greatly appreciated!

Thanks in advance.

Here is the simplified version of the script:

 

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(first_ts)",
    requirePartitionFilter: true,
    clusterBy: ["session_id"],
  },
  uniqueKey: ["session_id"]
}

pre_operations {
  DECLARE event_timestamp_checkpoint TIMESTAMP DEFAULT (
    ${when(incremental(),
    `SELECT last_processed_timestamp FROM sessions_meta_table`,
    `TIMESTAMP('2000-01-01 00:00:00')`
    )}
  );
}

MERGE INTO ${self()} AS target
USING (
  SELECT
    session_id,
    MIN(event_timestamp) AS first_ts,
    MAX(event_timestamp) AS last_ts,
    COUNT(*) AS event_count
  FROM events_table AS event
  WHERE session_id IS NOT NULL
    AND event_timestamp > event_timestamp_checkpoint
  GROUP BY session_id
) AS source
ON target.session_id = source.session_id
AND DATE(target.first_ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 HOUR)

WHEN MATCHED THEN
  UPDATE SET
    first_ts = LEAST(target.first_ts, source.first_ts),
    last_ts = GREATEST(target.last_ts, source.last_ts),
    event_count = target.event_count + source.event_count

WHEN NOT MATCHED THEN
  INSERT (
    session_id, first_ts, last_ts, event_count
  )
  VALUES (
    source.session_id, source.first_ts, source.last_ts, source.event_count
  );

post_operations {
  UPDATE sessions_meta_table
  SET last_processed_timestamp = (SELECT MAX(last_ts) FROM ${self()});
}

 




0 1 168
1 REPLY 1

In Dataform, directly using MERGE statements in SQLX files isn't supported. However, you can still achieve your custom merge logic by executing the MERGE statement as part of the pre_operations or post_operations blocks. Here’s how you can implement the custom MERGE statement and ensure it runs correctly:

  1. Use pre_operations or post_operations to Execute the MERGE Statement: You can place the MERGE statement inside a pre_operations or post_operations block in your SQLX file. This allows you to execute the custom MERGE before or after the main transformation logic.

  2. Example Implementation: Given your script, here’s how to modify it to include the MERGE statement within the pre_operations block:

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(first_ts)",
    requirePartitionFilter: true,
    clusterBy: ["session_id"],
  },
  uniqueKey: ["session_id"]
}

pre_operations {
  DECLARE event_timestamp_checkpoint TIMESTAMP DEFAULT (
    ${when(incremental(),
    `SELECT last_processed_timestamp FROM sessions_meta_table`,
    `TIMESTAMP('2000-01-01 00:00:00')`
    )}
  );

  -- Execute the MERGE statement as part of the pre_operations
  MERGE INTO ${self()} AS target
  USING (
    SELECT
      session_id,
      MIN(event_timestamp) AS first_ts,
      MAX(event_timestamp) AS last_ts,
      COUNT(*) AS event_count
    FROM events_table AS event
    WHERE session_id IS NOT NULL
      AND event_timestamp > event_timestamp_checkpoint
    GROUP BY session_id
  ) AS source
  ON target.session_id = source.session_id
  AND DATE(target.first_ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 HOUR)

  WHEN MATCHED THEN
    UPDATE SET
      first_ts = LEAST(target.first_ts, source.first_ts),
      last_ts = GREATEST(target.last_ts, source.last_ts),
      event_count = target.event_count + source.event_count

  WHEN NOT MATCHED THEN
    INSERT (
      session_id, first_ts, last_ts, event_count
    )
    VALUES (
      source.session_id, source.first_ts, source.last_ts, source.event_count
    );
}

post_operations {
  UPDATE sessions_meta_table
  SET last_processed_timestamp = (SELECT MAX(last_ts) FROM ${self()});
}
  • The MERGE statement is placed inside the pre_operations block, ensuring that it runs before the main transformation. Alternatively, you could use the post_operations block if you prefer to run it after the main data processing.
  • Dataform will execute the custom MERGE as a standard BigQuery operation, which gives you full control over how you want to merge and update your session data.
  • Make sure to test the script thoroughly in a development environment to ensure that the custom merge logic behaves as expected.

If you encounter any limitations using pre_operations or post_operations, consider breaking the process into two separate Dataform tables:

  1. Stage Table: Aggregate events and prepare the data for merging.
  2. Final Table: Use the MERGE statement in the pre_operations or post_operations to update this table based on the stage table's data.