Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Error while triggering Dataform via Airflow (Composer environment)

Please see my code for triggering Dataform. This is in my Airflow DAG and I am using the Composer Environment. I am triggering the Dataform pipeline every hour and it works for 10-11 runs but then it throws up the error. I have put the code in my DAG that triggers Dataform below as well as the error that occurs after 9-10 runs.

    with TaskGroup("run_dataform") as run_dataform:
            
            create_compilation_result = DataformCreateCompilationResultOperator(
            task_id = "create_compilation_result",
            project_id = pipel[2023-11-07, 10:07:06 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution. [2023-11-07, 10:07:06 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided. [2023-11-07, 10:07:06 UTC] {taskinstance.py:1778} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto return self._descriptor(**value) TypeError: bad argument type for built-in operation During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataform.py", line 248, in execute result = hook.create_workflow_invocation( File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper return func(self, *args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataform.py", line 196, in create_workflow_invocation return client.create_workflow_invocation( File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataform_v1beta1/services/dataform/client.py", line 3558, in create_workflow_invocation request = dataform.CreateWorkflowInvocationRequest(request) File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 570, in __init__ pb_value = marshal.to_proto(pb_type, value) File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 228, in to_proto pb_value = self.get_rule(proto_type=proto_type).to_proto(value) File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 41, in to_proto return self._wrapper(value)._pb File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 604, in __init__ super().__setattr__("_pb", self._meta.pb(**params)) TypeError: bad argument type for built-in operation [2023-11-07, 10:07:06 UTC] {taskinstance.py:1328} INFO - Marking task as FAILED. dag_id=shield_data_loading_and_analytics, task_id=run_dataform.create_workflow_invocation, execution_date=20231107T090000, start_date=20231107T100705, end_date=20231107T100706 [2023-11-07, 10:07:06 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 16489 for task run_dataform.create_workflow_invocation (bad argument type for built-in operation; 2304763) [2023-11-07, 10:07:07 UTC] {local_task_job.py:212} INFO - Task exited with return code 1 [2023-11-07, 10:07:07 UTC] {taskinstance.py:2599} INFO - 0 downstream tasks scheduled from follow-on schedule check ine_settings["dataform"]["dataform_gcp_project"],
            region = pipeline_settings["dataform"]["dataform_region"],
            repository_id = pipeline_settings["dataform"]["dataform_repository"],
            compilation_result={
                "git_commitish": pipeline_settings["dataform"]["dataform_branch"],
                "code_compilation_config":{
                    "default_database": pipeline_settings["dataform"]["dataform_gcp_project"],
                    "default_schema": pipeline_settings["datasets"]["common_dataset"],
                },
            },
            
            )
            
            create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id="create_workflow_invocation",
            max_active_tis_per_dag=1,
            project_id = pipeline_settings["dataform"]["dataform_gcp_project"],
            region = pipeline_settings["dataform"]["dataform_region"],
            repository_id = pipeline_settings["dataform"]["dataform_repository"],
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('run_dataform.create_compilation_result')['name'] }}",
                  
            },
            
  

 And is the error message I receive after 9-10 runs. 

[2023-11-07, 10:07:06 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-11-07, 10:07:06 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2023-11-07, 10:07:06 UTC] {taskinstance.py:1778} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
    return self._descriptor(**value)
TypeError: bad argument type for built-in operation

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataform.py", line 248, in execute
    result = hook.create_workflow_invocation(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataform.py", line 196, in create_workflow_invocation
    return client.create_workflow_invocation(
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataform_v1beta1/services/dataform/client.py", line 3558, in create_workflow_invocation
    request = dataform.CreateWorkflowInvocationRequest(request)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 570, in __init__
    pb_value = marshal.to_proto(pb_type, value)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 228, in to_proto
    pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
    return self._wrapper(value)._pb
  File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 604, in __init__
    super().__setattr__("_pb", self._meta.pb(**params))
TypeError: bad argument type for built-in operation
[2023-11-07, 10:07:06 UTC] {taskinstance.py:1328} INFO - Marking task as FAILED. dag_id=shield_data_loading_and_analytics, task_id=run_dataform.create_workflow_invocation, execution_date=20231107T090000, start_date=20231107T100705, end_date=20231107T100706
[2023-11-07, 10:07:06 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 16489 for task run_dataform.create_workflow_invocation (bad argument type for built-in operation; 2304763)
[2023-11-07, 10:07:07 UTC] {local_task_job.py:212} INFO - Task exited with return code 1
[2023-11-07, 10:07:07 UTC] {taskinstance.py:2599} INFO - 0 downstream tasks scheduled from follow-on schedule check

Can anyone please help?

 

0 6 2,821
6 REPLIES 6

The error message "bad argument type for built-in operation" typically indicates a problem with the data being passed to the DataformCreateWorkflowInvocationOperator. This error occurring after several successful runs suggests a potential issue with data consistency or system state that develops over time.

Enhanced Troubleshooting Steps:

  1. Logging and Debugging:

    • Enable Detailed Logging: Increase the verbosity of logging for both Airflow and Dataform tasks to capture more granular information about the error.
    • Analyze Log Messages: Review the logs for any specific error messages, codes, or warnings that could provide insights into the issue.
  2. Data Verification and XCom Evaluation:

    • Data Integrity: Confirm that the data passed to the compilation_result and workflow_invocation parameters is correct and consistent with the expected schema.
    • XCom Validity: Investigate if corrupted or stale XComs are causing the issue. Clearing XComs before each run may help ensure data freshness.
  3. Resource Management and Package Updates:

    • Resource Monitoring: Use monitoring tools to track resource usage and detect potential memory leaks or resource exhaustion in the Airflow workers.
    • Package Updates: Review the changelogs for updates to Airflow and Dataform packages that may address the issue. Upgrade cautiously and ensure you have a backup.
  4. Testing and Configuration Review:

    • Error Reproduction: Try to replicate the error in a staging environment that closely mirrors production to isolate the problem.
    • Configuration Review: Double-check Airflow configurations like parallelism, dag_concurrency, and worker_concurrency for potential misconfigurations.
  5. Error Handling Enhancements:

    • Implement Retries: Use retries with exponential backoff to manage intermittent issues.
    • Alerting Mechanisms: Set up alerts to notify you when errors occur for quicker resolution.
  6. Incremental Changes and Backups:

    • Incremental Changes: Apply changes one at a time and test thoroughly after each to understand their impact.
    • Create Backups: Always back up your environment and configurations before making significant changes.

Additional Considerations:

  • Version Control: Use version control for your DAGs to track changes and correlate them with the occurrence of issues.
  • Dependency Management: Ensure all dependencies are compatible and stable, reviewing Python packages and system libraries.
  • Monitoring and Observability: Implement monitoring and observability practices to detect issues proactively.
  • Best Practices: Follow best practices in DAG design to prevent hard-coded values and ensure tasks are idempotent.
  • Execution Context: Consider the execution context, such as time of day or system load, which might affect the error occurrence.
  • Dataform Diagnostic Tools: Utilize any diagnostic tools provided by Dataform to gain additional insights.
  • Airflow Upgrades: Be aware of any known issues with new Airflow versions that could affect your workflows.

  • Enable Detailed Logging: Increase the verbosity of logging for both Airflow and Dataform tasks to capture more granular

    @ms4446 wrote:

    Enable Detailed Logging: Increase the verbosity of logging for both Airflow and Dataform tasks to capture more granular information about the error.


    Hi, can you advice how to increase the verbosity of Airflow and Dataform tasks?

    @ms4446 wrote:

    Analyze Log Messages: Review the logs for any specific error messages, codes, or warnings that could provide insights into the issue.



    There are no warnings and my Dataform workflow runs using "Compile Query" through the user interface.

     

    Few questions:
  • What is the significance of increasing the number of retries? Nothing gets changed in data or my queries, yet retries helps sometimes, why? 

     

    information about the error.

Here are some additional Troubleshooting Steps for "bad argument type for built-in operation" Error in DataformCreateWorkflowInvocationOperator:

Increasing Log Verbosity:

  1. Airflow Logs:

    • Adjust the logging_level parameter in the airflow.cfg file to DEBUG for the most detailed logs.
    • Restart the necessary Airflow services, such as the web server, scheduler, and worker processes, to apply the changes.
  2. Dataform Logs:

    • Confirm whether Dataform supports the DATAFORM_LOG_LEVEL environment variable through the official documentation or support channels.
    • If supported, set DATAFORM_LOG_LEVEL to DEBUG in the appropriate environment (e.g., container configurations, server settings).

Analyzing Log Messages:

  • Review the detailed logs for specific error messages, codes, or warnings.
  • Look for patterns or correlations that may indicate the cause of the error.

Understanding the Significance of Retries:

  • Implementing retries can help overcome transient issues. Define a retry strategy with a maximum count and exponential backoff to prevent indefinite attempts.

Addressing No Warnings and Successful "Compile Query" Runs:

  • The absence of warnings and successful UI runs suggest the issue might be with Airflow's integration. Consider alternative invocation methods like the Dataform CLI or API as a diagnostic step.

Additional Considerations:

  1. Compatibility Checks:

    • Review compatibility between Airflow and Dataform versions. Consult the respective changelogs and documentation for any known issues.
  2. Monitoring and Alerting:

    • Set up monitoring tools to track the health of Airflow and Dataform services and configure alerts for immediate notification of failures.
  3. Testing and Version Control:

    • Test any changes in a staging or development environment before applying them to production.
    • Use version control for your DAGs and Dataform scripts to manage changes and facilitate rollbacks if necessary.

Can you please tell me how to clear the XCOMS? Increasing the number of retries is helping but I am noticing that with every run (every hour), the number of retries are increasing. That is, in the 5th run the number of retries to get a  success solution were 3, in 7th run the number of retries to get a  success solution is 8, in 12 th run its 11 and so on..

To manage and clear XComs in Cloud Composer there are a couple of options:

  1. Automated Cleanup via Airflow Configuration:

    • XComs can be automatically cleaned up based on the xcoms_cleanup DAG parameter or the core/xcom_enable_cleanup configuration in the airflow.cfg file. This process is managed by the Airflow scheduler and happens periodically.
    • To adjust the cleanup interval or enable/disable this feature, you will need to modify the airflow.cfg file or set the appropriate DAG parameters in your DAG definition.
  2. Manual Cleanup via Airflow Webserver UI:

    • The Airflow webserver UI provides an interface for managing XComs manually. Here are the steps to clear XComs using the Airflow webserver UI:
      • Access the Airflow webserver UI.
      • Navigate to the Admin dropdown menu and select XComs.
      • Use the filter boxes to locate the XComs you wish to delete. You can filter by DAG ID, task ID, key, etc.
      • Select the XComs you want to remove by checking the boxes next to them.
      • Scroll to the bottom of the page and click on the Delete button to remove the selected XComs.

Please exercise caution when manually deleting XComs. Ensure that the XComs you are deleting are not required for any ongoing or future executions of your DAGs. Removing XComs that are still needed can disrupt the proper functioning of your workflows.

If you are noticing that the number of retries is increasing with each run, this suggests that the root cause of the issue is not being addressed. While clearing XComs might temporarily alleviate symptoms, it's crucial to investigate the underlying cause of the failures to find a permanent solution. This might involve looking into resource constraints, network issues, or other environmental factors that could be affecting the stability of your DAG runs.

The failure occurs at this point:

As you can see below, it is immediately after :

2023-11-07, 20:06:22 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.

Does this convey anything/clue?

 

 

[2023-11-07, 20:06:22 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-11-07, 20:06:22 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2023-11-07, 20:06:22 UTC] {taskinstance.py:1778} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
    return self._descriptor(**value)