Please see my code for triggering Dataform. This is in my Airflow DAG and I am using the Composer Environment. I am triggering the Dataform pipeline every hour and it works for 10-11 runs but then it throws up the error. I have put the code in my DAG that triggers Dataform below as well as the error that occurs after 9-10 runs.
with TaskGroup("run_dataform") as run_dataform:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id = "create_compilation_result",
project_id = pipel[2023-11-07, 10:07:06 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution. [2023-11-07, 10:07:06 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided. [2023-11-07, 10:07:06 UTC] {taskinstance.py:1778} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto return self._descriptor(**value) TypeError: bad argument type for built-in operation During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataform.py", line 248, in execute result = hook.create_workflow_invocation( File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper return func(self, *args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataform.py", line 196, in create_workflow_invocation return client.create_workflow_invocation( File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataform_v1beta1/services/dataform/client.py", line 3558, in create_workflow_invocation request = dataform.CreateWorkflowInvocationRequest(request) File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 570, in __init__ pb_value = marshal.to_proto(pb_type, value) File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 228, in to_proto pb_value = self.get_rule(proto_type=proto_type).to_proto(value) File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 41, in to_proto return self._wrapper(value)._pb File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 604, in __init__ super().__setattr__("_pb", self._meta.pb(**params)) TypeError: bad argument type for built-in operation [2023-11-07, 10:07:06 UTC] {taskinstance.py:1328} INFO - Marking task as FAILED. dag_id=shield_data_loading_and_analytics, task_id=run_dataform.create_workflow_invocation, execution_date=20231107T090000, start_date=20231107T100705, end_date=20231107T100706 [2023-11-07, 10:07:06 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 16489 for task run_dataform.create_workflow_invocation (bad argument type for built-in operation; 2304763) [2023-11-07, 10:07:07 UTC] {local_task_job.py:212} INFO - Task exited with return code 1 [2023-11-07, 10:07:07 UTC] {taskinstance.py:2599} INFO - 0 downstream tasks scheduled from follow-on schedule check ine_settings["dataform"]["dataform_gcp_project"],
region = pipeline_settings["dataform"]["dataform_region"],
repository_id = pipeline_settings["dataform"]["dataform_repository"],
compilation_result={
"git_commitish": pipeline_settings["dataform"]["dataform_branch"],
"code_compilation_config":{
"default_database": pipeline_settings["dataform"]["dataform_gcp_project"],
"default_schema": pipeline_settings["datasets"]["common_dataset"],
},
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create_workflow_invocation",
max_active_tis_per_dag=1,
project_id = pipeline_settings["dataform"]["dataform_gcp_project"],
region = pipeline_settings["dataform"]["dataform_region"],
repository_id = pipeline_settings["dataform"]["dataform_repository"],
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('run_dataform.create_compilation_result')['name'] }}",
},
And is the error message I receive after 9-10 runs.
[2023-11-07, 10:07:06 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-11-07, 10:07:06 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2023-11-07, 10:07:06 UTC] {taskinstance.py:1778} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
return self._descriptor(**value)
TypeError: bad argument type for built-in operation
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataform.py", line 248, in execute
result = hook.create_workflow_invocation(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper
return func(self, *args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataform.py", line 196, in create_workflow_invocation
return client.create_workflow_invocation(
File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataform_v1beta1/services/dataform/client.py", line 3558, in create_workflow_invocation
request = dataform.CreateWorkflowInvocationRequest(request)
File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 570, in __init__
pb_value = marshal.to_proto(pb_type, value)
File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 228, in to_proto
pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
return self._wrapper(value)._pb
File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 604, in __init__
super().__setattr__("_pb", self._meta.pb(**params))
TypeError: bad argument type for built-in operation
[2023-11-07, 10:07:06 UTC] {taskinstance.py:1328} INFO - Marking task as FAILED. dag_id=shield_data_loading_and_analytics, task_id=run_dataform.create_workflow_invocation, execution_date=20231107T090000, start_date=20231107T100705, end_date=20231107T100706
[2023-11-07, 10:07:06 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 16489 for task run_dataform.create_workflow_invocation (bad argument type for built-in operation; 2304763)
[2023-11-07, 10:07:07 UTC] {local_task_job.py:212} INFO - Task exited with return code 1
[2023-11-07, 10:07:07 UTC] {taskinstance.py:2599} INFO - 0 downstream tasks scheduled from follow-on schedule check
Can anyone please help?
The error message "bad argument type for built-in operation" typically indicates a problem with the data being passed to the DataformCreateWorkflowInvocationOperator
. This error occurring after several successful runs suggests a potential issue with data consistency or system state that develops over time.
Enhanced Troubleshooting Steps:
Logging and Debugging:
Data Verification and XCom Evaluation:
compilation_result
and workflow_invocation
parameters is correct and consistent with the expected schema.Resource Management and Package Updates:
Testing and Configuration Review:
parallelism
, dag_concurrency
, and worker_concurrency
for potential misconfigurations.Error Handling Enhancements:
Incremental Changes and Backups:
Additional Considerations:
Hi, can you advice how to increase the verbosity of Airflow and Dataform tasks?
@ms4446 wrote:Enable Detailed Logging: Increase the verbosity of logging for both Airflow and Dataform tasks to capture more granular information about the error.
@ms4446 wrote:Analyze Log Messages: Review the logs for any specific error messages, codes, or warnings that could provide insights into the issue.
There are no warnings and my Dataform workflow runs using "Compile Query" through the user interface.
Few questions:
information about the error.
Here are some additional Troubleshooting Steps for "bad argument type for built-in operation" Error in DataformCreateWorkflowInvocationOperator:
Increasing Log Verbosity:
Airflow Logs:
logging_level
parameter in the airflow.cfg
file to DEBUG
for the most detailed logs.Dataform Logs:
DATAFORM_LOG_LEVEL
environment variable through the official documentation or support channels.DATAFORM_LOG_LEVEL
to DEBUG
in the appropriate environment (e.g., container configurations, server settings).Analyzing Log Messages:
Understanding the Significance of Retries:
Addressing No Warnings and Successful "Compile Query" Runs:
Additional Considerations:
Compatibility Checks:
Monitoring and Alerting:
Testing and Version Control:
Can you please tell me how to clear the XCOMS? Increasing the number of retries is helping but I am noticing that with every run (every hour), the number of retries are increasing. That is, in the 5th run the number of retries to get a success solution were 3, in 7th run the number of retries to get a success solution is 8, in 12 th run its 11 and so on..
To manage and clear XComs in Cloud Composer there are a couple of options:
Automated Cleanup via Airflow Configuration:
xcoms_cleanup
DAG parameter or the core/xcom_enable_cleanup
configuration in the airflow.cfg
file. This process is managed by the Airflow scheduler and happens periodically.airflow.cfg
file or set the appropriate DAG parameters in your DAG definition.Manual Cleanup via Airflow Webserver UI:
Please exercise caution when manually deleting XComs. Ensure that the XComs you are deleting are not required for any ongoing or future executions of your DAGs. Removing XComs that are still needed can disrupt the proper functioning of your workflows.
If you are noticing that the number of retries is increasing with each run, this suggests that the root cause of the issue is not being addressed. While clearing XComs might temporarily alleviate symptoms, it's crucial to investigate the underlying cause of the failures to find a permanent solution. This might involve looking into resource constraints, network issues, or other environmental factors that could be affecting the stability of your DAG runs.
The failure occurs at this point:
As you can see below, it is immediately after :
2023-11-07, 20:06:22 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
Does this convey anything/clue?
[2023-11-07, 20:06:22 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-11-07, 20:06:22 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2023-11-07, 20:06:22 UTC] {taskinstance.py:1778} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
return self._descriptor(**value)