Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Cloud composer 2 fails to check dataflow job status

Hi 

I need help with this issue.

We recently migrated from Cloud Composer 1 to Cloud Composer 2 but we have issues when Composer tries to check the DataFlow job status.
Dataflow correctly starts but often, the Composer worker seems to be unable to check job status, here are some screenshots for an example (but this issue happens with almost all our flows)
screencapture-61295dff005c4e0e862ecfdccf276783-dot-europe-west1-composer-googleusercontent-task-2025-01-07-10_11_20.pngscreencapture-console-cloud-google-dataflow-jobs-europe-west1-2025-01-06-14-15-33-17641244977953642513-graphView-0-2025-01-07-10_12_35.png
Here the dag:

 

import datetime
import pendulum
from airflow.providers.google.cloud.operators.dataflow import (
  DataflowTemplatedJobStartOperator,
)

from airflow import models

default_dag_args = {
  # Setting start date as yesterday starts the DAG immediately when it is
  # detected in the Cloud Storage bucket.
  'start_date': pendulum.yesterday("Europe/Rome"),
  # To email on failure or retry set 'email' arg to your email and enable
  # emailing here.
  'email_on_failure': False,
  'email_on_retry': False,
  'depends_on_past': False,
  # If a task fails, retry it once after waiting at least 5 minutes
  'retries': 1,
  'retry_delay': datetime.timedelta(minutes=20),
  'project_id': 'my-project-id',
  'location': 'europe-west1',
  'dataflow_default_options': {
    'zone': 'europe-west1-b',
    'tempLocation': 'gs://...omissis.../temp/'
  }
}

with models.DAG(
    'run_agent_performance_report',
    schedule_interval=datetime.timedelta(hours=12),
    default_args=default_dag_args,
    max_active_runs=1) as dag:
  run_agent_performance_report = DataflowTemplatedJobStartOperator(
    task_id='run_agent_performance_report',
    template='gs://...omissis...-template',
    job_name='agent-performance-report')

  run_agent_performance_report

 

I think that the issues can be related to the configuration of the location/zone, composer 2 runs into an Autopilot GKE cluster which spawns in 3 zones but dataflow runs only into europe-west-1-b   How can we fix this?
Thanks

4 6 502
6 REPLIES 6

What (if any) errors are produced.  Is there anything written to Cloud Logging?  I'm wondering if we need more detail beyond "the Composer worker seems to be unable to check job status".   Does the the DAG just "hang"?

Hi
I can see some errors but I don't know how they can be useful, they report that the task is in a zombie state.
I paste the JSON export for the latest run

[
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  0.96s           2025-01-07T23:24:39                      12",
    "insertId": "p92e5nfoy0z60",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:25:02.295420004Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:25:08.492673157Z"
  },
  {
    "textPayload": "Removing file:///home/airflow/gcs/dags/__pycache__/agent-performance-report.cpython-311.pyc",
    "insertId": "qz6nrhfp1fmp2",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:25:10.575072696Z",
    "severity": "INFO",
    "labels": {
      "sub_service": "gcs-syncd",
      "pod_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/gcs-syncd",
    "receiveTimestamp": "2025-01-07T23:25:16.602699456Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  0.97s           2025-01-07T23:25:10                      12",
    "insertId": "18p8y8pfopbhur",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:25:32.675782957Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "dag-processor-manager"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "\t<TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2025-01-07T11:00:00+00:00 [scheduled]>",
    "insertId": "18p8y8pfopbhvg",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:25:36.860384958Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "DAG run_agent_performance_report has 0/100 running and queued tasks",
    "insertId": "18p8y8pfopbhvi",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:25:36.860820048Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "scheduler_job_runner.py:495"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "\t<TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2025-01-07T11:00:00+00:00 [scheduled]>",
    "insertId": "18p8y8pfopbhvn",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:25:36.869033678Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "Trying to enqueue tasks: [<TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2025-01-07T11:00:00+00:00 [scheduled]>, <TaskInstance: run_enrich_rumor_aggregation.run_enrich_rumor_aggregation scheduled__2025-01-07T21:00:00+00:00 [scheduled]>] for executor: CeleryExecutor(parallelism=0)",
    "insertId": "18p8y8pfopbhvr",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:25:36.878613208Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "scheduler_job_runner.py:736"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "Sending TaskInstanceKey(dag_id='run_agent_performance_report', task_id='run_agent_performance_report', run_id='scheduled__2025-01-07T11:00:00+00:00', try_number=2, map_index=-1) to CeleryExecutor with priority 1 and queue default",
    "insertId": "18p8y8pfopbhvs",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:25:36.879006238Z",
    "severity": "INFO",
    "labels": {
      "process": "scheduler_job_runner.py:680",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "Adding to queue: ['airflow', 'tasks', 'run', 'run_agent_performance_report', 'run_agent_performance_report', 'scheduled__2025-01-07T11:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/agent-performance-report.py']",
    "insertId": "18p8y8pfopbhvt",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:25:36.879619568Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "base_executor.py:168"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "Received executor event with state queued for task instance TaskInstanceKey(dag_id='run_agent_performance_report', task_id='run_agent_performance_report', run_id='scheduled__2025-01-07T11:00:00+00:00', try_number=2, map_index=-1)",
    "insertId": "18p8y8pfopbhvw",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:25:36.964862767Z",
    "severity": "INFO",
    "labels": {
      "process": "scheduler_job_runner.py:764",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "Setting external_id for <TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2025-01-07T11:00:00+00:00 [queued]> to d0e3bfe1-6f8a-4ada-aeea-0cc50b154239",
    "insertId": "18p8y8pfopbhvy",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:25:36.983462727Z",
    "severity": "INFO",
    "labels": {
      "process": "scheduler_job_runner.py:791",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:25:38.819248279Z"
  },
  {
    "textPayload": "[d0e3bfe1-6f8a-4ada-aeea-0cc50b154239] Executing command in Celery: ['airflow', 'tasks', 'run', 'run_agent_performance_report', 'run_agent_performance_report', 'scheduled__2025-01-07T11:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/agent-performance-report.py']",
    "insertId": "h8ro1gfqugns3",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:25:37.047988493Z",
    "severity": "INFO",
    "labels": {
      "worker_id": "airflow-worker-nkbtr",
      "process": "celery_executor_utils.py:135"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:39.394171369Z"
  },
  {
    "textPayload": "Filling up the DagBag from /home/airflow/gcs/dags/agent-performance-report.py",
    "insertId": "h8ro1gfqugns5",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:25:38.247751813Z",
    "severity": "INFO",
    "labels": {
      "process": "dagbag.py:588",
      "worker_id": "airflow-worker-nkbtr"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:39.394171369Z"
  },
  {
    "textPayload": "Running <TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2025-01-07T11:00:00+00:00 [queued]> on host airflow-worker-nkbtr",
    "insertId": "1dz11lvfr13gdp",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:25:50.142213525Z",
    "severity": "INFO",
    "labels": {
      "process": "task_command.py:473",
      "worker_id": "airflow-worker-nkbtr"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:52.611081673Z"
  },
  {
    "textPayload": "Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2025-01-07T11:00:00+00:00 [queued]>",
    "insertId": "bdw22ifokeppw",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:25:51.746862087Z",
    "severity": "INFO",
    "labels": {
      "process": "taskinstance.py:2614",
      "worker_id": "airflow-worker-nkbtr",
      "task-id": "run_agent_performance_report",
      "workflow": "run_agent_performance_report",
      "map-index": "-1",
      "try-number": "2",
      "execution-date": "2025-01-07T11:00:00+00:00"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:58.629214923Z"
  },
  {
    "textPayload": "Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2025-01-07T11:00:00+00:00 [queued]>",
    "insertId": "bdw22ifokeppy",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:25:51.946384011Z",
    "severity": "INFO",
    "labels": {
      "process": "taskinstance.py:2614",
      "worker_id": "airflow-worker-nkbtr",
      "try-number": "2",
      "task-id": "run_agent_performance_report",
      "workflow": "run_agent_performance_report",
      "map-index": "-1",
      "execution-date": "2025-01-07T11:00:00+00:00"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:58.629214923Z"
  },
  {
    "textPayload": "Executing <Task(DataflowTemplatedJobStartOperator): run_agent_performance_report> on 2025-01-07 11:00:00+00:00",
    "insertId": "bdw22ifokepq0",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:25:52.340949067Z",
    "severity": "INFO",
    "labels": {
      "map-index": "-1",
      "execution-date": "2025-01-07T11:00:00+00:00",
      "workflow": "run_agent_performance_report",
      "process": "taskinstance.py:2890",
      "try-number": "2",
      "task-id": "run_agent_performance_report",
      "worker_id": "airflow-worker-nkbtr"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:58.629214923Z"
  },
  {
    "textPayload": "Running: ['airflow', 'tasks', 'run', 'run_agent_performance_report', 'run_agent_performance_report', 'scheduled__2025-01-07T11:00:00+00:00', '--job-id', '9685', '--raw', '--subdir', 'DAGS_FOLDER/agent-performance-report.py', '--cfg-path', '/tmp/tmpaozxtm89']",
    "insertId": "bdw22ifokepq3",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:25:52.738507542Z",
    "severity": "INFO",
    "labels": {
      "workflow": "run_agent_performance_report",
      "map-index": "-1",
      "execution-date": "2025-01-07T11:00:00+00:00",
      "process": "standard_task_runner.py:104",
      "worker_id": "airflow-worker-nkbtr",
      "try-number": "2",
      "task-id": "run_agent_performance_report"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:58.629214923Z"
  },
  {
    "textPayload": "Job 9685: Subtask run_agent_performance_report",
    "insertId": "bdw22ifokepq4",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:25:52.738960173Z",
    "severity": "INFO",
    "labels": {
      "process": "standard_task_runner.py:105",
      "execution-date": "2025-01-07T11:00:00+00:00",
      "map-index": "-1",
      "try-number": "2",
      "worker_id": "airflow-worker-nkbtr",
      "workflow": "run_agent_performance_report",
      "task-id": "run_agent_performance_report"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:58.629214923Z"
  },
  {
    "textPayload": "Running <TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2025-01-07T11:00:00+00:00 [running]> on host airflow-worker-nkbtr",
    "insertId": "bdw22ifokepqa",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:25:54.258972888Z",
    "severity": "INFO",
    "labels": {
      "task-id": "run_agent_performance_report",
      "process": "task_command.py:473",
      "map-index": "-1",
      "execution-date": "2025-01-07T11:00:00+00:00",
      "workflow": "run_agent_performance_report",
      "try-number": "2",
      "worker_id": "airflow-worker-nkbtr"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:58.629214923Z"
  },
  {
    "textPayload": "Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='run_agent_performance_report' AIRFLOW_CTX_TASK_ID='run_agent_performance_report' AIRFLOW_CTX_EXECUTION_DATE='2025-01-07T11:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2025-01-07T11:00:00+00:00'",
    "insertId": "bdw22ifokepqn",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:25:57.442632503Z",
    "severity": "INFO",
    "labels": {
      "execution-date": "2025-01-07T11:00:00+00:00",
      "try-number": "2",
      "workflow": "run_agent_performance_report",
      "task-id": "run_agent_performance_report",
      "worker_id": "airflow-worker-nkbtr",
      "map-index": "-1",
      "process": "taskinstance.py:3133"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:25:58.629214923Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  1.24s           2025-01-07T23:25:42                      12",
    "insertId": "1ojh31jfohix45",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:26:02.951690505Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:26:09.152737106Z"
  },
  {
    "textPayload": "Removing file:///home/airflow/gcs/dags/__pycache__/agent-performance-report.cpython-311.pyc",
    "insertId": "1q64y9mfopj32i",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:26:04.049253302Z",
    "severity": "INFO",
    "labels": {
      "pod_id": "airflow-worker-nkbtr",
      "sub_service": "gcs-syncd"
    },
    "logName": "projects/my-project/logs/gcs-syncd",
    "receiveTimestamp": "2025-01-07T23:26:10.604179156Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  0.99s           2025-01-07T23:26:13                      12",
    "insertId": "zu8902fp2dobx",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:26:32.961425825Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:26:39.075724313Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  0.92s           2025-01-07T23:26:45                      12",
    "insertId": "1jixdzgfowynbf",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:27:03.183624648Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:27:10.223307855Z"
  },
  {
    "textPayload": "[54f73db4-066b-4c5a-b79c-275a04096fd3] Executing command in Celery: ['airflow', 'tasks', 'run', 'run_agent_performance_report', 'run_agent_performance_report', 'scheduled__2024-12-31T11:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/agent-performance-report.py']",
    "insertId": "1u1na3qfo4hltv",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:27:22.550739369Z",
    "severity": "INFO",
    "labels": {
      "worker_id": "airflow-worker-nkbtr",
      "process": "celery_executor_utils.py:135"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:27:28.614670270Z"
  },
  {
    "textPayload": "Filling up the DagBag from /home/airflow/gcs/dags/agent-performance-report.py",
    "insertId": "1u1na3qfo4hltw",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:27:23.751857212Z",
    "severity": "INFO",
    "labels": {
      "worker_id": "airflow-worker-nkbtr",
      "process": "dagbag.py:588"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:27:28.614670270Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                        1           0  1.61s           2025-01-07T23:27:17                      12",
    "insertId": "1dyyj6gfiaiq1q",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:27:33.568590989Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:27:37.083397108Z"
  },
  {
    "textPayload": "Running <TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2024-12-31T11:00:00+00:00 [failed]> on host airflow-worker-nkbtr",
    "insertId": "vx1en4fqoxa6q",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:27:34.763062141Z",
    "severity": "INFO",
    "labels": {
      "worker_id": "airflow-worker-nkbtr",
      "process": "task_command.py:473"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:27:40.711887902Z"
  },
  {
    "textPayload": "Dependencies not met for <TaskInstance: run_agent_performance_report.run_agent_performance_report scheduled__2024-12-31T11:00:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state.",
    "insertId": "vx1en4fqoxa6y",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:27:35.667615127Z",
    "severity": "INFO",
    "labels": {
      "execution-date": "2024-12-31T11:00:00+00:00",
      "process": "taskinstance.py:2604",
      "worker_id": "airflow-worker-nkbtr",
      "try-number": "2",
      "workflow": "run_agent_performance_report",
      "map-index": "-1",
      "task-id": "run_agent_performance_report"
    },
    "logName": "projects/my-project/logs/airflow-worker",
    "receiveTimestamp": "2025-01-07T23:27:40.711887902Z"
  },
  {
    "textPayload": "Removing file:///home/airflow/gcs/dags/__pycache__/agent-performance-report.cpython-311.pyc",
    "insertId": "1dyyj6gfiaiq1b",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:27:35.970219369Z",
    "severity": "INFO",
    "labels": {
      "pod_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "sub_service": "gcs-syncd"
    },
    "logName": "projects/my-project/logs/gcs-syncd",
    "receiveTimestamp": "2025-01-07T23:27:37.083397108Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  0.95s           2025-01-07T23:27:48                      12",
    "insertId": "xwzvhqfnxi2yv",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:28:03.665122957Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:28:10.357309337Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  1.61s           2025-01-07T23:28:20                      12",
    "insertId": "18fjs6qfo4q1n8",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:28:33.695408663Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:28:39.612637859Z"
  },
  {
    "textPayload": "Removing file:///home/airflow/gcs/dags/__pycache__/agent-performance-report.cpython-311.pyc",
    "insertId": "u2s4keg3kiz",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:28:47.032990653Z",
    "severity": "INFO",
    "labels": {
      "pod_id": "airflow-worker-nkbtr",
      "sub_service": "gcs-syncd"
    },
    "logName": "projects/my-project/logs/gcs-syncd",
    "receiveTimestamp": "2025-01-07T23:28:48.224520368Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                        1           0  1.12s           2025-01-07T23:28:51                      12",
    "insertId": "rhz937fqih0xt",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:29:03.872854396Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:29:09.880164293Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                        1           0  2.32s           2025-01-07T23:29:24                      12",
    "insertId": "behsl5fo9gd42",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:29:34.080030400Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:29:41.184853816Z"
  },
  {
    "textPayload": "Removing file:///home/airflow/gcs/dags/__pycache__/agent-performance-report.cpython-311.pyc",
    "insertId": "1pnino6fp3igt1",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1"
      }
    },
    "timestamp": "2025-01-07T23:29:54.562774304Z",
    "severity": "INFO",
    "labels": {
      "sub_service": "gcs-syncd",
      "pod_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/gcs-syncd",
    "receiveTimestamp": "2025-01-07T23:29:58.063280771Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  0.78s           2025-01-07T23:29:56                      12",
    "insertId": "qmx69yfoce30z",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:30:04.099306289Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:30:10.122371894Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                        1           0  1.79s           2025-01-07T23:30:28                      12",
    "insertId": "1yj44smfqkt8cc",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:30:34.965549371Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "dag-processor-manager"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:30:41.224268360Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                        1           0  0.77s           2025-01-07T23:31:00                      12",
    "insertId": "18g8phnfouhfzf",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "project_id": "my-project",
        "environment_name": "doveit-airflow2-production"
      }
    },
    "timestamp": "2025-01-07T23:31:05.208953517Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "dag-processor-manager"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:31:12.385787985Z"
  },
  {
    "textPayload": "Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/agent-performance-report.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': \"{'DAG Id': 'run_agent_performance_report', 'Task Id': 'run_agent_performance_report', 'Run Id': 'scheduled__2025-01-07T11:00:00+00:00', 'Hostname': 'airflow-worker-nkbtr', 'External Executor Id': 'd0e3bfe1-6f8a-4ada-aeea-0cc50b154239'}\", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x792cb48b6190>, 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)",
    "insertId": "18g8phnfouhfzm",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:31:07.285547132Z",
    "severity": "ERROR",
    "labels": {
      "process": "scheduler_job_runner.py:2001",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:31:12.385787985Z"
  },
  {
    "textPayload": "{'DAG Id': 'run_agent_performance_report', 'Task Id': 'run_agent_performance_report', 'Run Id': 'scheduled__2025-01-07T11:00:00+00:00', 'Hostname': 'airflow-worker-nkbtr', 'External Executor Id': 'd0e3bfe1-6f8a-4ada-aeea-0cc50b154239'}",
    "insertId": "18g8phnfouhfzp",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:31:07.661046187Z",
    "severity": "ERROR",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "taskinstance.py:3314"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:31:12.385787985Z"
  },
  {
    "textPayload": "Marking run <DagRun run_agent_performance_report @ 2025-01-07 11:00:00+00:00: scheduled__2025-01-07T11:00:00+00:00, state:running, queued_at: 2025-01-07 23:00:01.030730+00:00. externally triggered: False> failed",
    "insertId": "18g8phnfouhfzr",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:31:08.178502451Z",
    "severity": "ERROR",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "dagrun.py:830"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:31:12.385787985Z"
  },
  {
    "textPayload": "DagRun Finished: dag_id=run_agent_performance_report, execution_date=2025-01-07 11:00:00+00:00, run_id=scheduled__2025-01-07T11:00:00+00:00, run_start_date=2025-01-07 23:00:01.292170+00:00, run_end_date=2025-01-07 23:31:08.179170+00:00, run_duration=1866.887, state=failed, external_trigger=False, run_type=scheduled, data_interval_start=2025-01-07 11:00:00+00:00, data_interval_end=2025-01-07 23:00:00+00:00, dag_hash=afeb24bfc8ad8b6b3c919253dde7a9a3",
    "insertId": "18g8phnfouhfzs",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:31:08.180562341Z",
    "severity": "INFO",
    "labels": {
      "process": "dagrun.py:912",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:31:12.385787985Z"
  },
  {
    "textPayload": "Setting next_dagrun for run_agent_performance_report to 2025-01-07 23:00:00+00:00, run_after=2025-01-08 11:00:00+00:00",
    "insertId": "18g8phnfouhfzt",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "location": "europe-west1",
        "environment_name": "doveit-airflow2-production",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:31:08.272739920Z",
    "severity": "INFO",
    "labels": {
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7",
      "process": "dag.py:4196"
    },
    "logName": "projects/my-project/logs/airflow-scheduler",
    "receiveTimestamp": "2025-01-07T23:31:12.385787985Z"
  },
  {
    "textPayload": "/home/airflow/gcs/dags/agent-performance-report.py                                       1           0  1.56s           2025-01-07T23:31:08                      23",
    "insertId": "160i5ffono9gd",
    "resource": {
      "type": "cloud_composer_environment",
      "labels": {
        "environment_name": "doveit-airflow2-production",
        "location": "europe-west1",
        "project_id": "my-project"
      }
    },
    "timestamp": "2025-01-07T23:31:35.406497396Z",
    "severity": "INFO",
    "labels": {
      "process": "dag-processor-manager",
      "scheduler_id": "airflow-scheduler-6fdb58d699-5p8r7"
    },
    "logName": "projects/my-project/logs/dag-processor-manager",
    "receiveTimestamp": "2025-01-07T23:31:39.543840662Z"
  }
]

thanks a lot

sorry are the logs helpful? The issue is still there, dataflow job ends correctly (in dataflow jobs report is marked as successful ) but arflow reports it as unresponsive, sometimes, it seems that airflow does not even try to check for the job state. The DAG just "hang"...

any help please? We are stuck in this issue, some of our data started to be not so reliable due to this issue

The formatting of the text of the logs isn't easy to read in the forum format.  My thinking on looking at the logs was to try and get deeper into the underlying problem.  If this issue is critical, you may want to consider raising a formal support case with Google Support.  Either that or search on the Apache Airflow sites using the relevant log records as key word searches.  I don't think this forum is a replacement for general support but rather an attempt to try and resolve specific and focused questions and also put similar minded folks together.   

We continued to investigate the problem, the logs show this kind of error message:

Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/prospect-enricher-aggregation.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'run_enrich_prospect_and_property_report', 'Task Id': 'run_enrich_prospect_appointment_aggregation', 'Run Id': 'scheduled__2025-01-22T05:00:00+00:00', 'Hostname': 'airflow-worker-prv2n', 'External Executor Id': 'ac21aa0b-2a5e-44f8-9e38-f39ce3b3723e'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x781f8d63cc90>, 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)

Looking at the worker graph there are two more idle workers than the maximum workers:
Screenshot 2025-01-22 alle 12.21.59.pngScreenshot 2025-01-22 alle 12.22.09.png

 

I don't know if this applies to you, but recently when I tried to upgrade my Composer environment from 2.9.11 to 2.10.2, all my Dataflow jobs hanged when going into deferred state (I'm using deferrable operators). Later I found out that there's some bug where the Operator can create the Dataflow job but, when returning to check the job it threw a permission error. In my case I basically had to recreate the environment back in 2.9.11, so I couldn't really fix the issue... maybe this is your case?