Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Sequential Read and Write to BigQuery in Dataflow

Our team has several data workflows where we first read the start date from one table in BigQuery, and then use that date to read the actual data from another table. The same process occurs at the end, when we write the processed data to BigQuery, and then extract a timestamp from the data and log it in a separate table.

When running on Dataflow, we typically start by using ReadFromBigQuery to fetch the start date. Then, we use a ParDo class with the Python BigQuery client SDK to read the actual data, filtering by the start date. At the end of the process, we use another ParDo class to write data to our sink table, followed by WriteToBigQuery to save the timestamp of the last processed record.

I'm unsure if this is the best approach, as it gives the impression that we are using BigQuery connectors only to read/write the date, rather than fetching the actual data to be processed.

What would you suggest?

0 5 695
5 REPLIES 5

Using BigQuery connectors solely for reading/writing the date, while fetching actual data with the Python BigQuery SDK in a ParDo, might not be the most efficient approach in Google Cloud Dataflow.

Mixing BigQuery connectors with Python SDK calls in ParDo can add unnecessary complexity, create potential performance overhead, and bypass Dataflow’s parallel data processing optimizations. Fetching the date separately introduces an additional read step that may increase latency, and using ParDo with the SDK for main data reads forfeits BigQueryIO’s built-in efficiencies.

To streamline the pipeline, consider using Dataflow’s BigQueryIO connector for all data operations, making the process more efficient and maintainable.

  1. Fetch the Start Date Using BigQueryIO: Use a BigQueryIO.read() SQL query to retrieve the start date directly, eliminating the need for an additional ParDo read step.

  2. Filter Data with BigQueryIO: Rather than a ParDo with the Python SDK, incorporate the date filter directly into your BigQueryIO.read() query using a WHERE clause. By filtering in BigQuery, data transfer to Dataflow is minimized, enhancing performance.

  3. Write Processed Data: Continue using WriteToBigQuery to load the processed data into the sink table.

  4. Extract and Log Timestamp: Use BigQueryIO.read() to select the maximum timestamp from the processed data, then write this timestamp directly to the log table with WriteToBigQuery.

Here’s an Example Code Snippet:

import apache_beam as beam

with beam.Pipeline() as pipeline:
    # 1. Fetch start date
    start_date = (pipeline
                  | 'ReadStartDate' >> beam.io.ReadFromBigQuery(query="SELECT start_date FROM `your_project.your_dataset.your_date_table` LIMIT 1", use_standard_sql=True)
                  | 'ExtractDate' >> beam.Map(lambda row: row['start_date']))

    # 2. Filter data with BigQueryIO, using start_date as side input
    data = (pipeline
            | 'ReadData' >> beam.io.ReadFromBigQuery(query=lambda date: f"SELECT * FROM `your_project.your_dataset.your_data_table` WHERE date_column >= '{date}'", use_standard_sql=True)
            .with_side_inputs(beam.pvalue.AsSingleton(start_date)))

    # 3. Process and write data
    processed_data = data | 'ProcessData' >> beam.Map(process_data_function)
    processed_data | 'WriteData' >> beam.io.WriteToBigQuery(table='your_project:your_dataset.your_output_table', schema='...', write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

    # 4. Log timestamp
    (pipeline
     | 'ReadTimestamp' >> beam.io.ReadFromBigQuery(query="SELECT MAX(timestamp_column) AS max_timestamp FROM `your_project.your_dataset.your_output_table`", use_standard_sql=True)
     | 'WriteTimestamp' >> beam.io.WriteToBigQuery(table='your_project:your_dataset.your_log_table', schema='max_timestamp:TIMESTAMP', write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

 

Hello ms4446!

I really liked your solution, but I am having some problems applying it:

When I executed it like the sample below I had the following error message:

with beam.Pipeline(options=pipeline_options) as pipeline:
    dt_start_date = (
        pipeline
        | "ReadStartDate" >> beam.io.ReadFromBigQuery(
            query="""
            SELECT
                MAX(dt_start_date),
            FROM
                `my_dataset.my_table` 
            """,
            use_standard_sql=True,
            temp_dataset=DatasetReference(
                projectId=beam_options["project"],
                datasetId="ti_materializacao_views_tmp",
            )
        )
        | "ExtractDate" >> beam.Map(lambda row: row['dt_start_date'])

    )

    get_events = (
        pipeline
        | "GetEvents" >> beam.io.ReadFromBigQuery(
            query= lambda date: f"""
            SELECT 
               *
            FROM 
                `my_dataset.my_table` 
            WHERE 
                DATE(column) >= {date}
            LIMIT 1
            """,
            use_standard_sql=True,
            temp_dataset=DatasetReference(
                projectId=beam_options["project"],
                datasetId="ti_materializacao_views_tmp",
            ),
        ).with_side_inputs(beam.pvalue.AsSingleton(dt_start_date))
    )

 

2024-11-19 12:45:15.443 WARNING  ('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_read_internal._PassThroughThenCleanup.expand.<locals>.RemoveExtractedFiles'>)
Traceback (most recent call last):
  File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/home/pipeline_teste.py", line 77, in <module>
    ).with_side_inputs(beam.pvalue.AsSingleton(dt_start_date))
AttributeError: 'ReadFromBigQuery' object has no attribute 'with_side_inputs'. Did you mean: 'side_inputs'?

 Then I fixed it replacing "with_side_inputs()" for "side_inputs()" as suggested by the message, but I keep receiving other error:

get_events = (
        pipeline
        | "GetEvents" >> beam.io.ReadFromBigQuery(
            query= lambda date: f"""
            SELECT 
               *
            FROM 
                `my_dataset.my_table` 
            WHERE 
                DATE(column) >= {date}
            LIMIT 1
            """,
            use_standard_sql=True,
            temp_dataset=DatasetReference(
                projectId=beam_options["project"],
                datasetId="ti_materializacao_views_tmp",
            ),
        ).side_inputs(beam.pvalue.AsSingleton(dt_start_date))
    )
2024-11-19 12:50:18.705 WARNING  ('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_read_internal._PassThroughThenCleanup.expand.<locals>.RemoveExtractedFiles'>)
Traceback (most recent call last):
  File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/home/pipeline_teste.py", line 77, in <module>
    ).side_inputs(beam.pvalue.AsSingleton(dt_start_date))
TypeError: 'tuple' object is not callable

I don't understand what it is happening. Could you help me?


The version of apache-beam that I am using is 2.60.0

The issue is due to a misunderstanding of how ReadFromBigQuery and side inputs work in Apache Beam. The ReadFromBigQuery transform does not directly support the side_inputs or with_side_inputs methods. Instead, side inputs are used with ParDo or Map transforms to pass additional data to a processing function.

You need to fetch dt_start_date first, convert it to a singleton using beam.pvalue.AsSingleton, and then pass it to a ParDo or Map transform to dynamically create the query for ReadFromBigQuery.

import apache_beam as beam
from apache_beam.io.gcp.bigquery import BigQuerySource

with beam.Pipeline(options=pipeline_options) as pipeline:
    # 1. Read the start date
    dt_start_date = (
        pipeline
        | "ReadStartDate" >> beam.io.ReadFromBigQuery(
            query="""
            SELECT
                MAX(dt_start_date) AS dt_start_date
            FROM
                `my_dataset.my_table`
            """,
            use_standard_sql=True,
            temp_dataset="ti_materializacao_views_tmp",  # Replace with your temp dataset name
            temp_location="gs://your_temp_location/"    # Replace with your GCS temp location
        )
        | "ExtractDate" >> beam.Map(lambda row: row['dt_start_date'])
    )

    # 2. Convert the start date to a singleton
    start_date_as_singleton = beam.pvalue.AsSingleton(dt_start_date)

    # 3. Use the start date to fetch filtered events
    get_events = (
        pipeline
        | "TriggerReadEvents" >> beam.Create([None])  # Create an empty PCollection to trigger the read
        | "GenerateQueryAndRead" >> beam.FlatMap(
            lambda _, date: beam.io.ReadFromBigQuery(
                query=f"""
                SELECT 
                   *
                FROM 
                    `my_dataset.my_table`
                WHERE 
                    DATE(column) >= '{date}'
                LIMIT 1
                """,
                use_standard_sql=True,
                temp_dataset="ti_materializacao_views_tmp",  # Replace with your temp dataset name
                temp_location="gs://your_temp_location/"    # Replace with your GCS temp location
            ),
            start_date_as_singleton
        )
    )

 

Hello again, ms446!

I've attempted this method before, but it seems that using a ReadFromBigQuery() operation inside a FlatMap PTransform isn't possible in Apache Beam. I kept encountering the following error message:

TypeError: 'ReadFromBigQuery' object is not iterable [while running 'GenerateQueryAndRead']

 😕

Sorry for the confusion. You're correct that ReadFromBigQuery cannot be directly used inside a FlatMap or ParDo because it’s not iterable. Instead, to achieve dynamic query generation based on a side input, you need to split the pipeline into multiple steps and leverage the BigQueryIO connectors appropriately.

Here’s a revised approach that works with Apache Beam's constraints:

import apache_beam as beam

class GenerateQuery(beam.DoFn):
    def process(self, element, start_date):
        query = f"""
        SELECT 
            *
        FROM 
            `my_dataset.my_table`
        WHERE 
            DATE(column) >= '{start_date}'
        LIMIT 1
        """
        yield query

class ReadBigQueryData(beam.DoFn):
    def process(self, query):
        # Use BigQueryIO directly in the transform that applies the query
        client = beam.io.gcp.bigquery.BigQuerySource(query=query, use_standard_sql=True)
        for row in client.read():
            yield row

with beam.Pipeline(options=pipeline_options) as pipeline:
    # Step 1: Read the start date
    dt_start_date = (
        pipeline
        | "ReadStartDate" >> beam.io.ReadFromBigQuery(
            query="""
            SELECT
                MAX(dt_start_date) AS dt_start_date
            FROM
                `my_dataset.my_table`
            """,
            use_standard_sql=True,
            temp_location="gs://your_temp_location/"  # Replace with your GCS temp location
        )
        | "ExtractStartDate" >> beam.Map(lambda row: row['dt_start_date'])
    )

    # Step 2: Use AsSingleton to make start_date accessible as a side input
    start_date_as_singleton = beam.pvalue.AsSingleton(dt_start_date)

    # Step 3: Dynamically generate query based on the start date
    queries = (
        pipeline
        | "CreateDummyInput" >> beam.Create([None])  # Dummy PCollection to initiate side input
        | "GenerateQueries" >> beam.ParDo(GenerateQuery(), start_date=start_date_as_singleton)
    )

    # Step 4: Execute the generated queries
    results = (
        queries
        | "ReadDataFromBigQuery" >> beam.ParDo(ReadBigQueryData())
    )

    # Process and write results as needed
    processed_data = (
        results
        | "ProcessData" >> beam.Map(lambda row: process_data_function(row))
        | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
            table="my_project:my_dataset.my_output_table",
            schema="...",
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        )
    )