Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataform: incremental() always returns false?

I'm having problems with the usage of the incremental() function within my Dataform definition. I have a partitioned source table "api-calls", from which I only want to query the events that happened on or after the last day that I have already created daily aggregates for. My SQLX file looks roughly as follows

 

config {
    type: "incremental",
    schema: "billing",
    description: "Daily active users by owner and client",
    tags: ["billing"],
    uniqueKey: ["triggered_on", "environment", "client_id"],
    bigquery: {
        partitionBy: "DATE(triggered_on)",
        updatePartitionFilter: "triggered_on >= event_timestamp_checkpoint",
        clusterBy: ["environment", "client_id"]
    }
}

SELECT
  TIMESTAMP_TRUNC(triggered_at, DAY) AS triggered_on,
  client_id,
  environment,
  SUM(...) AS api_call_count,
FROM
  ${ref("api-calls")}
WHERE
  stage = "production"
  AND client_id IS NOT NULL
  AND triggered_at >= event_timestamp_checkpoint
GROUP BY
  triggered_on,
  environment,
  client_id

pre_operations {
  DECLARE
    event_timestamp_checkpoint DEFAULT (
    ${
        when(incremental(),
            `SELECT MAX(triggered_on) FROM ${self()}`,
            `SELECT TIMESTAMP("2020-01-01")`
        )
    }
    )
}

 

The problem is that however I start an execution (either through the "Start Execution" button or through a scheduled workflow) the event_timestamp_checkpoint is set to TIMESTAMP("2020-01-01") resulting in a full table scan, making me think that the incremental() function always returns false.

Am I missing something here? I definitely did not check the "Run with full refresh" option.

2 3 974
3 REPLIES 3

Ok. I had an in-depth look at the generated SQL-code, and it seems as if I wasn't paying close enough attention. There are two code paths, one for an existing incremental table and one for the case where the table has to be created from scratch. Only in the latter case the fixed timestamp was used, which is as expected. Nevertheless, the executions always bill as many bytes as if a full table scan was being invoked (e.g. 200GB), even though the source table is partitioned by day. I ran a test with a copy of the source table and there the second execution was billed only billed in the megabytes range, which is what I would have expected in the first place. The only difference I can see is that the source table is used as target for a Datastream stream. Could that be a reason for it behaving differently?

Hi @mdomke

Welcome and thank you for reaching out to our community.

I'm not so sure how to answer your query but  I believe this document can help you understand and configure an incremental table.

Dataform updates tables differently based on the table type. During each execution of a table or a view, Dataform rebuilds the whole table or view from scratch.

When you define an incremental table, Dataform builds the incremental table from scratch only for the first time. During subsequent executions, Dataform only inserts or merges new rows into the incremental table according to the conditions that you configure.

Dataform inserts new rows only into columns that already exist in the incremental table. If you make changes to the incremental table definition query — for example, add a new column — you must rebuild the table from scratch. To do so, the next time you trigger an execution of the table, select the Run with full refresh option.


You may also need to revisit this guide on how to create a Dataform table as to not miss anything.

Hope this helps.

Okay. I think I know now what has been the reason for this, and it seems to be completely unrelated to Dataform or the incremental() function. If you're still interested, read on!

The source table that I used for the Dataform operations is filled by Datastream. Datastream created the table initially, but it doesn't provide an option to create a partitioned table. I therefore went through the following procedure as hinted here

  1. Stop the Datastream stream and wait until all streaming buffers are empty
  2. Use CREATE TABLE <source-copy> PARTITIONED BY ... AS (SELECT * FROM <source-original>) to create a copy of the table partitioned by a specific timestamp field.
  3. Delete <source-original>
  4. Use bq copy <source-copy> <source-original> to recreate the stream target for Datastream
  5. Restart the Datastream stream

The observation is, that for the table copied with bq copy the partitions are not used as expected and each query "feels" like a complete table scan (based on the amount of bytes that are being billed). I worked around this by using the SQL statement in step 2 to recreate the original table in a partitioned layout, and now the Dataform executions only consume an expectable amount of bytes. I don't know enough about the internals to make sense out of this, but this is what I could figure out.