Good day, I have the following flex template that works perfectly when I have only one custom interface that extends from PipelineOptions:
public class JobFlexTemplate {
public interface JobOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getJobName();
void setJobName(String value);
// imagine more options, some shared for both jobs and some specific for each job
}
public static void main(String[] args) {
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JobOptions.class);
var pipeline = Pipeline.create(options);
if (options.getJobName().equalsIgnoreCase("job1")) {
// here could also be extract the necesary params from options and pass to the Job1 Constructor
pipeline = new Job1(pipeline, options).build();
} else if (options.getJobName().equalsIgnoreCase("job2")) {
// here could also be extract the necesary params from options and pass to the Job2 Constructor
pipeline = new Job2(pipeline, options).build();
} else{
// a default job with some options too
}
pipeline.run();
}
}
Now when im having the trouble, is in a scenario where I want to have more interfaces to better group or organize the PipelineOptions that each job can have and not put all of them into one interface, even if some job does not use them:
public class JobFlexTemplate {
public interface JobOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getJobName();
void setJobName(String value);
}
// More interfaces that extend JobOptions and then have their own specific params
public interface DummyOptions extends JobOptions {
@Description("A dummy option")
@Default.String("Dummy")
String getDummyOption();
void setDummyOption(String value);
}
public static void main(String[] args) {
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JobOptions.class);
var pipeline = Pipeline.create(options);
if (options.getJobName().equalsIgnoreCase("job1")) {
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
pipeline = new Job1(pipeline, job1Options).build();
} else if (options.getJobName().equalsIgnoreCase("job2")) {
var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
pipeline = new Job2(pipeline, job2Options).build();
} else {
var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
wordsList.add(dummyOptions.getDummyOption());
pipeline.apply(Create.of(wordsList))
.apply("Log Words", ParDo.of(new LogElements()));
}
pipeline.run();
}
}
If I create a template with this last approach of having multiple interface for each job then the template is failing to run with the following error (JobOptions does not have dummyOption):
Exception in thread "main" ; see consecutive INFO logs for details.
java.lang.IllegalArgumentException: Class interface com.example.dataflow.options.JobOptions missing a property named 'dummyOption'.
So yes that is the problem, Im trying to figure out a way to do this without having a massive interface with all the options of all my jobs, but not sure if this is possible or the correct approach, tried to also remove the .withValidation() method from the:
public interface DynamicOptions extends PipelineOptions {
@Description("Job to execute")
String getJobName();
void setJobName(String value);
@Description("Dynamic options")
Map<String, String> getDynamicOptions();
void setDynamicOptions(Map<String, String> value);
}
Solved! Go to Solution.
To implement a solution that allows the usage of multiple interfaces for different jobs, you can dynamically parse and validate the options for each specific job within the main method. This ensures that only the necessary options for each job are used. See below for a Step-by-Step approach:
1. Define the Interfaces:
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
public interface JobOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getJobName();
void setJobName(String value);
}
public interface DummyOptions extends JobOptions {
@Description("A dummy option")
@Default.String("Dummy")
String getDummyOption();
void setDummyOption(String value);
}
// Define other job-specific options interfaces...
2. Dynamic Parsing in Main Method:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class JobFlexTemplate {
public static void main(String[] args) {
// Parse initial job options without validation
var baseOptions = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(JobOptions.class);
String jobName = baseOptions.getJobName();
Pipeline pipeline = null;
// Parse job-specific options based on the job name
if ("job1".equalsIgnoreCase(jobName)) {
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
pipeline = Pipeline.create(job1Options);
pipeline = new Job1(pipeline, job1Options).build();
} else if ("job2".equalsIgnoreCase(jobName)) {
var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
pipeline = Pipeline.create(job2Options);
pipeline = new Job2(pipeline, job2Options).build();
} else {
var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
pipeline = Pipeline.create(dummyOptions);
final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
wordsList.add(dummyOptions.getDummyOption());
pipeline.apply(Create.of(wordsList))
.apply("Log Words", ParDo.of(new LogElements()));
}
pipeline.run();
}
}
3. DynamicOptions Interface:
DynamicOptions
to handle generic dynamic options.import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
import java.util.Map;
public interface DynamicOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getTestingJobName();
void setTestingJobName(String value);
@Description("Dynamic options")
Map<String, String> getDynamicOptions();
void setDynamicOptions(Map<String, String> value);
}
4. metadata.json:
{
"name": "Test template flex",
"description": "An example flex template for Java.",
"parameters": [
{
"name": "testingJobName",
"label": "Job to execute",
"helpText": "the value of the job to execute.",
"regexes": [
".*"
]
},
{
"name": "dynamicOptions",
"label": "Dynamic options",
"helpText": "JSON string representing a map of option names to values.",
"isOptional": false,
"regexes": [
".*"
]
}
]
}
5. Passing Parameters:
--testingJobName="thenameofthejobyouwanttoexecute" \
--dynamicOptions="{'bigqueryTable':'my_table','connectionErrorCodes':'error1,error2','informDate':'2024-06-01'}"
6. Usage Example in the Main Method:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class JobFlexTemplate {
public static void main(String[] args) {
// Parse initial job options without validation
var dynamicOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicOptions.class);
String jobName = dynamicOptions.getTestingJobName();
Map<String, String> dynamicParams = dynamicOptions.getDynamicOptions();
Pipeline pipeline = null;
// Parse job-specific options based on the job name
if ("job1".equalsIgnoreCase(jobName)) {
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
pipeline = Pipeline.create(job1Options);
pipeline = new Job1(pipeline, job1Options).build();
} else if ("job2".equalsIgnoreCase(jobName)) {
var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
pipeline = Pipeline.create(job2Options);
pipeline = new Job2(pipeline, job2Options).build();
} else {
// Assuming DummyOptions is used for a default job
pipeline = Pipeline.create(dynamicOptions);
final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
wordsList.add(dynamicParams.getOrDefault("bigqueryTable", "DefaultTable"));
pipeline.apply(Create.of(wordsList))
.apply("Log Words", ParDo.of(new LogElements()));
}
pipeline.run();
}
}
PipelineOptionsFactory
based on the job name provided. This way, each job can have its own specific options interface.