Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataflow Flex Template - Multiple Custom PipelineOptions (Java)

Good day, I have the following flex template that works perfectly when I have only one custom interface that extends from PipelineOptions:

 

 

public class JobFlexTemplate {

    public interface JobOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getJobName();

    void setJobName(String value);

    // imagine more options, some shared for both jobs and some specific for each job

    }


    public static void main(String[] args) {
        
        var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JobOptions.class);
        var pipeline = Pipeline.create(options);

        if (options.getJobName().equalsIgnoreCase("job1")) {
            // here could also be extract the necesary params from options and pass to the Job1 Constructor
            pipeline = new Job1(pipeline, options).build();
        } else if (options.getJobName().equalsIgnoreCase("job2")) {
            // here could also be extract the necesary params from options and pass to the Job2 Constructor
            pipeline = new Job2(pipeline, options).build();
        } else{
            // a default job with some options too
        }

        pipeline.run();

    }

}

 

 

Now when im having the trouble, is in a scenario where I want to have more interfaces to better group or organize the PipelineOptions that each job can have and not put all of them into one interface, even if some job does not use them:

 

 

public class JobFlexTemplate {

    public interface JobOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getJobName();

    void setJobName(String value);

    }

    // More interfaces that extend JobOptions and then have their own specific params

    public interface DummyOptions extends JobOptions {

        @Description("A dummy option")
        @Default.String("Dummy")
        String getDummyOption();

        void setDummyOption(String value);

    }

    public static void main(String[] args) {
        
        var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JobOptions.class);
        var pipeline = Pipeline.create(options);

        if (options.getJobName().equalsIgnoreCase("job1")) {
            var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
            pipeline = new Job1(pipeline, job1Options).build();
        } else if (options.getJobName().equalsIgnoreCase("job2")) {
            var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
            pipeline = new Job2(pipeline, job2Options).build();
        } else {
            var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
            final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
            wordsList.add(dummyOptions.getDummyOption());
            pipeline.apply(Create.of(wordsList))
                    .apply("Log Words", ParDo.of(new LogElements()));
        }

        pipeline.run();

    }

}

 

 

If I create a template with this last approach of having multiple interface for each job then the template is failing to run with the following error (JobOptions does not have dummyOption):

Exception in thread "main" ; see consecutive INFO logs for details.

java.lang.IllegalArgumentException: Class interface com.example.dataflow.options.JobOptions missing a property named 'dummyOption'.

So yes that is the problem, Im trying to figure out a way to do this without having a massive interface with all the options of all my jobs, but not sure if this is possible or the correct approach, tried to also remove the .withValidation() method from the:

 

var options = PipelineOptionsFactory.fromArgs(args).as(JobOptions.class);
 
and use it at the custom options setting:
 
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
 
But no success, another way I was thinking but have not tried is to create a single interface that extends from PipelineOptions that would look like:
 

 

 

public interface DynamicOptions extends PipelineOptions {
    @Description("Job to execute")
    String getJobName();

    void setJobName(String value);

    @Description("Dynamic options")
    Map<String, String> getDynamicOptions();

    void setDynamicOptions(Map<String, String> value);
}

 

 

 
but would really prefer to have a interface for the options of each job.
 
Finally not sure how would it also impact the metadata.json file, maybe this one will have to be as big as all the total options, or maybe I can have a different metadata.json for each job, like metadatajob1.json, metadatajob2.json, and build each job template with each .json file, any ideas or solutions for this both things? Thank you.
Solved Solved
0 6 1,017
1 ACCEPTED SOLUTION

To implement a solution that allows the usage of multiple interfaces for different jobs, you can dynamically parse and validate the options for each specific job within the main method. This ensures that only the necessary options for each job are used. See below for a Step-by-Step approach:

1. Define the Interfaces:

  • Define separate interfaces for each job's specific options.
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

public interface JobOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getJobName();

    void setJobName(String value);
}

public interface DummyOptions extends JobOptions {
    @Description("A dummy option")
    @Default.String("Dummy")
    String getDummyOption();

    void setDummyOption(String value);
}

// Define other job-specific options interfaces...

2. Dynamic Parsing in Main Method:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class JobFlexTemplate {

    public static void main(String[] args) {
        // Parse initial job options without validation
        var baseOptions = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(JobOptions.class);
        String jobName = baseOptions.getJobName();
        Pipeline pipeline = null;

        // Parse job-specific options based on the job name
        if ("job1".equalsIgnoreCase(jobName)) {
            var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
            pipeline = Pipeline.create(job1Options);
            pipeline = new Job1(pipeline, job1Options).build();
        } else if ("job2".equalsIgnoreCase(jobName)) {
            var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
            pipeline = Pipeline.create(job2Options);
            pipeline = new Job2(pipeline, job2Options).build();
        } else {
            var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
            pipeline = Pipeline.create(dummyOptions);
            final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
            wordsList.add(dummyOptions.getDummyOption());
            pipeline.apply(Create.of(wordsList))
                    .apply("Log Words", ParDo.of(new LogElements()));
        }

        pipeline.run();
    }
}

3. DynamicOptions Interface:

  • Use DynamicOptions to handle generic dynamic options.
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

import java.util.Map;

public interface DynamicOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getTestingJobName();

    void setTestingJobName(String value);

    @Description("Dynamic options")
    Map<String, String> getDynamicOptions();

    void setDynamicOptions(Map<String, String> value);
}

4. metadata.json:

{
  "name": "Test template flex",
  "description": "An example flex template for Java.",
  "parameters": [
    {
      "name": "testingJobName",
      "label": "Job to execute",
      "helpText": "the value of the job to execute.",
      "regexes": [
        ".*"
      ]
    },
    {
      "name": "dynamicOptions",
      "label": "Dynamic options",
      "helpText": "JSON string representing a map of option names to values.",
      "isOptional": false,
      "regexes": [
        ".*"
      ]
    }
  ]
}

5. Passing Parameters:

--testingJobName="thenameofthejobyouwanttoexecute" \
--dynamicOptions="{'bigqueryTable':'my_table','connectionErrorCodes':'error1,error2','informDate':'2024-06-01'}"

6. Usage Example in the Main Method:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class JobFlexTemplate {

    public static void main(String[] args) {
        // Parse initial job options without validation
        var dynamicOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicOptions.class);
        String jobName = dynamicOptions.getTestingJobName();
        Map<String, String> dynamicParams = dynamicOptions.getDynamicOptions();
        Pipeline pipeline = null;

        // Parse job-specific options based on the job name
        if ("job1".equalsIgnoreCase(jobName)) {
            var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
            pipeline = Pipeline.create(job1Options);
            pipeline = new Job1(pipeline, job1Options).build();
        } else if ("job2".equalsIgnoreCase(jobName)) {
            var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
            pipeline = Pipeline.create(job2Options);
            pipeline = new Job2(pipeline, job2Options).build();
        } else {
            // Assuming DummyOptions is used for a default job
            pipeline = Pipeline.create(dynamicOptions);
            final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
            wordsList.add(dynamicParams.getOrDefault("bigqueryTable", "DefaultTable"));
            pipeline.apply(Create.of(wordsList))
                    .apply("Log Words", ParDo.of(new LogElements()));
        }

        pipeline.run();
    }
}

 

  • Dynamic Parsing: The code dynamically parses the job options using PipelineOptionsFactory based on the job name provided. This way, each job can have its own specific options interface.
  • DynamicOptions: This interface is used to capture generic dynamic options that can be passed in as a JSON string.
  • metadata.json: This configuration file specifies the required parameters for the template.
  • Parameter Passing: Parameters are passed to the template using a JSON string for dynamic options, allowing flexibility in specifying various options for different jobs.

 

View solution in original post

6 REPLIES 6

The DynamicOptions approach works, but if someone knows how to make it work with the multiple interfaces approach or another way I appreciate the help, im sharing the DynamicOptions approach:

 

Interface:

import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

import java.util.Map;

public interface DynamicOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getTestingJobName();

    void setTestingJobName(String value);

    @Description("Dynamic options")
    Map<String, String> getDynamicOptions();

    void setDynamicOptions(Map<String, String> value);
}

metadata.json:

{
  "name": "Test template flex",
  "description": "An example flex template for Java.",
  "parameters": [
    {
      "name": "testingJobName",
      "label": "Job to execute",
      "helpText": "the value of the job to execute.",
      "regexes": [
        ".*"
      ]
    },
    {
      "name": "dynamicOptions",
      "label": "Dynamic options",
      "helpText": "JSON string representing a map of option names to values.",
      "isOptional": false,
      "regexes": [
        ".*"
      ]
    }
  ]
}

Use it:

        var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicOptions.class);
        var pipeline = Pipeline.create(options);
        Map<String, String> dynamicOptions = options.getDynamicOptions();
// get any option from the map
String bigqueryTable = dynamicOptions.getOrDefault("bigqueryTable","");

 How could you pass the parameter? Example:

--testingJobName="thenameofthejobyouwanttoexecute" \
--dynamicOptions="{'bigqueryTable':'my_table','connectionErrorCodes':'error1,error2','informDate':'2024-06-01'}"

 

To implement a solution that allows the usage of multiple interfaces for different jobs, you can dynamically parse and validate the options for each specific job within the main method. This ensures that only the necessary options for each job are used. See below for a Step-by-Step approach:

1. Define the Interfaces:

  • Define separate interfaces for each job's specific options.
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

public interface JobOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getJobName();

    void setJobName(String value);
}

public interface DummyOptions extends JobOptions {
    @Description("A dummy option")
    @Default.String("Dummy")
    String getDummyOption();

    void setDummyOption(String value);
}

// Define other job-specific options interfaces...

2. Dynamic Parsing in Main Method:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class JobFlexTemplate {

    public static void main(String[] args) {
        // Parse initial job options without validation
        var baseOptions = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(JobOptions.class);
        String jobName = baseOptions.getJobName();
        Pipeline pipeline = null;

        // Parse job-specific options based on the job name
        if ("job1".equalsIgnoreCase(jobName)) {
            var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
            pipeline = Pipeline.create(job1Options);
            pipeline = new Job1(pipeline, job1Options).build();
        } else if ("job2".equalsIgnoreCase(jobName)) {
            var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
            pipeline = Pipeline.create(job2Options);
            pipeline = new Job2(pipeline, job2Options).build();
        } else {
            var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
            pipeline = Pipeline.create(dummyOptions);
            final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
            wordsList.add(dummyOptions.getDummyOption());
            pipeline.apply(Create.of(wordsList))
                    .apply("Log Words", ParDo.of(new LogElements()));
        }

        pipeline.run();
    }
}

3. DynamicOptions Interface:

  • Use DynamicOptions to handle generic dynamic options.
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

import java.util.Map;

public interface DynamicOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getTestingJobName();

    void setTestingJobName(String value);

    @Description("Dynamic options")
    Map<String, String> getDynamicOptions();

    void setDynamicOptions(Map<String, String> value);
}

4. metadata.json:

{
  "name": "Test template flex",
  "description": "An example flex template for Java.",
  "parameters": [
    {
      "name": "testingJobName",
      "label": "Job to execute",
      "helpText": "the value of the job to execute.",
      "regexes": [
        ".*"
      ]
    },
    {
      "name": "dynamicOptions",
      "label": "Dynamic options",
      "helpText": "JSON string representing a map of option names to values.",
      "isOptional": false,
      "regexes": [
        ".*"
      ]
    }
  ]
}

5. Passing Parameters:

--testingJobName="thenameofthejobyouwanttoexecute" \
--dynamicOptions="{'bigqueryTable':'my_table','connectionErrorCodes':'error1,error2','informDate':'2024-06-01'}"

6. Usage Example in the Main Method:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class JobFlexTemplate {

    public static void main(String[] args) {
        // Parse initial job options without validation
        var dynamicOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicOptions.class);
        String jobName = dynamicOptions.getTestingJobName();
        Map<String, String> dynamicParams = dynamicOptions.getDynamicOptions();
        Pipeline pipeline = null;

        // Parse job-specific options based on the job name
        if ("job1".equalsIgnoreCase(jobName)) {
            var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
            pipeline = Pipeline.create(job1Options);
            pipeline = new Job1(pipeline, job1Options).build();
        } else if ("job2".equalsIgnoreCase(jobName)) {
            var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
            pipeline = Pipeline.create(job2Options);
            pipeline = new Job2(pipeline, job2Options).build();
        } else {
            // Assuming DummyOptions is used for a default job
            pipeline = Pipeline.create(dynamicOptions);
            final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
            wordsList.add(dynamicParams.getOrDefault("bigqueryTable", "DefaultTable"));
            pipeline.apply(Create.of(wordsList))
                    .apply("Log Words", ParDo.of(new LogElements()));
        }

        pipeline.run();
    }
}

 

  • Dynamic Parsing: The code dynamically parses the job options using PipelineOptionsFactory based on the job name provided. This way, each job can have its own specific options interface.
  • DynamicOptions: This interface is used to capture generic dynamic options that can be passed in as a JSON string.
  • metadata.json: This configuration file specifies the required parameters for the template.
  • Parameter Passing: Parameters are passed to the template using a JSON string for dynamic options, allowing flexibility in specifying various options for different jobs.

 

Hey @ms4446  thank you for your response, I am facing a different issue about not being able to pass the json string parameter when trying to manually run the flex template (this flex template is working perfectly when being call from python cloud function, because the cloud function has no trouble in sending a proper json for the dynamic options), but when trying to run from the cloud shell I can not figure out how to pass the parameters flag properly, here is how im trying (I am trying also with this doc but could not see an example with a json parameter https://cloud.google.com/sdk/gcloud/reference/topic/escaping):

I have a .env that contains the dynamicOptions string:

dynamicOptions='{"bigquery_table":"table_name","report_date":"2024-05-28"}'

then at a run.sh file I have:

 

 

#!/bin/bash

source ./.env

gcloud dataflow flex-template run "my-manual-run" \
--template-file-gcs-location "gs:://bucket/my-template.json" \
--parameters ^~^testingJobName="96.b"~dynamicOptions=$dynamicOptions \
--region "us-east4" \
--subnetwork=asubnetwork

 

 

the problem is that the --parameters flag, I can not figure out how to properly pass the json string so that it is valid, I get errors like:

ERROR: (gcloud.dataflow.flex-template.run) unrecognized arguments:
"report_date":
"2024-05-28"

And so on all the json I pass


Also tried to define it like this trying to follow this documentation https://cloud.google.com/sdk/gcloud/reference/topic/escaping
dynamicOptions='bigquery_table="table_name",report_date="2024-05-28"'

but then when the dataflow template executes the job fails because Jackson cannot parse the string bigquery_table="table_name",report_date="2024-05-28 to a valid Map

So yes Im stuck in this part, as I said this template works when called from cloud function because python has no problem in passing a good json, but in bash, and using the --parameters flag, have not been able to pass a good json.

Also at step 5 of your reply another way that it works to be able to run it directly (not the flex template which I havent figured out) is:

run.sh file:

#!/bin/bash

source ./.env

mvn clean compile exec:java -P dataflow-runner -Dexec.args=" --project=${project} \
--runner=DataflowRunner \
--region=${region} \
--tempLocation=${tempLocation} \
--serviceAccount=${serviceAccount} \
--testingJobName=${testingJobName} \
--dynamicOptionsJson='${dynamicOptionsJson}' \
"

.env file:

dynamicOptionsJson='{"bigquery_table": "table", "report_date": "2024-05-28"}'

But yea that is just to give clarity, im still stuck with the problem about the flex template --parameters flag

When running Google Cloud Dataflow Flex Templates from the command line, passing a JSON string as a parameter can be tricky due to shell and gcloud escaping rules. Here’s how you can properly format and pass JSON parameters to a Flex Template using the --parameters flag.

Step-by-Step Solution

Escape JSON Properly:

Ensure that the JSON string is properly escaped so that it is interpreted correctly by gcloud. You can use single quotes to wrap the entire JSON string and escape inner quotes.

Example with .env and run.sh:

.env file:

 
dynamicOptions='{\"bigquery_table\":\"table_name\",\"report_date\":\"2024-05-28\"}'

run.sh file:

 
#!/bin/bash

source ./.env

gcloud dataflow flex-template run "my-manual-run" \
--template-file-gcs-location "gs://bucket/my-template.json" \
--parameters ^~^testingJobName="96.b"~dynamicOptions="${dynamicOptions}" \
--region "us-east4" \
--subnetwork=asubnetwork

Detailed Steps:

  1. Escape the JSON String in .env File: In your .env file, escape the double quotes inside the JSON string. Here’s the corrected format:

     
    dynamicOptions='{\"bigquery_table\":\"table_name\",\"report_date\":\"2024-05-28\"}'
    

    This escaping ensures that the JSON string remains intact when read by gcloud.

  2. Use the Correct Delimiter for --parameters: In the run.sh script, ensure the delimiter (^~^) matches the escaping style. In this case, use double quotes around ${dynamicOptions} to ensure the entire string is passed correctly:

     
    #!/bin/bash
    
    source ./.env
    
    gcloud dataflow flex-template run "my-manual-run" \
    --template-file-gcs-location "gs://bucket/my-template.json" \
    --parameters ^~^testingJobName="96.b"~dynamicOptions="${dynamicOptions}" \
    --region "us-east4" \
    --subnetwork=asubnetwork
    

Alternative: Directly in Shell

If you prefer to pass the JSON string directly in the shell, you can use triple quotes to handle the escaping. Here’s how:

 
gcloud dataflow flex-template run "my-manual-run" \
--template-file-gcs-location "gs://bucket/my-template.json" \
--parameters ^~^testingJobName="96.b"~dynamicOptions='{\"bigquery_table\":\"table_name\",\"report_date\":\"2024-05-28\"}' \
--region "us-east4" \
--subnetwork=asubnetwork

Explanation:

  • Escaping JSON in .env: The JSON string is escaped using backslashes to ensure that the inner quotes are not interpreted by the shell.
  • Using Quotes in run.sh: When referencing the dynamicOptions variable, use double quotes to preserve the full JSON string during execution.

Common Issues and Troubleshooting:

  • Double vs. Single Quotes: Ensure you use double quotes around the JSON string to correctly pass it through the shell.
  • Correct Delimiter: Ensure you are using the correct delimiter (^~^) to separate multiple parameters as per gcloud’s requirements.
  • Special Characters in JSON: Be mindful of any special characters in the JSON string that might require additional escaping.

@ms4446  Works mate, only thing that in my case, in the first approach, had to remove the single quotes from the json at the .env, so it looks like this:

dynamicOptions={\"bigquery_table\":\"table_name\",\"report_date\":\"2024-05-28\"}

Because Jackson would then receive also those single quotes around the json and it will complain about it, but without the single quotes as above, it works perfectly following your example, thank you very much mate this is just perfect as I imagined it should be and flexible as it should be

Hi @lawlesspattern ,

I'm glad to hear that it worked for you! Indeed, for your specific use case, removing the single quotes around the JSON string in the .env file makes sense to ensure it is correctly interpreted by Jackson when passed to Dataflow. Below is the refined approach for clarity:

Refined Approach

  1. .env File In the .env file, define the dynamicOptions without single quotes around the JSON string:

     
    dynamicOptions={"bigquery_table":"table_name","report_date":"2024-05-28"}
    
  2. run.sh Script In the run.sh script, source the .env file and pass the parameters to gcloud:

     
    #!/bin/bash
    
    source ./.env
    
    gcloud dataflow flex-template run "my-manual-run" \
    --template-file-gcs-location "gs://bucket/my-template.json" \
    --parameters ^~^testingJobName="96.b"~dynamicOptions="${dynamicOptions}" \
    --region "us-east4" \
    --subnetwork=asubnetwork
    
  • No Single Quotes in .env: Defining dynamicOptions without the surrounding single quotes ensures the JSON is interpreted correctly when injected into the script.
  • Proper Escaping: The escaped double quotes (\") within the JSON string are necessary to maintain the JSON structure when passed through the shell.