Hello,
I am experiencing weird behavior when fetching data from BigQuery : the performance is worse by using Storage Read API than regular Query.
The performance is around :
- 9000 rows per second with the regular BigQuery Query
- 30 to 1500 rows per second with BigQuery Storage Read API
One table in particular:
We use Golang and here is the code (with some omissions):
package yo
func yo() {
readTable := fmt.Sprintf("projects/%s/datasets/%s/tables/%s",
projectId,
datasetId,
tableId,
)
tableReadOptions := &bqStoragepb.ReadSession_TableReadOptions{
SelectedFields: []string{
"jsonPayload.field_1",
"jsonPayload.field_2",
"jsonPayload.field_3",
"jsonPayload.field_4",
"jsonPayload.field_5",
"jsonPayload.field_6,
"jsonPayload.field_7",
"jsonPayload.field_8",
"jsonPayload.field_9",
"jsonPayload.field_10",
"jsonPayload.field_11",
"jsonPayload.field_12",
"jsonPayload.field_13",
"jsonPayload.field_14",
"jsonPayload.field_15",
"jsonPayload.field_16",
"jsonPayload.field_17",
"jsonPayload.field_18",
"jsonPayload.field_19",
"timestamp",
"insertId",
},
// start and end usually cover 1 day
RowRestriction: fmt.Sprintf(`DATE(timestamp) >= "%s" AND DATE(timestamp) <= "%s"`,
start.Format("2006-01-02"),
end.Format("2006-01-02"),
),
}
dataFormat := bqStoragepb.DataFormat_AVRO
createReadSessionRequest := &bqStoragepb.CreateReadSessionRequest{
Parent: fmt.Sprintf("projects/%s", projectId),
ReadSession: &bqStoragepb.ReadSession{
Table: readTable,
DataFormat: dataFormat,
ReadOptions: tableReadOptions,
},
MaxStreamCount: 4,
}
// store the rows
ch := make(chan *bqStoragepb.ReadRowsResponse)
// Use a waitgroup to coordinate the reading and decoding goroutines.
var wg sync.WaitGroup
// We use multiple streams to parallelize the reading
for _, stream := range session.GetStreams() {
readStream := stream.Name
log.Debugln("readStream", readStream)
// Start the reading in one goroutine.
wg.Add(1)
go func() {
defer wg.Done()
if err := processStream(ctx, bqReadClient, readStream, ch); err != nil {
log.Errorf("processStream failure: %v", err)
return
}
}()
}
// kLog.Debug.Println(session.GetAvroSchema().GetSchema())
// Start Avro processing and decoding in another goroutine.
wg.Add(1)
go func() {
defer wg.Done()
var err error
switch format {
case AVRO_FORMAT:
err = processAvro(ctx, session.GetAvroSchema().GetSchema(), ch, events)
}
if err != nil {
log.Errorf("error processing %s: %v", format, err)
return
}
}()
// Wait until both the reading and decoding goroutines complete.
wg.Wait()
close(ch)
processStream() is the implementation of the official documentation, and processAvro() also
Each time processAvro() processes a row it increments a progress bar
The progress bar usually displays numbers between 30 to 1000 iterations per second (== rows per second)
I tried to measure inside both processAvro and processStream and it is about the same... But if i run the same query multiple times, it tends to go faster (to 20k rows per second)
Any idea?
Why the StorageRead is slower than regular query? Should i modify the MaxStreams?
If I find something I will keep you posted 🙂
Thanks,