I'm working on integrating a Dataform workflow in Airflow, using the guidelines provided in this Google Cloud documentation.
However, I've encountered an issue with the following segment of code:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id="create_workflow_invocation", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": True, }, }, # start_date=days_ago(1), timeout=None, )
The expression task_instance.xcom_pull('create_compilation_result') returns None. As a result, I'm unsure why there's no metadata in XCom for the create_compilation_result task_id.
I confirmed that create_compilation_result task was successful from Airflow. It is failing at the workflow invocation step.
Could someone guide me on how to correctly fetch the metadata from the compilation creation and use its compilation result ID to trigger the Dataform workflow?
Solved! Go to Solution.
I see. In that case, it's possible that the create_compilation_result
task is not actually pushing any data to XCom. You can verify this by checking the logs for the create_compilation_result
task. If you don't see any output related to XCom, then you'll need to modify the create_compilation_result
task to push the required data to XCom.
Here is an example of how you can push the compilation result ID to XCom in the create_compilation_result
task:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
def _push_compilation_result_to_xcom(context):
"""Pushes the compilation result ID to XCom."""
compilation_result = context["task_instance"].xcom_pull(task_ids="create_compilation_result")
if compilation_result:
compilation_result_id = compilation_result["name"]
context["task_instance"].xcom_push(key="compilation_result_id", value=compilation_result_id)
create_compilation_result.set_on_success_callback(_push_compilation_result_to_xcom)
This code defines a function called _push_compilation_result_to_xcom
that pushes the compilation result ID to XCom. The create_compilation_result
task then calls this function as part of its on_success_callback.
The reason why the expression task_instance.xcom_pull('create_compilation_result')
is returning None
is likely because the create_compilation_result
task has not yet finished running when the create_workflow_invocation
task is trying to fetch its metadata.
To fix this, you need to ensure that the create_compilation_result
task completes before the create_workflow_invocation
task starts. This can be achieved by setting a task dependency:
create_compilation_result >> create_workflow_invocation
Once the create_compilation_result
task has finished running, it should push the compilation result ID to XCom with the key 'name'
. You can then fetch this value in the create_workflow_invocation
task using the expression:
"{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
Here is the updated code:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create_workflow_invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": {
"included_tags": ["daily"],
"transitive_dependencies_included": True,
},
},
# start_date=days_ago(1),
timeout=None,
)
create_compilation_result >> create_workflow_invocation
This setup ensures that the create_workflow_invocation
task only runs after the create_compilation_result
task has completed, allowing it to successfully fetch the required data from XCom.
Hello @ms4446 .
Thanks for looking into my question! I already have `create_compilation_result >> create_workflow_invocation` and I already see the dependency is there in a graph and yet I am still getting `None ` for the expression, `task_instance.xcom_pull('create_compilation_result')`.
These are what I have. I haven't included the lines to set up the variables since it is related to my work.
with DAG(
dag_id="test_dataform",
max_active_runs=1,
catchup=False,
max_active_tasks=1,
schedule="@daily",
default_args=DEFAULT_TASK_ARGS,
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create_workflow_invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": {
"included_tags": ["daily", "listing_mart"],
"transitive_dependencies_included": True,
},
},
# start_date=days_ago(1),
timeout=None,
)
create_compilation_result >> create_workflow_invocation
I see. In that case, it's possible that the create_compilation_result
task is not actually pushing any data to XCom. You can verify this by checking the logs for the create_compilation_result
task. If you don't see any output related to XCom, then you'll need to modify the create_compilation_result
task to push the required data to XCom.
Here is an example of how you can push the compilation result ID to XCom in the create_compilation_result
task:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
def _push_compilation_result_to_xcom(context):
"""Pushes the compilation result ID to XCom."""
compilation_result = context["task_instance"].xcom_pull(task_ids="create_compilation_result")
if compilation_result:
compilation_result_id = compilation_result["name"]
context["task_instance"].xcom_push(key="compilation_result_id", value=compilation_result_id)
create_compilation_result.set_on_success_callback(_push_compilation_result_to_xcom)
This code defines a function called _push_compilation_result_to_xcom
that pushes the compilation result ID to XCom. The create_compilation_result
task then calls this function as part of its on_success_callback.
Thanks so much @ms4446. It works for me!
@ms4446 I have been facing a similar issue.
What i want to do is,calculate a value from bigquery and then pass that value to dataform as a variable.
As i can see in the task's xcom,the value is being pushed successfully.
But when i am trying to pull it in create_compilation_result, it is not rendering the jinja template for xcom_pull and the entire string is being passed to dataform as a value.
i.e. "{{ task_instance.xcom_pull('fetch_data')['result'] }}"
I have been trying to render this jinnja template using a python function but i am not able to resolve this issue yet.
Please let me know how we can resolve this.
Thanks & Regards,
Akshay Bhandare
The issue you're facing is related to the rendering of Jinja templates in Airflow.
Here are a few steps to troubleshoot and potentially resolve the issue:
1. Ensure Templating is Supported: Check if the field you're trying to template in the DataformCreateCompilationResultOperator supports Jinja templating. Not all fields in all operators are templatable. You might need to check the operator's documentation or its source code to confirm this.
2. Use Double Curly Braces: Ensure you're using double curly braces for Jinja templating, i.e., {{ ... }}.
3. Check for Typos: Ensure there are no typos in the task ID or key you're trying to pull from XCom.
4. Test Templating: You can test the templating separately by creating a simple PythonOperator that prints the templated value. This can help you verify if the issue is with the templating or with the DataformCreateCompilationResultOperator.
from airflow.operators.python_operator import PythonOperator
def print_value(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='fetch_data')['result']
print(value)
print_value_task = PythonOperator(
task_id='print_value',
python_callable=print_value,
provide_context=True,
dag=dag
)
5. Use a Python Function: If direct templating isn't working, you can use a Python function to pull the value from XCom and then pass it to the DataformCreateCompilationResultOperator.
def get_compilation_result(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='fetch_data')['result']
return value
compilation_result_value = PythonOperator(
task_id='get_compilation_result',
python_callable=get_compilation_result,
provide_context=True,
dag=dag
)
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result=compilation_result_value,
...
)
6. Check Airflow Version: Ensure you're using a version of Airflow that supports the features you're trying to use. If you're using an older version, consider upgrading to a newer version.
7. Check for Custom Modifications: If you're using a custom version of the DataformCreateCompilationResultOperator or if there have been any modifications to the operator, ensure that these modifications aren't causing the issue.
If you've followed all of these steps and you're still having trouble, please reach out to the Airflow community for assistance.