Title explains a lot.
I am using Dataform and Composer. Both instances are very simple and I just used the quickstart guides to get it up and running.
I have 2-3 workflows on Dataform. I added unique tags for all of them. The workflows deals with data and tables on my BigQuery instance.
I am trying to create a Composer DAG that will trigger the executing of a workflow, similar to if I am doing it manually from the UI, however, I can't do it and it looks the documentation is very limited.
Can somebody share an example of how to do it?
Thanks in advance!
Solved! Go to Solution.
To trigger a Dataform workflow from Google Cloud Composer, you need to use the Composer's Apache Airflow environment to programmatically execute a Dataform job. This typically involves using the Airflow's HTTP operator to make a call to the Dataform API, which in turn triggers the workflow.
Here's a step-by-step guide on how to set this up:
Ensure that your Google Cloud Composer environment is up and running. You should have Apache Airflow installed as part of this environment.
To trigger a Dataform workflow, you need to authenticate with the Dataform API. This usually involves obtaining an API key or setting up OAuth credentials. Refer to Dataform's documentation to get these credentials.
You will create a Directed Acyclic Graph (DAG) in Airflow to define the workflow. This DAG will include a task to trigger the Dataform job.
Here's a basic example of what the DAG might look like in Python:
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from datetime import datetime, timedelta
# Default arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
dag = DAG('trigger_dataform_workflow',
default_args=default_args,
description='Trigger a Dataform workflow',
schedule_interval=timedelta(days=1))
# Task to trigger Dataform workflow
trigger_dataform = SimpleHttpOperator(
task_id='trigger_dataform',
http_conn_id='dataform_api_connection', # Replace with your connection ID
endpoint='your/dataform/api/endpoint', # Replace with your Dataform API endpoint
method='POST',
headers={"Content-Type": "application/json", "Authorization": "Bearer YOUR_API_KEY"},
data=json.dumps({"tag": "your_workflow_tag"}), # Replace with your workflow tag
dag=dag,
)
In the Airflow UI, set up a new HTTP connection (dataform_api_connection
in the example) with the details of your Dataform API endpoint. This includes the API URL and any authentication headers required.
Deploy this DAG to your Airflow environment and test it to ensure it triggers the Dataform workflow as expected.
To authenticate and trigger a Dataform workflow from Google Cloud Composer, you typically need to set up an HTTP connection in Composer that includes the necessary authentication details to interact with Dataform's API. Here are the steps you can follow:
Obtain Dataform API Credentials:
Setting Up HTTP Connection in Composer:
{"Authorization": "Bearer YOUR_API_KEY"}
).Testing the Connection:
Integration in DAG:
SimpleHttpOperator
or a similar operator in Airflow, where you specify the HTTP connection ID and the necessary parameters for the API request.Execution Context:
Documentation and Support:
Troubleshooting: