Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Custom MERGE (INCREMENTAL TABLE) with DataForm

Hello everyone,

I am currently facing a situation where I am trying to implement a custom MERGE (INCREMENTAL TABLE) with DataForm. Here, I have the primary keys along with two metadata columns: LOAD_BATCH and DRIVER_ACTION_CODE.

My objective is to implement a SQLX Incremental that, given a date, creates an intermediate view that selects occurrences of the PKs for the most recent LOAD_BATCH. Following this, it should apply the MERGE, firstly where DRIVER_ACTION_CODE equals "D" to delete, and then proceed to merge where DRIVER_ACTION_CODE is either "I" or "U".

So far, I have managed to accomplish this using JS with type="operation", but I am looking to do it in a more scalable manner and take advantage of the features that DataForm offers.

Does anyone have any recommendations or tips on how to best achieve this?

Here is my current code: 

includes/merge_helper.js

```JavaScript

function generateCreateTableQuery(context) {
  return `
      CREATE TABLE IF NOT EXISTS ${context.tabla_silver} (
        ${context.columns.filter(column => column).map(column => `${column} ${context.columnDataTypes[column]}`).join(", ")}
      )
      -- PARTITION BY ${context.partition_column}
      CLUSTER BY ${context.clustering_columns.join(", ")}`
}


function generateDeleteQuery(context) {
  return `
      MERGE ${context.tabla_silver} as t
      USING (
        SELECT ${context.columns.join(", ")}
        FROM ${context.tabla_bronce} as s
         WHERE 1=1
          and load_batch >= '${context.startDate}'
          and ${context.TRANSACTION_COL} IN ('D')
      ) s
      ON ${context.pk_columns.map(pk_column => `t.${pk_column} = s.${pk_column}`).join(" AND ")}
      WHEN MATCHED AND s.${context.TRANSACTION_COL} = 'D' THEN
        DELETE
      `};


function generateUpsertQuery(context) {
  return `
        MERGE ${(context.tabla_silver)} as t
        USING (
          SELECT ${context.columns.join(", ")}
          FROM ${context.tabla_bronce} as s
          WHERE 1=1
          and load_batch >= '${context.startDate}'
          and ${context.TRANSACTION_COL} IN ('I', 'U')
        ) s
        ON ${context.pk_columns.map(pk_column => `t.${pk_column} = s.${pk_column}`).join(" AND ")}
        WHEN MATCHED THEN
          UPDATE SET ${context.columns.map(column => `t.${column} = s.${column}`).join(", ")}
        WHEN NOT MATCHED THEN
          INSERT (${context.columns.join(", ")})
          VALUES (${context.columns.map(column => `s.${column}`).join(", ")})
      `;
}

function generateBaseQuery(context) {
return `
WITH base AS (
SELECT ${context.pk_columns.join(", ")}, MAX(LOAD_BATCH) AS date_max
FROM (
SELECT ROW_NUMBER () OVER (PARTITION BY ${context.pk_columns.join(", ")} ORDER BY LOAD_BATCH DESC) Rn,
${context.pk_columns.join(", ")}, LOAD_BATCH
FROM ${context.tabla_bronce}
where load_batch >= '${context.startDate}'
)
WHERE RN = 1
GROUP BY ${context.pk_columns.join(", ")}
), query_fin AS (
SELECT op.*
FROM base rn
LEFT JOIN (select * from ${context.tabla_bronce} where load_batch >= '${context.startDate}') op
ON ${context.pk_columns.map(pk_column => `op.${pk_column} = rn.${pk_column}`).join(" AND ")} AND op.LOAD_BATCH = rn.date_max
WHERE ${context.pk_columns.map(pk_column => `op.${pk_column}`).join(" AND ")} IS NOT NULL
)
select * from query_fin
`;
}
```
Next, I create a view using the base query in a SQLX type view. After that, in a SQLX type='operations', I use the other three queries to create the table if needed, then merge the "D", and finally merge the "I" and "U"

The idea is do exactly the same steps but with a incremental sqlx or similar...

Looking forward to your responses.

Thank you in advance!

Solved Solved
1 3 5,335
1 ACCEPTED SOLUTION

I will try to address each of your questions.

The incremental_where() function is a Dataform-specific function that is used to filter the data that is loaded into an incremental table. It is used to ensure that only new or updated data is loaded into the table. However, it seems like you're getting an error because the function is not being recognized. This could be because you're not using it correctly. The incremental_where() function should be used in the SQLX config block like this:

config {
type: "incremental",
...
incremental_where: {
timestamp_field: "load_batch",
comparison: "greater than or equal to"
}
}

In this example, timestamp_field should be replaced with the name of the field in your table that contains the timestamp or date of each record, and comparison should be replaced with the comparison operator you want to use to filter the data.

  1. The error "Scalar subquery produced more than one element" typically occurs when a subquery that is expected to return a single value returns multiple values instead. This could be happening because your intermediate_view is returning multiple rows for a single primary key. You might need to modify your intermediate_view to ensure that it only returns one row for each primary key.
  2. The incremental() macro is a Dataform-specific function that is used to create incremental tables. It works by comparing the data in the source table with the data in the target table and only loading the new or updated records into the target table. You can use it in your SQLX file like this:

${incremental("source_table", ["primary_key"])}

In this example, source_table should be replaced with the name of your source table, and primary_key should be replaced with the name of your primary key field.

  • To grant access to a dataset using postOperations, you can use the GRANT statement in SQL. Here's an example:

post_operations {
"GRANT SELECT ON ${self()} TO group_name"
}

In this example, group_name should be replaced with the name of the group you want to grant access to. Note that this will only work if you have the necessary permissions to grant access to other users or groups.

View solution in original post

3 REPLIES 3