Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Custom MERGE (INCREMENTAL TABLE) with DataForm

Hello everyone,

I am currently facing a situation where I am trying to implement a custom MERGE (INCREMENTAL TABLE) with DataForm. Here, I have the primary keys along with two metadata columns: LOAD_BATCH and DRIVER_ACTION_CODE.

My objective is to implement a SQLX Incremental that, given a date, creates an intermediate view that selects occurrences of the PKs for the most recent LOAD_BATCH. Following this, it should apply the MERGE, firstly where DRIVER_ACTION_CODE equals "D" to delete, and then proceed to merge where DRIVER_ACTION_CODE is either "I" or "U".

So far, I have managed to accomplish this using JS with type="operation", but I am looking to do it in a more scalable manner and take advantage of the features that DataForm offers.

Does anyone have any recommendations or tips on how to best achieve this?

Here is my current code: 

includes/merge_helper.js

```JavaScript

function generateCreateTableQuery(context) {
  return `
      CREATE TABLE IF NOT EXISTS ${context.tabla_silver} (
        ${context.columns.filter(column => column).map(column => `${column} ${context.columnDataTypes[column]}`).join(", ")}
      )
      -- PARTITION BY ${context.partition_column}
      CLUSTER BY ${context.clustering_columns.join(", ")}`
}


function generateDeleteQuery(context) {
  return `
      MERGE ${context.tabla_silver} as t
      USING (
        SELECT ${context.columns.join(", ")}
        FROM ${context.tabla_bronce} as s
         WHERE 1=1
          and load_batch >= '${context.startDate}'
          and ${context.TRANSACTION_COL} IN ('D')
      ) s
      ON ${context.pk_columns.map(pk_column => `t.${pk_column} = s.${pk_column}`).join(" AND ")}
      WHEN MATCHED AND s.${context.TRANSACTION_COL} = 'D' THEN
        DELETE
      `};


function generateUpsertQuery(context) {
  return `
        MERGE ${(context.tabla_silver)} as t
        USING (
          SELECT ${context.columns.join(", ")}
          FROM ${context.tabla_bronce} as s
          WHERE 1=1
          and load_batch >= '${context.startDate}'
          and ${context.TRANSACTION_COL} IN ('I', 'U')
        ) s
        ON ${context.pk_columns.map(pk_column => `t.${pk_column} = s.${pk_column}`).join(" AND ")}
        WHEN MATCHED THEN
          UPDATE SET ${context.columns.map(column => `t.${column} = s.${column}`).join(", ")}
        WHEN NOT MATCHED THEN
          INSERT (${context.columns.join(", ")})
          VALUES (${context.columns.map(column => `s.${column}`).join(", ")})
      `;
}

function generateBaseQuery(context) {
return `
WITH base AS (
SELECT ${context.pk_columns.join(", ")}, MAX(LOAD_BATCH) AS date_max
FROM (
SELECT ROW_NUMBER () OVER (PARTITION BY ${context.pk_columns.join(", ")} ORDER BY LOAD_BATCH DESC) Rn,
${context.pk_columns.join(", ")}, LOAD_BATCH
FROM ${context.tabla_bronce}
where load_batch >= '${context.startDate}'
)
WHERE RN = 1
GROUP BY ${context.pk_columns.join(", ")}
), query_fin AS (
SELECT op.*
FROM base rn
LEFT JOIN (select * from ${context.tabla_bronce} where load_batch >= '${context.startDate}') op
ON ${context.pk_columns.map(pk_column => `op.${pk_column} = rn.${pk_column}`).join(" AND ")} AND op.LOAD_BATCH = rn.date_max
WHERE ${context.pk_columns.map(pk_column => `op.${pk_column}`).join(" AND ")} IS NOT NULL
)
select * from query_fin
`;
}
```
Next, I create a view using the base query in a SQLX type view. After that, in a SQLX type='operations', I use the other three queries to create the table if needed, then merge the "D", and finally merge the "I" and "U"

The idea is do exactly the same steps but with a incremental sqlx or similar...

Looking forward to your responses.

Thank you in advance!

Solved Solved
1 3 5,319
1 ACCEPTED SOLUTION

I will try to address each of your questions.

The incremental_where() function is a Dataform-specific function that is used to filter the data that is loaded into an incremental table. It is used to ensure that only new or updated data is loaded into the table. However, it seems like you're getting an error because the function is not being recognized. This could be because you're not using it correctly. The incremental_where() function should be used in the SQLX config block like this:

config {
type: "incremental",
...
incremental_where: {
timestamp_field: "load_batch",
comparison: "greater than or equal to"
}
}

In this example, timestamp_field should be replaced with the name of the field in your table that contains the timestamp or date of each record, and comparison should be replaced with the comparison operator you want to use to filter the data.

  1. The error "Scalar subquery produced more than one element" typically occurs when a subquery that is expected to return a single value returns multiple values instead. This could be happening because your intermediate_view is returning multiple rows for a single primary key. You might need to modify your intermediate_view to ensure that it only returns one row for each primary key.
  2. The incremental() macro is a Dataform-specific function that is used to create incremental tables. It works by comparing the data in the source table with the data in the target table and only loading the new or updated records into the target table. You can use it in your SQLX file like this:

${incremental("source_table", ["primary_key"])}

In this example, source_table should be replaced with the name of your source table, and primary_key should be replaced with the name of your primary key field.

  • To grant access to a dataset using postOperations, you can use the GRANT statement in SQL. Here's an example:

post_operations {
"GRANT SELECT ON ${self()} TO group_name"
}

In this example, group_name should be replaced with the name of the group you want to grant access to. Note that this will only work if you have the necessary permissions to grant access to other users or groups.

View solution in original post

3 REPLIES 3

Here are some recommendations on how you can achieve your goal using Dataform's SQLX:

  • Intermediate View: Continue to use your generateBaseQuery function to create an intermediate view that selects occurrences of the PKs for the most recent LOAD_BATCH. This can be done using a standard SQLX view.

  • Incremental Table: Create an incremental table that will hold your final data. In the preOps and postOps sections of the incremental table, you can include your custom logic for handling the "D", "I", and "U" operations.

Here's a simplified example of how you might structure your SQLX file:

config {
type: "incremental",
bigquery: {
partitionBy: "date",
clusterBy: ["column1", "column2"]
},
preOps: [
"MERGE ${self()} as t USING (SELECT * FROM ${ref('intermediate_view')} WHERE DRIVER_ACTION_CODE = 'D') s ON t.PK = s.PK WHEN MATCHED THEN DELETE"
],
postOps: [
"MERGE ${self()} as t USING (SELECT * FROM ${ref('intermediate_view')} WHERE DRIVER_ACTION_CODE IN ('I', 'U')) s ON t.PK = s.PK WHEN MATCHED THEN UPDATE SET column1 = s.column1, column2 = s.column2 WHEN NOT MATCHED THEN INSERT (column1, column2) VALUES (s.column1, s.column2)"
]
}

SELECT *
FROM ${ref('intermediate_view')}
WHERE _PARTITIONTIME = TIMESTAMP(@{incremental_where()})

In this example, intermediate_view is the name of the view you created in step 1. The preOps and postOps sections contain your custom MERGE logic. The incremental_where() function is used to only select the new data since the last run.

Please note that this is a simplified example and you would need to adapt it to your specific use case.

 

@ms4446 

Hello, thank you very much for the answer. I have a couple of doubts to carry this out.
1. I don't understand what the code is for

```SQLX
SELECT *
FROM ${ref('intermediate_view')}
WHERE _PARTITIONTIME = TIMESTAMP(@{incremental_where()})
```
I get an error, in @{incremental_where()}

error: Syntax error: Expected ")" but got "@" at [12:34].

If i change the code deleting the "WHERE _PARTITIONTIME = TIMESTAMP(@{incremental_where()})" then:

error: reason:"invalidQuery" location:"query" message:"Query error: Scalar subquery produced more than one element at [28:45]": invalid argument

The CODE right now:

```SQLX

config
{
type: "incremental",
schema: "schema_silver",
name: "table_name",
tags: ["merge", "table_name"],
bigquery: {
partitionBy: "RANGE_BUCKET(Partition_key, GENERATE_ARRAY(0, 100000, 10))",
clusterBy: [cluster_keys...]
}
}

select * from FROM ${ref('intermediate')})

pre_operations {
DECLARE partition_pruning DEFAULT (select distinct ${context.partition_column} FROM ${ref('intermediate')});

MERGE ${self()} as t USING (SELECT * FROM ${ref('intermediate_view')}
WHERE DRIVER_S_ACTION_CODE = 'D') s
ON 1=1
and t.${context.partition_column} in (partition_pruning)
and ${context.pk_columns.map(pk_column => `t.${pk_column} = s.${pk_column}`).join(" AND ")}
WHEN MATCHED THEN
DELETE
}

post_operations {
MERGE ${self()} as t USING (SELECT * FROM ${ref('intermediate_view')}
WHERE DRIVER_S_ACTION_CODE IN ('I', 'U')) s
ON 1=1
and t.${context.partition_column} in (partition_pruning)
and ${context.pk_columns.map(pk_column => `t.${pk_column} = s.${pk_column}`).join(" AND ")}
WHEN MATCHED THEN
UPDATE SET ${context.columns.map(column => `t.${column} = s.${column}`).join(", ")}
WHEN NOT MATCHED THEN
INSERT (${context.columns.join(", ")})
VALUES (${context.columns.map(column => `s.${column}`).join(", ")})
}
```

But if i execute the compile query manually works just fine 😞

I would love to use the incremental() macro, but I don't quite understand how to apply it in my context.

One additional question, how can I give access by dataset using postOperations, I see how to give access to an email but not how to add an authorized dataset...
Thanks again for all the help ‌‌

 

I will try to address each of your questions.

The incremental_where() function is a Dataform-specific function that is used to filter the data that is loaded into an incremental table. It is used to ensure that only new or updated data is loaded into the table. However, it seems like you're getting an error because the function is not being recognized. This could be because you're not using it correctly. The incremental_where() function should be used in the SQLX config block like this:

config {
type: "incremental",
...
incremental_where: {
timestamp_field: "load_batch",
comparison: "greater than or equal to"
}
}

In this example, timestamp_field should be replaced with the name of the field in your table that contains the timestamp or date of each record, and comparison should be replaced with the comparison operator you want to use to filter the data.

  1. The error "Scalar subquery produced more than one element" typically occurs when a subquery that is expected to return a single value returns multiple values instead. This could be happening because your intermediate_view is returning multiple rows for a single primary key. You might need to modify your intermediate_view to ensure that it only returns one row for each primary key.
  2. The incremental() macro is a Dataform-specific function that is used to create incremental tables. It works by comparing the data in the source table with the data in the target table and only loading the new or updated records into the target table. You can use it in your SQLX file like this:

${incremental("source_table", ["primary_key"])}

In this example, source_table should be replaced with the name of your source table, and primary_key should be replaced with the name of your primary key field.

  • To grant access to a dataset using postOperations, you can use the GRANT statement in SQL. Here's an example:

post_operations {
"GRANT SELECT ON ${self()} TO group_name"
}

In this example, group_name should be replaced with the name of the group you want to grant access to. Note that this will only work if you have the necessary permissions to grant access to other users or groups.