I am currently managing a dataform project with numerous transformations. These transformations are executed through a combination of workflow configuration and release configuration for orchestration, scheduled to run daily, weekly, or monthly, depending on the data source and the tags I have placed on the source usually daily, weekly or monthly tag.
In transitioning to Airflow, I aim to trigger the transformations upon completion of the Extraction and Loading processes. To achieve this, I intend for Airflow to execute the transformations with a designated tag, facilitating a unified view of daily, weekly, and monthly DAG runs. How can I implement this transition effectively within Airflow?
Also, can I create a Dataform repository inside my current airflow repository so that all the ELT codes are in the same repository in GitHub? Will that work?
@ms4446
Solved! Go to Solution.
Hi @francisatoyebi ,
Here's how to you might implement this effectively using Airflow:
Airflow Operators:
DataformCreateCompilationResultOperator
and DataformWorkflowInvocationOperator
were available, they would be ideal. However, as these are not standard Airflow operators, we'll need to use flexible alternatives.BashOperator
or PythonOperator
are excellent substitutes that allow you to execute shell commands or Python functions, respectively. These can be used to run Dataform CLI commands or interact with its API.Workflow Structure:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
# ... your other default DAG arguments
}
with DAG(
dag_id='dataform_workflow',
default_args=default_args,
schedule_interval='@daily', # Adjust to your needs
start_date=days_ago(1),
) as dag:
check_elt_completion = BashOperator(
task_id='check_elt_completion',
bash_command='<insert your logic to verify ETL completion>' # Placeholder
)
create_compilation_result = BashOperator(
task_id='create_compilation_result',
bash_command='gcloud dataform ...' # Replace with your command
)
trigger_workflow = BashOperator(
task_id='trigger_workflow',
bash_command='gcloud dataform ...' # Replace with your command
)
check_elt_completion >> create_compilation_result >> trigger_workflow
Important: Replace the bash_command
placeholders with your actual Dataform CLI calls or API interactions. This code serves as a template—customize it to fit your specific needs.
Repository Structure
It's generally recommended to keep your Airflow and Dataform code in separate repositories. This promotes:
Keeping separate repositories helps avoid complications that can arise from merging different technologies and development practices into a single repository. However, if integration simplifies your workflow, ensure clear documentation and organization within the repository to manage dependencies effectively.
Hi @francisatoyebi ,
Here's how to you might implement this effectively using Airflow:
Airflow Operators:
DataformCreateCompilationResultOperator
and DataformWorkflowInvocationOperator
were available, they would be ideal. However, as these are not standard Airflow operators, we'll need to use flexible alternatives.BashOperator
or PythonOperator
are excellent substitutes that allow you to execute shell commands or Python functions, respectively. These can be used to run Dataform CLI commands or interact with its API.Workflow Structure:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
# ... your other default DAG arguments
}
with DAG(
dag_id='dataform_workflow',
default_args=default_args,
schedule_interval='@daily', # Adjust to your needs
start_date=days_ago(1),
) as dag:
check_elt_completion = BashOperator(
task_id='check_elt_completion',
bash_command='<insert your logic to verify ETL completion>' # Placeholder
)
create_compilation_result = BashOperator(
task_id='create_compilation_result',
bash_command='gcloud dataform ...' # Replace with your command
)
trigger_workflow = BashOperator(
task_id='trigger_workflow',
bash_command='gcloud dataform ...' # Replace with your command
)
check_elt_completion >> create_compilation_result >> trigger_workflow
Important: Replace the bash_command
placeholders with your actual Dataform CLI calls or API interactions. This code serves as a template—customize it to fit your specific needs.
Repository Structure
It's generally recommended to keep your Airflow and Dataform code in separate repositories. This promotes:
Keeping separate repositories helps avoid complications that can arise from merging different technologies and development practices into a single repository. However, if integration simplifies your workflow, ensure clear documentation and organization within the repository to manage dependencies effectively.