Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Error while triggering Dataform via Airflow (Composer environment)

Please see my code for triggering Dataform. This is in my Airflow DAG and I am using the Composer Environment. I am triggering the Dataform pipeline every hour and it works for 10-11 runs but then it throws up the error. I have put the code in my DAG that triggers Dataform below as well as the error that occurs after 9-10 runs.

    with TaskGroup("run_dataform") as run_dataform:
            
            create_compilation_result = DataformCreateCompilationResultOperator(
            task_id = "create_compilation_result",
            project_id = pipel[2023-11-07, 10:07:06 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution. [2023-11-07, 10:07:06 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided. [2023-11-07, 10:07:06 UTC] {taskinstance.py:1778} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto return self._descriptor(**value) TypeError: bad argument type for built-in operation During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataform.py", line 248, in execute result = hook.create_workflow_invocation( File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper return func(self, *args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataform.py", line 196, in create_workflow_invocation return client.create_workflow_invocation( File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataform_v1beta1/services/dataform/client.py", line 3558, in create_workflow_invocation request = dataform.CreateWorkflowInvocationRequest(request) File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 570, in __init__ pb_value = marshal.to_proto(pb_type, value) File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 228, in to_proto pb_value = self.get_rule(proto_type=proto_type).to_proto(value) File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 41, in to_proto return self._wrapper(value)._pb File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 604, in __init__ super().__setattr__("_pb", self._meta.pb(**params)) TypeError: bad argument type for built-in operation [2023-11-07, 10:07:06 UTC] {taskinstance.py:1328} INFO - Marking task as FAILED. dag_id=shield_data_loading_and_analytics, task_id=run_dataform.create_workflow_invocation, execution_date=20231107T090000, start_date=20231107T100705, end_date=20231107T100706 [2023-11-07, 10:07:06 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 16489 for task run_dataform.create_workflow_invocation (bad argument type for built-in operation; 2304763) [2023-11-07, 10:07:07 UTC] {local_task_job.py:212} INFO - Task exited with return code 1 [2023-11-07, 10:07:07 UTC] {taskinstance.py:2599} INFO - 0 downstream tasks scheduled from follow-on schedule check ine_settings["dataform"]["dataform_gcp_project"],
            region = pipeline_settings["dataform"]["dataform_region"],
            repository_id = pipeline_settings["dataform"]["dataform_repository"],
            compilation_result={
                "git_commitish": pipeline_settings["dataform"]["dataform_branch"],
                "code_compilation_config":{
                    "default_database": pipeline_settings["dataform"]["dataform_gcp_project"],
                    "default_schema": pipeline_settings["datasets"]["common_dataset"],
                },
            },
            
            )
            
            create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id="create_workflow_invocation",
            max_active_tis_per_dag=1,
            project_id = pipeline_settings["dataform"]["dataform_gcp_project"],
            region = pipeline_settings["dataform"]["dataform_region"],
            repository_id = pipeline_settings["dataform"]["dataform_repository"],
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('run_dataform.create_compilation_result')['name'] }}",
                  
            },
            
  

 And is the error message I receive after 9-10 runs. 

[2023-11-07, 10:07:06 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-11-07, 10:07:06 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2023-11-07, 10:07:06 UTC] {taskinstance.py:1778} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
    return self._descriptor(**value)
TypeError: bad argument type for built-in operation

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataform.py", line 248, in execute
    result = hook.create_workflow_invocation(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataform.py", line 196, in create_workflow_invocation
    return client.create_workflow_invocation(
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataform_v1beta1/services/dataform/client.py", line 3558, in create_workflow_invocation
    request = dataform.CreateWorkflowInvocationRequest(request)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 570, in __init__
    pb_value = marshal.to_proto(pb_type, value)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 228, in to_proto
    pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
    return self._wrapper(value)._pb
  File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 604, in __init__
    super().__setattr__("_pb", self._meta.pb(**params))
TypeError: bad argument type for built-in operation
[2023-11-07, 10:07:06 UTC] {taskinstance.py:1328} INFO - Marking task as FAILED. dag_id=shield_data_loading_and_analytics, task_id=run_dataform.create_workflow_invocation, execution_date=20231107T090000, start_date=20231107T100705, end_date=20231107T100706
[2023-11-07, 10:07:06 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 16489 for task run_dataform.create_workflow_invocation (bad argument type for built-in operation; 2304763)
[2023-11-07, 10:07:07 UTC] {local_task_job.py:212} INFO - Task exited with return code 1
[2023-11-07, 10:07:07 UTC] {taskinstance.py:2599} INFO - 0 downstream tasks scheduled from follow-on schedule check

Can anyone please help?

 

0 6 3,125
6 REPLIES 6