Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

How to trigger a single Dataform Workflow from Composer

Title explains a lot.

I am using Dataform and Composer. Both instances are very simple and I just used the quickstart guides to get it up and running.

I have 2-3 workflows on Dataform. I added unique tags for all of them. The workflows deals with data and tables on my BigQuery instance.

I am trying to create a Composer DAG that will trigger the executing of a workflow, similar to if I am doing it manually from the UI, however, I can't do it and it looks the documentation is very limited.

Can somebody share an example of how to do it?

Thanks in advance!

Solved Solved
1 7 5,408
2 ACCEPTED SOLUTIONS

To trigger a Dataform workflow from Google Cloud Composer, you need to use the Composer's Apache Airflow environment to programmatically execute a Dataform job. This typically involves using the Airflow's HTTP operator to make a call to the Dataform API, which in turn triggers the workflow.

Here's a step-by-step guide on how to set this up:

1. Set Up Google Cloud Composer

Ensure that your Google Cloud Composer environment is up and running. You should have Apache Airflow installed as part of this environment.

2. Obtain Dataform API Credentials

To trigger a Dataform workflow, you need to authenticate with the Dataform API. This usually involves obtaining an API key or setting up OAuth credentials. Refer to Dataform's documentation to get these credentials.

3. Create an Airflow DAG

You will create a Directed Acyclic Graph (DAG) in Airflow to define the workflow. This DAG will include a task to trigger the Dataform job.

Here's a basic example of what the DAG might look like in Python:

 
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from datetime import datetime, timedelta

# Default arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG('trigger_dataform_workflow',
          default_args=default_args,
          description='Trigger a Dataform workflow',
          schedule_interval=timedelta(days=1))

# Task to trigger Dataform workflow
trigger_dataform = SimpleHttpOperator(
    task_id='trigger_dataform',
    http_conn_id='dataform_api_connection',  # Replace with your connection ID
    endpoint='your/dataform/api/endpoint',  # Replace with your Dataform API endpoint
    method='POST',
    headers={"Content-Type": "application/json", "Authorization": "Bearer YOUR_API_KEY"},
    data=json.dumps({"tag": "your_workflow_tag"}),  # Replace with your workflow tag
    dag=dag,
)
 

4. Set Up Airflow Connections

In the Airflow UI, set up a new HTTP connection (dataform_api_connection in the example) with the details of your Dataform API endpoint. This includes the API URL and any authentication headers required.

5. Deploy and Test the DAG

Deploy this DAG to your Airflow environment and test it to ensure it triggers the Dataform workflow as expected.

View solution in original post

To authenticate and trigger a Dataform workflow from Google Cloud Composer, you typically need to set up an HTTP connection in Composer that includes the necessary authentication details to interact with Dataform's API. Here are the steps you can follow:

  1. Obtain Dataform API Credentials:

    • You need to have the appropriate credentials to authenticate with Dataform's API. This could be an API key or OAuth tokens, depending on Dataform's authentication mechanism.
    • The guide you followed for authentication should provide you with these credentials. Make sure you have followed all the steps correctly.
  2. Setting Up HTTP Connection in Composer:

    • In your Composer (Airflow) environment, you need to set up an HTTP connection that includes the Dataform API's base URL and the necessary authentication headers.
    • The connection setup typically involves specifying the API endpoint as the 'Host' and adding the authentication token or API key in the 'Extras' field in JSON format (e.g., {"Authorization": "Bearer YOUR_API_KEY"}).
  3. Testing the Connection:

    • Before integrating this connection into your DAG, you can test it using tools like Postman.
    • In Postman, set up a request to the Dataform API endpoint and include the authentication headers. If you face issues, double-check the API endpoint and the credentials.
  4. Integration in DAG:

    • Once the connection is set up and tested, you can use it in your DAG to trigger Dataform workflows. This is usually done using the SimpleHttpOperator or a similar operator in Airflow, where you specify the HTTP connection ID and the necessary parameters for the API request.
  5. Execution Context:

    • You don't need to execute the authentication steps on the Composer machine. The key is to ensure that the HTTP connection in Composer has the correct configuration to authenticate with Dataform's API.
  6. Documentation and Support:

    • If the documentation you're following is unclear or you're encountering specific issues, consider reaching out to Dataform's support or community forums for more targeted assistance.
  7. Troubleshooting:

    • If you encounter errors, check the logs in your Composer environment for clues. Common issues include incorrect API endpoints, invalid credentials, or network-related problems.

View solution in original post

7 REPLIES 7