Our team has several data workflows where we first read the start date from one table in BigQuery, and then use that date to read the actual data from another table. The same process occurs at the end, when we write the processed data to BigQuery, and then extract a timestamp from the data and log it in a separate table.
When running on Dataflow, we typically start by using ReadFromBigQuery to fetch the start date. Then, we use a ParDo class with the Python BigQuery client SDK to read the actual data, filtering by the start date. At the end of the process, we use another ParDo class to write data to our sink table, followed by WriteToBigQuery to save the timestamp of the last processed record.
I'm unsure if this is the best approach, as it gives the impression that we are using BigQuery connectors only to read/write the date, rather than fetching the actual data to be processed.
What would you suggest?
Using BigQuery connectors solely for reading/writing the date, while fetching actual data with the Python BigQuery SDK in a ParDo, might not be the most efficient approach in Google Cloud Dataflow.
Mixing BigQuery connectors with Python SDK calls in ParDo can add unnecessary complexity, create potential performance overhead, and bypass Dataflow’s parallel data processing optimizations. Fetching the date separately introduces an additional read step that may increase latency, and using ParDo with the SDK for main data reads forfeits BigQueryIO’s built-in efficiencies.
To streamline the pipeline, consider using Dataflow’s BigQueryIO connector for all data operations, making the process more efficient and maintainable.
Fetch the Start Date Using BigQueryIO: Use a BigQueryIO.read() SQL query to retrieve the start date directly, eliminating the need for an additional ParDo read step.
Filter Data with BigQueryIO: Rather than a ParDo with the Python SDK, incorporate the date filter directly into your BigQueryIO.read() query using a WHERE clause. By filtering in BigQuery, data transfer to Dataflow is minimized, enhancing performance.
Write Processed Data: Continue using WriteToBigQuery to load the processed data into the sink table.
Extract and Log Timestamp: Use BigQueryIO.read() to select the maximum timestamp from the processed data, then write this timestamp directly to the log table with WriteToBigQuery.
Here’s an Example Code Snippet:
import apache_beam as beam
with beam.Pipeline() as pipeline:
# 1. Fetch start date
start_date = (pipeline
| 'ReadStartDate' >> beam.io.ReadFromBigQuery(query="SELECT start_date FROM `your_project.your_dataset.your_date_table` LIMIT 1", use_standard_sql=True)
| 'ExtractDate' >> beam.Map(lambda row: row['start_date']))
# 2. Filter data with BigQueryIO, using start_date as side input
data = (pipeline
| 'ReadData' >> beam.io.ReadFromBigQuery(query=lambda date: f"SELECT * FROM `your_project.your_dataset.your_data_table` WHERE date_column >= '{date}'", use_standard_sql=True)
.with_side_inputs(beam.pvalue.AsSingleton(start_date)))
# 3. Process and write data
processed_data = data | 'ProcessData' >> beam.Map(process_data_function)
processed_data | 'WriteData' >> beam.io.WriteToBigQuery(table='your_project:your_dataset.your_output_table', schema='...', write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
# 4. Log timestamp
(pipeline
| 'ReadTimestamp' >> beam.io.ReadFromBigQuery(query="SELECT MAX(timestamp_column) AS max_timestamp FROM `your_project.your_dataset.your_output_table`", use_standard_sql=True)
| 'WriteTimestamp' >> beam.io.WriteToBigQuery(table='your_project:your_dataset.your_log_table', schema='max_timestamp:TIMESTAMP', write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Hello ms4446!
I really liked your solution, but I am having some problems applying it:
When I executed it like the sample below I had the following error message:
with beam.Pipeline(options=pipeline_options) as pipeline:
dt_start_date = (
pipeline
| "ReadStartDate" >> beam.io.ReadFromBigQuery(
query="""
SELECT
MAX(dt_start_date),
FROM
`my_dataset.my_table`
""",
use_standard_sql=True,
temp_dataset=DatasetReference(
projectId=beam_options["project"],
datasetId="ti_materializacao_views_tmp",
)
)
| "ExtractDate" >> beam.Map(lambda row: row['dt_start_date'])
)
get_events = (
pipeline
| "GetEvents" >> beam.io.ReadFromBigQuery(
query= lambda date: f"""
SELECT
*
FROM
`my_dataset.my_table`
WHERE
DATE(column) >= {date}
LIMIT 1
""",
use_standard_sql=True,
temp_dataset=DatasetReference(
projectId=beam_options["project"],
datasetId="ti_materializacao_views_tmp",
),
).with_side_inputs(beam.pvalue.AsSingleton(dt_start_date))
)
2024-11-19 12:45:15.443 WARNING ('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_read_internal._PassThroughThenCleanup.expand.<locals>.RemoveExtractedFiles'>)
Traceback (most recent call last):
File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/home/pipeline_teste.py", line 77, in <module>
).with_side_inputs(beam.pvalue.AsSingleton(dt_start_date))
AttributeError: 'ReadFromBigQuery' object has no attribute 'with_side_inputs'. Did you mean: 'side_inputs'?
Then I fixed it replacing "with_side_inputs()" for "side_inputs()" as suggested by the message, but I keep receiving other error:
get_events = (
pipeline
| "GetEvents" >> beam.io.ReadFromBigQuery(
query= lambda date: f"""
SELECT
*
FROM
`my_dataset.my_table`
WHERE
DATE(column) >= {date}
LIMIT 1
""",
use_standard_sql=True,
temp_dataset=DatasetReference(
projectId=beam_options["project"],
datasetId="ti_materializacao_views_tmp",
),
).side_inputs(beam.pvalue.AsSingleton(dt_start_date))
)
2024-11-19 12:50:18.705 WARNING ('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_read_internal._PassThroughThenCleanup.expand.<locals>.RemoveExtractedFiles'>)
Traceback (most recent call last):
File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/home/pipeline_teste.py", line 77, in <module>
).side_inputs(beam.pvalue.AsSingleton(dt_start_date))
TypeError: 'tuple' object is not callable
I don't understand what it is happening. Could you help me?
The version of apache-beam that I am using is 2.60.0
The issue is due to a misunderstanding of how ReadFromBigQuery and side inputs work in Apache Beam. The ReadFromBigQuery transform does not directly support the side_inputs or with_side_inputs methods. Instead, side inputs are used with ParDo or Map transforms to pass additional data to a processing function.
You need to fetch dt_start_date first, convert it to a singleton using beam.pvalue.AsSingleton, and then pass it to a ParDo or Map transform to dynamically create the query for ReadFromBigQuery.
import apache_beam as beam
from apache_beam.io.gcp.bigquery import BigQuerySource
with beam.Pipeline(options=pipeline_options) as pipeline:
# 1. Read the start date
dt_start_date = (
pipeline
| "ReadStartDate" >> beam.io.ReadFromBigQuery(
query="""
SELECT
MAX(dt_start_date) AS dt_start_date
FROM
`my_dataset.my_table`
""",
use_standard_sql=True,
temp_dataset="ti_materializacao_views_tmp", # Replace with your temp dataset name
temp_location="gs://your_temp_location/" # Replace with your GCS temp location
)
| "ExtractDate" >> beam.Map(lambda row: row['dt_start_date'])
)
# 2. Convert the start date to a singleton
start_date_as_singleton = beam.pvalue.AsSingleton(dt_start_date)
# 3. Use the start date to fetch filtered events
get_events = (
pipeline
| "TriggerReadEvents" >> beam.Create([None]) # Create an empty PCollection to trigger the read
| "GenerateQueryAndRead" >> beam.FlatMap(
lambda _, date: beam.io.ReadFromBigQuery(
query=f"""
SELECT
*
FROM
`my_dataset.my_table`
WHERE
DATE(column) >= '{date}'
LIMIT 1
""",
use_standard_sql=True,
temp_dataset="ti_materializacao_views_tmp", # Replace with your temp dataset name
temp_location="gs://your_temp_location/" # Replace with your GCS temp location
),
start_date_as_singleton
)
)
Hello again, ms446!
I've attempted this method before, but it seems that using a ReadFromBigQuery() operation inside a FlatMap PTransform isn't possible in Apache Beam. I kept encountering the following error message:
TypeError: 'ReadFromBigQuery' object is not iterable [while running 'GenerateQueryAndRead']
😕
Sorry for the confusion. You're correct that ReadFromBigQuery cannot be directly used inside a FlatMap or ParDo because it’s not iterable. Instead, to achieve dynamic query generation based on a side input, you need to split the pipeline into multiple steps and leverage the BigQueryIO connectors appropriately.
Here’s a revised approach that works with Apache Beam's constraints:
import apache_beam as beam
class GenerateQuery(beam.DoFn):
def process(self, element, start_date):
query = f"""
SELECT
*
FROM
`my_dataset.my_table`
WHERE
DATE(column) >= '{start_date}'
LIMIT 1
"""
yield query
class ReadBigQueryData(beam.DoFn):
def process(self, query):
# Use BigQueryIO directly in the transform that applies the query
client = beam.io.gcp.bigquery.BigQuerySource(query=query, use_standard_sql=True)
for row in client.read():
yield row
with beam.Pipeline(options=pipeline_options) as pipeline:
# Step 1: Read the start date
dt_start_date = (
pipeline
| "ReadStartDate" >> beam.io.ReadFromBigQuery(
query="""
SELECT
MAX(dt_start_date) AS dt_start_date
FROM
`my_dataset.my_table`
""",
use_standard_sql=True,
temp_location="gs://your_temp_location/" # Replace with your GCS temp location
)
| "ExtractStartDate" >> beam.Map(lambda row: row['dt_start_date'])
)
# Step 2: Use AsSingleton to make start_date accessible as a side input
start_date_as_singleton = beam.pvalue.AsSingleton(dt_start_date)
# Step 3: Dynamically generate query based on the start date
queries = (
pipeline
| "CreateDummyInput" >> beam.Create([None]) # Dummy PCollection to initiate side input
| "GenerateQueries" >> beam.ParDo(GenerateQuery(), start_date=start_date_as_singleton)
)
# Step 4: Execute the generated queries
results = (
queries
| "ReadDataFromBigQuery" >> beam.ParDo(ReadBigQueryData())
)
# Process and write results as needed
processed_data = (
results
| "ProcessData" >> beam.Map(lambda row: process_data_function(row))
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table="my_project:my_dataset.my_output_table",
schema="...",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
)
)