Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Push Data from Big Query to Cloud SQL

Hi Team,

 

I want to push Data from Big Query to Cloud SQL in batch mode.

Please suggest me the simplest way to do the same. Please share some sample code in Python apache beam/ dataflow etc.

@ms4446 Need your help here.

Thanks !!

1 6 2,785
6 REPLIES 6

There are several approaches are suitable for this task:

  1. Cloud Dataflow (Apache Beam): A great choice for scalable and flexible batch data transfers.

  2. Cloud Functions & Cloud Scheduler: Ideal for smaller datasets or less frequent transfers. This offers a serverless, code-centric solution.

  3. Cloud Composer (Apache Airflow): Well-suited for orchestrating complex batch workflows that might involve multiple steps beyond just BigQuery to Cloud SQL transfer.

Below is a simplified example of how you can use Apache Beam with the Python SDK to read data from BigQuery and write it to Cloud SQL (PostgreSQL or MySQL) in batch mode. This example assumes you have a basic understanding of Python, Google Cloud Platform, and have both Google Cloud SDK and Apache Beam Python SDK set up in your environment.

Prerequisites

  • Google Cloud SDK installed and initialized.
  • Apache Beam Python SDK installed in your environment. You can install it using pip:
     
    pip install apache-beam[gcp]
  • A BigQuery dataset and table from which data will be exported.
  • A Cloud SQL instance (PostgreSQL or MySQL) and a database where data will be imported, including the necessary table(s) with appropriate schema(s).
 
import apache_beam as beam 
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import ReadFromBigQuery 

class WriteToCloudSql(beam.DoFn):
    def start_bundle(self): 
        # Use SQLAlchemy for database operations. Ensure proper dependencies.
        from sqlalchemy import create_engine 

        # Connection string (adjust for your SQL database type)
        db_engine = create_engine('your_connection_string_here') 
        self.connection = db_engine.connect() 

    def process(self, element):
        try:
            # Adjust SQL statement for your schema and type compatibility
            insert_statement = "INSERT INTO your_table_name (column1, column2) VALUES (%s, %s)" 
            self.connection.execute(insert_statement, (element['column1'], element['column2'])) 
        except Exception as e:
            print(f"Error inserting record: {e}") 

    def finish_bundle(self):
        self.connection.close() 

def run():
    options = PipelineOptions(
        project='your_project',
        runner='DataflowRunner',
        temp_location='gs://your_temp_location/temp',
        region='your_region', 
        # More options as needed
    ) 

    p = beam.Pipeline(options=options)
    query = 'SELECT column1, column2 FROM your_dataset.your_table' 

    (p | 'ReadFromBigQuery' >> ReadFromBigQuery(query=query, use_standard_sql=True) 
       | 'WriteToCloudSql' >> beam.ParDo(WriteToCloudSql()) 
    )

    result = p.run()
    result.wait_until_finish() 

if __name__ == '__main__':
    run() 

Key Considerations and Enhancements

  • Dependency Management:
    • Use pip or virtualenv to manage SQLAlchemy and database connectors (e.g., PyMySQL, pg8000).
    • This creates a reproducible environment.
  • Dataflow Execution:
     
    python bq_to_cloudsql.py \
        --runner=DataflowRunner \
        --project=your_project \
        --temp_location=gs://your_temp_location/temp \
        --region=your_region 
    
  • Schema Mapping:
    • Ensure data types are compatible between BigQuery and your Cloud SQL database. Implement transformations if needed.
  • Batching & Write Disposition:
    • For BigQuery writes, consider write_disposition (e.g., BigQueryDisposition.WRITE_APPEND).
  • Error Handling & Logging:
    • Robust error handling (try-except) and logging are essential for production pipelines.
  • Security & Connections:
    • Use secure connections! This could mean a Cloud SQL proxy when using Dataflow.
  • Efficiency:
    • Batch writes to Cloud SQL to improve performance.

Subject: Issue with Inserting Records into Cloud SQL from BigQuery using Apache Beam – "List argument must consist only of tuples or dictionaries"

Hello Community,
I am working on a data pipeline that reads data from BigQuery and writes it into Cloud SQL (MySQL) using Apache Beam. The pipeline seems to be running, but I'm encountering the following error when trying to insert records into Cloud SQL:

Error inserting record: List argument must consist only of tuples or dictionaries

Here’s a simplified version of my code:

 

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from sqlalchemy import create_engine
import logging

class WriteToCloudSql(beam.DoFn):
    def start_bundle(self):
        from sqlalchemy import create_engine

        # Replace with your actual MySQL instance details
        db_engine = create_engine('mysql+pymysql://<username>:<password>@<hostname>:<port>/<database>')
        self.connection = db_engine.connect()

    def process(self, element):
        try:
            logging.info(f"Processing element: {element}")
            insert_statement = "INSERT INTO <table_name> (emp_id, emp_name, emp_department) VALUES (%s, %s, %s)"
            self.connection.execute(insert_statement, (element['emp_id'], element['emp_name'], element['emp_department']))
            yield element
        except Exception as e:
            logging.error(f"Error inserting record: {e}")

    def finish_bundle(self):
        if hasattr(self, 'connection') and self.connection:
            self.connection.close()

def run():
    options = PipelineOptions(
        project='<your_project_id>',
        runner='DataflowRunner',
        temp_location='gs://<your_temp_bucket>/',
        region='<your_region>',
        network='projects/<your_project_id>/global/networks/<your_network>',
        subnetwork='regions/<your_region>/subnetworks/<your_subnet>',
        service_account_email='<your_service_account_email>',
        cloudsql_instances='<your_project_id>:<your_region>:<your_mysql_instance>'
    )

    p = beam.Pipeline(options=options)
    query = 'SELECT emp_id, emp_name, emp_department FROM <your_project_id>.<your_dataset>.<your_table>'

    (p | 'ReadFromBigQuery' >> ReadFromBigQuery(query=query, use_standard_sql=True)
       | 'WriteToCloudSql' >> beam.ParDo(WriteToCloudSql())
    )

    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
    run()

 

Issue:

The records are being processed correctly, but when trying to insert them into Cloud SQL, I’m seeing this error:

Error inserting record: List argument must consist only of tuples or dictionaries

Can you provide a similar solution in java apache beam? I am not able to find any solution which works with JAVA apache beam. 

Creating a data pipeline using Apache Beam in Java to transfer data from Google BigQuery to Cloud SQL involves similar steps as in Python, but with Java-specific implementations. Below is an example of how you can set up such a pipeline using Apache Beam's Java SDK.

Prerequisites

  • Apache Beam Java SDK (ensure it's added to your project dependencies)
  • JDBC driver for your specific Cloud SQL instance (MySQL or PostgreSQL)
  • Google Cloud SDK setup for authentication and permissions

Maven Dependencies

First, add the necessary dependencies in your pom.xml for Apache Beam and the JDBC driver:

 
<dependencies>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.35.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
        <version>2.35.0</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
</dependencies>

Here’s a basic example in Java, which reads data from BigQuery and writes it to a Cloud SQL instance using JDBC.

 
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import java.sql.PreparedStatement;

public class BigQueryToCloudSQL {

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        String query = "SELECT column1, column2 FROM `your_project.your_dataset.your_table`";
        
        PCollection<KV<String, String>> bqData = p.apply("ReadFromBigQuery", BigQueryIO.readTableRows()
            .fromQuery(query)
            .usingStandardSql());

        String jdbcUrl = "jdbc:mysql://your-cloudsql-instance-ip:3306/database_name";
        String dbUsername = "your-db-username";
        String dbPassword = "your-db-password";

        bqData.apply("WriteToCloudSQL", JdbcIO.<KV<String, String>>write()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                "com.mysql.cj.jdbc.Driver", jdbcUrl)
                .withUsername(dbUsername)
                .withPassword(dbPassword))
            .withStatement("insert into your_table (column1, column2) values (?, ?)")
            .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<String, String>>() {
                public void setParameters(KV<String, String> element, PreparedStatement query) throws Exception {
                    query.setString(1, element.getKey());
                    query.setString(2, element.getValue());
                }
            }));

        p.run().waitUntilFinish();
    }
}

Key Points:

  • BigQuery IO: This code snippet reads from a BigQuery table using a query.
  • JDBC IO: It writes to a Cloud SQL database using JDBC. Make sure to replace the JDBC URL, username, and password with your actual Cloud SQL instance details.
  • Error Handling: Implement robust error handling and logging for production use.
  • Performance Optimization: Depending on the size of your data, consider tuning the pipeline options, such as the number of workers and their machine type.

This basic setup should get you started with transferring data from BigQuery to Cloud SQL using Java Apache Beam. Adjust the SQL query, table schema, and connection parameters according to your specific requirements.

The above code is resulting into below error - Cannot create PoolableConnectionFactory (Communications link failure)

I am using direct runner , trying to run this locally and using cloudsql - MYSQL instance with public IP. All the required APIs are enabled too. Not sure what's the cause of the issue! 

The error message "Cannot create PoolableConnectionFactory (Communications link failure)" usually indicates a connectivity issue between your application and the MySQL Cloud SQL instance. This can occur for several reasons, such as network issues, incorrect database credentials, or misconfigured JDBC connection settings. Here are some steps and considerations to troubleshoot and resolve this issue:

  1. Verify JDBC URL Ensure that the JDBC URL is correctly formatted and includes all necessary parameters. For a MySQL instance, it generally looks like this:
jdbc:mysql://<IP-ADDRESS>:3306/<DATABASE>?useSSL=false

Make sure to replace <IP-ADDRESS>, <DATABASE> with your actual Cloud SQL instance's public IP address and the database name. The useSSL=false parameter can be adjusted based on whether you are using SSL or not.

  1. Test Network Connectivity Since you're using a public IP, ensure that your local machine can reach the Cloud SQL instance's IP. You can test this by attempting to connect using a standard MySQL client tool like mysql from your command line:
mysql -h <IP-ADDRESS> -u <USER> -p

If you cannot connect, there might be a network issue or firewall settings blocking your connection.

  1. Firewall and IP Allowlisting Check the Cloud SQL instance's settings to ensure that your local machine's IP address is allowlisted. In Google Cloud SQL, you need to add your IP address to the list of authorized networks.

  2. Credentials and Permissions Double-check the username and password used in the connection string. Ensure the database user has the correct privileges for the operations your application is trying to perform.

  3. Use Cloud SQL Proxy For local development and to simplify connectivity without worrying about IP allowlisting, consider using the Cloud SQL Proxy. The proxy provides secure access to your Cloud SQL instance without the need for allowlisting IPs or configuring SSL. To use the proxy:

  • Download and start the Cloud SQL Proxy:
./cloud_sql_proxy -instances=your-gcp-project:your-region:your-instance=tcp:3306
  • Then, modify your JDBC URL to connect locally:
 
String jdbcUrl = "jdbc:mysql://localhost:3306/database_name";

This setup redirects the database calls to the local port where the proxy is listening, which tunnels the traffic securely to the Cloud SQL instance.

  1. Logging and Error Details Increase the logging level in your Java application to capture more detailed error information from the JDBC driver and Apache Beam. This might provide more insights into what might be going wrong.

Here's an example of how you might configure the logger (if using SLF4J with Logback for instance):

 
<logger name="org.apache.beam" level="DEBUG"/>
<logger name="com.mysql.cj.jdbc" level="DEBUG"/>

If the problem persists after these checks, it may be beneficial to examine the exact configurations and network settings further or consult Google Cloud support with specific error logs and details.