We have recently created later version of composer 2 environment.
Here the task is returning non zero exit code(which is expected) but the task is keep on running instead of failing.
What is the issue here?
@ms4446 can you please help here.
Hello @chethan108225 ,Welcome on Google Cloud Community.
I would love to help, however I'm not that familiar with Cloud Composer. Beside that, did you've maybe checked Composes logs ? Also, which exact version of Composer are you using? Maybe your version have some bugs ?
URL: https://cloud.google.com/composer/docs/concepts/logs
--
cheers,
DamianS
LinkedIn medium.com Cloudskillsboost
@DamianS I'm using Composer-2.8.6-airflow-2.9.1 (latest version)
In the logs as well it is mentioned that the task ended with non zero exit code, but the task keep on running instead of failure.
PFB attached image of error.
Several factors could lead to this behavior. First, the task might be configured with automatic retries. Airflow's retry mechanism allows tasks to retry a set number of times before ultimately failing. If retries are enabled, the task will keep running until it either succeeds or exhausts the retry limit. Second, the task's trigger rule might not be set to the default "all_success." If configured to a less restrictive rule, such as "all_done," the task might not recognize a non-zero exit code as a failure. Third, if custom operators are in use, their logic might not handle non-zero exit codes in the standard way, potentially overriding the default failure behavior. Lastly, timeout issues can cause a task to time out before Airflow registers the failure, especially if the task takes longer than expected or if the Airflow configuration has strict timeout settings.
To address these potential causes, you should first check the retries parameter in your task definition within the DAG. Setting retries to 0 will disable automatic retries. Next, verify the task's trigger_rule, ensuring it is set to "all_success" to maintain the default behavior where a task proceeds only if all upstream tasks have succeeded.
For custom operators, inspect their code to ensure they properly handle non-zero exit codes and raise exceptions when needed. Additionally, review your Airflow configuration file (airflow.cfg) for timeout settings such as execution_timeout and dagrun_timeout. If these timeouts are too restrictive, increase them to allow sufficient time for task completion and result processing.
Here’s an example of how to disable retries in your task:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0, # Disable retries
}
with DAG(
dag_id='my_dag',
default_args=default_args,
description='An example DAG',
schedule_interval=None,
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
failing_task = BashOperator(
task_id='failing_task',
bash_command='exit 1', # This will intentionally fail
)
Carefully review your Airflow logs for any additional clues about the task's behavior or error messages related to the non-zero exit code. If the issue persists, add more detailed logging or debugging statements to your task code to pinpoint the exact failure point. Additionally, the Airflow community, including forums and Slack channels, is an excellent resource for troubleshooting complex issues.
Hi @ms4446 ,
PFB parameters which I'm using.
The primary reasons for this behavior are likely related to task retries and how the task integrates with the DAG's context:
To resolve this, you should:
Ensure the task does not retry by explicitly setting retries to 0 in both default_args and within the task itself.
Ensure the task is fully integrated with the DAG to apply all default_args correctly, including the on_failure_callback.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'verbose': True,
'email_on_failure': True,
'bigquery_conn_id': 'bigquery_default',
'on_failure_callback': init_email,
'retry_delay': timedelta(minutes=2),
'max_active_runs': 1,
'use_legacy_sql': False,
'retries': 0 # Ensure retries are disabled
}
dag = DAG(
dag_id='my_dag_uat_api-v2',
start_date=datetime(2023, 10, 4, tzinfo=caltz),
schedule_interval='*/20 * * * *',
catchup=False,
default_args=default_args,
concurrency=1,
max_active_runs=1
)
perform_api_call = BashOperator(
task_id='perform_api_call',
bash_command="gcloud run jobs execute us-in-cr-uat-apis-jobname --region=us-east1 --wait",
trigger_rule="all_success",
retries=0, # Explicitly disable retries here as well
dag=dag
)
By explicitly setting retries to 0 in both default_args and the task definition, you ensure no retries occur. This change, along with maintaining the dag=dag context, ensures that all default arguments, including email alerts, are correctly applied.
@ms4446 , even I tried with above steps as well.
This time dag is getting failed as expected when I set only 'email_on_failure'= False in default arguments.
Otherwise the dag is keep on running as addressed previously.
When I set 'email_on_failure'= False, below is the log
And when I set 'email_on_failure'= True, below is the log
The custom failure call back I'm using is as below.
def init_email(context, **kwargs):
project_id = os.environ["GCP_PROJECT"]
gcs_file_system = gcsfs.GCSFileSystem(project=project_id)
gcs_json_path = config_path
config_str = gcs_file_system.open(gcs_json_path)
json_dict = json.load(config_str)
dag_bucket_name = json_dict['dag_bucket_name']
dag_id = context['dag_run'].dag_id
task_id = context['task'].task_id
smtp_email_from = json_dict['smtp_email_from']
smtp_email_to = json_dict['smtp_email_to']
subject = "Dag Failure - {project_id:" + project_id + ",dag_id:" + dag_id +"}"
error_type = str(context.get("exception"))
path = f"gs://{dag_bucket_name}/logs/dag_id={dag_id}/*/task_id={task_id}"
process = subprocess.Popen(["gsutil","ls","-l",path], stdout=subprocess.PIPE)
p1 = subprocess.Popen(["grep", "-v", "^TOTAL"], stdin=process.stdout, stdout=subprocess.PIPE)
p2 = subprocess.Popen(["sort", "-k", "2"], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(["tail", "-n", "1"], stdin=p2.stdout, stdout=subprocess.PIPE)
output, error = p3.communicate()
file_full_path = output.split()[2]
p4 = subprocess.Popen(["gsutil","cat",file_full_path ], stdout=subprocess.PIPE)
output, error = p4.communicate()
output = output.decode('utf-8')
error_log = output.replace('\n', '<br />')
#for recipient in smtp_email_to:
send_email_basic(json_dict, project_id, smtp_email_from, smtp_email_to, subject, dag_id, task_id, error_type, error_log)
The same code is working in Airflow 2.5.3 version as expected but unable to understand what is wrong in Airflow 2.9.1 version.
Request you to please share your insight here.
Thanks
@ms4446 is there any update on this issue?
You can try the following to to ensure retries are explicitly disabled and simplify the email handling to isolate the problem:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# Define the custom email callback
def init_email(context, **kwargs):
project_id = os.environ["GCP_PROJECT"]
gcs_file_system = gcsfs.GCSFileSystem(project=project_id)
gcs_json_path = config_path
with gcs_file_system.open(gcs_json_path) as f:
json_dict = json.load(f)
dag_bucket_name = json_dict['dag_bucket_name']
dag_id = context['dag_run'].dag_id
task_id = context['task'].task_id
smtp_email_from = json_dict['smtp_email_from']
smtp_email_to = json_dict['smtp_email_to']
subject = f"Dag Failure - project_id: {project_id}, dag_id: {dag_id}"
error_type = str(context.get("exception"))
path = f"gs://{dag_bucket_name}/logs/dag_id={dag_id}/*/task_id={task_id}"
process = subprocess.Popen(["gsutil", "ls", "-l", path], stdout=subprocess.PIPE)
p1 = subprocess.Popen(["grep", "-v", "^TOTAL"], stdin=process.stdout, stdout=subprocess.PIPE)
p2 = subprocess.Popen(["sort", "-k", "2"], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(["tail", "-n", "1"], stdin=p2.stdout, stdout=subprocess.PIPE)
output, error = p3.communicate()
file_full_path = output.split()[2]
p4 = subprocess.Popen(["gsutil", "cat", file_full_path], stdout=subprocess.PIPE)
output, error = p4.communicate()
output = output.decode('utf-8')
error_log = output.replace('\n', '<br />')
send_email_basic(json_dict, project_id, smtp_email_from, smtp_email_to, subject, dag_id, task_id, error_type, error_log)
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'verbose': True,
'email_on_failure': True,
'bigquery_conn_id': 'bigquery_default',
'on_failure_callback': init_email,
'retry_delay': timedelta(minutes=2),
'max_active_runs': 1,
'use_legacy_sql': False,
'retries': 0 # Ensure retries are disabled
}
dag = DAG(
dag_id='my_dag_uat_api-v2',
start_date=datetime(2023, 10, 4, tzinfo=caltz),
schedule_interval='*/20 * * * *',
catchup=False,
default_args=default_args,
concurrency=1,
max_active_runs=1
)
perform_api_call = BashOperator(
task_id='perform_api_call',
bash_command="gcloud run jobs execute us-in-cr-uat-apis-jobname --region=us-east1 --wait",
trigger_rule="all_success",
retries=0, # Explicitly disable retries here as well
dag=dag
)
Add more detailed logging inside the init_email function to ensure all steps are executing as expected. Simplify the function temporarily to isolate issues:
def init_email(context, **kwargs):
try:
project_id = os.environ["GCP_PROJECT"]
dag_id = context['dag_run'].dag_id
task_id = context['task'].task_id
error_type = str(context.get("exception"))
# Simplify the email body
subject = f"Dag Failure - project_id: {project_id}, dag_id: {dag_id}"
error_log = f"Error: {error_type}"
# Log the email content to Airflow logs
context['task_instance'].log.info(f"Sending email: {subject}\n{error_log}")
# Send the email (simplified)
send_email_basic(
json_dict=None, # Simplify or mock this for testing
project_id=project_id,
smtp_email_from="example@example.com", # Use hardcoded values for testing
smtp_email_to=["example@example.com"],
subject=subject,
dag_id=dag_id,
task_id=task_id,
error_type=error_type,
error_log=error_log
)
except Exception as e:
context['task_instance'].log.error(f"Failed to send email: {str(e)}")
Ensure your Airflow configuration (airflow.cfg) does not have any unexpected settings affecting task retries or email behavior. If the issue persists, I recommend contacting Support who should provide further insights.