Hello Community,
I am building a validation process for checking whether a .csv file was successfully ingested into a BQ table.
I have a BQ table: ingestion_status(table_name, status, ingestion_date) that maintains the ingestion status of the incoming .csv files daily.
This is the flow:
1. A .csv file is added to a GCS bucket.
2. A Java cloud function is triggered, inserts the .csv file into a corresponding BQ table.
3. A validation is performed to validate the ingestion process. The result of this validation is put to the same BQ table: ingestion_status.
This is the Java Code:
public TableResult updateCsvFileIngestionStatus(String tableName, Boolean status) {
String query = buildUpdateStatusQuery(tableName, status);
try {
int randomNumber = getRandomNumber();
logger.info("File: {} has waited: {} seconds to update status", tableName, randomNumber);
return bigQueryProcessor.executeQuery(query);
} catch (InterruptedException e) {
logger.error(BQ002.name(), e, INTERRUPTED_EXCEPTION, query);
} catch (BigQueryException e) {
logger.error(BQ001.name(), e, BIG_QUERY_EXCEPTION, query);
}
return null;
}
Please note that every day I get over 30 .csv files trat trigger 30 cloud functions that simultaneously try to update the same ingestion_status BQ table.
In this scenario I got the following BQ exception:
Query error: Could not serialize access to table due to concurrent update
Please note that 30 is greater than the 2 concurrent updates and the queue size limit of 20 for concurrent DML UPDATES.
I changed the code to this:
public TableResult updateCsvFileIngestionStatus(String tableName, Boolean status) {
String query = buildUpdateStatusQuery(tableName, status);
try {
int randomNumber = getRandomNumber();
Thread.sleep(MILLIS_MULTIPLICATION_FACTOR * randomNumber);
logger.info("File: {} has waited: {} seconds to update status", tableName, randomNumber);
return bigQueryProcessor.executeQuery(query);
} catch (InterruptedException e) {
logger.error(BQ002.name(), e, INTERRUPTED_EXCEPTION, query);
} catch (BigQueryException e) {
logger.error(BQ001.name(), e, BIG_QUERY_EXCEPTION, query);
}
return null;
}
The wait time is a random number between 0 and 600 seconds.
After this change I fixed the previous BQ error BUT I get these types of Big Query errors:
com.google.cloud.bigquery.BigQueryException: Unexpected end of file from server
com.google.cloud.bigquery.BigQueryException: Error writing request body to server
com.google.cloud.bigquery.BigQueryException: Broken pipe
I also tried to replace UPDATES with INSERTS but to no success.
From documentation I got this:
Queued mutating DML statements per table | 20 statements | A table can have up to 20 mutating DML statements in the queue waiting to run. If you submit additional mutating DML statements for the table, then those statements fail. |
and this:
Maximum rate of DML statements for each table | 25 statements every 10 seconds | Your project can run up to 25 DML statements every 10 seconds for each table. Both INSERT and mutating DML statements contribute to this limit. |
I really need this validation to work since I am planning to further use the BQ ingestion_status table in other Dataform assertion scripts that will prevent sequential Dataform workflows (MERGE) to get executed in case of a failed .csv file ingestion.
Do you have any idea how could I concurrent insert/update records of a single BQ table from concurrently running Cloud Functions?
Thank you,
Best regards,
Valentin
Solved! Go to Solution.
Hi @valentiniacov ,
Thank you for the comprehensive explanation of the challenges you're facing with concurrent updates to your BigQuery ingestion_status
table. Ensuring efficient and reliable updates is crucial.
Here is how you can build on the you strategies suggested:
1. Distributed Queue with Google Cloud Pub/Sub:
Pub/Sub is a powerful tool for managing concurrency. Here's how to enhance the subscriber logic:
Batching and Error Handling:
MERGE
operation. This significantly reduces BigQuery interactions and improves efficiency.Subscriber Code (with Batching and Error Handling):
public void processPubSubMessages(List<PubsubMessage> messages) {
List<Map<String, Object>> rows = new ArrayList<>();
for (PubsubMessage message : messages) {
try {
// Extract data from message (e.g., using JSON parsing)
Map<String, Object> row = ...;
rows.add(row);
} catch (Exception e) {
// Handle message parsing errors (log, send to dead-letter topic, etc.)
logger.error("Error parsing message: {}", e.getMessage());
// ... (consider sending message to dead-letter topic)
}
}
if (!rows.isEmpty()) {
try {
String query = buildBatchMergeQuery(rows);
bigQueryProcessor.executeQuery(query);
} catch (BigQueryException e) {
// Handle BigQuery errors (log, retry with backoff, etc.)
logger.error("BigQuery error: {}", e.getMessage());
// ... (consider retrying with exponential backoff)
}
}
}
2. Exponential Backoff and Retry Logic:
The refined retryBigQueryOperation
function with jitter is a solid foundation. Consider these enhancements:
3. Batch Updates via Scheduled Jobs:
If real-time updates aren't critical, leverage a staging table and scheduled batch updates. A Cloud Scheduler job can trigger a Cloud Function or Cloud Run service to perform the update, streamlining the process and potentially reducing costs.
4. Partitioned and Clustered Tables:
Beyond partitioning by ingestion_date
and clustering by table_name
and status
, explore partitioning by other relevant fields like file_name
or source_system
. This further enhances query performance and data organization.
5. Monitoring and Alerting:
As emphasized, comprehensive monitoring is key. Use Cloud Monitoring to track vital metrics such as Pub/Sub message latency, BigQuery query performance, and error rates. Configure alerts to promptly notify you of anomalies or critical issues, enabling proactive intervention.
Additional Considerations:
Hi @valentiniacov ,
Thank you for the comprehensive explanation of the challenges you're facing with concurrent updates to your BigQuery ingestion_status
table. Ensuring efficient and reliable updates is crucial.
Here is how you can build on the you strategies suggested:
1. Distributed Queue with Google Cloud Pub/Sub:
Pub/Sub is a powerful tool for managing concurrency. Here's how to enhance the subscriber logic:
Batching and Error Handling:
MERGE
operation. This significantly reduces BigQuery interactions and improves efficiency.Subscriber Code (with Batching and Error Handling):
public void processPubSubMessages(List<PubsubMessage> messages) {
List<Map<String, Object>> rows = new ArrayList<>();
for (PubsubMessage message : messages) {
try {
// Extract data from message (e.g., using JSON parsing)
Map<String, Object> row = ...;
rows.add(row);
} catch (Exception e) {
// Handle message parsing errors (log, send to dead-letter topic, etc.)
logger.error("Error parsing message: {}", e.getMessage());
// ... (consider sending message to dead-letter topic)
}
}
if (!rows.isEmpty()) {
try {
String query = buildBatchMergeQuery(rows);
bigQueryProcessor.executeQuery(query);
} catch (BigQueryException e) {
// Handle BigQuery errors (log, retry with backoff, etc.)
logger.error("BigQuery error: {}", e.getMessage());
// ... (consider retrying with exponential backoff)
}
}
}
2. Exponential Backoff and Retry Logic:
The refined retryBigQueryOperation
function with jitter is a solid foundation. Consider these enhancements:
3. Batch Updates via Scheduled Jobs:
If real-time updates aren't critical, leverage a staging table and scheduled batch updates. A Cloud Scheduler job can trigger a Cloud Function or Cloud Run service to perform the update, streamlining the process and potentially reducing costs.
4. Partitioned and Clustered Tables:
Beyond partitioning by ingestion_date
and clustering by table_name
and status
, explore partitioning by other relevant fields like file_name
or source_system
. This further enhances query performance and data organization.
5. Monitoring and Alerting:
As emphasized, comprehensive monitoring is key. Use Cloud Monitoring to track vital metrics such as Pub/Sub message latency, BigQuery query performance, and error rates. Configure alerts to promptly notify you of anomalies or critical issues, enabling proactive intervention.
Additional Considerations:
Thank you for the answer.
User | Count |
---|---|
4 | |
4 | |
2 | |
1 | |
1 |