How can i run multiple queries in dataflow template?

I am performing MSSql to bigquery data pipeline job using jdbc to bigquery template in dataflow.

Since, I'm working with old MsSQL server version I'm building Change Data Capture logic on my own and capture changes and store it in the separate table.

After that Im doing  incremental load to bigquery. So, after each batch load i want to insert last batch run timestamp details into source (mssql) table. So, based on this my CDC logic will check new changes after last batch runtime.

How can i perform both select and insert queries in dataflow template?  

1 3 775
3 REPLIES 3

In Dataflow, you can perform both SELECT and INSERT queries using a combination of transforms provided by the Apache Beam SDK which Dataflow is based on. The JDBC IO connector is particularly useful for this. Below is a rough idea of how you might implement it:

  1. Read from the MSSQL database using JdbcIO.read(). This allows you to use a SELECT query to fetch data from the MSSQL database:

     
    PCollection<YourDataType> data = pipeline .apply("Read from MSSQL", JdbcIO.<YourDataType>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("jdbc:sqlserver://[your-server];databaseName=[your-database]", "[username]", "[password]")) .withQuery("SELECT * FROM your_table WHERE timestamp > last_batch_timestamp") .withRowMapper(new JdbcIO.RowMapper<YourDataType>() { publicYourDataType mapRow(ResultSet resultSet) throws Exception { // map the result set to your data type } }) .withCoder(AvroCoder.of(YourDataType.class)));
  2. Write the data to BigQuery using BigQueryIO.write(). This will load the data you fetched from the MSSQL database into a BigQuery table:

     
    data.apply("Write to BigQuery", BigQueryIO.writeTableRows() .to("your-project:your_dataset.your_table") .withJsonSchema("your table schema") .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
  3. After writing to BigQuery, you can then perform an INSERT operation back to the MSSQL database using JdbcIO.write(). This will allow you to update the last batch run timestamp:

     
    data.apply("Update last batch run timestamp", JdbcIO.<YourDataType>write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("jdbc:sqlserver://[your-server];databaseName=[your-database]", "[username]", "[password]")) .withStatement("INSERT INTO your_table (last_batch_run_timestamp) VALUES (?)") .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<YourDataType>() { public void setParameters(YourDataType element, PreparedStatement query) throwsException { // set the parameters for your insert statement } }));

Please replace YourDataType with the actual class you're using to represent your data.

Please note that this is a very simplified version of the code you would need to implement this pipeline, and you would need to handle errors, retries, and other considerations in a production environment.

Also, remember that using JdbcIO in Apache Beam requires the appropriate JDBC driver for MSSQL, which should be included in your classpath. The JDBC URL, username, and password should also be updated to match your actual database connection details.

Hi @ms4446 ,

Thank you so much for your help and detailed explanation. So, in order to run this, i have to create custom template, right? So, my another question is, when i create the custom template, do i have to create classic or flex template? What is your suggestion?

Regards

Yes, you are correct. In order to run the pipeline as described, you would need to create a custom template.

Dataflow supports two types of templates: Flex templates and classic templates.

Classic templates contain a JSON serialization of a Dataflow job graph and require runtime parameters to be wrapped in the ValueProvider interface. This interface allows users to specify parameter values when they deploy the template.

Flex templates, on the other hand, package the pipeline as a Docker image, along with a template specification file in Cloud Storage. When you run the template, the Dataflow service starts a launcher VM, pulls the Docker image, and runs the pipeline. The execution graph is dynamically built based on runtime parameters provided by the user.

There are several advantages of Flex templates over classic templates:

  • Flex templates do not require the ValueProvider interface for input parameters. Not all Dataflow sources and sinks support ValueProvider.
  • While classic templates have a static job graph, Flex templates can dynamically construct the job graph. For example, the template might select a different I/O connector based on input parameters.
  • Flex templates can perform preprocessing on a virtual machine (VM) during pipeline construction. For example, it might validate input parameter values.

Given these advantages, if you are creating a new Dataflow template, it is recommended to create it as a Flex template​.

For more details see:

https://cloud.google.com/dataflow/docs/concepts/dataflow-templates