I have a list of bigquery queries
("SELECT JSON_CONTENT FROM project_id.ds.TB_1",
"SELECT TRANSACTION_ID FROMproject_id.ds.TB_2",
"SELECT TRANSACTION_ID FROMproject_id.ds.TB_2", etc...)
This list is dynamic, which means that sometimes it has 3 queries like above, sometimes is can have 10 and so on..
So I need a way to interact thought the list, like a for each and then ready from bigquery passing the current element of the list (sql query) and and save each result into a specific folder in a bucket in cloud storage.
I was able to achieve this with the code below, however, each query is running in parallel processes in the pipeline, and I need it to be sequential (read query --> save to file, then read query again --> save to file etc...) as only after all queries have been read and each file created I 'll call another service (cloud function) which depends of all those filed already created to generate signed url's for those files and send email to customers...
Does anyone could help? Having a very hard time trying to achieve this... thanks
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
if __name__ == '__main__':
# Define your pipeline options
pipeline_options = PipelineOptions(
job_name = "test-loop-4",
runner='DataflowRunner',
project='my-project-dev',
region='us-central1',
staging_location='gs://bucket_test/staging',
temp_location='gs://bucket_test/temp',
save_main_session=True
)
queries = [
"SELECT JSON_CONTENT FROM project_id.ds.TB_1",
"SELECT JSON_CONTENT FROM project_id.ds.TB_2"
]
# Define a base output path for all the results
base_output_prefix = 'gs://bucket_test_2/output_4/output'
with beam.Pipeline(options=pipeline_options) as pipeline:
for i, query in enumerate(queries😞
output_prefix = f"{base_output_prefix}_{i}"
# Read data from BigQuery
bigquery_data = pipeline | f"ReadFromBigQuery_{i}" >> ReadFromBigQuery(query=query,use_standard_sql=True)
# Write to Cloud Storage as CSV
bigquery_data | f"WriteToGCS_{i}" >> WriteToText(
file_path_prefix=output_prefix,
file_name_suffix='.csv',
header = 'WRITE_HEADER'
)