Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Is Dataflow Good for bulk BigQuery to Cloud SQL Sync

Hi,

Right now we are in discussion to decide a Tech for our Data Sync feature that we are trying to develop

We need to sync data from bigquery to various Cloud SQL instance. 

We have two services which we can use 

1. Batch

We used Batch to do poc test but the kind of challenge that we face was that we don't receive the sync job in a parallel request we get it on a schedule basis so at any time we can have 10K batch jobs/vm running in infrastructure.

2. Dataflow

I have limited experience of using dataflow but can it be a good service to sync it we can have dataflow job in streaming mode and when its receives a task from pub sub  it can start syncing the bigquery table to cloudsql.

Can anyone help me how should i evalulate my stack for this?

 

Solved Solved
0 11 3,420
1 ACCEPTED SOLUTION

The error you're seeing is because beam.io.ReadFromBigQuery is not designed to be used as a context manager with the with statement in the way you're trying to use it. Instead, it's a PTransform that should be applied to a pipeline.

To dynamically read from BigQuery based on the table ID from the Pub/Sub message, you'll need a different approach. Here's one way to do it:

  1. Use a ParDo transform to extract the table ID from the Pub/Sub message and then use that table ID to read from BigQuery.
  2. Return the rows from BigQuery as the output of the ParDotransform.

Here's a revised version of your code:

 
import apache_beam as beam
import json

class ExtractAndReadFromBigQuery(beam.DoFn):
  def process(self, element):
    pub_data = element.decode('utf-8')
    pub_data = json.loads(pub_data)
    table_id = pub_data['bigquery_table_info']['project_id'] + ':' + pub_data['bigquery_table_info']['dataset_id'] + '.' + pub_data['bigquery_table_info']['table_id']
     
    # Use a direct read from BigQuery
    bq_rows = beam.io.BigQuerySource(table=table_id).read_records(None, None)
    for row in bq_rows:
      yield row

(pipeline
 | 'ReadPubSubMessage' >> beam.io.ReadFromPubSub(topic='<pubsub_topic>')
 | 'ExtractAndReadFromBigQuery' >> beam.ParDo(ExtractAndReadFromBigQuery())
 | 'ProcessRecords' >> ...)

View solution in original post

11 REPLIES 11

Here are some factors to consider when evaluating your stack for bulk BigQuery to Cloud SQL sync:

Data volume and velocity:

  • How much data do you need to sync and how often? If you have a large dataset and need to sync it frequently, then Dataflow, which can process both batch and streaming data, may be a better option. Batch jobs can be efficient for large datasets but introduce latency, especially if data needs to be synced in near real-time.

Complexity of the sync:

  • How complex is the sync process? If you need to perform complex transformations on the data, then Dataflow, with its flexible Apache Beam-based programming model, may be a better choice. Batch jobs are typically more straightforward and might be better suited for simpler sync tasks.

Cost:

  • Dataflow can be more expensive than Batch, especially if you're processing a significant amount of data. However, the cost also depends on the resources used by the Dataflow job. Batch processing might have hidden costs, especially if there's a need for manual intervention or processing delays.

Scalability and reliability:

  • Both Dataflow and Batch are scalable. However, Dataflow offers auto-scaling based on data volume, while Batch might require manual scaling or adjustments, depending on the implementation.

Your team's expertise:

  • Consider your team's expertise with both Dataflow and Batch. Familiarity with a service can significantly impact the speed and quality of development.

Pros and Cons:

Dataflow:

  • Pros:
    • Can process both batch and streaming data.
    • Offers a flexible programming model.
    • Provides built-in monitoring and logging through Stackdriver.
  • Cons:
    • Potentially more expensive than Batch, depending on resource usage.

Batch:

  • Pros:
    • Might be more cost-effective for certain use cases.
    • Typically simpler to use for straightforward tasks.
  • Cons:
    • Not ideal for real-time or near real-time syncing.
    • Less flexible programming model.

Based on the information you provided, I would recommend using Dataflow to sync bulk data from BigQuery to Cloud SQL. Dataflow is better suited for both streaming data and complex sync tasks, and it can handle a large number of concurrent requests efficiently.

Here's a sample Dataflow pipeline for syncing BigQuery to Cloud SQL:

  1. Read the BigQuery table using a BigQueryIO.Read() transform.
  2. Perform any necessary transformations on the data using a ParDo() transform.
  3. Write the data to Cloud SQL. While JDBCIO.Write() can be used, ensure you have the right configurations and optimizations for Cloud SQL to ensure efficient data transfer.

You can schedule the Dataflow pipeline to run regularly or trigger it using a Pub/Sub message.

Hi, 

Why batch is less flexible programmable model? having own container and free style code doesn't help ?

Right now we plan to sync once a day and down to 6 hours. Its a plain sync running UPSERT query and delete query its about maintaining same state of SQL as of BigQuery.

As per the team expertise team is well versed in python but the thread count for dataflow in python is only 12 thread per worker which is too less than java (300 threads per worker vm). And i don't think python dataflow has these logic to handle this.

I am also naïve regarding dataflow implementation to have concurrency handling for cloudsql how i can ensure that at any time only certain numbers of connections are made to a sql instance? since dataflow have workers and in workers there are thread how can i sure the connection handling.

 

 

 

Summary of Batch vs. Dataflow

 

Feature Batch Dataflow
Programming model Sequential (can be parallelized with the right setup, such as using a Spark cluster) Unified (handles both batch and stream processing)
Flexibility Can be flexible but may require more custom coding Highly flexible with built-in connectors and transformations
Infrastructure management Typically simpler, especially with managed services Can be more complex, especially if customizing beyond default settings
Cost Generally more cost-effective for infrequent, large jobs Can vary based on factors like the number of vCPUs, memory, and the duration of job execution, but can be costlier, especially for streaming or frequent batch jobs

Concurrency Handling for Cloud SQL:

  • Use connection pooling to manage and limit the number of active connections. Connection pooling helps in reusing database connections rather than creating a new one every time a connection is requested, which can be resource-intensive and prevent overwhelming the database.
  • Implement a throttling mechanism to control the rate of requests. Choose a rate that is appropriate for your workload to ensure neither overloading nor underutilizing the database.
  • For replicated Cloud SQL setups, use load balancing to distribute read requests among replicas. This improves performance and scalability. Ensure a consistent load balancing algorithm for even distribution. Writes should go to the primary instance to ensure data consistency.

For syncing tasks that occur once a day or every 6 hours, with straightforward UPSERT and DELETE operations, a well-implemented batch process is likely sufficient. However, if you anticipate the need for near real-time syncing or handling large data volumes in the future, consider Dataflow. If using Batch, optimize your Batch jobs for performance by partitioning the data, using a parallel processing framework, or offloading some of the work to services like Cloud Data Fusion or other ETL tools. If considering Dataflow, evaluate its cost for your specific workload.

Choosing between Batch and Dataflow depends on specific requirements, data volume, sync frequency, and team expertise. Both options have their advantages, and the optimal choice will best match the project's needs. There is no one-size-fits-all answer.

Hi @ms4446 ,

My workload is this I will be receiving a ProjectId and a tableName of BigQuery and the destination CloudSQL server details and IP address (can be upto 100K events per day for 100's of CloudSQL instance/db) I will running some query on BigQuery to find out the incremental and Deleted record and then will be syncing this record to CloudSQL.
Goal is to have CloudSql same as BigQuery table.

In batch world I am achieving this by first accessing the secret manager to get db username password creating a Postgres connection.

Running the bigquery to find out the incremental data change by some watermark or hash value and then extract  batch of records says(10K rows) and  creating a dynamic UPSERT query and then firing it over SQL to perform the update/insert.
Similarly i am handling the record deletion by BigQuery time travel query to find out the record deleted and then syncing the same to batch and then dynamic writing a deleted query .

Now how should I do this in Dataflow to achieve the same thing that I am doing in batch world? and morever is Dataflow even good at this stuff?

1. Dataflow for incremental and deleted record sync from BigQuery to CloudSQL

To implement incremental and deleted record sync from BigQuery to CloudSQL in Dataflow, you can use the following steps:

  1. Read the BigQuery table using a BigQueryIO.Read() transform.
  2. Filter the data to only include incremental and deleted records.This can be done using a ParDo() transform and a custom filter function.
  3. Write the incremental and deleted records to CloudSQL using a JDBCIO.Write() transform. This transform can be configured to use a dynamic UPSERT query for incremental records and a dynamic DELETE query for deleted records.

Here is a sample Dataflow pipeline for incremental and deleted record sync from BigQuery to CloudSQL:

import apache_beam as beam
from google.cloud import secretmanager

# Define a method to retrieve secrets from Google Cloud Secret Manager
def get_secret(secret_name):
  client = secretmanager.SecretManagerServiceClient()
  name = client.secret_version_path(secret_name, "latest")
  response = client.access_secret_version(name)
  return response.payload.data.decode("utf-8")

class IncrementalAndDeletedRecordFilter(beam.DoFn):
  def process(self, element):
    # Filter the data to only include incremental and deleted records.
    # This can be done by checking the watermark or hash value of the record.
    if element['watermark'] > self.last_watermark:
      self.last_watermark = element['watermark']
      yield element
    elif element['is_deleted']:
      yield element

class BigQueryToCloudSQLTransform(beam.PTransform):
  def __init__(self, cloud_sql_instance, cloud_sql_database, cloud_sql_table):
    self.cloud_sql_instance = cloud_sql_instance
    self.cloud_sql_database = cloud_sql_database
    self.cloud_sql_table = cloud_sql_table

  def expand(self, pcoll):
    # Write the incremental and deleted records to CloudSQL using a JDBCIO.Write() transform.
    # This transform can be configured to use a dynamic UPSERT query for incremental records and a dynamic DELETE query for deleted records.
    return (pcoll | 'FilterIncrementalAndDeletedRecords' >> beam.ParDo(IncrementalAndDeletedRecordFilter())
            | 'WriteIncrementalAndDeletedRecordsToCloudSQL' >> beam.io.WriteToJDBC(
                jdbc_url='jdbc:postgresql://' + self.cloud_sql_instance + '/' + self.cloud_sql_database,
                driver_name='postgresql',
                username=get_secret('CLOUDSQL_USERNAME'),
                password=get_secret('CLOUDSQL_PASSWORD'),
                table=self.cloud_sql_table,
                write_disposition=beam.io.WriteToJDBC.WriteDisposition.WRITE_APPEND
            ))

# Create a Dataflow pipeline.
pipeline = beam.Pipeline()

# Read the BigQuery table.
(pipeline
 | 'ReadBigQueryTable' >> beam.io.ReadFromBigQuery(
     table='<project_id>.<dataset_id>.<table_id>'
 ))

# Sync the incremental and deleted records to CloudSQL.
(pipeline
 | 'SyncIncrementalAndDeletedRecordsToCloudSQL' >> BigQueryToCloudSQLTransform(
     cloud_sql_instance='<cloud_sql_instance>',
     cloud_sql_database='<cloud_sql_database>',
     cloud_sql_table='<cloud_sql_table>'
 ))

# Run the pipeline.
pipeline.run()

Yes, Dataflow is a good choice for incremental and deleted record sync from BigQuery to CloudSQL. It is a highly scalable and reliable service that can handle large volumes of data. Dataflow also provides a unified programming model for batch and stream processing, which makes it easy to implement complex data processing pipelines.

In addition, Dataflow provides a number of features that are useful for incremental and deleted record sync, such as:

  • Support for parameterized queries: This allows you to construct dynamic SQL queries at runtime, in a safe way that avoids SQL injection vulnerabilities.
  • Support for batch processing:Dataflow can be used to process batch data in a highly scalable and efficient way.
  • Support for credentials rotation:Dataflow can be configured to rotate credentials at runtime,which improves security.
  • Support for error handling:Dataflow provides a number of built-in error handling features,such as retries and logging. This can help to ensure that the pipeline continues to operate even if there are errors.
  • Support for optimization:Dataflow provides a number of features that can be used to optimize pipelines, such as batching writes and using different methods to detect changes in BigQuery.

Overall, Dataflow is a powerful and flexible tool that can be used to implement incremental and deleted record sync from BigQuery to CloudSQL in an efficient and secure way.

Hi @ms4446 ,

Can you help me to to acheive this
In my case the cloud sql instance ip will be part of pub sub message and then by reading message i need to get the secret.
Moreover i also need to get the records from bigquery in a page manner and than create a query to write a upsert query.

Since we are using dataflow how it will effect the postgres connection since dataflow will spawn/thread this connection will it not increase the number of connections on sql server?

  • Getting the Cloud SQL instance IP address from Pub/Sub message and using it to get the secret:
  • Read the Pub/Sub message: Use the beam.io.ReadFromPubSub transform to read the Pub/Sub message.
  • Extract the Cloud SQL instance IP address: If the entire Pub/Sub message is the IP address, you can decode it directly. If the message contains other data, you'll need to parse the message to extract the IP address.
class ExtractCloudSQLInstanceIP(beam.DoFn):
  def process(self, element):
    # Extract the Cloud SQL instance IP address from the Pub/Sub message.
    cloud_sql_instance_ip = element.decode('utf-8')

    # Yield the Cloud SQL instance IP address.
    yield cloud_sql_instance_ip

Hi ,

How to pass the dynamic table_Id(received from pubsub) to ReadFromBigQuery.

I am also getting WritetoJDBC module not found i am on apache-beam==2.50.0 version

To pass the dynamic table ID received from Pub/Sub to ReadFromBigQuery, you can use the following steps:

  1. Use the beam.io.ReadFromPubSub transform to read the message from Pub/Sub.
  2. Extract the table ID from the message.
  3. Use a custom transform to read from BigQuery based on the dynamic table ID.
import apache_beam as beam

class ExtractAndReadFromBigQuery(beam.DoFn):
  def process(self, element):
    table_id = element.decode('utf-8')
    with beam.io.BigQuerySource(table=table_id).reader() as reader:
      for row in reader:
        yield row

(pipeline
 | 'ReadPubSubMessage' >> beam.io.ReadFromPubSub(topic='<pubsub_topic>')
 | 'ReadFromBigQuery' >> beam.ParDo(ExtractAndReadFromBigQuery())
 | 'ProcessRecords' >> ...)

Hi,

class ExtractAndReadFromBigQuery(beam.DoFn):
    def process(self, element):
        pub_data = element.decode('utf-8')
        pub_data = json.loads(pub_data)
        table_id = pub_data['bigquery_table_info']['project_id'] + ':' + pub_data['bigquery_table_info']['dataset_id'] + '.' + pub_data['bigquery_table_info']['table_id']
        with beam.io.ReadFromBigQuery(table=table_id) as reader:
            for row in reader:
                yield row

I am getting error

 File "c:\Users\nitish\dfbatchsyncer.py", line 19, in process
    with beam.io.ReadFromBigQuery(table=table_id) as reader:
AttributeError: __enter__ [while running 'ReadFromBigQuery']

 

The error you're seeing is because beam.io.ReadFromBigQuery is not designed to be used as a context manager with the with statement in the way you're trying to use it. Instead, it's a PTransform that should be applied to a pipeline.

To dynamically read from BigQuery based on the table ID from the Pub/Sub message, you'll need a different approach. Here's one way to do it:

  1. Use a ParDo transform to extract the table ID from the Pub/Sub message and then use that table ID to read from BigQuery.
  2. Return the rows from BigQuery as the output of the ParDotransform.

Here's a revised version of your code:

 
import apache_beam as beam
import json

class ExtractAndReadFromBigQuery(beam.DoFn):
  def process(self, element):
    pub_data = element.decode('utf-8')
    pub_data = json.loads(pub_data)
    table_id = pub_data['bigquery_table_info']['project_id'] + ':' + pub_data['bigquery_table_info']['dataset_id'] + '.' + pub_data['bigquery_table_info']['table_id']
     
    # Use a direct read from BigQuery
    bq_rows = beam.io.BigQuerySource(table=table_id).read_records(None, None)
    for row in bq_rows:
      yield row

(pipeline
 | 'ReadPubSubMessage' >> beam.io.ReadFromPubSub(topic='<pubsub_topic>')
 | 'ExtractAndReadFromBigQuery' >> beam.ParDo(ExtractAndReadFromBigQuery())
 | 'ProcessRecords' >> ...)