Dataform<>Airflow Run Using Tags

I am currently managing a dataform project with numerous transformations. These transformations are executed through a combination of workflow configuration and release configuration for orchestration, scheduled to run daily, weekly, or monthly, depending on the data source and the tags I have placed on the source usually daily, weekly or monthly tag.

In transitioning to Airflow, I aim to trigger the transformations upon completion of the Extraction and Loading processes. To achieve this, I intend for Airflow to execute the transformations with a designated tag, facilitating a unified view of daily, weekly, and monthly DAG runs. How can I implement this transition effectively within Airflow?


Also, can I create a Dataform repository inside my current airflow repository so that all the ELT codes are in the same repository in GitHub? Will that work?

@ms4446 

Solved Solved
7 1 97
1 ACCEPTED SOLUTION

Hi @francisatoyebi ,

Here's how to you might implement this effectively using Airflow:

Airflow Operators:

  • Custom Operators: If specific Dataform operators like DataformCreateCompilationResultOperator and DataformWorkflowInvocationOperator were available, they would be ideal. However, as these are not standard Airflow operators, we'll need to use flexible alternatives.
  • Standard Operators: The BashOperator or PythonOperator are excellent substitutes that allow you to execute shell commands or Python functions, respectively. These can be used to run Dataform CLI commands or interact with its API.

Workflow Structure:

 
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    # ... your other default DAG arguments
}

with DAG(
    dag_id='dataform_workflow',
    default_args=default_args,
    schedule_interval='@daily',  # Adjust to your needs
    start_date=days_ago(1),
) as dag:
    check_elt_completion = BashOperator(
        task_id='check_elt_completion',
        bash_command='<insert your logic to verify ETL completion>'  # Placeholder
    )
    create_compilation_result = BashOperator(
        task_id='create_compilation_result',
        bash_command='gcloud dataform ...'  # Replace with your command
    )
    trigger_workflow = BashOperator(
        task_id='trigger_workflow',
        bash_command='gcloud dataform ...'  # Replace with your command
    )

    check_elt_completion >> create_compilation_result >> trigger_workflow

Important: Replace the bash_command placeholders with your actual Dataform CLI calls or API interactions. This code serves as a template—customize it to fit your specific needs.

Repository Structure

It's generally recommended to keep your Airflow and Dataform code in separate repositories. This promotes:

  • Separation of Concerns: Different tools, different codebases.
  • Version Control Clarity: Independent updates and history for each.
  • Maintainability: Easier to manage focused codebases.

Keeping separate repositories helps avoid complications that can arise from merging different technologies and development practices into a single repository. However, if integration simplifies your workflow, ensure clear documentation and organization within the repository to manage dependencies effectively.

View solution in original post

1 REPLY 1

Hi @francisatoyebi ,

Here's how to you might implement this effectively using Airflow:

Airflow Operators:

  • Custom Operators: If specific Dataform operators like DataformCreateCompilationResultOperator and DataformWorkflowInvocationOperator were available, they would be ideal. However, as these are not standard Airflow operators, we'll need to use flexible alternatives.
  • Standard Operators: The BashOperator or PythonOperator are excellent substitutes that allow you to execute shell commands or Python functions, respectively. These can be used to run Dataform CLI commands or interact with its API.

Workflow Structure:

 
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    # ... your other default DAG arguments
}

with DAG(
    dag_id='dataform_workflow',
    default_args=default_args,
    schedule_interval='@daily',  # Adjust to your needs
    start_date=days_ago(1),
) as dag:
    check_elt_completion = BashOperator(
        task_id='check_elt_completion',
        bash_command='<insert your logic to verify ETL completion>'  # Placeholder
    )
    create_compilation_result = BashOperator(
        task_id='create_compilation_result',
        bash_command='gcloud dataform ...'  # Replace with your command
    )
    trigger_workflow = BashOperator(
        task_id='trigger_workflow',
        bash_command='gcloud dataform ...'  # Replace with your command
    )

    check_elt_completion >> create_compilation_result >> trigger_workflow

Important: Replace the bash_command placeholders with your actual Dataform CLI calls or API interactions. This code serves as a template—customize it to fit your specific needs.

Repository Structure

It's generally recommended to keep your Airflow and Dataform code in separate repositories. This promotes:

  • Separation of Concerns: Different tools, different codebases.
  • Version Control Clarity: Independent updates and history for each.
  • Maintainability: Easier to manage focused codebases.

Keeping separate repositories helps avoid complications that can arise from merging different technologies and development practices into a single repository. However, if integration simplifies your workflow, ensure clear documentation and organization within the repository to manage dependencies effectively.