Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Bigquery data movement from stg table to main table

Hi,

we are streaming data to stg table and has duplicate events. Now Using some technique data has to move to main table and it should have single  and latest event. Basically our Main table should have CDC changes, Could you suggest how can we achieve this in Bigquery,? Basically Solution should have cost effective and performance.

Thanks

0 7 1,360
7 REPLIES 7

The error message indicates that the service account you are using does not have the required permissions to write to the bucket you are exporting to. Even though the service account has the storage.admin and storage.objectAdmin roles, these roles are not sufficient for exporting data from Cloud SQL to Cloud Storage.

Steps to move data from a staging table to a main table with a single and latest event in BigQuery:

  1. Create a partitioned table for the main table. This will allow you to efficiently query the latest data.
  2. Use a merge statement to insert or update the data in the main table. The merge statement will compare the data in the staging table to the data in the main table and only insert or update the data if it is new or has changed.
  3. Partition the merge statement by the event timestamp. This will ensure that only the latest event for each partition is inserted or updated in the main table.

Additional tips:

  • Use a streaming data ingestion pipeline to load the data into the staging table. This will ensure that the data is available for processing as soon as possible.
  • Use a data warehouse partition strategy that is appropriate for your data and workload. For example, you may want to partition the data by day, week, or month.
  • Use a data warehouse clustering strategy to improve the performance of your queries. For example, you may want to cluster the data by event timestamp.
  • Monitor the performance of your merge statement and adjust the schedule as needed.
  • Test the entire process in a non-production environment before implementing it in production to ensure data integrity and performance.

SQL syntax highlighting:

 
MERGE INTO `main_table` AS T
USING `staging_table` AS S
ON T.event_id = S.event_id
WHEN MATCHED THEN
  UPDATE SET
    T.event_timestamp = S.event_timestamp,
    T.event_data = S.event_data
WHEN NOT MATCHED THEN
  INSERT (event_id, event_timestamp, event_data)
  VALUES (S.event_id, S.event_timestamp, S.event_data);

Streaming data into BigQuery may introduce a slight delay (a few seconds) before the data is available for querying. Additionally, streaming data may have associated costs.

Thanks for the detailed response. Since there is a delay in getting streamed data, Can I directly insert/update/delete using BigQuery job programmatically using the REST API or client libraries to avoid duplicates to Main table and can eliminate stage table. I can do below operations in Main tables

for Insert : use native insert query and execute job

for update: first select the existing record by unique key and update the record

for delete : select by unique key and execute delete query.

but while doing BigQuery job programmatically using the REST API or client libraries, I got know that it has some major limitations for table, we can perform 1500 dml operations (

 
Like below code snippet for update execute

 

 

 

 // Write a DML query to modify UserSessions table
      // To create DML query job to mask the last octet in every row's ip_address column
      String dmlQuery =
          String.format(
              "UPDATE `%s.%s` \n"
                  + "SET ip_address = REGEXP_REPLACE(ip_address, r\"(\\.[0-9]+)$\", \".0\")\n"
                  + "WHERE TRUE",
              datasetName, tableName);

      QueryJobConfiguration dmlQueryConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();

      // Execute the query.
      TableResult result = bigquery.query(dmlQueryConfig);
​

 

 

 

 

Yes, you can directly insert, update, and delete data in your main table using BigQuery jobs, eliminating the need for a stage table. However, as you mentioned, there are some limitations to the number of DML operations that can be performed per table in a single job.

The limitation of 1,500 DML operations is not about the number of rows affected, but the number of DML statements executed against a table per day. This means that breaking your updates into batches of 1,500 rows will not help if you are executing each batch as a separate DML statement.

To work around this limitation, you can:

  • Use a single DML statement to update multiple rows. For example, you could use a WHERE clause to filter the rows that you want to update.
  • Split your DML operations into multiple jobs. For example, you could split your updates into batches of 1,500 rows and submit a separate job for each batch.
  • Use streaming inserts for updates. Streaming inserts allow you to update data in BigQuery in real time, without having to wait for a job to complete. However, streaming inserts are not currently supported for deletes.

Example:

The following example shows how to use a single DML statement to update multiple rows:

UPDATE `my_dataset.my_table`
SET ip_address = REGEXP_REPLACE(ip_address, r\"(\\.[0-9]+)$\", \".0\")
WHERE country = 'US';

This statement will update the ip_address column for all rows in the my_dataset.my_table table where the country column is equal to US.

Another limitation to be aware of is the concurrent jobs quota. BigQuery limits the number of jobs that can be running concurrently per project. If you are executing many jobs in rapid succession, you might hit the concurrent jobs quota.

To avoid hitting the concurrent jobs quota, you can implement some form of rate limiting or check job completion status before submitting the next job.

It is also important to have robust error handling in place, especially when working with batch operations. If one batch fails, you need mechanisms to retry or handle that failure without affecting the entire dataset.

Each DML operation has associated costs. It is important to be aware of the costs when executing multiple DML operations, especially if they are affecting large datasets.

Directly inserting, updating, and deleting data in your main table using BigQuery jobs can be a good way to eliminate the need for a stage table. However, it is important to be aware of the limitations and costs associated with this approach.

Thanks, could you share me some java example/code snippet for select and update using Streaming inserts for update?

Here is a Java example/code snippet for select and update using Streaming inserts for update:

 
import com.google.cloud.bigquery.*;
import java.util.Map;
import java.util.HashMap;

public class StreamingInsertsForUpdate {

    public static void main(String[] args) throws InterruptedException, BigQueryException {
        // Create a BigQuery client library object.
        BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

        // Prepare to stream new versions of the records.
        TableId tableId = TableId.of("my_dataset", "my_table");

        // Select the data to update.
        String selectQuery = "SELECT * FROM `my_dataset.my_table` WHERE country = 'US'";
        QueryJobConfiguration selectQueryConfig = QueryJobConfiguration.newBuilder(selectQuery).build();
        TableResult selectResult = bigquery.query(selectQueryConfig);

        for (FieldValueList row : selectResult.iterateAll()) {
            // Create a new version of the record with updated ip_address.
            Map<String, Object> rowContent = new HashMap<>();
            rowContent.put("id", row.get("id").getStringValue());
            rowContent.put("ip_address", "192.168.1.1"); // Or any other logic to update ip_address

            // Stream the new version of the record.
            InsertAllRequest insertRequest = InsertAllRequest.newBuilder(tableId)
                .addRow(rowContent)
                .build();

            InsertAllResponse insertAllResponse = bigquery.insertAll(insertRequest);

            // Check for errors.
            if (insertAllResponse.hasErrors()) {
                System.err.println("Error streaming data: " + insertAllResponse.getInsertErrors());
                // TODO: Add more robust error handling, logging, and possibly retries for transient errors.
            }
        }
    }
} 

 

I verified this , rather than updating existing records, Its is inserting another set of records. my expectation is it has to update the existing records.

I apologize for the oversight. You're right; the code provided uses streaming inserts to add new rows to the table, rather than updating existing ones. Streaming inserts in BigQuery are designed for appending new rows to a table in real-time, and they cannot be used to directly update or delete existing rows. 

To update existing rows, you can use a BigQuery job to execute a DML statement such as UPDATE.

Here's an example of how to use a BigQuery job to update the ip_address column for all rows in the my_dataset.my_table table where the country column is equal to 'US':

// Create a BigQuery client library object.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

// Write a DML query to modify the table.
String dmlQuery =
String.format(
"UPDATE `%s.%s` \n"
+ "SET ip_address = '192.168.1.1' \n"
+ "WHERE country = 'US'",
datasetName, tableName);

// Create a QueryJobConfiguration object.
QueryJobConfiguration dmlQueryConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();

// Execute the query.
TableResult result = bigquery.query(dmlQueryConfig);

his code will update the ip_address column for all existing rows in the my_dataset.my_table table where the country column is equal to 'US'.

If you need to "update" existing rows using streaming inserts, the typical approach is:

  1. Stream the new version of the record to BigQuery.
  2. Schedule a query or batch process to run periodically.
  3. This query or batch process should merge the streamed data with the existing data in the table.
  4. Deduplicate the data to ensure that only the latest version of each record is retained.

This approach allows for near-real-time data ingestion with streaming, but the deduplication and merging process is not real-time and depends on the frequency of the scheduled query or batch process.