My design is to have a script run daily to stream records into BigQuery tables via Write API. However, I need to UPSERT given that there are future updates in the previous records I've already inserted. This is where I am stuck right now since Write API only does append rows. How do I solve this?
Solved! Go to Solution.
The approach to handling stale records in your UPSERT process depends on your specific use case and requirements. Here are two common approaches:
Update the stale records: If you want to keep the stale records in your target table and update them with the new values, you can include an UPDATE operation in your UPSERT process. By comparing the existing records with the new records, you can identify the stale records that need to be updated. Use the UPDATE statement to modify the existing records with the new values.
Delete the stale records: If you no longer need the stale records in your target table and want to remove them, you can include a DELETE operation in your UPSERT process. After comparing the existing records with the new records, identify the stale records that are no longer present in the new records. Use the DELETE statement to remove those stale records from the target table.
The choice between updating or deleting stale records depends on the nature of your data and the requirements of your application.
The MERGE statement provides a more concise and efficient way to perform UPSERT
With the MERGE statement, you can specify the conditions for matching rows and define the actions to perform when a match is found or not found. Here's an example of how you can use the MERGE statement for UPSERT:
MERGE merge_example.table_data T USING merge_example.table_changes S ON T.id = S.id WHEN MATCHED THEN UPDATE SET value = s.value WHEN NOT MATCHED THEN INSERT (id, value) VALUES(id, value)
You might want to create a temporary table to store new daily records and then use a MERGE statement to update the main table accordingly. You can achieve this by following these steps:
You have already provided the code snippet for creating a temporary table with an expiration of 1 hour. Make sure to execute the query with the BigQuery Java client library.
Insert new daily records into the temporary table. You can do this by executing an INSERT statement or using the BigQuery Java client library to load data from a file or a list of records.
Here's a sample query to create the "change" table:
String changeTable = String.format("%s.Change_%s", datasetName, tableName);
String query = String.format("CREATE TABLE `%s`\n" +
"LIKE `%s`\n" +
"OPTIONS(\n" +
" expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)\n" +
")", changeTable, destTable);
Execute the query with the BigQuery Java client library.
This step depends on your specific use case. You'll need to execute an appropriate INSERT or SELECT statement to populate the "change" table with the required data.
Here's a sample MERGE statement:
String mergeQuery = String.format("MERGE `%s` AS target\n" +
"USING `%s` AS source\n" +
"ON target.id = source.id\n" +
"WHEN MATCHED THEN\n" +
" UPDATE SET target.column1 = source.column1, target.column2 = source.column2\n" +
"WHEN NOT MATCHED THEN\n" +
" INSERT (id, column1, column2) VALUES (source.id, source.column1, source.column2)", destTable, changeTable);
Replace id
, column1
, and column2
with the appropriate columns from your tables. Execute the query with the BigQuery Java client library.