Hi,
Right now we are in discussion to decide a Tech for our Data Sync feature that we are trying to develop
We need to sync data from bigquery to various Cloud SQL instance.
We have two services which we can use
1. Batch
We used Batch to do poc test but the kind of challenge that we face was that we don't receive the sync job in a parallel request we get it on a schedule basis so at any time we can have 10K batch jobs/vm running in infrastructure.
2. Dataflow
I have limited experience of using dataflow but can it be a good service to sync it we can have dataflow job in streaming mode and when its receives a task from pub sub it can start syncing the bigquery table to cloudsql.
Can anyone help me how should i evalulate my stack for this?
Solved! Go to Solution.
The error you're seeing is because beam.io.ReadFromBigQuery
is not designed to be used as a context manager with the with
statement in the way you're trying to use it. Instead, it's a PTransform that should be applied to a pipeline.
To dynamically read from BigQuery based on the table ID from the Pub/Sub message, you'll need a different approach. Here's one way to do it:
ParDo
transform to extract the table ID from the Pub/Sub message and then use that table ID to read from BigQuery.ParDo
transform.Here's a revised version of your code:
import apache_beam as beam
import json
class ExtractAndReadFromBigQuery(beam.DoFn):
def process(self, element):
pub_data = element.decode('utf-8')
pub_data = json.loads(pub_data)
table_id = pub_data['bigquery_table_info']['project_id'] + ':' + pub_data['bigquery_table_info']['dataset_id'] + '.' + pub_data['bigquery_table_info']['table_id']
# Use a direct read from BigQuery
bq_rows = beam.io.BigQuerySource(table=table_id).read_records(None, None)
for row in bq_rows:
yield row
(pipeline
| 'ReadPubSubMessage' >> beam.io.ReadFromPubSub(topic='<pubsub_topic>')
| 'ExtractAndReadFromBigQuery' >> beam.ParDo(ExtractAndReadFromBigQuery())
| 'ProcessRecords' >> ...)
BigQuerySource
's read_records
method inside the ParDo
transform. This allows us to dynamically specify the table ID based on the Pub/Sub message.