Good day, I have the following flex template that works perfectly when I have only one custom interface that extends from PipelineOptions:
public class JobFlexTemplate {
public interface JobOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getJobName();
void setJobName(String value);
// imagine more options, some shared for both jobs and some specific for each job
}
public static void main(String[] args) {
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JobOptions.class);
var pipeline = Pipeline.create(options);
if (options.getJobName().equalsIgnoreCase("job1")) {
// here could also be extract the necesary params from options and pass to the Job1 Constructor
pipeline = new Job1(pipeline, options).build();
} else if (options.getJobName().equalsIgnoreCase("job2")) {
// here could also be extract the necesary params from options and pass to the Job2 Constructor
pipeline = new Job2(pipeline, options).build();
} else{
// a default job with some options too
}
pipeline.run();
}
}
Now when im having the trouble, is in a scenario where I want to have more interfaces to better group or organize the PipelineOptions that each job can have and not put all of them into one interface, even if some job does not use them:
public class JobFlexTemplate {
public interface JobOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getJobName();
void setJobName(String value);
}
// More interfaces that extend JobOptions and then have their own specific params
public interface DummyOptions extends JobOptions {
@Description("A dummy option")
@Default.String("Dummy")
String getDummyOption();
void setDummyOption(String value);
}
public static void main(String[] args) {
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JobOptions.class);
var pipeline = Pipeline.create(options);
if (options.getJobName().equalsIgnoreCase("job1")) {
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
pipeline = new Job1(pipeline, job1Options).build();
} else if (options.getJobName().equalsIgnoreCase("job2")) {
var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
pipeline = new Job2(pipeline, job2Options).build();
} else {
var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
wordsList.add(dummyOptions.getDummyOption());
pipeline.apply(Create.of(wordsList))
.apply("Log Words", ParDo.of(new LogElements()));
}
pipeline.run();
}
}
If I create a template with this last approach of having multiple interface for each job then the template is failing to run with the following error (JobOptions does not have dummyOption):
Exception in thread "main" ; see consecutive INFO logs for details.
java.lang.IllegalArgumentException: Class interface com.example.dataflow.options.JobOptions missing a property named 'dummyOption'.
So yes that is the problem, Im trying to figure out a way to do this without having a massive interface with all the options of all my jobs, but not sure if this is possible or the correct approach, tried to also remove the .withValidation() method from the:
public interface DynamicOptions extends PipelineOptions {
@Description("Job to execute")
String getJobName();
void setJobName(String value);
@Description("Dynamic options")
Map<String, String> getDynamicOptions();
void setDynamicOptions(Map<String, String> value);
}
Solved! Go to Solution.
To implement a solution that allows the usage of multiple interfaces for different jobs, you can dynamically parse and validate the options for each specific job within the main method. This ensures that only the necessary options for each job are used. See below for a Step-by-Step approach:
1. Define the Interfaces:
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
public interface JobOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getJobName();
void setJobName(String value);
}
public interface DummyOptions extends JobOptions {
@Description("A dummy option")
@Default.String("Dummy")
String getDummyOption();
void setDummyOption(String value);
}
// Define other job-specific options interfaces...
2. Dynamic Parsing in Main Method:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class JobFlexTemplate {
public static void main(String[] args) {
// Parse initial job options without validation
var baseOptions = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(JobOptions.class);
String jobName = baseOptions.getJobName();
Pipeline pipeline = null;
// Parse job-specific options based on the job name
if ("job1".equalsIgnoreCase(jobName)) {
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
pipeline = Pipeline.create(job1Options);
pipeline = new Job1(pipeline, job1Options).build();
} else if ("job2".equalsIgnoreCase(jobName)) {
var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
pipeline = Pipeline.create(job2Options);
pipeline = new Job2(pipeline, job2Options).build();
} else {
var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
pipeline = Pipeline.create(dummyOptions);
final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
wordsList.add(dummyOptions.getDummyOption());
pipeline.apply(Create.of(wordsList))
.apply("Log Words", ParDo.of(new LogElements()));
}
pipeline.run();
}
}
3. DynamicOptions Interface:
DynamicOptions
to handle generic dynamic options.import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
import java.util.Map;
public interface DynamicOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getTestingJobName();
void setTestingJobName(String value);
@Description("Dynamic options")
Map<String, String> getDynamicOptions();
void setDynamicOptions(Map<String, String> value);
}
4. metadata.json:
{
"name": "Test template flex",
"description": "An example flex template for Java.",
"parameters": [
{
"name": "testingJobName",
"label": "Job to execute",
"helpText": "the value of the job to execute.",
"regexes": [
".*"
]
},
{
"name": "dynamicOptions",
"label": "Dynamic options",
"helpText": "JSON string representing a map of option names to values.",
"isOptional": false,
"regexes": [
".*"
]
}
]
}
5. Passing Parameters:
--testingJobName="thenameofthejobyouwanttoexecute" \
--dynamicOptions="{'bigqueryTable':'my_table','connectionErrorCodes':'error1,error2','informDate':'2024-06-01'}"
6. Usage Example in the Main Method:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class JobFlexTemplate {
public static void main(String[] args) {
// Parse initial job options without validation
var dynamicOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicOptions.class);
String jobName = dynamicOptions.getTestingJobName();
Map<String, String> dynamicParams = dynamicOptions.getDynamicOptions();
Pipeline pipeline = null;
// Parse job-specific options based on the job name
if ("job1".equalsIgnoreCase(jobName)) {
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
pipeline = Pipeline.create(job1Options);
pipeline = new Job1(pipeline, job1Options).build();
} else if ("job2".equalsIgnoreCase(jobName)) {
var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
pipeline = Pipeline.create(job2Options);
pipeline = new Job2(pipeline, job2Options).build();
} else {
// Assuming DummyOptions is used for a default job
pipeline = Pipeline.create(dynamicOptions);
final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
wordsList.add(dynamicParams.getOrDefault("bigqueryTable", "DefaultTable"));
pipeline.apply(Create.of(wordsList))
.apply("Log Words", ParDo.of(new LogElements()));
}
pipeline.run();
}
}
PipelineOptionsFactory
based on the job name provided. This way, each job can have its own specific options interface.
The DynamicOptions approach works, but if someone knows how to make it work with the multiple interfaces approach or another way I appreciate the help, im sharing the DynamicOptions approach:
Interface:
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
import java.util.Map;
public interface DynamicOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getTestingJobName();
void setTestingJobName(String value);
@Description("Dynamic options")
Map<String, String> getDynamicOptions();
void setDynamicOptions(Map<String, String> value);
}
metadata.json:
{
"name": "Test template flex",
"description": "An example flex template for Java.",
"parameters": [
{
"name": "testingJobName",
"label": "Job to execute",
"helpText": "the value of the job to execute.",
"regexes": [
".*"
]
},
{
"name": "dynamicOptions",
"label": "Dynamic options",
"helpText": "JSON string representing a map of option names to values.",
"isOptional": false,
"regexes": [
".*"
]
}
]
}
Use it:
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicOptions.class);
var pipeline = Pipeline.create(options);
Map<String, String> dynamicOptions = options.getDynamicOptions();
// get any option from the map
String bigqueryTable = dynamicOptions.getOrDefault("bigqueryTable","");
How could you pass the parameter? Example:
--testingJobName="thenameofthejobyouwanttoexecute" \
--dynamicOptions="{'bigqueryTable':'my_table','connectionErrorCodes':'error1,error2','informDate':'2024-06-01'}"
To implement a solution that allows the usage of multiple interfaces for different jobs, you can dynamically parse and validate the options for each specific job within the main method. This ensures that only the necessary options for each job are used. See below for a Step-by-Step approach:
1. Define the Interfaces:
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
public interface JobOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getJobName();
void setJobName(String value);
}
public interface DummyOptions extends JobOptions {
@Description("A dummy option")
@Default.String("Dummy")
String getDummyOption();
void setDummyOption(String value);
}
// Define other job-specific options interfaces...
2. Dynamic Parsing in Main Method:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class JobFlexTemplate {
public static void main(String[] args) {
// Parse initial job options without validation
var baseOptions = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(JobOptions.class);
String jobName = baseOptions.getJobName();
Pipeline pipeline = null;
// Parse job-specific options based on the job name
if ("job1".equalsIgnoreCase(jobName)) {
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
pipeline = Pipeline.create(job1Options);
pipeline = new Job1(pipeline, job1Options).build();
} else if ("job2".equalsIgnoreCase(jobName)) {
var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
pipeline = Pipeline.create(job2Options);
pipeline = new Job2(pipeline, job2Options).build();
} else {
var dummyOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DummyOptions.class);
pipeline = Pipeline.create(dummyOptions);
final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
wordsList.add(dummyOptions.getDummyOption());
pipeline.apply(Create.of(wordsList))
.apply("Log Words", ParDo.of(new LogElements()));
}
pipeline.run();
}
}
3. DynamicOptions Interface:
DynamicOptions
to handle generic dynamic options.import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
import java.util.Map;
public interface DynamicOptions extends PipelineOptions {
@Description("The job to execute")
@Validation.Required
String getTestingJobName();
void setTestingJobName(String value);
@Description("Dynamic options")
Map<String, String> getDynamicOptions();
void setDynamicOptions(Map<String, String> value);
}
4. metadata.json:
{
"name": "Test template flex",
"description": "An example flex template for Java.",
"parameters": [
{
"name": "testingJobName",
"label": "Job to execute",
"helpText": "the value of the job to execute.",
"regexes": [
".*"
]
},
{
"name": "dynamicOptions",
"label": "Dynamic options",
"helpText": "JSON string representing a map of option names to values.",
"isOptional": false,
"regexes": [
".*"
]
}
]
}
5. Passing Parameters:
--testingJobName="thenameofthejobyouwanttoexecute" \
--dynamicOptions="{'bigqueryTable':'my_table','connectionErrorCodes':'error1,error2','informDate':'2024-06-01'}"
6. Usage Example in the Main Method:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class JobFlexTemplate {
public static void main(String[] args) {
// Parse initial job options without validation
var dynamicOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicOptions.class);
String jobName = dynamicOptions.getTestingJobName();
Map<String, String> dynamicParams = dynamicOptions.getDynamicOptions();
Pipeline pipeline = null;
// Parse job-specific options based on the job name
if ("job1".equalsIgnoreCase(jobName)) {
var job1Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job1Options.class);
pipeline = Pipeline.create(job1Options);
pipeline = new Job1(pipeline, job1Options).build();
} else if ("job2".equalsIgnoreCase(jobName)) {
var job2Options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Job2Options.class);
pipeline = Pipeline.create(job2Options);
pipeline = new Job2(pipeline, job2Options).build();
} else {
// Assuming DummyOptions is used for a default job
pipeline = Pipeline.create(dynamicOptions);
final List<String> wordsList = Arrays.asList("1", "2", "3", "4");
wordsList.add(dynamicParams.getOrDefault("bigqueryTable", "DefaultTable"));
pipeline.apply(Create.of(wordsList))
.apply("Log Words", ParDo.of(new LogElements()));
}
pipeline.run();
}
}
PipelineOptionsFactory
based on the job name provided. This way, each job can have its own specific options interface.
Hey @ms4446 thank you for your response, I am facing a different issue about not being able to pass the json string parameter when trying to manually run the flex template (this flex template is working perfectly when being call from python cloud function, because the cloud function has no trouble in sending a proper json for the dynamic options), but when trying to run from the cloud shell I can not figure out how to pass the parameters flag properly, here is how im trying (I am trying also with this doc but could not see an example with a json parameter https://cloud.google.com/sdk/gcloud/reference/topic/escaping):
I have a .env that contains the dynamicOptions string:
dynamicOptions='{"bigquery_table":"table_name","report_date":"2024-05-28"}'
then at a run.sh file I have:
#!/bin/bash
source ./.env
gcloud dataflow flex-template run "my-manual-run" \
--template-file-gcs-location "gs:://bucket/my-template.json" \
--parameters ^~^testingJobName="96.b"~dynamicOptions=$dynamicOptions \
--region "us-east4" \
--subnetwork=asubnetwork
the problem is that the --parameters flag, I can not figure out how to properly pass the json string so that it is valid, I get errors like:
ERROR: (gcloud.dataflow.flex-template.run) unrecognized arguments:
"report_date":
"2024-05-28"
And so on all the json I pass
Also tried to define it like this trying to follow this documentation https://cloud.google.com/sdk/gcloud/reference/topic/escaping
dynamicOptions='bigquery_table="table_name",report_date="2024-05-28"'
but then when the dataflow template executes the job fails because Jackson cannot parse the string bigquery_table="table_name",report_date="2024-05-28 to a valid Map
So yes Im stuck in this part, as I said this template works when called from cloud function because python has no problem in passing a good json, but in bash, and using the --parameters flag, have not been able to pass a good json.
Also at step 5 of your reply another way that it works to be able to run it directly (not the flex template which I havent figured out) is:
run.sh file:
#!/bin/bash
source ./.env
mvn clean compile exec:java -P dataflow-runner -Dexec.args=" --project=${project} \
--runner=DataflowRunner \
--region=${region} \
--tempLocation=${tempLocation} \
--serviceAccount=${serviceAccount} \
--testingJobName=${testingJobName} \
--dynamicOptionsJson='${dynamicOptionsJson}' \
"
.env file:
dynamicOptionsJson='{"bigquery_table": "table", "report_date": "2024-05-28"}'
But yea that is just to give clarity, im still stuck with the problem about the flex template --parameters flag
When running Google Cloud Dataflow Flex Templates from the command line, passing a JSON string as a parameter can be tricky due to shell and gcloud escaping rules. Here’s how you can properly format and pass JSON parameters to a Flex Template using the --parameters flag.
Step-by-Step Solution
Escape JSON Properly:
Ensure that the JSON string is properly escaped so that it is interpreted correctly by gcloud. You can use single quotes to wrap the entire JSON string and escape inner quotes.
Example with .env and run.sh:
.env file:
dynamicOptions='{\"bigquery_table\":\"table_name\",\"report_date\":\"2024-05-28\"}'
run.sh file:
#!/bin/bash
source ./.env
gcloud dataflow flex-template run "my-manual-run" \
--template-file-gcs-location "gs://bucket/my-template.json" \
--parameters ^~^testingJobName="96.b"~dynamicOptions="${dynamicOptions}" \
--region "us-east4" \
--subnetwork=asubnetwork
Detailed Steps:
Escape the JSON String in .env File: In your .env file, escape the double quotes inside the JSON string. Here’s the corrected format:
dynamicOptions='{\"bigquery_table\":\"table_name\",\"report_date\":\"2024-05-28\"}'
This escaping ensures that the JSON string remains intact when read by gcloud.
Use the Correct Delimiter for --parameters: In the run.sh script, ensure the delimiter (^~^) matches the escaping style. In this case, use double quotes around ${dynamicOptions} to ensure the entire string is passed correctly:
#!/bin/bash
source ./.env
gcloud dataflow flex-template run "my-manual-run" \
--template-file-gcs-location "gs://bucket/my-template.json" \
--parameters ^~^testingJobName="96.b"~dynamicOptions="${dynamicOptions}" \
--region "us-east4" \
--subnetwork=asubnetwork
Alternative: Directly in Shell
If you prefer to pass the JSON string directly in the shell, you can use triple quotes to handle the escaping. Here’s how:
gcloud dataflow flex-template run "my-manual-run" \
--template-file-gcs-location "gs://bucket/my-template.json" \
--parameters ^~^testingJobName="96.b"~dynamicOptions='{\"bigquery_table\":\"table_name\",\"report_date\":\"2024-05-28\"}' \
--region "us-east4" \
--subnetwork=asubnetwork
Explanation:
Common Issues and Troubleshooting:
@ms4446 Works mate, only thing that in my case, in the first approach, had to remove the single quotes from the json at the .env, so it looks like this:
dynamicOptions={\"bigquery_table\":\"table_name\",\"report_date\":\"2024-05-28\"}
Because Jackson would then receive also those single quotes around the json and it will complain about it, but without the single quotes as above, it works perfectly following your example, thank you very much mate this is just perfect as I imagined it should be and flexible as it should be
Hi @lawlesspattern ,
I'm glad to hear that it worked for you! Indeed, for your specific use case, removing the single quotes around the JSON string in the .env
file makes sense to ensure it is correctly interpreted by Jackson when passed to Dataflow. Below is the refined approach for clarity:
Refined Approach
.env File In the .env
file, define the dynamicOptions
without single quotes around the JSON string:
dynamicOptions={"bigquery_table":"table_name","report_date":"2024-05-28"}
run.sh Script In the run.sh
script, source the .env
file and pass the parameters to gcloud
:
#!/bin/bash
source ./.env
gcloud dataflow flex-template run "my-manual-run" \
--template-file-gcs-location "gs://bucket/my-template.json" \
--parameters ^~^testingJobName="96.b"~dynamicOptions="${dynamicOptions}" \
--region "us-east4" \
--subnetwork=asubnetwork
dynamicOptions
without the surrounding single quotes ensures the JSON is interpreted correctly when injected into the script.\"
) within the JSON string are necessary to maintain the JSON structure when passed through the shell.