Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

execute dynamic query to read bigquery table using bigqueryio connector in dataflow

Hi,
I am trying to build dataflow pipeline and requirement is to read event from pubsub then extract table name from pubsub message and then read data from bigquery with same table.

Each pubsub message will have different table name to read from BQ. I have write some sample code but issue in passing dynamic created query to Bigquery IO connector : 

// Read messages from Pub/Sub
    PCollection<String> messages = pipeline.apply("Read from Pubsub Topic "+ configOption.getPubsubSubscriptions() , PubsubIO.readStrings()
            .fromSubscription("projects/xyz/subscriptions/topic-sub"));
    
    PCollectionTuple events =  messages.apply("Validate Json Message ", 
    ParDo.of(new GenerateEventFromPubsubMsg( validEvents, InvalidEvents  )).withOutputTags(validEvents, TupleTagList.of(InvalidEvents)));
    
    PCollection<PubsubEvent> validEventPCollection = events.get(validEvents);
    PCollection<KV<String, PubsubEvent>> invalidEventPCollection = events.get(InvalidEvents);
    
    PCollection<String> tableToReadBQ = validEventPCollection.apply("extract table name", ParDo.of(new DoFn<PubsubEvent, String>(){
    @ProcessElement
    public void processEvent(ProcessContext context) throws RuntimeException {
    PubsubEvent event = context.element();
    String bqProjectId = event.getBqProjectId();
    String bqDataSet = event.getBqDataSet();
    String bqTable = event.getBqTable();
    String tableName = bqProjectId + ":"+ bqDataSet +"."+ bqTable;
    context.output(tableName);
    }
    }) );
    
    // with below Code I was trying to extract but looks invalid  
    PCollection<TableRow> tableRows = tableToReadBQ
            .apply("ReadFromBigQuery", ParDo.of(new DoFn<String, TableRow>(){
            @ProcessElement
                public void processElement(ProcessContext c) {
                    String tableName = c.element();
                    String query = "SELECT * FROM `" + tableName + "`";
 
                    // Read data from BigQuery using the dynamically constructed query
                    PCollection<TableRow> tableRows = c.element().apply(BigQueryIO.readTableRows()
                            .fromQuery(query)
                            .usingStandardSql());
 
                    // Output the result to the downstream PCollection
                    c.output(tableRows);
                }
   }));
 
I have tried other ways as well but not getting PCollection<TableRow> to process data further. Can someone please suggest optimized and correct way ?
Solved Solved
1 8 4,393
1 ACCEPTED SOLUTION

Addressing your questions one by one.

  1. Reading from BigQuery with Dynamic Table Names:

    Apache Beam doesn't allow nested transformations. Hence, you can't apply a BigQueryIO.readTableRows() inside a ParDo. However, if you're okay with starting separate jobs for each table, a solution could be to launch separate Dataflow pipelines for each table name. This would involve considerably more overhead and might not be ideal. This approach can be thought of as a "meta-pipeline", where the main pipeline's job is to launch other pipelines.

  2. Using Unbounded PCollections with BigQueryIO:

    For the BigQueryIO.readTableRows(), the Apache Beam documentation does indicate that you cannot use an unbounded PCollection. The read from BigQuery is typically designed for bounded data sets. However, since you're reading an unbounded source from Pub/Sub, the design becomes challenging.

    In cases where you have unbounded sources and you want to execute operations that typically expect bounded sources, you may need to introduce a windowing strategy. For example, you can batch the table names into fixed windows and then process each window.

  3. Using BigQuery Storage API vs BigQueryIO:

    The solution I provided earlier is hinting towards directly interfacing with the BigQuery client, not necessarily just the BigQuery Storage API. If you want to stick with Beam's connectors, then this direct interfacing wouldn't be appropriate. However, in scenarios where the Beam's IO connectors don't provide the required functionality, sometimes you have to go with direct API calls.

Proposed Solution:

Considering your requirements, the challenge here is dealing with an unbounded source and wanting to dynamically read from BigQuery.

  1. Windowing: You could use a windowing strategy to group your unbounded Pub/Sub messages into bounded windows. This way, each window can be processed as a separate bounded input.

     
    PCollection<String> windowedTableNames = tableNames.apply( Window.into(FixedWindows.of(Duration.standardMinutes(5))));
  2. Side Inputs: Once windowed, you can turn the PCollection of table names into a PCollectionView, which can be used as a side input for another transform. You'd still have to use the BigQuery client directly within a ParDo, but you'd be able to access the list of table names from the side input.

  3. BigQuery Client in ParDo: While it's not ideal, given Beam's constraints, you can use the BigQuery client directly in a ParDo to run the required query and return the results.

If dynamically starting separate pipelines for each table isn't feasible due to overhead, then the approach mentioned above (using windowing and side inputs) might be your best bet. However, you'd be mixing direct BigQuery client calls with Apache Beam, which might not be as clean as using Beam's IO connectors but could solve the issue.

View solution in original post

8 REPLIES 8

Приветствую, через какую программу писали код? 

Я думаю там ошибка в строке : PCollection<PubsubEvent> validEventPCollection = event.get(validEvents);

    PCollection<KV<String, PubsubEvent>>

The issue in your code stems from attempting to nest a Dataflow transform (BigQueryIO.readTableRows()) inside a DoFn function. Apache Beam doesn't allow nested transforms within a DoFn.

Here's a revised Java-based solution:

  1. Extract table names from the Pub/Sub messages.
  2. Use a reshuffle or a dummy grouping to consolidate the collection.
  3. Use a ParDo that reads from BigQuery for each query.

// Read messages from Pub/Sub
PCollection<String> messages = pipeline.apply("Read from Pubsub Topic", PubsubIO.readStrings()
  .fromSubscription("projects/xyz/subscriptions/topic-sub"));

// Extract and construct table names
PCollection<String> tableNames = messages.apply("Extract Table Names", ParDo.of(new DoFn<String, String>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    // Extract the table name from the message and output it
    String tableName = extractTableNameFromMessage(c.element());
    c.output(tableName);
  }
}));

// Reshuffle if needed
PCollection<String> reshuffledTableNames = tableNames.apply(Reshuffle.viaRandomKey());

// Read data from BigQuery based on table names
PCollection<TableRow> tableRows = reshuffledTableNames.apply("Read From BigQuery", ParDo.of(new DoFn<String, TableRow>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    String tableName = c.element();
    String query = "SELECT * FROM `" + tableName + "`";

    // Implement a method to run the BigQuery query and obtain results
    List<TableRow> results = runBigQueryQuery(query);

    for (TableRow row : results) {
      c.output(row);
    }
  }
}));

// Implement the runBigQueryQuery function to execute the query and retrieve results from BigQuery.

Remember, if you're reading data from many different tables, this approach might not be the most efficient due to initiating multiple BigQuery jobs. It's essential to understand the implications of this design based on your expected workload.

Hi,

Thank you for your proposed solution and pointing out in initiating multiple BigQuery jobs, we have already taken this into consideration and providing input ( message from pubsub) in throttle way i.e. at a time some tables will be processed and after completion new message will be published from source to read new bigquery tables.

still I have queries in solution you proposed, how will I execute query to read bigquery table using BigQueryIO.readTableRows() .  Inside The Transformation you applied we  have query/table name but do not have  Pcollection as input to apply new BigqueryIO transformation. If we have PCollection  then we can apply transformation read data from table like below : 

pcollection.apply("Read from BigQuery table",
BigQueryIO.readTableRows().from(tablename).withMethod(Method.DIRECT_READ))
.apply(MapElements .into(TypeDescriptor.of(TableRow.class))
// Use TableRow to access individual fields in the row.
.via( (row ) -> {
var tableRow = new TableRow();
var col1 = (String) row.get("col1");
var col2 = (String) row.get("col2");
tableRow.set("col1", col1);
tableRow.set("col2", col2);
return tableRow;
}));

that's why I had sent that sample code having nested transformation to describe the problem.

Problem will be solved If I can apply the PCollection<String> tableNames  as Input to  next step transformation BigQueryIO.readTableRows() and use tablename as shows in above code snippet.

can we use this unbounded collection (pcollection from pubsub read ) directly while reading from  BigQueryIO.readTableRows() , please suggest if there is any way ?

https://stackoverflow.com/questions/38746021/writing-to-bigquery-from-cloud-dataflow-unable-to-creat...   https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html.... It suggesting we can use unbounded collection in BigQueryIO.write but not with readtable rows .

Also the solution you proposed earlier , are you suggesting to use Bigquery storage API directly to read data from table instead of BigQueryIO connector ?   I wants to use BigqueyIO connector to read data.  Please suggest possible solution to this problem.

Addressing your questions one by one.

  1. Reading from BigQuery with Dynamic Table Names:

    Apache Beam doesn't allow nested transformations. Hence, you can't apply a BigQueryIO.readTableRows() inside a ParDo. However, if you're okay with starting separate jobs for each table, a solution could be to launch separate Dataflow pipelines for each table name. This would involve considerably more overhead and might not be ideal. This approach can be thought of as a "meta-pipeline", where the main pipeline's job is to launch other pipelines.

  2. Using Unbounded PCollections with BigQueryIO:

    For the BigQueryIO.readTableRows(), the Apache Beam documentation does indicate that you cannot use an unbounded PCollection. The read from BigQuery is typically designed for bounded data sets. However, since you're reading an unbounded source from Pub/Sub, the design becomes challenging.

    In cases where you have unbounded sources and you want to execute operations that typically expect bounded sources, you may need to introduce a windowing strategy. For example, you can batch the table names into fixed windows and then process each window.

  3. Using BigQuery Storage API vs BigQueryIO:

    The solution I provided earlier is hinting towards directly interfacing with the BigQuery client, not necessarily just the BigQuery Storage API. If you want to stick with Beam's connectors, then this direct interfacing wouldn't be appropriate. However, in scenarios where the Beam's IO connectors don't provide the required functionality, sometimes you have to go with direct API calls.

Proposed Solution:

Considering your requirements, the challenge here is dealing with an unbounded source and wanting to dynamically read from BigQuery.

  1. Windowing: You could use a windowing strategy to group your unbounded Pub/Sub messages into bounded windows. This way, each window can be processed as a separate bounded input.

     
    PCollection<String> windowedTableNames = tableNames.apply( Window.into(FixedWindows.of(Duration.standardMinutes(5))));
  2. Side Inputs: Once windowed, you can turn the PCollection of table names into a PCollectionView, which can be used as a side input for another transform. You'd still have to use the BigQuery client directly within a ParDo, but you'd be able to access the list of table names from the side input.

  3. BigQuery Client in ParDo: While it's not ideal, given Beam's constraints, you can use the BigQuery client directly in a ParDo to run the required query and return the results.

If dynamically starting separate pipelines for each table isn't feasible due to overhead, then the approach mentioned above (using windowing and side inputs) might be your best bet. However, you'd be mixing direct BigQuery client calls with Apache Beam, which might not be as clean as using Beam's IO connectors but could solve the issue.

Thank you so much @ms4446 for your time and proposing solution. I'll implement the solution you proposed. Actually I was trying to avoid bigquery client call that's why I was thinking for implementing the Bigquery read call using BigQueryIO but here it looks not feasible due to limited feature of BigQueryIO.

with solution we discussed for mentioned use case we need to use either BigqueryStorage API / Bigquery client to retrieve data and because I can extract the details from pubsub message in step/transformation so no need to use side input.

from performance point of view which method is better while reading from large table size in 2-5 GB to copy : BigQueryIO or BigQuery Client ? 

In our use case we need to copy data from for large number of tables from bigquery to on daily basis and total table might be 40000-50000 tables . At a time this dataflow job will be processing/migrating the data for 2500-5000 tables and large table will have 2-5 GB of data.

 now we are using bigquery client to read data , Is dataflow right technology to process such huge amount of data ?

we are thinking to explore another google service like google batch but number of jobs in google batch will be high in number and there could be other issues as well which we are not aware as of now .

Please suggest which technology is better for this use case.

 

 

 

BigQueryIO does not inherently struggle with reading data from unbounded PCollections. Its challenge lies in dynamic table reads where table names aren't determined before pipeline execution begins.

Additionally, while BigQueryIO efficiently interfaces with the BigQuery Storage API for many use cases, it's worth noting that there could be specific advanced features or nuances that the connector might not directly handle.

Dataflow:

Pros:

  • A robust solution designed for processing large datasets in both streaming and batch modes.
  • Particularly suitable for tasks like transferring data from BigQuery to alternative systems.
  • Offers an autoscaling feature to optimize costs.
  • Supports a broad spectrum of functionalities.

Cons:

  • Depending on the size and intricacy of the jobs, it might be on the pricier side.
  • There could be a steeper learning curve, especially for intricate operations.

Batch vs Streaming in Dataflow:

Apache Beam's Dataflow runner supports both batch and streaming modes.

Batch Mode:

  • Usually more cost-efficient for finite datasets or operations that aren't time-sensitive.
  • Provides a more straightforward setup for jobs that don't necessitate real-time data processing.

Streaming Mode:

  • Specifically architected for real-time data processing tasks and handling unbounded datasets.
  • Capable of managing late-arriving data and facilitates intricate time-based windowing and triggering setups.

Given the specificities of your requirements, especially considering the dynamic nature of reading from tables and the sheer volume of data, Dataflow emerges as a more apt choice. Your choice between batch or streaming will largely be governed by your immediate data processing requirements. A hands-on evaluation considering both modes within the scope of your project will guide you to a more informed decision.

Thanks a lot @ms4446  for your inputs. This is helpful !

I have a similar setup where I am trying to hit BigQuery Query from within dataflow streaming job from unbounded collection with positional arguments 

As such I have to write a bigquery ParDo which hits bigquery Query 

static class BigQueryParDo extends DoFn<Long, TableRow> {

private final PCollectionView mySideInput;

// Way to Access side input in a non - anonymous DoFn
// https://stackoverflow.com/questions/45463061/access-side-input-in-a-non-anonymous-dofn
BigQueryParDo(PCollectionView mySideInput) {
this.mySideInput = mySideInput;
}

@ProcessElement
public void processElement(DoFn<Long, TableRow>.ProcessContext c) throws Exception {

BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
LocalDateTime queryingMoment = LocalDateTime.now();
int minuteOfHour = queryingMoment.getMinuteOfHour();

// Now below can bew used to form BQ query while streaming in parametrized way (giving positional parameters)
c.sideInput(mySideInput);

QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"select JSON_EXTRACT_SCALAR(data, '$._id') as prid, " +
" TIMESTAMP_SECONDS(SAFE_CAST(SAFE_CAST(JSON_EXTRACT_SCALAR(data, '$.eka_ingested_at') as INT64) / 1000 AS INT64)) as eka_ingested_at," +
" SAFE_CAST(JSON_EXTRACT_SCALAR(data, '$.created_at') AS TIMESTAMP FORMAT 'DD:MM:YYYY,HH24:MI:SS' AT TIME ZONE 'UTC') as created_at," +
" JSON_EXTRACT_SCALAR(data, '$.prescription_url') as prescription_url," +
" JSON_EXTRACT_SCALAR(data, '$.docid') as docid," +
" JSON_EXTRACT_SCALAR(data, '$.flavour') as flavour," +
" JSON_EXTRACT_SCALAR(data, '$.dob') as dob," +
" JSON_EXTRACT_SCALAR(data, '$.patient_id') as patient_id," +
" JSON_EXTRACT_SCALAR(data, '$.dob_timestamp') as dob_timestamp," +
" JSON_EXTRACT_SCALAR(data, '$.patient_gender') as patient_gender," +
" JSON_EXTRACT_SCALAR(data, '$.patient_name') as patient_name," +
" JSON_EXTRACT_SCALAR(data, '$.patient_phone') as patient_phone," +
" JSON_EXTRACT_SCALAR(data, '$.op') as op " +
" from `mydataset.pubsub.prescription_latest`")
.setUseLegacySql(false) // Use standard SQL syntax for queries.
.build();

// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

// Wait for the query to complete.
queryJob = queryJob.waitFor();

// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getError().toString());
}

// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId);
TableResult result = queryJob.getQueryResults();
List<TableRow> results = new ArrayList<TableRow>();
for (FieldValueList row : result.iterateAll()) {
TableRow tableRow = new TableRow();
for (int i=0; i<row.size(); i++){
FieldValue value = row.get(i);
tableRow.set(value.toString(), value.getValue());
}
results.add(tableRow);
}
assert results != null;
for (TableRow tableRows : results) {
c.output(tableRows);
}
}
}

...

Called as below 

 

//....mainData comes from above windowed to make bounded from unbounded collection 
//....

PCollection<TableRow> bqQueryData = mainData.apply(ParDo.of(new BigQueryParDo(newRandomNumber)).withSideInput("Side Input to BQ Query", newRandomNumber));

// Do further processing on bqQueryData



List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("prid").setType("STRING"));
fields.add(new TableFieldSchema().setName("created_at").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("eka_ingested_at").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("prescription_url").setType("STRING"));
fields.add(new TableFieldSchema().setName("doc_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("flavour").setType("STRING"));
fields.add(new TableFieldSchema().setName("dob").setType("STRING"));
fields.add(new TableFieldSchema().setName("patient_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("dob_timestamp").setType("STRING"));
fields.add(new TableFieldSchema().setName("patient_gender").setType("STRING"));
fields.add(new TableFieldSchema().setName("patient_name").setType("STRING"));
fields.add(new TableFieldSchema().setName("patient_phone").setType("STRING"));
fields.add(new TableFieldSchema().setName("op").setType("STRING"));

TableSchema schema = new TableSchema().setFields(fields);



bqQueryData.apply("Write To BQ", BigQueryIO
.writeTableRows()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchema(schema)
.to(options.getTable())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED )
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

p.run();

I am getting below error: (Dataset[mydataset] is anonymised )

Another important observation :  If I replace mydataset with SQL query to any public dataset, its able to get data from BigQuery

I am trying to provide service account as well while running dataflow job

 

 


Error message from worker: com.google.cloud.bigquery.BigQueryException: Not found: Job mydataset:0694cb64-3977-406f-a207-6ee9f215a972
com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:114)
com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getQueryResults(HttpBigQueryRpc.java:702)
com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1441)
com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1436)
com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1435)
com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1419)
orbi.prescriptionwindow.window.RXMonetize$BigQueryParDo.processElement(RXMonetize.java:85)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found GET https://www.googleapis.com/bigquery/v2/projects/mydataset/queries/0694cb64-3977-406f-a207-6ee9f215a972?prettyPrint=false
{ "code": 404,
"errors":
[
{
"domain": "global",
"message": "Not found: Job mydataset:0694cb64-3977-406f-a207-6ee9f215a972",
"reason": "notFound"
}
],
"message": "Not found: Job mydataset:0694cb64-3977-406f-a207-6ee9f215a972",
"status": "NOT_FOUND"