I'm encountering an issue with my custom Apache Airflow plugin which sends notifications to Google Cloud PubSub upon various task events. After the PubSub notification is sent successfully, Airflow seems to encounter an issue marking the task as successful and progressing to the subsequent tasks.
I am on Google Cloud Composer on GCP.
This is the most recent Composer version, but this happens also with all the other past ones.
The task appears to complete its primary function and sends a notification to PubSub, but then Airflow's internal mechanisms (possibly the scheduler) are unable to mark the task as successful and proceed.
[2023-08-28, 12:46:25 UTC] {ocean_events.py:74} INFO - Attempting to publish event: entity='task' event_type='task_instance_running' dag_id='quality_funnel' dag_status='running' dag_start_date=datetime.datetime(2023, 8, 28, 12, 46, 8, 505237, tzinfo=Timezone('UTC')) dag_end_date=None dag_tags=['pipeline_v3', 'quality_funnel', 'airbyte', 'full_refresh', 'dbt'] task_id='dbt_contract_0lan' task_status='running' task_start_date=datetime.datetime(2023, 8, 28, 12, 46, 24, 729776, tzinfo=Timezone('UTC')) task_end_date=None [2023-08-28, 12:46:25 UTC] {ocean_events.py:76} INFO - Waiting for the publish future... [2023-08-28, 12:46:25 UTC] {ocean_events.py:78} INFO - Published event: entity='task' event_type='task_instance_running' dag_id='quality_funnel' dag_status='running' dag_start_date=datetime.datetime(2023, 8, 28, 12, 46, 8, 505237, tzinfo=Timezone('UTC')) dag_end_date=None dag_tags=['pipeline_v3', 'quality_funnel', 'airbyte', 'full_refresh', 'dbt'] task_id='dbt_contract_0lan' task_status='running' task_start_date=datetime.datetime(2023, 8, 28, 12, 46, 24, 729776, tzinfo=Timezone('UTC')) task_end_date=None. Response: 8265762839788956 ... ... ... ... [2023-08-28, 12:49:29 UTC] {operators.py:46} INFO - >>> END OF CLOUD BUILD OUTPUT LOGS <<< [2023-08-28, 12:49:29 UTC] {operators.py:199} INFO - save_stats is False, skipping getting test results [2023-08-28, 12:49:30 UTC] {taskinstance.py:1328} INFO - Marking task as SUCCESS. dag_id=quality_funnel, task_id=dbt_contract_0lan, execution_date=20230828T100843, start_date=20230828T124624, end_date=20230828T124930 [2023-08-28, 12:49:30 UTC] {ocean_events.py:74} INFO - Attempting to publish event: entity='task' event_type='task_instance_succcess' dag_id='quality_funnel' dag_status='running' dag_start_date=datetime.datetime(2023, 8, 28, 12, 46, 8, 505237, tzinfo=Timezone('UTC')) dag_end_date=None dag_tags=['pipeline_v3', 'quality_funnel', 'airbyte', 'full_refresh', 'dbt'] task_id='dbt_contract_0lan' task_status='success' task_start_date=datetime.datetime(2023, 8, 28, 12, 46, 24, 729776, tzinfo=Timezone('UTC')) task_end_date=datetime.datetime(2023, 8, 28, 12, 49, 30, 103604, tzinfo=Timezone('UTC')) [2023-08-28, 12:49:30 UTC] {ocean_events.py:76} INFO - Waiting for the publish future... [2023-08-28, 12:50:15 UTC] {ocean_events.py:78} INFO - Published event: entity='task' event_type='task_instance_succcess' dag_id='quality_funnel' dag_status='running' dag_start_date=datetime.datetime(2023, 8, 28, 12, 46, 8, 505237, tzinfo=Timezone('UTC')) dag_end_date=None dag_tags=['pipeline_v3', 'quality_funnel', 'airbyte', 'full_refresh', 'dbt'] task_id='dbt_contract_0lan' task_status='success' task_start_date=datetime.datetime(2023, 8, 28, 12, 46, 24, 729776, tzinfo=Timezone('UTC')) task_end_date=datetime.datetime(2023, 8, 28, 12, 49, 30, 103604, tzinfo=Timezone('UTC')). Response: 8265668564688067 [2023-08-28, 12:50:15 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 693 for task dbt_contract_0lan ((psycopg2.errors.ProtocolViolation) invalid frontend message type 23 server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. (Background on this error at: https://sqlalche.me/e/14/e3q8); 458)
At the beginning and at the botto, we can see from the logging statement that the plugin ocean_events.py is able to publish. However, the error happens when the task is complete and I think the scheduler tries to mark the task as Success.
import logging import sys import typing as t from datetime import datetime import google.api_core.exceptions as google_exceptions from airflow.listeners import hookimpl from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.plugins_manager import AirflowPlugin from airflow.utils.state import TaskInstanceState from google.cloud import pubsub_v1 from pydantic import BaseModel from sqlalchemy.orm.session import Session from airflow import models # Configuration ENV = models.Variable.get("ENV") PROJECT = models.Variable.get("PROJECT_ID") TOPIC_NAME = models.Variable.get("EVENTS_TOPIC", default_var="composer-all-events") TOPIC = f"projects/{PROJECT}/topics/{TOPIC_NAME}-{ENV}" # Initialize publisher publisher = pubsub_v1.PublisherClient() # Logger setup logger = logging.getLogger(__name__) def create_topic(topic_name): try: publisher.create_topic(name=topic_name) logger.info(f"Topic {topic_name} created") except google_exceptions.AlreadyExists: logger.info(f"Topic {topic_name} already exists") create_topic(TOPIC) # Event Models class BaseEvent(BaseModel): entity: str event_type: str dag_id: str dag_status: str dag_start_date: datetime dag_end_date: t.Optional[datetime] dag_tags: t.List[str] = [] def as_json_encoded(self) -> bytes: return self.json().encode("utf-8") class DagEvent(BaseEvent): entity: str = "dag" class TaskEvent(BaseEvent): entity: str = "task" task_id: str task_status: str task_start_date: datetime task_end_date: t.Optional[datetime] # Publish Function with additional logging def publish_event(event: BaseEvent) -> None: """ Publishes the given event to the configured Pub/Sub topic. """ try: logger.info(f"Attempting to publish event: {event}") future = publisher.publish(TOPIC, event.as_json_encoded()) logger.info("Waiting for the publish future...") response = future.result() logger.info(f"Published event: {event}. Response: {response}") except Exception as e: logger.error(f"Failed to publish event: {e}") # Event Builders def build_dag_event(dag_run: DagRun, event_type: str, entity: str = "dag") -> DagEvent: """ Builds a DagEvent from a DagRun. """ return DagEvent( entity=entity, event_type=event_type, dag_id=dag_run.dag_id, dag_status=dag_run.state, dag_start_date=dag_run.start_date, dag_end_date=dag_run.end_date, dag_tags=dag_run.dag.tags, ) def handle_dag_event(dag_run: DagRun, event_type: str): """ This method is called when dag run state changes. """ publish_event(build_dag_event(dag_run, event_type)) def handle_task_event(task_instance: TaskInstance, event_type: str): """ This method is called when dag run state changes. """ dag_run = task_instance.dag_run if dag_run.dag is None: dag_run.dag = task_instance.task.dag dag_event = build_dag_event(dag_run, event_type, entity="task") task_event = TaskEvent( task_id=task_instance.task_id, task_status=task_instance.state, task_start_date=task_instance.start_date, task_end_date=task_instance.end_date, **dag_event.dict(), ) publish_event(task_event) # Hook Implementations @hookimpl def on_task_instance_running( previous_state: TaskInstanceState, task_instance: TaskInstance, session: t.Optional[Session] ): """ This method is called when task state changes to RUNNING. Through callback, parameters like previous_task_state, task_instance object can be accessed. This will give more information about current task_instance that is running its dag_run, task and dag information. """ handle_task_event(task_instance, "task_instance_running") @hookimpl def on_task_instance_success( previous_state: TaskInstanceState, task_instance: TaskInstance, session: t.Optional[Session] ): """ This method is called when task state changes to SUCCESS. Through callback, parameters like previous_task_state, task_instance object can be accessed. This will give more information about current task_instance that has succeeded its dag_run, task and dag information. """ handle_task_event(task_instance, "task_instance_succcess") @hookimpl def on_task_instance_failed( previous_state: TaskInstanceState, task_instance: TaskInstance, session: t.Optional[Session] ): """ This method is called when task state changes to FAILED. Through callback, parameters like previous_task_state, task_instance object can be accessed. This will give more information about current task_instance that has failed its dag_run, task and dag information. """ handle_task_event(task_instance, "task_instance_failed") @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str): """ This method is called when dag run state changes to SUCCESS. """ handle_dag_event(dag_run, "dag_run_success") @hookimpl def on_dag_run_failed(dag_run: DagRun, msg: str): """ This method is called when dag run state changes to FAILED. """ handle_dag_event(dag_run, "dag_run_failed") @hookimpl def on_dag_run_running(dag_run: DagRun, msg: str): """ This method is called when dag run state changes to RUNNING. """ handle_dag_event(dag_run, "dag_run_running") # Plugin Registration class OceanEventsPlugin(AirflowPlugin): """This is the plugin class required to register with Airflow the Ocean Events plugin.""" name = "OceanEventsPlugin" listeners = [sys.modules[__name__]]
My suspicion is that this might be related to an interference between the libraries used by Airflow's PostgreSQL backend and those used by the PubSub client library. The error message suggests a potential Protocol Violation with psycopg2. It could be something related to an open thread during the pub/sub phase that interfere with the task registration in the PostgreSQL of Airflow internals.
Hi @dadadima,
Welcome and thank you for reaching out to our community for help.
I understand that you are having challenges with Airflow waiting for a task to complete prior proceeding to the next action item. Although Apache Airflow is not particularly a Google owned product and I may have limited knowledge about it, I will try to assist you in looking for answers.
A couple of reasons might be related to Airflow's configuration with regards to callbacks or scheduler:
Your thoughts about client libraries can also be considered as probable cause because both psycopg2 and Pub/Sub client libraries potentially utilize libpq. Consider using pg8000 as alternative for connecting to PostgreSQL.
Here are some other resources that can be of help:
I am also facing same issue, while using a custom plugin to send notification to a MS Teams Channel for on_success_callback, on_failure_callback and on_retry_callback.
The DAG used to work prior to upgrading to composer-2.4.6-airflow-2.6.3.
[2024-01-02, 11:47:25 IST] {base.py:73} INFO - Using connection ID 'ms_teams' for task execution. [2024-01-02, 11:47:26 IST] {standard_task_runner.py:104} ERROR - Failed to execute job 382012 for task t3_load_data ((psycopg2.OperationalError) server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. (Background on this error at: https://sqlalche.me/e/14/e3q8); 127943)
@dadadima : Were you able to get a solution to this issue?