Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Airflow task fails after sending notification to PubSub via Airflow Plugin

General description

I'm encountering an issue with my custom Apache Airflow plugin which sends notifications to Google Cloud PubSub upon various task events. After the PubSub notification is sent successfully, Airflow seems to encounter an issue marking the task as successful and progressing to the subsequent tasks.

I am on Google Cloud Composer on GCP.

Environment details

  • GCP Composer Environment Name: ocean-composer-plugins-test
  • Region: europe-west3
  • Composer Version: 2.4.1
  • Airflow Version: 2.5.3

This is the most recent Composer version, but this happens also with all the other past ones.

Issue

The task appears to complete its primary function and sends a notification to PubSub, but then Airflow's internal mechanisms (possibly the scheduler) are unable to mark the task as successful and proceed.

Logs

[2023-08-28, 12:46:25 UTC] {ocean_events.py:74} INFO - Attempting to publish event: entity='task' event_type='task_instance_running' dag_id='quality_funnel' dag_status='running' dag_start_date=datetime.datetime(2023, 8, 28, 12, 46, 8, 505237, tzinfo=Timezone('UTC')) dag_end_date=None dag_tags=['pipeline_v3', 'quality_funnel', 'airbyte', 'full_refresh', 'dbt'] task_id='dbt_contract_0lan' task_status='running' task_start_date=datetime.datetime(2023, 8, 28, 12, 46, 24, 729776, tzinfo=Timezone('UTC')) task_end_date=None
[2023-08-28, 12:46:25 UTC] {ocean_events.py:76} INFO - Waiting for the publish future...
[2023-08-28, 12:46:25 UTC] {ocean_events.py:78} INFO - Published event: entity='task' event_type='task_instance_running' dag_id='quality_funnel' dag_status='running' dag_start_date=datetime.datetime(2023, 8, 28, 12, 46, 8, 505237, tzinfo=Timezone('UTC')) dag_end_date=None dag_tags=['pipeline_v3', 'quality_funnel', 'airbyte', 'full_refresh', 'dbt'] task_id='dbt_contract_0lan' task_status='running' task_start_date=datetime.datetime(2023, 8, 28, 12, 46, 24, 729776, tzinfo=Timezone('UTC')) task_end_date=None. Response: 8265762839788956
...
...
...
...
[2023-08-28, 12:49:29 UTC] {operators.py:46} INFO - >>> END OF CLOUD BUILD OUTPUT LOGS <<<
[2023-08-28, 12:49:29 UTC] {operators.py:199} INFO - save_stats is False, skipping getting test results
[2023-08-28, 12:49:30 UTC] {taskinstance.py:1328} INFO - Marking task as SUCCESS. dag_id=quality_funnel, task_id=dbt_contract_0lan, execution_date=20230828T100843, start_date=20230828T124624, end_date=20230828T124930
[2023-08-28, 12:49:30 UTC] {ocean_events.py:74} INFO - Attempting to publish event: entity='task' event_type='task_instance_succcess' dag_id='quality_funnel' dag_status='running' dag_start_date=datetime.datetime(2023, 8, 28, 12, 46, 8, 505237, tzinfo=Timezone('UTC')) dag_end_date=None dag_tags=['pipeline_v3', 'quality_funnel', 'airbyte', 'full_refresh', 'dbt'] task_id='dbt_contract_0lan' task_status='success' task_start_date=datetime.datetime(2023, 8, 28, 12, 46, 24, 729776, tzinfo=Timezone('UTC')) task_end_date=datetime.datetime(2023, 8, 28, 12, 49, 30, 103604, tzinfo=Timezone('UTC'))
[2023-08-28, 12:49:30 UTC] {ocean_events.py:76} INFO - Waiting for the publish future...
[2023-08-28, 12:50:15 UTC] {ocean_events.py:78} INFO - Published event: entity='task' event_type='task_instance_succcess' dag_id='quality_funnel' dag_status='running' dag_start_date=datetime.datetime(2023, 8, 28, 12, 46, 8, 505237, tzinfo=Timezone('UTC')) dag_end_date=None dag_tags=['pipeline_v3', 'quality_funnel', 'airbyte', 'full_refresh', 'dbt'] task_id='dbt_contract_0lan' task_status='success' task_start_date=datetime.datetime(2023, 8, 28, 12, 46, 24, 729776, tzinfo=Timezone('UTC')) task_end_date=datetime.datetime(2023, 8, 28, 12, 49, 30, 103604, tzinfo=Timezone('UTC')). Response: 8265668564688067
[2023-08-28, 12:50:15 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 693 for task dbt_contract_0lan ((psycopg2.errors.ProtocolViolation) invalid frontend message type 23
server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.

(Background on this error at: https://sqlalche.me/e/14/e3q8); 458)

At the beginning and at the botto, we can see from the logging statement that the plugin ocean_events.py is able to publish. However, the error happens when the task is complete and I think the scheduler tries to mark the task as Success.

Plugin Code

import logging
import sys
import typing as t
from datetime import datetime

import google.api_core.exceptions as google_exceptions
from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import TaskInstanceState
from google.cloud import pubsub_v1
from pydantic import BaseModel
from sqlalchemy.orm.session import Session

from airflow import models

# Configuration
ENV = models.Variable.get("ENV")
PROJECT = models.Variable.get("PROJECT_ID")
TOPIC_NAME = models.Variable.get("EVENTS_TOPIC", default_var="composer-all-events")
TOPIC = f"projects/{PROJECT}/topics/{TOPIC_NAME}-{ENV}"

# Initialize publisher
publisher = pubsub_v1.PublisherClient()

# Logger setup
logger = logging.getLogger(__name__)


def create_topic(topic_name):
    try:
        publisher.create_topic(name=topic_name)
        logger.info(f"Topic {topic_name} created")
    except google_exceptions.AlreadyExists:
        logger.info(f"Topic {topic_name} already exists")


create_topic(TOPIC)


# Event Models
class BaseEvent(BaseModel):
    entity: str
    event_type: str
    dag_id: str
    dag_status: str
    dag_start_date: datetime
    dag_end_date: t.Optional[datetime]
    dag_tags: t.List[str] = []

    def as_json_encoded(self) -> bytes:
        return self.json().encode("utf-8")


class DagEvent(BaseEvent):
    entity: str = "dag"


class TaskEvent(BaseEvent):
    entity: str = "task"
    task_id: str
    task_status: str
    task_start_date: datetime
    task_end_date: t.Optional[datetime]


# Publish Function with additional logging
def publish_event(event: BaseEvent) -> None:
    """
    Publishes the given event to the configured Pub/Sub topic.
    """
    try:
        logger.info(f"Attempting to publish event: {event}")
        future = publisher.publish(TOPIC, event.as_json_encoded())
        logger.info("Waiting for the publish future...")
        response = future.result()
        logger.info(f"Published event: {event}. Response: {response}")
    except Exception as e:
        logger.error(f"Failed to publish event: {e}")


# Event Builders
def build_dag_event(dag_run: DagRun, event_type: str, entity: str = "dag") -> DagEvent:
    """
    Builds a DagEvent from a DagRun.
    """

    return DagEvent(
        entity=entity,
        event_type=event_type,
        dag_id=dag_run.dag_id,
        dag_status=dag_run.state,
        dag_start_date=dag_run.start_date,
        dag_end_date=dag_run.end_date,
        dag_tags=dag_run.dag.tags,
    )


def handle_dag_event(dag_run: DagRun, event_type: str):
    """
    This method is called when dag run state changes.
    """
    publish_event(build_dag_event(dag_run, event_type))


def handle_task_event(task_instance: TaskInstance, event_type: str):
    """
    This method is called when dag run state changes.
    """
    dag_run = task_instance.dag_run

    if dag_run.dag is None:
        dag_run.dag = task_instance.task.dag

    dag_event = build_dag_event(dag_run, event_type, entity="task")
    task_event = TaskEvent(
        task_id=task_instance.task_id,
        task_status=task_instance.state,
        task_start_date=task_instance.start_date,
        task_end_date=task_instance.end_date,
        **dag_event.dict(),
    )
    publish_event(task_event)


# Hook Implementations
@hookimpl
def on_task_instance_running(
    previous_state: TaskInstanceState, task_instance: TaskInstance, session: t.Optional[Session]
):
    """
    This method is called when task state changes to RUNNING.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that is running its dag_run,
    task and dag information.
    """
    handle_task_event(task_instance, "task_instance_running")


@hookimpl
def on_task_instance_success(
    previous_state: TaskInstanceState, task_instance: TaskInstance, session: t.Optional[Session]
):
    """
    This method is called when task state changes to SUCCESS.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has succeeded its
    dag_run, task and dag information.
    """
    handle_task_event(task_instance, "task_instance_succcess")


@hookimpl
def on_task_instance_failed(
    previous_state: TaskInstanceState, task_instance: TaskInstance, session: t.Optional[Session]
):
    """
    This method is called when task state changes to FAILED.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has failed its dag_run,
    task and dag information.
    """
    handle_task_event(task_instance, "task_instance_failed")


@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    handle_dag_event(dag_run, "dag_run_success")


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    handle_dag_event(dag_run, "dag_run_failed")


@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    handle_dag_event(dag_run, "dag_run_running")


# Plugin Registration
class OceanEventsPlugin(AirflowPlugin):
    """This is the plugin class required to register with Airflow the Ocean Events plugin."""

    name = "OceanEventsPlugin"
    listeners = [sys.modules[__name__]]

Hypothesis

My suspicion is that this might be related to an interference between the libraries used by Airflow's PostgreSQL backend and those used by the PubSub client library. The error message suggests a potential Protocol Violation with psycopg2. It could be something related to an open thread during the pub/sub phase that interfere with the task registration in the PostgreSQL of Airflow internals.

Questions

  • Has anyone faced a similar issue with Airflow in conjunction with PubSub notifications?
  • Could there be a conflict or compatibility issue with the psycopg2 library when the PubSub client is also in use?
  • Any recommended approaches or fixes for this problem?
1 2 1,820
2 REPLIES 2

Hi @dadadima

Welcome and thank you for reaching out to our community for help.

I understand that you are having challenges with Airflow waiting for a task to complete prior proceeding to the next action item. Although Apache Airflow is not particularly a Google owned product and I may have limited knowledge about it, I will try to assist you in looking for answers.

A couple of reasons might be related to Airflow's configuration with regards to callbacks or scheduler:

  • You may need to set a callback task to wait for Pub/Sub's notification to be sent prior to tagging the task as complete. Consider using on_success_callback type, it is set to none by default which makes the task immediately marked as successful telling Airflow to proceed
  • As you mentioned, check on your scheduler if it is set to retry the task that failed upon Pub/Sub notification. By default, the scheduler will only retry a task if it is caused by a network error, consider using retry_on_failure to let the scheduler attempt to rerun the failed tasks again.

Your thoughts about client libraries can also be considered as probable cause because both psycopg2 and Pub/Sub client libraries potentially utilize libpq. Consider using pg8000 as alternative for connecting to PostgreSQL.

Here are some other resources that can be of help:

 

I am also facing same issue, while using a custom plugin to send notification to a MS Teams Channel for on_success_callback, on_failure_callback and on_retry_callback. 

The DAG used to work prior to upgrading to composer-2.4.6-airflow-2.6.3.

[2024-01-02, 11:47:25 IST] {base.py:73} INFO - Using connection ID 'ms_teams' for task execution. [2024-01-02, 11:47:26 IST] {standard_task_runner.py:104} ERROR - Failed to execute job 382012 for task t3_load_data ((psycopg2.OperationalError) server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. (Background on this error at: https://sqlalche.me/e/14/e3q8); 127943)

 

@dadadima : Were you able to get a solution to this issue?