Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

execute dynamic query to read bigquery table using bigqueryio connector in dataflow

Hi,
I am trying to build dataflow pipeline and requirement is to read event from pubsub then extract table name from pubsub message and then read data from bigquery with same table.

Each pubsub message will have different table name to read from BQ. I have write some sample code but issue in passing dynamic created query to Bigquery IO connector : 

// Read messages from Pub/Sub
    PCollection<String> messages = pipeline.apply("Read from Pubsub Topic "+ configOption.getPubsubSubscriptions() , PubsubIO.readStrings()
            .fromSubscription("projects/xyz/subscriptions/topic-sub"));
    
    PCollectionTuple events =  messages.apply("Validate Json Message ", 
    ParDo.of(new GenerateEventFromPubsubMsg( validEvents, InvalidEvents  )).withOutputTags(validEvents, TupleTagList.of(InvalidEvents)));
    
    PCollection<PubsubEvent> validEventPCollection = events.get(validEvents);
    PCollection<KV<String, PubsubEvent>> invalidEventPCollection = events.get(InvalidEvents);
    
    PCollection<String> tableToReadBQ = validEventPCollection.apply("extract table name", ParDo.of(new DoFn<PubsubEvent, String>(){
    @ProcessElement
    public void processEvent(ProcessContext context) throws RuntimeException {
    PubsubEvent event = context.element();
    String bqProjectId = event.getBqProjectId();
    String bqDataSet = event.getBqDataSet();
    String bqTable = event.getBqTable();
    String tableName = bqProjectId + ":"+ bqDataSet +"."+ bqTable;
    context.output(tableName);
    }
    }) );
    
    // with below Code I was trying to extract but looks invalid  
    PCollection<TableRow> tableRows = tableToReadBQ
            .apply("ReadFromBigQuery", ParDo.of(new DoFn<String, TableRow>(){
            @ProcessElement
                public void processElement(ProcessContext c) {
                    String tableName = c.element();
                    String query = "SELECT * FROM `" + tableName + "`";
 
                    // Read data from BigQuery using the dynamically constructed query
                    PCollection<TableRow> tableRows = c.element().apply(BigQueryIO.readTableRows()
                            .fromQuery(query)
                            .usingStandardSql());
 
                    // Output the result to the downstream PCollection
                    c.output(tableRows);
                }
   }));
 
I have tried other ways as well but not getting PCollection<TableRow> to process data further. Can someone please suggest optimized and correct way ?
Solved Solved
1 8 4,414
1 ACCEPTED SOLUTION

Addressing your questions one by one.

  1. Reading from BigQuery with Dynamic Table Names:

    Apache Beam doesn't allow nested transformations. Hence, you can't apply a BigQueryIO.readTableRows() inside a ParDo. However, if you're okay with starting separate jobs for each table, a solution could be to launch separate Dataflow pipelines for each table name. This would involve considerably more overhead and might not be ideal. This approach can be thought of as a "meta-pipeline", where the main pipeline's job is to launch other pipelines.

  2. Using Unbounded PCollections with BigQueryIO:

    For the BigQueryIO.readTableRows(), the Apache Beam documentation does indicate that you cannot use an unbounded PCollection. The read from BigQuery is typically designed for bounded data sets. However, since you're reading an unbounded source from Pub/Sub, the design becomes challenging.

    In cases where you have unbounded sources and you want to execute operations that typically expect bounded sources, you may need to introduce a windowing strategy. For example, you can batch the table names into fixed windows and then process each window.

  3. Using BigQuery Storage API vs BigQueryIO:

    The solution I provided earlier is hinting towards directly interfacing with the BigQuery client, not necessarily just the BigQuery Storage API. If you want to stick with Beam's connectors, then this direct interfacing wouldn't be appropriate. However, in scenarios where the Beam's IO connectors don't provide the required functionality, sometimes you have to go with direct API calls.

Proposed Solution:

Considering your requirements, the challenge here is dealing with an unbounded source and wanting to dynamically read from BigQuery.

  1. Windowing: You could use a windowing strategy to group your unbounded Pub/Sub messages into bounded windows. This way, each window can be processed as a separate bounded input.

     
    PCollection<String> windowedTableNames = tableNames.apply( Window.into(FixedWindows.of(Duration.standardMinutes(5))));
  2. Side Inputs: Once windowed, you can turn the PCollection of table names into a PCollectionView, which can be used as a side input for another transform. You'd still have to use the BigQuery client directly within a ParDo, but you'd be able to access the list of table names from the side input.

  3. BigQuery Client in ParDo: While it's not ideal, given Beam's constraints, you can use the BigQuery client directly in a ParDo to run the required query and return the results.

If dynamically starting separate pipelines for each table isn't feasible due to overhead, then the approach mentioned above (using windowing and side inputs) might be your best bet. However, you'd be mixing direct BigQuery client calls with Apache Beam, which might not be as clean as using Beam's IO connectors but could solve the issue.

View solution in original post

8 REPLIES 8