Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Unable to Retrieve Metadata from XCom for Dataform Workflow Invocation in Airflow

 

I'm working on integrating a Dataform workflow in Airflow, using the guidelines provided in this Google Cloud documentation.

However, I've encountered an issue with the following segment of code:

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id="create_workflow_invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
        "invocation_config": {
            "included_tags": ["daily"],
            "transitive_dependencies_included": True,
        },
    },
    # start_date=days_ago(1),
    timeout=None,
)

The expression task_instance.xcom_pull('create_compilation_result') returns None. As a result, I'm unsure why there's no metadata in XCom for the create_compilation_result task_id.

I confirmed that create_compilation_result task was successful from Airflow. It is failing at the workflow invocation step.

Could someone guide me on how to correctly fetch the metadata from the compilation creation and use its compilation result ID to trigger the Dataform workflow?

Solved Solved
1 6 2,175
1 ACCEPTED SOLUTION

 
 

I see. In that case, it's possible that the create_compilation_result task is not actually pushing any data to XCom. You can verify this by checking the logs for the create_compilation_result task. If you don't see any output related to XCom, then you'll need to modify the create_compilation_result task to push the required data to XCom.

Here is an example of how you can push the compilation result ID to XCom in the create_compilation_result task:

 
create_compilation_result = DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result={
        "git_commitish": GIT_COMMITISH,
    },
)

def _push_compilation_result_to_xcom(context):
    """Pushes the compilation result ID to XCom."""
    compilation_result = context["task_instance"].xcom_pull(task_ids="create_compilation_result")
    if compilation_result:
        compilation_result_id = compilation_result["name"]
        context["task_instance"].xcom_push(key="compilation_result_id", value=compilation_result_id)

create_compilation_result.set_on_success_callback(_push_compilation_result_to_xcom)

View solution in original post

6 REPLIES 6

 

The reason why the expression task_instance.xcom_pull('create_compilation_result') is returning None is likely because the create_compilation_result task has not yet finished running when the create_workflow_invocation task is trying to fetch its metadata.

To fix this, you need to ensure that the create_compilation_result task completes before the create_workflow_invocation task starts. This can be achieved by setting a task dependency:

 
create_compilation_result >> create_workflow_invocation

Once the create_compilation_result task has finished running, it should push the compilation result ID to XCom with the key 'name'. You can then fetch this value in the create_workflow_invocation task using the expression:

"{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"

Here is the updated code:

create_compilation_result = DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
)

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id="create_workflow_invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
        "invocation_config": {
            "included_tags": ["daily"],
            "transitive_dependencies_included": True,
        },
    },
    # start_date=days_ago(1),
    timeout=None,
)

create_compilation_result >> create_workflow_invocation

This setup ensures that the create_workflow_invocation task only runs after the create_compilation_result task has completed, allowing it to successfully fetch the required data from XCom.

Hello @ms4446 .
Thanks for looking into my question! I already have `create_compilation_result >> create_workflow_invocation` and I already see the dependency is there in a graph and yet I am still getting `None ` for the expression, `task_instance.xcom_pull('create_compilation_result')`. 

These are what I have. I haven't included the lines to set up the variables since it is related to my work. 

with DAG(
    dag_id="test_dataform",
    max_active_runs=1,
    catchup=False,
    max_active_tasks=1,
    schedule="@daily",
    default_args=DEFAULT_TASK_ARGS,
) as dag:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id="create_workflow_invocation",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": {
                "included_tags": ["daily", "listing_mart"],
                "transitive_dependencies_included": True,
            },
        },
        # start_date=days_ago(1),
        timeout=None,
    )

create_compilation_result >> create_workflow_invocation




 
 

I see. In that case, it's possible that the create_compilation_result task is not actually pushing any data to XCom. You can verify this by checking the logs for the create_compilation_result task. If you don't see any output related to XCom, then you'll need to modify the create_compilation_result task to push the required data to XCom.

Here is an example of how you can push the compilation result ID to XCom in the create_compilation_result task:

 
create_compilation_result = DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result={
        "git_commitish": GIT_COMMITISH,
    },
)

def _push_compilation_result_to_xcom(context):
    """Pushes the compilation result ID to XCom."""
    compilation_result = context["task_instance"].xcom_pull(task_ids="create_compilation_result")
    if compilation_result:
        compilation_result_id = compilation_result["name"]
        context["task_instance"].xcom_push(key="compilation_result_id", value=compilation_result_id)

create_compilation_result.set_on_success_callback(_push_compilation_result_to_xcom)

Thanks so much @ms4446. It works for me! 

@ms4446 I have been facing a similar issue.

What i want to do is,calculate a value from bigquery and then pass that value to dataform as a variable.

As i can see in the task's xcom,the value is being pushed successfully.

But when i am trying to pull it in create_compilation_result, it is not rendering the jinja template for xcom_pull and the entire string is being passed to dataform as a value.

i.e. "{{ task_instance.xcom_pull('fetch_data')['result'] }}"

I have been trying to render this jinnja template using a python function but i am not able to resolve this issue yet.

Please let me know how we can resolve this.

 

Thanks & Regards,

Akshay Bhandare

The issue you're facing is related to the rendering of Jinja templates in Airflow.

Here are a few steps to troubleshoot and potentially resolve the issue:

1. Ensure Templating is Supported: Check if the field you're trying to template in the DataformCreateCompilationResultOperator supports Jinja templating. Not all fields in all operators are templatable. You might need to check the operator's documentation or its source code to confirm this.

2. Use Double Curly Braces: Ensure you're using double curly braces for Jinja templating, i.e., {{ ... }}.

3. Check for Typos: Ensure there are no typos in the task ID or key you're trying to pull from XCom.

4. Test Templating: You can test the templating separately by creating a simple PythonOperator that prints the templated value. This can help you verify if the issue is with the templating or with the DataformCreateCompilationResultOperator.

 

from airflow.operators.python_operator import PythonOperator

def print_value(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='fetch_data')['result']
    print(value)

print_value_task = PythonOperator(
    task_id='print_value',
    python_callable=print_value,
    provide_context=True,
    dag=dag
)

5. Use a Python Function: If direct templating isn't working, you can use a Python function to pull the value from XCom and then pass it to the DataformCreateCompilationResultOperator.

 

def get_compilation_result(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='fetch_data')['result']
    return value

compilation_result_value = PythonOperator(
    task_id='get_compilation_result',
    python_callable=get_compilation_result,
    provide_context=True,
    dag=dag
)

create_compilation_result = DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result=compilation_result_value,
    ...
)

 

6. Check Airflow Version: Ensure you're using a version of Airflow that supports the features you're trying to use. If you're using an older version, consider upgrading to a newer version.

7. Check for Custom Modifications: If you're using a custom version of the DataformCreateCompilationResultOperator or if there have been any modifications to the operator, ensure that these modifications aren't causing the issue.

If you've followed all of these steps and you're still having trouble, please reach out to the Airflow community for assistance.