Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Is Dataflow Good for bulk BigQuery to Cloud SQL Sync

Hi,

Right now we are in discussion to decide a Tech for our Data Sync feature that we are trying to develop

We need to sync data from bigquery to various Cloud SQL instance. 

We have two services which we can use 

1. Batch

We used Batch to do poc test but the kind of challenge that we face was that we don't receive the sync job in a parallel request we get it on a schedule basis so at any time we can have 10K batch jobs/vm running in infrastructure.

2. Dataflow

I have limited experience of using dataflow but can it be a good service to sync it we can have dataflow job in streaming mode and when its receives a task from pub sub  it can start syncing the bigquery table to cloudsql.

Can anyone help me how should i evalulate my stack for this?

 

Solved Solved
0 11 3,443
1 ACCEPTED SOLUTION

The error you're seeing is because beam.io.ReadFromBigQuery is not designed to be used as a context manager with the with statement in the way you're trying to use it. Instead, it's a PTransform that should be applied to a pipeline.

To dynamically read from BigQuery based on the table ID from the Pub/Sub message, you'll need a different approach. Here's one way to do it:

  1. Use a ParDo transform to extract the table ID from the Pub/Sub message and then use that table ID to read from BigQuery.
  2. Return the rows from BigQuery as the output of the ParDotransform.

Here's a revised version of your code:

 
import apache_beam as beam
import json

class ExtractAndReadFromBigQuery(beam.DoFn):
  def process(self, element):
    pub_data = element.decode('utf-8')
    pub_data = json.loads(pub_data)
    table_id = pub_data['bigquery_table_info']['project_id'] + ':' + pub_data['bigquery_table_info']['dataset_id'] + '.' + pub_data['bigquery_table_info']['table_id']
     
    # Use a direct read from BigQuery
    bq_rows = beam.io.BigQuerySource(table=table_id).read_records(None, None)
    for row in bq_rows:
      yield row

(pipeline
 | 'ReadPubSubMessage' >> beam.io.ReadFromPubSub(topic='<pubsub_topic>')
 | 'ExtractAndReadFromBigQuery' >> beam.ParDo(ExtractAndReadFromBigQuery())
 | 'ProcessRecords' >> ...)

View solution in original post

11 REPLIES 11