Hi Team,
I have source table which contains the change stream, I'm trying to replication the table in BigQuery using Dataform. For that I want to read the records from change stream table and insert or update records in the destination table.
When I tried creating two output files one containing insert and other update publish function I get the error
Duplicate action name detected. Names within a schema must be unique across tables, declarations, assertions, and operations
Insert.js ->
publish("car_table", {type: "incremental"})
.query(ctx => {
`SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src WHERE src._metadata_spanner_mod_type="INSERT"`
});
Update.js ->
publish("car_table", {type: "incremental"})
.query(ctx => `SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src WHERE src._metadata_spanner_mod_type="UPDATE"`);
How can I achieve it? Can I combine both the SQL statements into one publish function?
Solved! Go to Solution.
Below are some resources and example projects that can help you understand and work with Dataform:
Dataform's official documentation and guides: This is your best starting point as it provides comprehensive details, tutorials, and best practices.
Dataform's GitHub repository: This contains sample projects, integrations, and more. It's a useful resource to see real-world setups and understand potential advanced configurations.
Example Projects:
Dataform Web Tracking Example: This is a sample project that transforms raw Google Analytics and database logs into a set of tables suitable for analysis.
Dataform E-commerce Example: This project takes raw e-commerce data and turns it into a tidy set of tables.
Community Resources:
Dataform's Community Slack: This is a great place to ask questions, share experiences, and learn from other Dataform users.
Dataform Blog: Contains a variety of articles, from best practices to in-depth explanations of specific features.
A sample script to run the MERGE statement in BigQuery can be written in either SQL or a programming language such as JavaScript.
Here is a sample SQL script:
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
This script can be run in the BigQuery console or using the BigQuery API.
Here is a sample JavaScript script to run the MERGE statement in BigQuery:
const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
const query = `
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
`;
async function runQuery() {
const job = await bigquery.createQueryJob({
query,
location: 'US',
});
const [result] = await job.getQueryResults();
console.log(result);
}
runQuery();
This script can be run using Node.js.
Which type of script you choose will depend on your specific needs and preferences.
Yes, you can combine both the SQL statements into one publish
function. One way to achieve this is by using the CASE
statement to categorize the type of operation (INSERT
or UPDATE
). Here's an example:
publish("car_table", {type: "incremental"}).query(ctx => `
SELECT
src.id,
src.brand,
src.count,
CASE src._metadata_spanner_mod_type
WHEN 'INSERT' THEN 'insert'
WHEN 'UPDATE' THEN 'update'
ELSE 'unknown'
END AS action
FROM ${ctx.ref("car_table_changelog")} AS src
`);
This will produce a table with an additional action
column that indicates whether the record is an insert
or update
. However, note that this action
column is more for informational purposes. Dataform will handle the insertion and updating of records automatically when you use {type: "incremental"}
based on the primary key of the table.
Thanks for the response.
I checked what you said and Dataform didn't handle insert or update. Instead it appended all the values creating duplicate.
Publish function
publish("car_table", {type: "incremental"})
.query(ctx => `SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src ORDER BY src._metadata_spanner_commit_timestamp`);
Output:
I understand. Dataform's publish
function is designed to create or append data to a destination table, not to directly handle upserts (insert or update). To achieve upserts, you'll need a different approach in the context of BigQuery.
One approach is to use BigQuery's MERGE
statement:
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
This MERGE
statement will handle both inserts and updates based on the _metadata_spanner_mod_type
column.
In Dataform, you can create a script (not a publish
script) that runs this MERGE
statement. This script can be scheduled to run periodically to apply changes from the car_table_changelog
to the car_table
.
Remember, the exact solution might need adjustments based on your specific dataset and requirements.
Thanks for the reply. Can you give or point me to a sample script to run merge statement? when you mean script it is javascript or sqlx?
A sample script to run the MERGE statement in BigQuery can be written in either SQL or a programming language such as JavaScript.
Here is a sample SQL script:
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
This script can be run in the BigQuery console or using the BigQuery API.
Here is a sample JavaScript script to run the MERGE statement in BigQuery:
const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
const query = `
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
`;
async function runQuery() {
const job = await bigquery.createQueryJob({
query,
location: 'US',
});
const [result] = await job.getQueryResults();
console.log(result);
}
runQuery();
This script can be run using Node.js.
Which type of script you choose will depend on your specific needs and preferences.
I plan to run it in https://cloud.google.com/dataform
As per https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement
f as part of a MERGE a new row is inserted in the target table, the newly inserted row is not eligible for a match with rows from the source table. Matching is based on the state the tables are in when the query is started.
The Merge update DML won't work as my source table contains both inserts and updates for the same primary key.
I tried different approach by separating the two activities into two files.
File: insert_tables.sqlx
config {
type: "operations",
schema: "dataform",
hasOutput: true
}
INSERT INTO dataform.car_table (id, brand, count)
SELECT src.id, src.brand, src.count
FROM ${ref("car_table_changelog")} as src
WHERE src._metadata_spanner_mod_type like 'INSERT'
ORDER BY src._metadata_spanner_commit_timestamp
File: update_tables.sqlx
config {
type: "operations",
schema: "dataform",
hasOutput: true,
dependencies: ["insert_tables"]
}
UPDATE dataform.car_table target
SET
target.brand = source.brand,
target.count = source.count
FROM ${ref("car_table_changelog")} as source
WHERE target.id = source.id AND source._metadata_spanner_mod_type like 'UPDATE'
Even this with error UPDATE/MERGE must match at most one source row for each target row
I'm unable to find a solution.
If possible, can you share links to example projects (not specific to my use case) which I can refer? Thanks in advance.
Below are some resources and example projects that can help you understand and work with Dataform:
Dataform's official documentation and guides: This is your best starting point as it provides comprehensive details, tutorials, and best practices.
Dataform's GitHub repository: This contains sample projects, integrations, and more. It's a useful resource to see real-world setups and understand potential advanced configurations.
Example Projects:
Dataform Web Tracking Example: This is a sample project that transforms raw Google Analytics and database logs into a set of tables suitable for analysis.
Dataform E-commerce Example: This project takes raw e-commerce data and turns it into a tidy set of tables.
Community Resources:
Dataform's Community Slack: This is a great place to ask questions, share experiences, and learn from other Dataform users.
Dataform Blog: Contains a variety of articles, from best practices to in-depth explanations of specific features.