I am streaming data from pub/sub to bigquery using dataflow templates via this code [1]. Data is typically available after 40 seconds in preview. Unfortunately most of the times querying can be done after 90 minutes (as in documentation). I've tried also STREAMING_INSERTS and STORAGE_WRITE_API method with same latency.
But when using the bigquery api directly [2], data stays in the streaming buffer for 3-4 minutes and then it is available for querying. Rows even go in the buffer and become available before the rows from case [1].
This makes me think that the default behaviour of the storage write api is what I need to make the streaming faster. There should be some setting to be done in apache beam in order to make it behave the same way as direct usage of storage write api.
A quick fix I've found is to add "OR _PARTITIONTIME IS NULL" to query, which adds to results also the rows from the streaming buffer. But it still does not help when you need the exact partition time (e.g. in a materialized view for which you need to know the update time, which is being updated only when main table is updated).
Any ideas what could be tweaked in this pipeline [1] or other dataflow setting to make it work? Probably it is the Type.PENDING which is passed to the writeclient, but it can not be added via Apache Beam library in dataflow.
[1]
WriteResult writeResult = convertedTableRows .get(TRANSFORM_OUT) .apply( "WriteSuccessfulRecords", BigQueryIO.writeTableRows() .withoutValidation() .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE) .withNumStorageWriteApiStreams(0) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) .to( input -> getTableDestination( input, tableNameAttr, datasetNameAttr, outputTableProject)));
[2]
com.google.cloud.bigquery.storage.v1.BigQueryWriteClient + Type.PENDING
Solved! Go to Solution.
The latency you're experiencing with the Dataflow approach is challenge. Here are a few considerations and potential solutions:
Data Ingestion and Latency:
BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE
with WriteDisposition.WRITE_APPEND
. This method optimizes the write process, ensuring data is appended promptly upon arrival, which can help reduce latency.Partition Time Visibility:
_PARTITIONTIME
visibility is a common challenge with streaming data.BigQueryIO.Write.Method.STREAMING_INSERTS
with IgnoreUnknownValues
can help in some scenarios, consider implementing a ParDo
transform in Dataflow to manage and store partition times more effectively. This approach can provide better control over data, especially for updates in materialized views.Write Client Configuration:
Type.PENDING
as in the BigQuery Write API is not available in Dataflow's BigQueryIO
.BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE
and experiment with the numStorageWriteApiStreams
setting. While setting it to 0 allows the service to choose the number of streams, fine-tuning this number might offer better latency optimization. Additionally, explore BigQueryIO.Write.WriteOption
configurations like IGNORE_PARTITION_ERRORS
and IGNORE_UNKNOWN_VALUES
for tailored data handling.Additional Strategies:
The latency you're experiencing with the Dataflow approach is challenge. Here are a few considerations and potential solutions:
Data Ingestion and Latency:
BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE
with WriteDisposition.WRITE_APPEND
. This method optimizes the write process, ensuring data is appended promptly upon arrival, which can help reduce latency.Partition Time Visibility:
_PARTITIONTIME
visibility is a common challenge with streaming data.BigQueryIO.Write.Method.STREAMING_INSERTS
with IgnoreUnknownValues
can help in some scenarios, consider implementing a ParDo
transform in Dataflow to manage and store partition times more effectively. This approach can provide better control over data, especially for updates in materialized views.Write Client Configuration:
Type.PENDING
as in the BigQuery Write API is not available in Dataflow's BigQueryIO
.BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE
and experiment with the numStorageWriteApiStreams
setting. While setting it to 0 allows the service to choose the number of streams, fine-tuning this number might offer better latency optimization. Additionally, explore BigQueryIO.Write.WriteOption
configurations like IGNORE_PARTITION_ERRORS
and IGNORE_UNKNOWN_VALUES
for tailored data handling.Additional Strategies:
Hello and thank you for the detailed response.
I am still experimenting with the pipelines without any luck.
Here is what I've tried:
- 1,
- 2 with STREAMING_INSERTS + IgnoreUnknownValues,
- 3 (except the WriteOption configuration - it seems I am using 2.49.0 sdk version);
All results are similar and impact only the time which it takes the data to go into the streaming buffer (whether it is 10 or 40 seconds in my opinion). After that, each entry lives in that buffer exactly 90 minutes. No matter if it is 1, 10, 1000, etc... rows.
I think that there is something strange with the logic that exports data from streaming buffer to the table. It looks to me that there is a cron job that gets only the entries that have been there more than 90 minutes and that's all. No option for flushing or whatever.
What I still need to try is the "implementing a ParDo transform". Will investigate this option further.
The consistent 90-minute latency you're observing, regardless of the method used, is indeed unusual and suggests that the issue might be related to how BigQuery processes and makes the streamed data available for querying, rather than the Dataflow pipeline itself.
Here are a few additional considerations and steps you might explore:
BigQuery's Streaming Buffer Behavior: The behavior of BigQuery's streaming buffer is not fully transparent, and it's possible that there are internal mechanisms or thresholds that determine when data moves from the streaming buffer to a queryable state. This might explain the consistent 90-minute latency you're observing.
BigQuery Support and Documentation: Since this behavior is consistent and not clearly documented, reaching out to Google Cloud's support team or consulting the latest BigQuery documentation might provide more insights. There could be recent changes or known issues that are affecting the streaming buffer's behavior.
Implementing a ParDo
Transform: While exploring the ParDo
transform in your pipeline, consider using it to add additional logging or monitoring to your data as it's processed. This might help you identify if there are specific patterns or characteristics of the data that influence how it's handled by BigQuery.
Reviewing Pipeline Metrics: If you haven't already, review the detailed metrics and logs of your Dataflow pipeline to see if there are any clues about the data processing and writing behavior. Sometimes, subtle issues in the pipeline can affect how data is ingested into BigQuery.
Experimenting with Different Data Volumes: You mentioned trying different row counts, but it might also be worth experimenting with different data sizes or types to see if that influences the latency. Sometimes, the nature of the data itself can impact how it's processed by BigQuery.
Exploring Alternative Approaches: If the latency remains a critical issue and you have some flexibility in your architecture, you might consider alternative approaches for real-time data processing and querying, such as using a different database or data store that offers lower latency for your specific use case.
Hi,
did you manage to find the root-cause of the issue? We have very similar problem that data can take up to 90 minutes to leave streaming buffer, and we use _PARTITIONTIME to partition the tables.
I wonder which of the bullet points solved your problem.