Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

In GCP BigQuery Datafrom, how to perform insert and update operations on same incrementat table?

pnr
Bronze 3
Bronze 3

Hi Team,

I have source table which contains the change stream, I'm trying to replication the table in BigQuery using Dataform. For that I want to read the records from change stream table and insert or update records in the destination table.

When I tried creating two output files one containing insert and other update publish function I get the error

Duplicate action name detected. Names within a schema must be unique across tables, declarations, assertions, and operations 

Insert.js ->

publish("car_table", {type: "incremental"})
.query(ctx => {
    `SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src WHERE src._metadata_spanner_mod_type="INSERT"`
});

Update.js ->

publish("car_table", {type: "incremental"})
.query(ctx => `SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src WHERE src._metadata_spanner_mod_type="UPDATE"`);

How can I achieve it? Can I combine both the SQL statements into one publish function?

Solved Solved
0 9 2,675
2 ACCEPTED SOLUTIONS

Below are some resources and example projects that can help you understand and work with Dataform:

  1. Dataform's official documentation and guides: This is your best starting point as it provides comprehensive details, tutorials, and best practices.

  2. Dataform's GitHub repository: This contains sample projects, integrations, and more. It's a useful resource to see real-world setups and understand potential advanced configurations.

  3. Example Projects:

  4. Community Resources:

    • Dataform's Community Slack: This is a great place to ask questions, share experiences, and learn from other Dataform users.

    • Dataform Blog: Contains a variety of articles, from best practices to in-depth explanations of specific features.

View solution in original post

 

A sample script to run the MERGE statement in BigQuery can be written in either SQL or a programming language such as JavaScript.

Here is a sample SQL script:

MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);

This script can be run in the BigQuery console or using the BigQuery API.

Here is a sample JavaScript script to run the MERGE statement in BigQuery:

const { BigQuery } = require('@google-cloud/bigquery');

const bigquery = new BigQuery();

const query = `
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
`;

async function runQuery() {
const job = await bigquery.createQueryJob({
query,
location: 'US',
});

const [result] = await job.getQueryResults();

console.log(result);
}

runQuery();

This script can be run using Node.js.

Which type of script you choose will depend on your specific needs and preferences.

 

View solution in original post

9 REPLIES 9

Yes, you can combine both the SQL statements into one publish function. One way to achieve this is by using the CASE statement to categorize the type of operation (INSERT or UPDATE). Here's an example:

publish("car_table", {type: "incremental"}).query(ctx => `
SELECT
src.id,
src.brand,
src.count,
CASE src._metadata_spanner_mod_type
WHEN 'INSERT' THEN 'insert'
WHEN 'UPDATE' THEN 'update'
ELSE 'unknown'
END AS action
FROM ${ctx.ref("car_table_changelog")} AS src
`);

This will produce a table with an additional action column that indicates whether the record is an insert or update. However, note that this action column is more for informational purposes. Dataform will handle the insertion and updating of records automatically when you use {type: "incremental"} based on the primary key of the table.

pnr
Bronze 3
Bronze 3

Thanks for the response.

I checked what you said and Dataform didn't handle insert or update. Instead it appended all the values creating duplicate.

Screenshot 2023-09-07 at 12.24.34 AM.png

Publish function

publish("car_table", {type: "incremental"})
.query(ctx => `SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src ORDER BY src._metadata_spanner_commit_timestamp`);

Output:

Screenshot 2023-09-07 at 12.26.48 AM.png

I understand. Dataform's publish function is designed to create or append data to a destination table, not to directly handle upserts (insert or update). To achieve upserts, you'll need a different approach in the context of BigQuery.

One approach is to use BigQuery's MERGE statement:

MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);

This MERGE statement will handle both inserts and updates based on the _metadata_spanner_mod_type column.

In Dataform, you can create a script (not a publish script) that runs this MERGE statement. This script can be scheduled to run periodically to apply changes from the car_table_changelog to the car_table.

Remember, the exact solution might need adjustments based on your specific dataset and requirements.

 

pnr
Bronze 3
Bronze 3

Thanks for the reply. Can you give or point me to a sample script to run merge statement? when you mean script it is javascript or sqlx?

 

A sample script to run the MERGE statement in BigQuery can be written in either SQL or a programming language such as JavaScript.

Here is a sample SQL script:

MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);

This script can be run in the BigQuery console or using the BigQuery API.

Here is a sample JavaScript script to run the MERGE statement in BigQuery:

const { BigQuery } = require('@google-cloud/bigquery');

const bigquery = new BigQuery();

const query = `
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
`;

async function runQuery() {
const job = await bigquery.createQueryJob({
query,
location: 'US',
});

const [result] = await job.getQueryResults();

console.log(result);
}

runQuery();

This script can be run using Node.js.

Which type of script you choose will depend on your specific needs and preferences.

 

I plan to run it in https://cloud.google.com/dataform

As per https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement

f as part of a MERGE a new row is inserted in the target table, the newly inserted row is not eligible for a match with rows from the source table. Matching is based on the state the tables are in when the query is started.

The Merge update DML won't work as my source table contains both inserts and updates for the same primary key.

I tried different approach by separating the two activities into two files.

File: insert_tables.sqlx

config { 
  type: "operations",
  schema: "dataform",
  hasOutput: true
}

INSERT INTO dataform.car_table (id, brand, count)
SELECT src.id, src.brand, src.count
FROM ${ref("car_table_changelog")} as src
WHERE src._metadata_spanner_mod_type like 'INSERT'
ORDER BY src._metadata_spanner_commit_timestamp

File: update_tables.sqlx 

config { 
  type: "operations",
  schema: "dataform",
  hasOutput: true,
  dependencies: ["insert_tables"]
}

UPDATE dataform.car_table target
SET 
target.brand = source.brand,
target.count = source.count
FROM ${ref("car_table_changelog")} as source
WHERE target.id = source.id AND source._metadata_spanner_mod_type like 'UPDATE'

Even this with error UPDATE/MERGE must match at most one source row for each target row 

I'm unable to find a solution.

pnr
Bronze 3
Bronze 3

If possible, can you share links to example projects (not specific to my use case) which I can refer? Thanks in advance.

Below are some resources and example projects that can help you understand and work with Dataform:

  1. Dataform's official documentation and guides: This is your best starting point as it provides comprehensive details, tutorials, and best practices.

  2. Dataform's GitHub repository: This contains sample projects, integrations, and more. It's a useful resource to see real-world setups and understand potential advanced configurations.

  3. Example Projects:

  4. Community Resources:

    • Dataform's Community Slack: This is a great place to ask questions, share experiences, and learn from other Dataform users.

    • Dataform Blog: Contains a variety of articles, from best practices to in-depth explanations of specific features.