My design is to have a script run daily to stream records into BigQuery tables via Write API. However, I need to UPSERT given that there are future updates in the previous records I've already inserted. This is where I am stuck right now since Write API only does append rows. How do I solve this?
Solved! Go to Solution.
The approach to handling stale records in your UPSERT process depends on your specific use case and requirements. Here are two common approaches:
Update the stale records: If you want to keep the stale records in your target table and update them with the new values, you can include an UPDATE operation in your UPSERT process. By comparing the existing records with the new records, you can identify the stale records that need to be updated. Use the UPDATE statement to modify the existing records with the new values.
Delete the stale records: If you no longer need the stale records in your target table and want to remove them, you can include a DELETE operation in your UPSERT process. After comparing the existing records with the new records, identify the stale records that are no longer present in the new records. Use the DELETE statement to remove those stale records from the target table.
The choice between updating or deleting stale records depends on the nature of your data and the requirements of your application.
The MERGE statement provides a more concise and efficient way to perform UPSERT
With the MERGE statement, you can specify the conditions for matching rows and define the actions to perform when a match is found or not found. Here's an example of how you can use the MERGE statement for UPSERT:
MERGE merge_example.table_data T USING merge_example.table_changes S ON T.id = S.id WHEN MATCHED THEN UPDATE SET value = s.value WHEN NOT MATCHED THEN INSERT (id, value) VALUES(id, value)
You might want to create a temporary table to store new daily records and then use a MERGE statement to update the main table accordingly. You can achieve this by following these steps:
You have already provided the code snippet for creating a temporary table with an expiration of 1 hour. Make sure to execute the query with the BigQuery Java client library.
Insert new daily records into the temporary table. You can do this by executing an INSERT statement or using the BigQuery Java client library to load data from a file or a list of records.
Here's a sample query to create the "change" table:
String changeTable = String.format("%s.Change_%s", datasetName, tableName);
String query = String.format("CREATE TABLE `%s`\n" +
"LIKE `%s`\n" +
"OPTIONS(\n" +
" expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)\n" +
")", changeTable, destTable);
Execute the query with the BigQuery Java client library.
This step depends on your specific use case. You'll need to execute an appropriate INSERT or SELECT statement to populate the "change" table with the required data.
Here's a sample MERGE statement:
String mergeQuery = String.format("MERGE `%s` AS target\n" +
"USING `%s` AS source\n" +
"ON target.id = source.id\n" +
"WHEN MATCHED THEN\n" +
" UPDATE SET target.column1 = source.column1, target.column2 = source.column2\n" +
"WHEN NOT MATCHED THEN\n" +
" INSERT (id, column1, column2) VALUES (source.id, source.column1, source.column2)", destTable, changeTable);
Replace id
, column1
, and column2
with the appropriate columns from your tables. Execute the query with the BigQuery Java client library.
To achieve UPSERT functionality with the BigQuery Write API, you can follow these steps:
1. Retrieve the existing records from BigQuery: Before performing the UPSERT operation, you need to fetch the existing records from the target table in BigQuery. You can use a SELECT query to retrieve the records you want to update.
2. Compare the existing records with the new records: Once you have the existing records, you need to compare them with the new records you want to insert. You can use a unique identifier or a combination of columns as the basis for the comparison.
3. Identify the records to update: Determine which records need to be updated based on the comparison. This could involve checking for changes in specific columns or comparing timestamps to identify newer versions of records.
4. Perform the update: To update the existing records, you can use the BigQuery Write API's `appendRows` method. Instead of inserting new rows, you'll be appending the updated records to the existing table.
5. Insert new records: For the new records that don't exist in the target table, you can use the `appendRows` method to insert them as new rows.
Hi @ms4446 , thank you for the detailed response! This is a great direction for me already. However, how do I approach the stale records? Do I delete them or do nothing? I researched about the MERGE statement and I was wondering if I can use that.
The approach to handling stale records in your UPSERT process depends on your specific use case and requirements. Here are two common approaches:
Update the stale records: If you want to keep the stale records in your target table and update them with the new values, you can include an UPDATE operation in your UPSERT process. By comparing the existing records with the new records, you can identify the stale records that need to be updated. Use the UPDATE statement to modify the existing records with the new values.
Delete the stale records: If you no longer need the stale records in your target table and want to remove them, you can include a DELETE operation in your UPSERT process. After comparing the existing records with the new records, identify the stale records that are no longer present in the new records. Use the DELETE statement to remove those stale records from the target table.
The choice between updating or deleting stale records depends on the nature of your data and the requirements of your application.
The MERGE statement provides a more concise and efficient way to perform UPSERT
With the MERGE statement, you can specify the conditions for matching rows and define the actions to perform when a match is found or not found. Here's an example of how you can use the MERGE statement for UPSERT:
MERGE merge_example.table_data T USING merge_example.table_changes S ON T.id = S.id WHEN MATCHED THEN UPDATE SET value = s.value WHEN NOT MATCHED THEN INSERT (id, value) VALUES(id, value)
Thank you! I apologize if my questions are newbie given that I just started exploring BigQuery and SQL in general. Given that the MERGE statement uses another table "change", how do I make that? Given that I can only read the source + the objects in my script. I am planning to make a daily temporary table to append the new records and make the merge from there. How does this sound? Or is there a better approach?
Sample code snippet
//TODO: Retrieve the existing records from BigQuery
bigQueryReaderHelper.startRead();
//TODO: Create timed table to insert fresh rows
String tempTable = createTempTableQueryAndReturnTempTable(tableName);
//TODO: Populate temporary table with new daily records
// writeHistoricalAbsToPendingStream(tempTable, newDataList);
//TODO: Query MERGE / UPSERT statement
//Code snippet for temp table
String destTable = String.format("%s.%s",datasetName , tableName);
String tempTable = String.format("%s.Temp_%s", datasetName, tableName);
String query = String.format("CREATE TABLE `%s`\n" +
"LIKE `%s`\n" +
"OPTIONS(\n" +
" expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)\n" +
")", tempTable, destTable);
You might want to create a temporary table to store new daily records and then use a MERGE statement to update the main table accordingly. You can achieve this by following these steps:
You have already provided the code snippet for creating a temporary table with an expiration of 1 hour. Make sure to execute the query with the BigQuery Java client library.
Insert new daily records into the temporary table. You can do this by executing an INSERT statement or using the BigQuery Java client library to load data from a file or a list of records.
Here's a sample query to create the "change" table:
String changeTable = String.format("%s.Change_%s", datasetName, tableName);
String query = String.format("CREATE TABLE `%s`\n" +
"LIKE `%s`\n" +
"OPTIONS(\n" +
" expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)\n" +
")", changeTable, destTable);
Execute the query with the BigQuery Java client library.
This step depends on your specific use case. You'll need to execute an appropriate INSERT or SELECT statement to populate the "change" table with the required data.
Here's a sample MERGE statement:
String mergeQuery = String.format("MERGE `%s` AS target\n" +
"USING `%s` AS source\n" +
"ON target.id = source.id\n" +
"WHEN MATCHED THEN\n" +
" UPDATE SET target.column1 = source.column1, target.column2 = source.column2\n" +
"WHEN NOT MATCHED THEN\n" +
" INSERT (id, column1, column2) VALUES (source.id, source.column1, source.column2)", destTable, changeTable);
Replace id
, column1
, and column2
with the appropriate columns from your tables. Execute the query with the BigQuery Java client library.
Thank you! This is what I am doing now. But I am facing an error
"Requested entity not found" whenever I try to insert into the temporary change table. 8 out of 10 times I face this one. Is there a delay as to how or when BQ can see a new table?
Coming off from this, I think the better approach is to follow the first response that you've written since that one is straightforward. This recent MERGE statement we are doing is kinda clunky, unless if I create a job. Maybe BigQuery can add an UPSERT API soon? 🙂
The "Requested entity was not found" error usually happens when the referred entity (in this case, a BigQuery table) does not exist at the time the request is made. This could occur due to the following reasons:
Propagation Delay: After you create a table in BigQuery, there might be a small propagation delay before the table is fully "visible" to all operations in the BigQuery service. If you're trying to write to the table immediately after creating it, this delay might cause the "entity not found" error.
Asynchronous Operations: If your code is running asynchronously, it's possible that the write operation is being called before the table creation operation has completed.
To resolve the issue, consider the following solutions:
Add a delay: You can add a short delay in your code between the creation of the table and the first operation that uses the table. This can give BigQuery time to complete the propagation of the table creation.
Check table existence: Before performing operations on the table, you can check if the table exists. BigQuery has APIs for checking if a table exists. Here's how you can do it in Java:
boolean doesTableExist(BigQuery bigquery, String datasetName, String tableName) {
TableId tableId = TableId.of(datasetName, tableName);
return bigquery.getTable(tableId) != null;
}
You can then use this function before attempting to insert into the table:
// Wait until the table exists before inserting data.
while (!doesTableExist(bigquery, datasetName, tempTable)) {
Thread.sleep(1000); // Wait for 1 second before checking again.
}
// Now you can insert data into the table.
In addition, , please remember to handle any exceptions that might occur in your code (for example, an InterruptedException
that might be thrown by the Thread.sleep()
method).
Hello @ms4446 ,
I have a question, at point 4 in your response you say the following:
4. Perform the update: To update the existing records, you can use the BigQuery Write API's `appendRows` method. Instead of inserting new rows, you'll be appending the updated records to the existing table.
So, can we really UPDATE a record in BQ by using the 'appendRows' method? How does the API recognize an update from an insert since the method signature is the same.
Thank you!
Sorry for the confusion. You cannot directly update existing rows in a BigQuery table using the appendRows
method. The appendRows
method of the BigQuery Write API is designed primarily to add new rows of data to a table.
To achieve the effect of updating existing records, the common strategy is to:
appendRows
to append these updated rows to the BigQuery table.BigQuery's Handling of Appended Rows:
When you use appendRows
with data that has the same unique identifier(s) as existing rows:
Workarounds to Simulate Updates:
To address this and achieve the desired update behavior, you usually need to employ one of the following techniques:
1. Client-Side Deduplication and Overwrite:
2. Using MERGE
Statement (for larger datasets or frequent updates):
MERGE
statement. This statement allows you to match records based on a condition and conditionally perform updates or inserts.MERGE
query as a BigQuery Script.