Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

ReadFromBigQuery dinamically Apache Beam - Dataflow

I have a list of bigquery queries

("SELECT JSON_CONTENT FROM project_id.ds.TB_1",

"SELECT TRANSACTION_ID FROMproject_id.ds.TB_2",
"SELECT TRANSACTION_ID FROMproject_id.ds.TB_2", etc...)
 
This list is dynamic, which means that sometimes it has 3 queries like above, sometimes is can have 10 and so on..
 
So I need a way to interact thought the list, like a for each and then ready from bigquery passing the current element of the list (sql query) and and save each result into a specific folder in a bucket in cloud storage.
 
I was able to achieve this with the code below, however, each query is running in parallel processes in the pipeline, and I need it to be sequential (read query --> save to file, then read query again --> save to file etc...) as only after all queries have been read and each file created I 'll call another service (cloud function) which depends of all those filed already created to generate signed url's for those files and send email to customers...


Does anyone could help? Having a very hard time trying to achieve this... thanks

import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

 
if __name__ == '__main__':

# Define your pipeline options
pipeline_options = PipelineOptions(
job_name = "test-loop-4",
runner='DataflowRunner',
project='my-project-dev',
region='us-central1'
staging_location='gs://bucket_test/staging',
temp_location='gs://bucket_test/temp',
save_main_session=True
)

queries = [
"SELECT JSON_CONTENT FROM project_id.ds.TB_1",
"SELECT JSON_CONTENT FROM project_id.ds.TB_2"
]

# Define a base output path for all the results
base_output_prefix = 'gs://bucket_test_2/output_4/output'

with beam.Pipeline(options=pipeline_options) as pipeline:
 
for i, query in enumerate(queries😞
output_prefix = f"{base_output_prefix}_{i}"

# Read data from BigQuery
bigquery_data = pipeline | f"ReadFromBigQuery_{i}" >> ReadFromBigQuery(query=query,use_standard_sql=True)

# Write to Cloud Storage as CSV
bigquery_data | f"WriteToGCS_{i}" >> WriteToText(
file_path_prefix=output_prefix,
file_name_suffix='.csv',
header = 'WRITE_HEADER'
)



0 2 378
2 REPLIES 2

Hi @rafaelpazsud,

Welcome to Google Cloud Community!

You're correct, the for loop in your current code generates multiple parallel branches within your Dataflow pipeline, leading to the concurrent execution of all queries.

Here are potential ways that might help with your use case:

  • Configuring the beam.DoFn for ReadAndWriteQuery: You may implement a configurable beam.DoFn called ReadAndWriteQuery. The process method constructs an output file path using an index, reads data from BigQuery using the provided query, and writes the results to Cloud Storage. It also yields a PCollection to notify when your query is executed.
  • Enumerating Queries with beam.Create: You may want to transform your list of SQL queries into a PCollection, where each element becomes a tuple of (index, query). This setup allows the pipeline to manage each query individually.
  • Applying ReadAndWriteQuery with beam.ParDo: By creating a ParDo transform that applies the custom ReadAndWriteQuery function to each element in the PCollection, you ensure sequential processing, as each step must complete before moving on to the next item.
  • Error Handling: You may want to incorporate error handling within your ReadAndWriteQuery method. By doing so, any failures during query execution will be gracefully managed, which will prevent the entire pipeline from failing and simplify issue diagnosis.

You may refer to the documentation below, which provides essential information for understanding Apache Beam and Google Cloud Dataflow. These resources are fundamental for grasping the concepts and implementation details of both technologies:

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.

Hi Marvin! Thanks for sharing, but would you have some code to share? I usually don't ask for code, however already did the above but getting an error that "ReadFromBigQuery is not iterable" which made me ask you guys for help.