Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Composer - Trigger a Dataform Repository with gcp cloud composer

TA3
Bronze 2
Bronze 2


Hi All,

I am performing a Poc to trigger dataform via gcp composer, I am using dataform to perform ETL for a multiple clients depending on the client name i pass as an argument. My requirement is to be able to create a Dag which will trigger the dataform for a client by sending the client_name.

For example 
Below is how my workflow_settings.yaml looks like in a dataform repo, It performs the ETL operations for a client which is sent via code variable, In the below example it would for client A.
================================================

defaultProject: project_id
defaultLocation: europe-west1
defaultDataset: dataform_tm_poc
defaultAssertionDataset: dataform_assertions
dataformCoreVersion: 3.0.0-beta.4
vars:
  code: client A
===========================================

Can anybody please help me in setting up the part of the composer where i can trigger the dataform for a particular client by sending the client code as an argument.
6 1 922
1 REPLY 1

Hi @TA3 

Welcome to Google Cloud Community!

To trigger a Dataform workflow from Google Cloud Composer,  you can follow these steps:

  1. Set Up Your Cloud Composer Environment: Configure your Cloud Composer environment and define necessary parameters.
  2. Define Cloud Storage: Each Cloud Composer environment comes with an associated Cloud Storage bucket.
  3. Create Your Workflow Structure: Utilize Python, as Cloud Composer is based on Apache Airflow, to define your workflow.

Here's a basic Python example of what your DAG might resemble:

from airflow import DAG
from airflow.providers.google.cloud.operators.dataform import DataformRunOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

#Declare DAG
with DAG(
#adjust to your needs
    dag_id='dataform_client_dag',
    default_args={'owner': 'airflow'},
    start_date=days_ago(1),
    schedule_interval=None, #manual
    catchup=False,
    tags=['dataform', 'client'],
) as dag:

    # Create a task to run Dataform for a specific client
    def run_dataform_for_client(client_name):
        return DataformRunOperator(
            task_id=f'run_dataform_{client_name}',
            project_id='your_project_id', #replace with your project
            location='europe-west1', #replace with your location
            workspace='your_dataform_workspace',
            dataform_config_path='path/to/your/dataform/repo', #set your path
            dataform_run_options={
                'workspace_options': {
                    'vars': {
                        'code': client_name
                    }
                }
            }
        )

    # Example usage:
    client_a_task = run_dataform_for_client('client A')
    client_b_task = run_dataform_for_client('client B')

    # Define a start and end task
    start_task = DummyOperator(task_id='start')
    end_task = DummyOperator(task_id='end')

    # Define task dependencies
    start_task >> client_a_task >> end_task
    start_task >> client_b_task >> end_task
​

       4. Upload the DAG file to your Airflow environment. Ensure it's placed in the correct directory.

Note: 

  • Trigger your DAG: You can manually trigger your DAG or schedule it to run on a regular basis.
  • Ensure that you have the necessary permissions in your GCP project to run Dataform and interact with the required resources.

I hope the above information is helpful.