Hi,
I am trying to build dataflow pipeline and requirement is to read event from pubsub then extract table name from pubsub message and then read data from bigquery with same table.
Each pubsub message will have different table name to read from BQ. I have write some sample code but issue in passing dynamic created query to Bigquery IO connector :
Solved! Go to Solution.
Addressing your questions one by one.
Reading from BigQuery with Dynamic Table Names:
Apache Beam doesn't allow nested transformations. Hence, you can't apply a BigQueryIO.readTableRows()
inside a ParDo
. However, if you're okay with starting separate jobs for each table, a solution could be to launch separate Dataflow pipelines for each table name. This would involve considerably more overhead and might not be ideal. This approach can be thought of as a "meta-pipeline", where the main pipeline's job is to launch other pipelines.
Using Unbounded PCollections with BigQueryIO:
For the BigQueryIO.readTableRows()
, the Apache Beam documentation does indicate that you cannot use an unbounded PCollection. The read from BigQuery is typically designed for bounded data sets. However, since you're reading an unbounded source from Pub/Sub, the design becomes challenging.
In cases where you have unbounded sources and you want to execute operations that typically expect bounded sources, you may need to introduce a windowing strategy. For example, you can batch the table names into fixed windows and then process each window.
Using BigQuery Storage API vs BigQueryIO:
The solution I provided earlier is hinting towards directly interfacing with the BigQuery client, not necessarily just the BigQuery Storage API. If you want to stick with Beam's connectors, then this direct interfacing wouldn't be appropriate. However, in scenarios where the Beam's IO connectors don't provide the required functionality, sometimes you have to go with direct API calls.
Considering your requirements, the challenge here is dealing with an unbounded source and wanting to dynamically read from BigQuery.
Windowing: You could use a windowing strategy to group your unbounded Pub/Sub messages into bounded windows. This way, each window can be processed as a separate bounded input.
PCollection<String> windowedTableNames = tableNames.apply( Window.into(FixedWindows.of(Duration.standardMinutes(5))));
Side Inputs: Once windowed, you can turn the PCollection
of table names into a PCollectionView
, which can be used as a side input for another transform. You'd still have to use the BigQuery client directly within a ParDo
, but you'd be able to access the list of table names from the side input.
BigQuery Client in ParDo: While it's not ideal, given Beam's constraints, you can use the BigQuery client directly in a ParDo
to run the required query and return the results.
If dynamically starting separate pipelines for each table isn't feasible due to overhead, then the approach mentioned above (using windowing and side inputs) might be your best bet. However, you'd be mixing direct BigQuery client calls with Apache Beam, which might not be as clean as using Beam's IO connectors but could solve the issue.