Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataflow Flex Template - Multiple Custom PipelineOptions (Java)

Good day, I have the following flex template that works perfectly when I have only one custom interface that extends from PipelineOptions:

 

 

public class JobFlexTemplate {

    public interface JobOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getJobName();

    void setJobName(String value);

    // imagine more options, some shared for both jobs and some specific for each job

    }


    public static void main(String[] args) {
        
        var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JobOptions.class);
        var pipeline = Pipeline.create(options);

        if (options.getJobName().equalsIgnoreCase("job1")) {
            // here could also be extract the necesary params from options and pass to the Job1 Constructor
            pipeline = new Job1(pipeline, options).build();
        } else if (options.getJobName().equalsIgnoreCase("job2")) {
            // here could also be extract the necesary params from options and pass to the Job2 Constructor
            pipeline = new Job2(pipeline, options).build();
        } else{
            // a default job with some options too
        }

        pipeline.run();

    }

}

 

 

Now when im having the trouble, is in a scenario where I want to have more interfaces to better group or organize the PipelineOptions that each job can have and not put all of them into one interface, even if some job does not use them:

 

 

public class JobFlexTemplate {

    public interface JobOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getJobName();

    void setJobName(String value);

    }

    // More interfaces that extend JobOptions and then have their own specific params

    public interface DummyOptions extends JobOptions {

        @Description("A dummy option")
        @Default.String("Dummy")
        String getDummyOption();

        void setDummyOption(String value);

    }

    public static void main(String[] args) {
        
        var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JobOptions.class);
        var pipeline = Pipeline.create(options);

        if (options.getJobName().equalsIgnoreCase("job1")) {
            var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
            pipeline = new Job1(pipeline, job1Options).build();
        } else if (options.getJobName().equalsIgnoreCase("job2")) {
            var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
            pipeline = new Job2(pipeline, job2Options).build();
        } else {
            var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
            final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
            wordsList.add(dummyOptions.getDummyOption());
            pipeline.apply(Create.of(wordsList))
                    .apply("Log Words", ParDo.of(new LogElements()));
        }

        pipeline.run();

    }

}

 

 

If I create a template with this last approach of having multiple interface for each job then the template is failing to run with the following error (JobOptions does not have dummyOption):

Exception in thread "main" ; see consecutive INFO logs for details.

java.lang.IllegalArgumentException: Class interface com.example.dataflow.options.JobOptions missing a property named 'dummyOption'.

So yes that is the problem, Im trying to figure out a way to do this without having a massive interface with all the options of all my jobs, but not sure if this is possible or the correct approach, tried to also remove the .withValidation() method from the:

 

var options = PipelineOptionsFactory.fromArgs(args).as(JobOptions.class);
 
and use it at the custom options setting:
 
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
 
But no success, another way I was thinking but have not tried is to create a single interface that extends from PipelineOptions that would look like:
 

 

 

public interface DynamicOptions extends PipelineOptions {
    @Description("Job to execute")
    String getJobName();

    void setJobName(String value);

    @Description("Dynamic options")
    Map<String, String> getDynamicOptions();

    void setDynamicOptions(Map<String, String> value);
}

 

 

 
but would really prefer to have a interface for the options of each job.
 
Finally not sure how would it also impact the metadata.json file, maybe this one will have to be as big as all the total options, or maybe I can have a different metadata.json for each job, like metadatajob1.json, metadatajob2.json, and build each job template with each .json file, any ideas or solutions for this both things? Thank you.
Solved Solved
0 6 1,026
1 ACCEPTED SOLUTION

To implement a solution that allows the usage of multiple interfaces for different jobs, you can dynamically parse and validate the options for each specific job within the main method. This ensures that only the necessary options for each job are used. See below for a Step-by-Step approach:

1. Define the Interfaces:

  • Define separate interfaces for each job's specific options.
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

public interface JobOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getJobName();

    void setJobName(String value);
}

public interface DummyOptions extends JobOptions {
    @Description("A dummy option")
    @Default.String("Dummy")
    String getDummyOption();

    void setDummyOption(String value);
}

// Define other job-specific options interfaces...

2. Dynamic Parsing in Main Method:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class JobFlexTemplate {

    public static void main(String[] args) {
        // Parse initial job options without validation
        var baseOptions = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(JobOptions.class);
        String jobName = baseOptions.getJobName();
        Pipeline pipeline = null;

        // Parse job-specific options based on the job name
        if ("job1".equalsIgnoreCase(jobName)) {
            var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
            pipeline = Pipeline.create(job1Options);
            pipeline = new Job1(pipeline, job1Options).build();
        } else if ("job2".equalsIgnoreCase(jobName)) {
            var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
            pipeline = Pipeline.create(job2Options);
            pipeline = new Job2(pipeline, job2Options).build();
        } else {
            var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
            pipeline = Pipeline.create(dummyOptions);
            final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
            wordsList.add(dummyOptions.getDummyOption());
            pipeline.apply(Create.of(wordsList))
                    .apply("Log Words", ParDo.of(new LogElements()));
        }

        pipeline.run();
    }
}

3. DynamicOptions Interface:

  • Use DynamicOptions to handle generic dynamic options.
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

import java.util.Map;

public interface DynamicOptions extends PipelineOptions {
    @Description("The job to execute")
    @Validation.Required
    String getTestingJobName();

    void setTestingJobName(String value);

    @Description("Dynamic options")
    Map<String, String> getDynamicOptions();

    void setDynamicOptions(Map<String, String> value);
}

4. metadata.json:

{
  "name": "Test template flex",
  "description": "An example flex template for Java.",
  "parameters": [
    {
      "name": "testingJobName",
      "label": "Job to execute",
      "helpText": "the value of the job to execute.",
      "regexes": [
        ".*"
      ]
    },
    {
      "name": "dynamicOptions",
      "label": "Dynamic options",
      "helpText": "JSON string representing a map of option names to values.",
      "isOptional": false,
      "regexes": [
        ".*"
      ]
    }
  ]
}

5. Passing Parameters:

--testingJobName="thenameofthejobyouwanttoexecute" \
--dynamicOptions="{'bigqueryTable':'my_table','connectionErrorCodes':'error1,error2','informDate':'2024-06-01'}"

6. Usage Example in the Main Method:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class JobFlexTemplate {

    public static void main(String[] args) {
        // Parse initial job options without validation
        var dynamicOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicOptions.class);
        String jobName = dynamicOptions.getTestingJobName();
        Map<String, String> dynamicParams = dynamicOptions.getDynamicOptions();
        Pipeline pipeline = null;

        // Parse job-specific options based on the job name
        if ("job1".equalsIgnoreCase(jobName)) {
            var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
            pipeline = Pipeline.create(job1Options);
            pipeline = new Job1(pipeline, job1Options).build();
        } else if ("job2".equalsIgnoreCase(jobName)) {
            var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
            pipeline = Pipeline.create(job2Options);
            pipeline = new Job2(pipeline, job2Options).build();
        } else {
            // Assuming DummyOptions is used for a default job
            pipeline = Pipeline.create(dynamicOptions);
            final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
            wordsList.add(dynamicParams.getOrDefault("bigqueryTable", "DefaultTable"));
            pipeline.apply(Create.of(wordsList))
                    .apply("Log Words", ParDo.of(new LogElements()));
        }

        pipeline.run();
    }
}

 

  • Dynamic Parsing: The code dynamically parses the job options using PipelineOptionsFactory based on the job name provided. This way, each job can have its own specific options interface.
  • DynamicOptions: This interface is used to capture generic dynamic options that can be passed in as a JSON string.
  • metadata.json: This configuration file specifies the required parameters for the template.
  • Parameter Passing: Parameters are passed to the template using a JSON string for dynamic options, allowing flexibility in specifying various options for different jobs.

 

View solution in original post

6 REPLIES 6