I am new to dataform and i have to implement a working Dataform workflow which i have to trigger from Airflow.
Now the issue is, I have tried to implement many things described in official google documentation but there are many issues with these implementations.
1.Inline JS is not possible.
2.Triggering Dataform dags with specific tags and with vars.
3.Cannot import DataformWorkflowInvocationStateSensor operator in my dag code.
4.When i am trying to trigger airflow dag for dataform,it should ideally run entire compiled workspace.
I have my code inside a folder in my workspace.It is not getting triggered at all from airflow.
There are many such issues.
Please guide me to a reliable tech forum/article that i can use for implementing these scenarios.
Also,would it be better if i use dataform core instead of dataform for the same.
I understand your concerns with integrating Dataform and Airflow. Based on your requirements, you can certainly check in here or use Stack Overflow.
Regarding Dataform core vs. Dataform: Dataform core offers the essential functionality without the web interface, making it suitable for those comfortable with direct code interactions. On the other hand, if you prefer a GUI with added features like scheduling, the web version of Dataform would be more apt. Your choice should align with your specific needs and familiarity with the tools.
@ms4446 All the above mentioned issues are resolved now.
I am facing issue with another scenario where i need to store a value that is coming from bigquery sql in sqlx file in a variable and then assign this value in a variable to a js variable which i need to export this value.
What can i do to achieve this result?
@AkshayBhandare5, Glad to hear previous issues has been resolved.
As for the latest issue you are facing, In Dataform, you cannot directly assign the result of a SQL query to a JavaScript variable within a SQLX file. However, you can structure your Dataform project in a way that allows you to use the results of SQL queries in downstream transformations. Here's a more accurate approach:
Define your SQLX File: Start by defining your SQLX file, which will contain the SQL code.
Run the BigQuery SQL: Use the SQLX syntax to run your BigQuery SQL and store the result in a table or view.
SELECT my_value
FROM `my_dataset.my_table`
WHERE `my_column` = 'my_value';
Reference the Result in Downstream Transformations: Once you've created a view or table with the SQL result, you can use it in other SQLX files or JS scripts in Dataform by referencing it.
const tableName = "your_sqlx_table_or_view_created_above";
Export the Table or View: Use the publish
function to export the table or view.
publish("your_output_table_or_view")
.query(ctx => `
SELECT *
FROM ${ctx.ref(tableName)}
`);
The assign()
function and the method you described for it do not exist in Dataform. Instead, you should think in terms of creating datasets with SQL and then referencing those datasets in subsequent transformations or logic.
Hello, here's the basic structure for invoking DATAFORM from airflow :
from datetime import datetime, timedelta
from airflow import DAG
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator
)
DAG_ID = "dataform"
PROJECT_ID = "YOUR_PROJECT_ID"
REPOSITORY_ID = "YOUR_DATAFORM_REPOSITORY"
REGION = "europe-west1"
GIT_COMMITISH = "YOUR_GIT_BRANCH"
with DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
gcp_conn_id="YOUR_PROJECT_CONN_default"
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
gcp_conn_id="YOUR_PROJECT_CONN_default"
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
gcp_conn_id="YOUR_PROJECT_CONN_default"
)
create_compilation_result >> create_workflow_invocation >> is_workflow_invocation_done
hopping this snipped help to you!
best,
Ricardo
What does the Dag_ID stand for? is it the same as a tag defined in the config file? So currently I have a dataform project with multiple transformations, I currently run these transformations using the workflow configuration and the release configuration for orchestration which I have scheduled for daily, weekly and monthly run depending on the data source. Now, I am migrating to airflow so that the transformation is triggered once the Extraction and Loading are done I want airflow to run the transformations with a particular tag so I can have the same view of daily, weekly and monthly DAG run, how will I do that?
Hello @francisatoyebi.
The `dag_id` is a unique identifier for your Airflow DAG.
https://docs.astronomer.io/learn/dags
Best,