Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Cloud Composer: Managing cross-environment dependencies

sguruprasad
Staff

In the realm of cloud migration, organizations often adopt a multi-environment strategy for their data pipelines and workflows to bolster isolation, security, and flexibility. However, this approach introduces the intricate challenge of managing dependencies between workflows operating in different environments.

Within the context of Cloud Composer, Google Cloud Platform's managed Apache Airflow service, this translates to orchestrating dependencies between Directed Acyclic Graphs (DAGs) residing in separate Cloud Composer environments or even distinct Google Cloud projects. Instances arise in which numerous teams or business units, operating within distinct Google Cloud projects and Cloud Composer environments, exhibit inter-dependencies within a Data Mesh architecture.

This blog post explores the complexities of cross-environment dependencies in Cloud Composer and introduces a solution to streamline their management with CloudComposerDAGRunSensor.

Composer airflow sensor-operator for addressing the cross-environment DAG dependency

To address cross Composer dependencies, the Composer team recently released the DAG Run Sensor to better handle this scenario. This operator verifies the completion of the Parent process by polling the Parent DAG's status from a different GCP project or Composer environment, and then proceeds with the DAG run.

The subsequent section delineates the procedures to be followed concerning the requisite permissions and provides an illustrative demonstration.

Permissions for Airflow job monitoring

Required permission: composer.environments.executeAirflowCommand

Necessity of the permission: This specific permission is essential for the functionality that enables the polling and retrieval of Airflow job status and details from the audit logs. The audit logs serve as a comprehensive record of activities within the Airflow environment, and access to this information is crucial for monitoring job progress, identifying potential issues, and ensuring overall workflow health.

Current IAM Role with the permission: Composer Admin role

Alternative solution(Recommended): Custom Role Creation

To provide more granular access control and adhere to the principle of least privilege, it is recommended that a custom role be created. This custom role should specifically bind the `composer.environments.executeAirflowCommand` permission, granting only the necessary capabilities for Airflow job monitoring without the broader administrative privileges associated with the Composer Admin role.

Benefits of a custom role:

  • Enhanced security: By limiting the scope of permissions, the risk of accidental or intentional misuse of the `composer.environments.executeAirflowCommand` permission is significantly reduced.

  • Improved compliance: Creating a custom role aligns with best practices for access control and helps organizations meet regulatory requirements related to data security and user permissions.

  • Granular access control: The custom role can be tailored to the specific needs of users or groups, ensuring that they have the necessary permissions to perform their job functions without unnecessary access.

Steps to create a custom role:

  1. Define the role: Specify the name and description of the custom role.

  2. Bind the permission: Include the `composer.environments.executeAirflowCommand` permission within the role definition.

  3. Grant the role: Assign the custom role to the appropriate Service Account which requires access to Airflow job monitoring functionality.

By implementing a custom role, organizations can strike a balance between enabling essential Airflow job monitoring capabilities and maintaining a secure and well-governed Composer environment.

Suggested steps

  1. Create a custom role to include ‘composer.environments.executeAirflowCommand’ in Parent DAG project

  2. Dedicated Service Accounts:

    1. Create a dedicated Service Accounts in Parent and Child project and attach below roles

      1. Composer Worker

      2. Custom role (required if there are intra-project DAG dependencies)

      3. Cloud Composer v2 API Service Agent Extension

  3. Attach the custom role to the Child project dedicated SA in Parent project IAM

  4. Create the Composer environments in Parent and Child projects with the created Service Accounts

Example: Steps to be followed

Below is an example of cross-project DAG dependency management with CloudComposerDAGRunSensor.

  1. Custom role creation: Create custom roles in Parent and Child projects, below is the screenshot.

image1.png

2. Dedicated Service Accounts(SA) creation: Create a dedicated Service Accounts in Parent and Child projects, attaching the roles as below.image6.png3. Attach the Child-project SA with the created custom role in the Parent project.
image7.png

4. Create the composer environments in Parent and Child project with the created SA.

Parent project:
image2.png

Child project:
image4.png

DAG code

The following example illustrates a typical enterprise data lake scenario with Parent and Child DAG dependencies.

In this scenario, the Parent DAG, Company_cal_refresh, executes annually to reload the enterprise physical calendar into the system. The Child DAG, Product_catalog_refresh, runs monthly to refresh product catalog details and depends on the Company_cal_refresh, as the monthly job relies on the yearly calendar details. The subordinate Directed Acyclic Graph (DAG) tracks the primary DAG, designated as Company_cal_refresh, utilizing the Google Cloud Platform's (GCP) provided Airflow operator, specifically the CloudComposerDAGRunSensor. This monitoring process involves polling for the successful completion status of the Parent DAG.

Parent DAG - Yearly: Company_cal_refresh
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cloud Composer DAG to generate stock weekly snapshot update."""
import airflow
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.version import version as AIRFLOW_VERSION
from airflow.sensors.external_task_sensor import ExternalTaskSensor

default_dag_args = {
   "depends_on_past": False,
   "start_date": datetime(2023, 11, 01),
   "retries": 1,
   "retry_delay": timedelta(minutes=30),
}
with airflow.DAG('company_cal_refresh',
                 template_searchpath=['/home/airflow/gcs/data/bq_data_replication'],
                 default_args=default_dag_args,
                 max_active_runs=2,
                 schedule_interval="@yearly",
                 catchup=True
                 ) as dag:
    start_task = DummyOperator(task_id='start')

    stop_task = DummyOperator(task_id="stop")

    # Uncomment this after fixing the dependency  
    start_task >> stop_task
image3.png
Child DAG - Monthly: Product_catalog_refresh
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Cloud Composer DAG to generate stock monthly snapshot update."""

import airflow
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.version import version as AIRFLOW_VERSION
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

default_dag_args = {
   "depends_on_past": False,
   "start_date": datetime(2024, 1, 1),
   "retries": 1,
   "retry_delay": timedelta(minutes=30),
}

with airflow.DAG('product_catalog_refresh',
template_searchpath=['/home/airflow/gcs/data/bq_data_replication'],
                 default_args=default_dag_args,
                 max_active_runs=2,
                 schedule_interval="@monthly",
                 catchup=True
                 ) as dag:
    start_task = DummyOperator(task_id='start')


    external_task_sensors = CloudComposerDAGRunSensor(
        task_id="external_task_sensors",
        project_id='api-sandbox-328207',
        region='us-central1',
        environment_id='env-a',
        composer_dag_id="company_cal_refresh",
        allowed_states=["success"],
    )
    stop_task = DummyOperator(task_id="stop")

    # Uncomment this after fixing the dependency  
    start_task >> external_task_sensors >> stop_task

image5.png

 

In summary, the CloudComposerDAGRunSensor allows users to monitor the completion of a Parent DAG from a different GCP project or Composer environment, facilitating cross-project DAG dependency management. The provided example demonstrates the practical application of the CloudComposerDAGRunSensor in managing DAG dependencies across projects. With minor adjustments to permissions for your Composer Service Accounts, developers can quickly leverage this new operator to significantly simplify cross-instance DAG dependencies, making Data Mesh-style architectures much easier to implement.