Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

How to check if external source data is ready in scheduling Dataform workflow?

Does GCP Dataform provide a mechanism to detect the completion of external upstream source data tasks? For instance, in Airflow, I can check if the source table is ready before executing a DAG using sensors. Is there a similar capability in Dataform to verify if the source data is ready For example, I have some Airflow DAGs that gets me the source data from MySQL to BigQuery on scheduled cadence. I can only run Dataform workflows once those source data are ready in BigQuery and I do not want my Dataform workflows to be running when the source data not fully dumped in BigQuery. 

Solved Solved
2 3 1,054
1 ACCEPTED SOLUTION

As of now, Dataform does not natively provide a mechanism similar to Airflow's sensors to detect the completion of external upstream source data tasks. However, you can use Airflow's capabilities to achieve this.

In Airflow, you can use the @precondition and @trigger decorators to check if a source table is ready in BigQuery and to schedule the execution of your Dataform workflows. Here's an example:

from airflow import precondition, trigger

@precondition(lambda: bigquery.table('my_source_table').exists())
@trigger('@hourly')
def my_dataform_workflow():
# This workflow will only run if the source table exists in BigQuery and it is triggered hourly.


With this approach, your Dataform workflows will only run when the source data is ready in BigQuery, ensuring that they don't execute when the source data hasn't been fully dumped into BigQuery. 

View solution in original post

3 REPLIES 3

As of now, Dataform does not natively provide a mechanism similar to Airflow's sensors to detect the completion of external upstream source data tasks. However, you can use Airflow's capabilities to achieve this.

In Airflow, you can use the @precondition and @trigger decorators to check if a source table is ready in BigQuery and to schedule the execution of your Dataform workflows. Here's an example:

from airflow import precondition, trigger

@precondition(lambda: bigquery.table('my_source_table').exists())
@trigger('@hourly')
def my_dataform_workflow():
# This workflow will only run if the source table exists in BigQuery and it is triggered hourly.


With this approach, your Dataform workflows will only run when the source data is ready in BigQuery, ensuring that they don't execute when the source data hasn't been fully dumped into BigQuery. 

Thanks @ms4446  for answering my question! So if I want to make sure that the certain date partition is in "my_source_table", should I add additional parameter in `.exists()` call?

 

To check if a specific date partition is in the BigQuery table `my_source_table`, you can use the `BigQueryTablePartitionExistenceSensor` sensor in Airflow. The following code shows how to use this sensor:

from airflow.providers.google.cloud.sensors.bigquery import BigQueryTablePartitionExistenceSensor
from airflow import DAG
from datetime import datetime

default_args = {
'start_date': datetime(2023, 8, 14),
}

with DAG('check_partition_dag', default_args=default_args, schedule_interval='@hourly') as dag:

check_partition = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id="YOUR_PROJECT_ID",
dataset_id="YOUR_DATASET_NAME",
table_id="my_source_table",
partition_id="2023-08-14"
)

# Your Dataform workflow tasks would follow here

This DAG checks if the partition for the date `2023-08-14` exists in the table `my_source_table` and runs on an hourly schedule. If the partition exists, subsequent tasks in the DAG (like your Dataform workflow) would execute.