Hi,
we are streaming data to stg table and has duplicate events. Now Using some technique data has to move to main table and it should have single and latest event. Basically our Main table should have CDC changes, Could you suggest how can we achieve this in Bigquery,? Basically Solution should have cost effective and performance.
Thanks
The error message indicates that the service account you are using does not have the required permissions to write to the bucket you are exporting to. Even though the service account has the storage.admin
and storage.objectAdmin
roles, these roles are not sufficient for exporting data from Cloud SQL to Cloud Storage.
Steps to move data from a staging table to a main table with a single and latest event in BigQuery:
Additional tips:
SQL syntax highlighting:
MERGE INTO `main_table` AS T
USING `staging_table` AS S
ON T.event_id = S.event_id
WHEN MATCHED THEN
UPDATE SET
T.event_timestamp = S.event_timestamp,
T.event_data = S.event_data
WHEN NOT MATCHED THEN
INSERT (event_id, event_timestamp, event_data)
VALUES (S.event_id, S.event_timestamp, S.event_data);
Streaming data into BigQuery may introduce a slight delay (a few seconds) before the data is available for querying. Additionally, streaming data may have associated costs.
Thanks for the detailed response. Since there is a delay in getting streamed data, Can I directly insert/update/delete using BigQuery job programmatically using the REST API or client libraries to avoid duplicates to Main table and can eliminate stage table. I can do below operations in Main tables
for Insert : use native insert query and execute job
for update: first select the existing record by unique key and update the record
for delete : select by unique key and execute delete query.
but while doing BigQuery job programmatically using the REST API or client libraries, I got know that it has some major limitations for table, we can perform 1500 dml operations (
// Write a DML query to modify UserSessions table
// To create DML query job to mask the last octet in every row's ip_address column
String dmlQuery =
String.format(
"UPDATE `%s.%s` \n"
+ "SET ip_address = REGEXP_REPLACE(ip_address, r\"(\\.[0-9]+)$\", \".0\")\n"
+ "WHERE TRUE",
datasetName, tableName);
QueryJobConfiguration dmlQueryConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();
// Execute the query.
TableResult result = bigquery.query(dmlQueryConfig);
Yes, you can directly insert, update, and delete data in your main table using BigQuery jobs, eliminating the need for a stage table. However, as you mentioned, there are some limitations to the number of DML operations that can be performed per table in a single job.
The limitation of 1,500 DML operations is not about the number of rows affected, but the number of DML statements executed against a table per day. This means that breaking your updates into batches of 1,500 rows will not help if you are executing each batch as a separate DML statement.
To work around this limitation, you can:
WHERE
clause to filter the rows that you want to update.Example:
The following example shows how to use a single DML statement to update multiple rows:
UPDATE `my_dataset.my_table`
SET ip_address = REGEXP_REPLACE(ip_address, r\"(\\.[0-9]+)$\", \".0\")
WHERE country = 'US';
This statement will update the ip_address
column for all rows in the my_dataset.my_table
table where the country
column is equal to US
.
Another limitation to be aware of is the concurrent jobs quota. BigQuery limits the number of jobs that can be running concurrently per project. If you are executing many jobs in rapid succession, you might hit the concurrent jobs quota.
To avoid hitting the concurrent jobs quota, you can implement some form of rate limiting or check job completion status before submitting the next job.
It is also important to have robust error handling in place, especially when working with batch operations. If one batch fails, you need mechanisms to retry or handle that failure without affecting the entire dataset.
Each DML operation has associated costs. It is important to be aware of the costs when executing multiple DML operations, especially if they are affecting large datasets.
Directly inserting, updating, and deleting data in your main table using BigQuery jobs can be a good way to eliminate the need for a stage table. However, it is important to be aware of the limitations and costs associated with this approach.
Thanks, could you share me some java example/code snippet for select and update using Streaming inserts for update?
Here is a Java example/code snippet for select and update using Streaming inserts for update:
import com.google.cloud.bigquery.*;
import java.util.Map;
import java.util.HashMap;
public class StreamingInsertsForUpdate {
public static void main(String[] args) throws InterruptedException, BigQueryException {
// Create a BigQuery client library object.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Prepare to stream new versions of the records.
TableId tableId = TableId.of("my_dataset", "my_table");
// Select the data to update.
String selectQuery = "SELECT * FROM `my_dataset.my_table` WHERE country = 'US'";
QueryJobConfiguration selectQueryConfig = QueryJobConfiguration.newBuilder(selectQuery).build();
TableResult selectResult = bigquery.query(selectQueryConfig);
for (FieldValueList row : selectResult.iterateAll()) {
// Create a new version of the record with updated ip_address.
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("id", row.get("id").getStringValue());
rowContent.put("ip_address", "192.168.1.1"); // Or any other logic to update ip_address
// Stream the new version of the record.
InsertAllRequest insertRequest = InsertAllRequest.newBuilder(tableId)
.addRow(rowContent)
.build();
InsertAllResponse insertAllResponse = bigquery.insertAll(insertRequest);
// Check for errors.
if (insertAllResponse.hasErrors()) {
System.err.println("Error streaming data: " + insertAllResponse.getInsertErrors());
// TODO: Add more robust error handling, logging, and possibly retries for transient errors.
}
}
}
}
I verified this , rather than updating existing records, Its is inserting another set of records. my expectation is it has to update the existing records.
I apologize for the oversight. You're right; the code provided uses streaming inserts to add new rows to the table, rather than updating existing ones. Streaming inserts in BigQuery are designed for appending new rows to a table in real-time, and they cannot be used to directly update or delete existing rows.
To update existing rows, you can use a BigQuery job to execute a DML statement such as UPDATE
.
Here's an example of how to use a BigQuery job to update the ip_address
column for all rows in the my_dataset.my_table
table where the country
column is equal to 'US':
// Create a BigQuery client library object.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Write a DML query to modify the table.
String dmlQuery =
String.format(
"UPDATE `%s.%s` \n"
+ "SET ip_address = '192.168.1.1' \n"
+ "WHERE country = 'US'",
datasetName, tableName);
// Create a QueryJobConfiguration object.
QueryJobConfiguration dmlQueryConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();
// Execute the query.
TableResult result = bigquery.query(dmlQueryConfig);
his code will update the ip_address
column for all existing rows in the my_dataset.my_table
table where the country
column is equal to 'US'.
If you need to "update" existing rows using streaming inserts, the typical approach is:
This approach allows for near-real-time data ingestion with streaming, but the deduplication and merging process is not real-time and depends on the frequency of the scheduled query or batch process.