Hello,
I am experiencing weird behavior when fetching data from BigQuery : the performance is worse by using Storage Read API than regular Query.
The performance is around :
- 9000 rows per second with the regular BigQuery Query
- 30 to 1500 rows per second with BigQuery Storage Read API
One table in particular:
We use Golang and here is the code (with some omissions):
package yo
func yo() {
readTable := fmt.Sprintf("projects/%s/datasets/%s/tables/%s",
projectId,
datasetId,
tableId,
)
tableReadOptions := &bqStoragepb.ReadSession_TableReadOptions{
SelectedFields: []string{
"jsonPayload.field_1",
"jsonPayload.field_2",
"jsonPayload.field_3",
"jsonPayload.field_4",
"jsonPayload.field_5",
"jsonPayload.field_6,
"jsonPayload.field_7",
"jsonPayload.field_8",
"jsonPayload.field_9",
"jsonPayload.field_10",
"jsonPayload.field_11",
"jsonPayload.field_12",
"jsonPayload.field_13",
"jsonPayload.field_14",
"jsonPayload.field_15",
"jsonPayload.field_16",
"jsonPayload.field_17",
"jsonPayload.field_18",
"jsonPayload.field_19",
"timestamp",
"insertId",
},
// start and end usually cover 1 day
RowRestriction: fmt.Sprintf(`DATE(timestamp) >= "%s" AND DATE(timestamp) <= "%s"`,
start.Format("2006-01-02"),
end.Format("2006-01-02"),
),
}
dataFormat := bqStoragepb.DataFormat_AVRO
createReadSessionRequest := &bqStoragepb.CreateReadSessionRequest{
Parent: fmt.Sprintf("projects/%s", projectId),
ReadSession: &bqStoragepb.ReadSession{
Table: readTable,
DataFormat: dataFormat,
ReadOptions: tableReadOptions,
},
MaxStreamCount: 4,
}
// store the rows
ch := make(chan *bqStoragepb.ReadRowsResponse)
// Use a waitgroup to coordinate the reading and decoding goroutines.
var wg sync.WaitGroup
// We use multiple streams to parallelize the reading
for _, stream := range session.GetStreams() {
readStream := stream.Name
log.Debugln("readStream", readStream)
// Start the reading in one goroutine.
wg.Add(1)
go func() {
defer wg.Done()
if err := processStream(ctx, bqReadClient, readStream, ch); err != nil {
log.Errorf("processStream failure: %v", err)
return
}
}()
}
// kLog.Debug.Println(session.GetAvroSchema().GetSchema())
// Start Avro processing and decoding in another goroutine.
wg.Add(1)
go func() {
defer wg.Done()
var err error
switch format {
case AVRO_FORMAT:
err = processAvro(ctx, session.GetAvroSchema().GetSchema(), ch, events)
}
if err != nil {
log.Errorf("error processing %s: %v", format, err)
return
}
}()
// Wait until both the reading and decoding goroutines complete.
wg.Wait()
close(ch)
processStream() is the implementation of the official documentation, and processAvro() also
Each time processAvro() processes a row it increments a progress bar
The progress bar usually displays numbers between 30 to 1000 iterations per second (== rows per second)
I tried to measure inside both processAvro and processStream and it is about the same... But if i run the same query multiple times, it tends to go faster (to 20k rows per second)
Any idea?
Why the StorageRead is slower than regular query? Should i modify the MaxStreams?
If I find something I will keep you posted 🙂
Thanks,
If you are experiencing slower performance when using the BigQuery Storage Read API compared to the regular BigQuery Query API. While the Storage Read API offers more granular control over data access and typically provides faster performance for large datasets in streaming scenarios, some factors can impact its efficiency.
While the Storage Read API's granularity allows for fine-grained control over data filtering and projection, it may require more precise configuration to achieve optimal performance compared to the regular Query API. Striking a balance between flexibility and performance is crucial, and a deeper understanding of BigQuery's data handling is often beneficial.
Data Format:
The chosen data format significantly impacts performance. Avro's compact binary format and schema support make it efficient for large datasets due to its lower serialization/deserialization overhead and higher compression efficiency. However, formats like CSV or JSON might be preferable for smaller datasets or scenarios where schema flexibility is crucial.
MaxStreamCount:
Adjusting MaxStreamCount
can improve performance by increasing parallelism. However, finding the optimal value requires careful consideration. Start with a moderate number and gradually increase it while monitoring performance and resource utilization. Avoid exceeding your available resources to prevent overhead and contention.
Partition Filter:
Utilizing larger partition filters effectively reduces the amount of data scanned, significantly improving performance, especially for large partitioned tables. This can also help manage costs by reducing processed data volume.
Client Library:
While different client libraries might offer slight performance variations, the underlying API capabilities and limitations remain consistent. Choose the library that best suits your project needs, considering factors like ease of integration, community support, documentation quality, and familiarity. Compare and evaluate different libraries based on these criteria before making a decision.
Troubleshooting Recommendations:
Optimizing BigQuery Storage Read API usage is an iterative process that may require multiple adjustments and tests to achieve the best performance. By adopting a holistic approach that considers both the technical aspects of the API and the specific characteristics of your data and use case, you can maximize its efficiency and achieve optimal results. Remember, experimentation and analysis are key to finding the optimal configuration.
Hi
Thanks for the answer!
> Optimizing BigQuery Storage Read API usage is an iterative process that may require multiple adjustments and tests to achieve the best performance By adopting a holistic approach that considers both the technical aspects of the API and the specific characteristics of your data and use case
That is true, unfortunately there are not many precise elements about the technical aspects and rules of thumbs i can start from.
Do you see anything specific that may apply to the information i provided in the OP?
Best regards
Here are some specific technical aspects and rules of thumb based on the information you provided. These can serve as starting points for optimizing your use of the BigQuery Storage Read API:
Data Format (Avro):
MaxStreamCount (4 Streams):
MaxStreamCount
to 4. This setting controls the level of parallelism. While increasing this number can improve throughput, it's crucial to balance it with your system's ability to process these streams concurrently. If your client system (CPU, memory, network bandwidth) is underutilized, you might benefit from increasing this number. However, if you notice increased latency or resource contention, reducing it might be more effective.Selected Fields in TableReadOptions:
SelectedFields
. Ensure that you're only selecting the fields necessary for your analysis. Reducing the number of fields can decrease the amount of data transferred and processed, potentially improving performance.RowRestriction for Date Filtering:
RowRestriction
is set to filter rows by date. Make sure that the timestamp
field is indexed or that the table is partitioned by this field. Efficient filtering can significantly reduce the amount of data read, which can improve performance.Client-Side Processing:
processAvro()
and processStream()
is crucial. Ensure that these functions are optimized for efficiency. Profiling these functions to identify bottlenecks is essential. For instance, if the processing of each row is computationally intensive, this could be a limiting factor.Caching Effects:
Network Considerations:
Error Handling and Retries:
processStream()
and processAvro()
is efficient and does not inadvertently cause unnecessary retries or delays.Experimentation:
MaxStreamCount
values, compare performance with different data formats, and adjust the SelectedFields
to include only the necessary fields.Hi All,
I had a question on this one: "Make sure that the timestamp field is indexed or that the table is partitioned by this field".
I checked the BigQuery docs and the only field types that can be indexed with a Search Index are string, array, struct, and json. What type of index were you referring to?
Thanks!
G
Yes, you are correct in pointing out that BigQuery's search index capabilities are limited to specific field types like string, array, struct, and JSON, and do not apply to timestamp fields. My reference to indexing in the context of your timestamp
field was imprecise. In BigQuery, the concept of indexing as it is known in traditional databases doesn't apply in the same way. Let me clarify:
Table Partitioning: In BigQuery, instead of indexing, you often use table partitioning to improve query performance. If your table is large and you frequently query based on date or timestamp, partitioning the table by the timestamp
field can significantly improve performance. This is because partitioning limits the amount of data scanned during a query, which can reduce costs and increase speed.
Clustering: Another feature that can improve performance in BigQuery is clustering. If your table is already partitioned, you can further organize it by clustering on one or more columns, such as your timestamp
field. Clustering sorts the data within each partition and can provide more efficient data retrieval for queries that filter on the clustered columns.
Data Skew: Be mindful of data skew in partitioned and clustered tables. If certain partitions or clusters are much larger than others, it can lead to uneven performance.