Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Issue with latest version of composer 2 environment

We have recently created later version of composer 2 environment.

Here the task is returning non zero exit code(which is expected) but the task is keep on running instead of failing. 

What is the issue here?

0 11 854
11 REPLIES 11

@ms4446 can you please help here. 

@DamianS @knet can you please help here. 

Hello @chethan108225  ,Welcome on Google Cloud Community.

I would love to help, however I'm not that familiar with Cloud Composer. Beside that, did you've maybe checked Composes logs ? Also, which exact version of Composer are you using? Maybe your version have some bugs ? 

DamianS_0-1721883422000.png


URL: https://cloud.google.com/composer/docs/concepts/logs

--
cheers,
DamianS
LinkedIn medium.com Cloudskillsboost

@DamianS  I'm using Composer-2.8.6-airflow-2.9.1 (latest version)

In the logs as well it is mentioned that the task ended with non zero exit code, but the task keep on running instead of failure.

PFB attached image of error.

IMG_20240725_105940.jpg

Several factors could lead to this behavior. First, the task might be configured with automatic retries. Airflow's retry mechanism allows tasks to retry a set number of times before ultimately failing. If retries are enabled, the task will keep running until it either succeeds or exhausts the retry limit. Second, the task's trigger rule might not be set to the default "all_success." If configured to a less restrictive rule, such as "all_done," the task might not recognize a non-zero exit code as a failure. Third, if custom operators are in use, their logic might not handle non-zero exit codes in the standard way, potentially overriding the default failure behavior. Lastly, timeout issues can cause a task to time out before Airflow registers the failure, especially if the task takes longer than expected or if the Airflow configuration has strict timeout settings.

To address these potential causes, you should first check the retries parameter in your task definition within the DAG. Setting retries to 0 will disable automatic retries. Next, verify the task's trigger_rule, ensuring it is set to "all_success" to maintain the default behavior where a task proceeds only if all upstream tasks have succeeded.

For custom operators, inspect their code to ensure they properly handle non-zero exit codes and raise exceptions when needed. Additionally, review your Airflow configuration file (airflow.cfg) for timeout settings such as execution_timeout and dagrun_timeout. If these timeouts are too restrictive, increase them to allow sufficient time for task completion and result processing.

Here’s an example of how to disable retries in your task:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,  # Disable retries
}

with DAG(
    dag_id='my_dag',
    default_args=default_args,
    description='An example DAG',
    schedule_interval=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    failing_task = BashOperator(
        task_id='failing_task',
        bash_command='exit 1',  # This will intentionally fail
    )

Carefully review your Airflow logs for any additional clues about the task's behavior or error messages related to the non-zero exit code. If the issue persists, add more detailed logging or debugging statements to your task code to pinpoint the exact failure point. Additionally, the Airflow community, including forums and Slack channels, is an excellent resource for troubleshooting complex issues.

 

Hi @ms4446 ,

PFB parameters which I'm using.

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'verbose': True,
'email_on_failure': True,
    'bigquery_conn_id': 'bigquery_default',
    'on_failure_callback': init_email,
    'retry_delay': timedelta(minutes=2),
    'max_active_runs': 1,
    'use_legacy_sql': False
}
 
dag = DAG(
       dag_id='my_dag_uat_api-v2',
       start_date=datetime(2023,10,4,tzinfo=caltz),
       schedule_interval='*/20 * * * *',
       catchup= False,
       default_args=default_args,
   concurrency=1,
       max_active_runs=1
       )
The task which I'm trying to try is as below.
perform_api_call = BashOperator(
task_id='perform_api_call',
bash_command="gcloud run jobs execute us-in-cr-uat-apis-jobname --region=us-east1 --wait",
#retries=0,
trigger_rule="all_success",
dag=dag
)
I'm using the parameters as mentioned above and also here if I comment out dag=dag in 'perform_api_call' task then the dag is failing as expected since the cloud run job does not exists but it is not sending any mail alert we defined in the dag with the function name 'init email' as no default arguments passed specified in the dag. But if I  uncomment dag=dag then the task keep on trying without failing.
 
Can I know what is happening here?

The primary reasons for this behavior are likely related to task retries and how the task integrates with the DAG's context:

  • The default_args do not explicitly set retries, meaning it defaults to Airflow's default (usually 3 retries). Including dag=dag applies these defaults, causing the task to keep retrying. When dag=dag is commented out, the task does not fully integrate with the DAG, bypassing the retry mechanism and leading to immediate failure.
  • The init_email callback is not triggered when dag=dag is commented out, indicating that the task is not fully applying the default_args without the DAG context.

To resolve this, you should:

  1. Ensure the task does not retry by explicitly setting retries to 0 in both default_args and within the task itself.

  2. Ensure the task is fully integrated with the DAG to apply all default_args correctly, including the on_failure_callback.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'verbose': True,
    'email_on_failure': True,
    'bigquery_conn_id': 'bigquery_default',
    'on_failure_callback': init_email,
    'retry_delay': timedelta(minutes=2),
    'max_active_runs': 1,
    'use_legacy_sql': False,
    'retries': 0  # Ensure retries are disabled
}

dag = DAG(
    dag_id='my_dag_uat_api-v2',
    start_date=datetime(2023, 10, 4, tzinfo=caltz),
    schedule_interval='*/20 * * * *',
    catchup=False,
    default_args=default_args,
    concurrency=1,
    max_active_runs=1
)

perform_api_call = BashOperator(
    task_id='perform_api_call',
    bash_command="gcloud run jobs execute us-in-cr-uat-apis-jobname --region=us-east1 --wait",
    trigger_rule="all_success",
    retries=0,  # Explicitly disable retries here as well
    dag=dag
)

By explicitly setting retries to 0 in both default_args and the task definition, you ensure no retries occur. This change, along with maintaining the dag=dag context, ensures that all default arguments, including email alerts, are correctly applied.

@ms4446 , even I tried with above steps as well.

This time dag is getting failed as expected when I set only 'email_on_failure'= False in default arguments.

Otherwise the dag is keep on running as addressed previously.

When I set 'email_on_failure'= False, below is the log

Screenshot (521).png

And when I set  'email_on_failure'= True, below is the log

Screenshot (522).png

The custom failure call back I'm using is as below.

def init_email(context, **kwargs):
project_id = os.environ["GCP_PROJECT"]
gcs_file_system = gcsfs.GCSFileSystem(project=project_id)
gcs_json_path = config_path
config_str = gcs_file_system.open(gcs_json_path)
json_dict = json.load(config_str)
dag_bucket_name = json_dict['dag_bucket_name']
dag_id = context['dag_run'].dag_id
task_id = context['task'].task_id
smtp_email_from = json_dict['smtp_email_from']
smtp_email_to = json_dict['smtp_email_to']
subject = "Dag Failure - {project_id:" + project_id + ",dag_id:" + dag_id +"}"
error_type = str(context.get("exception"))
path = f"gs://{dag_bucket_name}/logs/dag_id={dag_id}/*/task_id={task_id}"
process = subprocess.Popen(["gsutil","ls","-l",path], stdout=subprocess.PIPE)
p1 = subprocess.Popen(["grep", "-v", "^TOTAL"], stdin=process.stdout, stdout=subprocess.PIPE)
p2 = subprocess.Popen(["sort", "-k", "2"], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(["tail", "-n", "1"], stdin=p2.stdout, stdout=subprocess.PIPE)
output, error = p3.communicate()
file_full_path = output.split()[2]
p4 = subprocess.Popen(["gsutil","cat",file_full_path ], stdout=subprocess.PIPE)
output, error = p4.communicate()
output = output.decode('utf-8')
error_log = output.replace('\n', '<br />')
#for recipient in smtp_email_to:
send_email_basic(json_dict, project_id, smtp_email_from, smtp_email_to, subject, dag_id, task_id, error_type, error_log)

The same code is working in Airflow 2.5.3 version as expected  but unable to understand what is wrong in Airflow 2.9.1 version.

Request you to please share your insight here.

Thanks

@ms4446  is there any update on this issue? 

You can try the following to to ensure retries are explicitly disabled and simplify the email handling to isolate the problem:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# Define the custom email callback
def init_email(context, **kwargs):
    project_id = os.environ["GCP_PROJECT"]
    gcs_file_system = gcsfs.GCSFileSystem(project=project_id)
    gcs_json_path = config_path
    with gcs_file_system.open(gcs_json_path) as f:
        json_dict = json.load(f)
    dag_bucket_name = json_dict['dag_bucket_name']
    dag_id = context['dag_run'].dag_id
    task_id = context['task'].task_id
    smtp_email_from = json_dict['smtp_email_from']
    smtp_email_to = json_dict['smtp_email_to']
    subject = f"Dag Failure - project_id: {project_id}, dag_id: {dag_id}"
    error_type = str(context.get("exception"))
    path = f"gs://{dag_bucket_name}/logs/dag_id={dag_id}/*/task_id={task_id}"
    process = subprocess.Popen(["gsutil", "ls", "-l", path], stdout=subprocess.PIPE)
    p1 = subprocess.Popen(["grep", "-v", "^TOTAL"], stdin=process.stdout, stdout=subprocess.PIPE)
    p2 = subprocess.Popen(["sort", "-k", "2"], stdin=p1.stdout, stdout=subprocess.PIPE)
    p3 = subprocess.Popen(["tail", "-n", "1"], stdin=p2.stdout, stdout=subprocess.PIPE)
    output, error = p3.communicate()
    file_full_path = output.split()[2]
    p4 = subprocess.Popen(["gsutil", "cat", file_full_path], stdout=subprocess.PIPE)
    output, error = p4.communicate()
    output = output.decode('utf-8')
    error_log = output.replace('\n', '<br />')
    send_email_basic(json_dict, project_id, smtp_email_from, smtp_email_to, subject, dag_id, task_id, error_type, error_log)

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'verbose': True,
    'email_on_failure': True,
    'bigquery_conn_id': 'bigquery_default',
    'on_failure_callback': init_email,
    'retry_delay': timedelta(minutes=2),
    'max_active_runs': 1,
    'use_legacy_sql': False,
    'retries': 0  # Ensure retries are disabled
}

dag = DAG(
    dag_id='my_dag_uat_api-v2',
    start_date=datetime(2023, 10, 4, tzinfo=caltz),
    schedule_interval='*/20 * * * *',
    catchup=False,
    default_args=default_args,
    concurrency=1,
    max_active_runs=1
)

perform_api_call = BashOperator(
    task_id='perform_api_call',
    bash_command="gcloud run jobs execute us-in-cr-uat-apis-jobname --region=us-east1 --wait",
    trigger_rule="all_success",
    retries=0,  # Explicitly disable retries here as well
    dag=dag
)

Add more detailed logging inside the init_email function to ensure all steps are executing as expected. Simplify the function temporarily to isolate issues:

def init_email(context, **kwargs):
    try:
        project_id = os.environ["GCP_PROJECT"]
        dag_id = context['dag_run'].dag_id
        task_id = context['task'].task_id
        error_type = str(context.get("exception"))

        # Simplify the email body
        subject = f"Dag Failure - project_id: {project_id}, dag_id: {dag_id}"
        error_log = f"Error: {error_type}"

        # Log the email content to Airflow logs
        context['task_instance'].log.info(f"Sending email: {subject}\n{error_log}")

        # Send the email (simplified)
        send_email_basic(
            json_dict=None,  # Simplify or mock this for testing
            project_id=project_id,
            smtp_email_from="example@example.com",  # Use hardcoded values for testing
            smtp_email_to=["example@example.com"],
            subject=subject,
            dag_id=dag_id,
            task_id=task_id,
            error_type=error_type,
            error_log=error_log
        )
    except Exception as e:
        context['task_instance'].log.error(f"Failed to send email: {str(e)}")

Ensure your Airflow configuration (airflow.cfg) does not have any unexpected settings affecting task retries or email behavior. If the issue persists, I recommend contacting Support who should provide further insights.