Announcements
This site is in read only until July 22 as we migrate to a new platform; refer to this community post for more details.
Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Google Push Sub messages delayed

Hi everyone,

I'm building a system where a worker node pushes ADK agent responses to a Google Pub/Sub topic. A relay node, which is subscribed to that topic, receives those messages and streams them to users via Server-Sent Events (SSE).

For the most part, everything works as expected. However, I'm encountering an intermittent issue: about 1 out of every 10 messages is delayed by several minutes before it reaches the relay node—even though the message is successfully published to Pub/Sub. This delay makes the system unreliable for production use.

I suspect there's something I'm doing wrong, but I haven’t been able to identify the issue.

Setup

  • Api endpoint:

 

from fastapi import FastAPI, Depends, status
from pydantic import BaseModel, Field
from redis.asyncio import Redis
from fastapi.middleware.cors import CORSMiddleware
# Internal imports
from redis_conn import get_redis_connection
from tasks import process_message_task

# --- Pydantic Models ---
class ChatRequest(BaseModel):
    """Defines the structure for an incoming chat message."""
    # The user_id is now correctly typed as an integer.
    user_id: int = Field(..., example=12345)
    # Added conversation_id to match the frontend payload
    conversation_id: str = Field(..., example="conv-abc-123")
    message: str = Field(..., example="Hello, tell me a joke.")

class QueueResponse(BaseModel):
    """Defines the structure for the API's response."""
    status: str
    task_id: str
    # This now correctly matches the integer type for user_id
    user_id: int
    # Added conversation_id to the response model
    conversation_id: str

# --- FastAPI Application ---
app = FastAPI(
    title="Chatbot API",
    description="Receives user messages and queues them for processing by a worker.",
    version="1.1.0" # Version bump to reflect changes
)


app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173", "http://localhost:3000"],
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.post(
    "/api/v1/message",
    status_code=status.HTTP_202_ACCEPTED,
    response_model=QueueResponse,
)
async def receive_message(
    request: ChatRequest,
    # This dependency injection is a clean way to get a Redis connection,
    # but we don't actually need it here since the broker handles the connection.
    # It's kept for demonstration purposes if you need direct Redis access in an endpoint.
    redis: Redis = Depends(get_redis_connection)
):
    """
    Receives a message from a user, validates it, and places it into a
    queue for the chatbot worker to process.
    """
    print(f"API: Received message from user '{request.user_id}' in conversation '{request.conversation_id}'. Queuing task...")

    # Asynchronously send the task to the broker.
    # .kiq() is the method to send a task.
    # Now passing the conversation_id to the worker task.
    task = await process_message_task.kiq(
        user_id=request.user_id, 
        conversation_id=request.conversation_id, 
        message=request.message
    )

    print(f"API: Task '{task.task_id}' for user '{request.user_id}' has been queued.")

    # Return a response confirming the task has been queued.
    # The conversation_id is now included in the response.
    return QueueResponse(
        status="Message received and queued for processing.",
        task_id=task.task_id,
        user_id=request.user_id,
        conversation_id=request.conversation_id,
    )

@app.get("/")
async def root():
    """A simple root endpoint to confirm the API is running."""
    return {"message": "Chatbot API is running."}

 

  • Worker Node Code:

 

# ==============================================================================
# File: tasks.py (stream‑enabled worker with non-blocking publish)
# Purpose: Consume user messages, run ADK agent with SSE streaming, and publish
#           each token chunk to a Google Cloud Pub/Sub topic.
# ==============================================================================
import os
import logging
import time

import uuid
from datetime import datetime, timezone


from broker import broker
from db_queries import get_all_pet_details_by_user_id

# --- Google Cloud --------------------------------------------------------------
from google.cloud import pubsub_v1
from google.cloud import firestore
# ⬇️ add just after the other imports
from functools import partial                      # NEW



# --- ADK / Vertex --------------------------------------------------------------
# from adk_client import (
#     runner,
#     run_config,
#     session_service,
# )

from adk_client import ensure_session, run_agent_stream, create_session



# ------------------------------------------------------------------------------
#  Logging
# ------------------------------------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s][%(levelname)s][Worker] %(message)s",
)
logger = logging.getLogger(__name__)
# ⬇️ add somewhere below the logger definition (keep it close for clarity)
def _log_publish_result(future: pubsub_v1.publisher.futures.Future,
                        ordering_key: str) -> None:
    """
    Runs inside the Pub/Sub publisher thread when a publish completes.
    Logs success or failure and unpauses the ordering key on error.
    """
    try:
        msg_id = future.result()                   # returns server-assigned ID
        logger.info("✓ Publish OK [%s] msg_id=%s", ordering_key, msg_id)
    except Exception as exc:
        logger.error("✗ Publish FAILED [%s]: %s", ordering_key, exc)
        # 🔑 If publish failed, unlock the key so later messages can flow
        publisher.resume_publish(ordering_key)
# ------------------------------------------------------------------------------
#  Firestore & Pub/Sub clients
# ------------------------------------------------------------------------------
db = firestore.AsyncClient()
publisher = pubsub_v1.PublisherClient(
    publisher_options=pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
)
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
TOPIC_ID       = "chatbot-stream-topic"
TOPIC_PATH     = publisher.topic_path(GCP_PROJECT_ID, TOPIC_ID)

# ------------------------------------------------------------------------------
#  Taskiq task
# ------------------------------------------------------------------------------
@broker.task
async def process_message_task(user_id: int, conversation_id: str, message: str) -> None:
    """Handle one inbound user message with performance logging."""
    task_start_time = time.perf_counter()
    logger.info("Processing message for user '%s' in conversation '%s'", user_id, conversation_id)
    
    convo_doc_ref = db.collection("conversations").document(conversation_id)
    
    session_id = None
    
    @firestore.async_transactional
    async def get_or_create_convo(transaction):
        nonlocal session_id
        convo_snapshot = await convo_doc_ref.get(transaction=transaction)
       

        if convo_snapshot.exists:
            session_id = convo_snapshot.get("agent_session_id")
            # cheap no-op ping – prevents very rare stale-session errors

            if session_id:
                await ensure_session(session_id, str(user_id))
            transaction.update(convo_doc_ref, {"last_message_at": datetime.now(timezone.utc)})
        else:
            try:
                logger.info("New conversation '%s'. Creating session and Firestore doc.", conversation_id)
                pet_data_start = time.perf_counter()
                all_pets_info = await get_all_pet_details_by_user_id(user_id)
                logger.info("Esta es all_pets_info %s", all_pets_info)
                pet_data_end = time.perf_counter()
                logger.info("[PERF] Pet data query took: %.4f seconds", pet_data_end - pet_data_start)

                init_state  = {"info_mascotas": all_pets_info or {}}
                create_start = time.perf_counter()
                session_id   = (await create_session(str(user_id), state=init_state))["id"]
                create_end   = time.perf_counter()
                logger.info("[PERF] Cloud-Run session creation took: %.4f s", create_end - create_start)



                transaction.set(convo_doc_ref, {
                    "user_id": str(user_id), "agent_session_id": session_id,
                    "subject": "New Conversation", "created_at": datetime.now(timezone.utc),
                    "last_message_at": datetime.now(timezone.utc),
                })
            except Exception as e:
                logger.critical(
                    "CRITICAL ERROR during new conversation creation for convo_id '%s': %s",
                    conversation_id, e, exc_info=True
                )
                raise
    
    try:
        get_convo_start = time.perf_counter()
        await get_or_create_convo(db.transaction())
        get_convo_end = time.perf_counter()
        logger.info("[PERF] Get/Create conversation took: %.4f seconds", get_convo_end - get_convo_start)
    except Exception as e:
        logger.error("Transaction to get/create conversation failed. Task will not proceed: %s", e)
        return

    if not session_id:
        logger.error("Failed to get or create a session ID. Aborting task.")
        return

    persist_user_msg_start = time.perf_counter()
    try:
        user_msg_ref = convo_doc_ref.collection("messages").document()
        await user_msg_ref.set({"timestamp": datetime.now(timezone.utc), "role": "user", "content": message})
    except Exception as exc:
        logger.error("Firestore write (user msg) failed: %s", exc, exc_info=True)
    persist_user_msg_end = time.perf_counter()
    logger.info("[PERF] Persisting user message took: %.4f seconds", persist_user_msg_end - persist_user_msg_start)

    stream_agent_start = time.perf_counter()
    message_id = str(uuid.uuid4())
    full_reply_parts: list[str] = []
    first_token_time = None

    try:
        async for chunk, is_first in run_agent_stream(str(user_id), session_id, message):
            if is_first:
                first_token_time = time.perf_counter()
                logger.info("[PERF] Time to first token: %.4f seconds", first_token_time - stream_agent_start)

            await publish_non_blocking(
                publisher, TOPIC_PATH, ordering_key=conversation_id,
                user_id=str(user_id), conversation_id=conversation_id,
                chunk=chunk, message_id=message_id,
            )
            full_reply_parts.append(chunk)

    except Exception as exc:
        logger.error("Streaming/publish failure: %s", exc, exc_info=True)
    
    stream_agent_end = time.perf_counter()
    logger.info("[PERF] Agent streaming and publishing took: %.4f seconds", stream_agent_end - stream_agent_start)

    persist_bot_msg_start = time.perf_counter()
    full_reply = "".join(full_reply_parts)
    if full_reply:
        try:
            bot_msg_ref = convo_doc_ref.collection("messages").document()
            await bot_msg_ref.set({"timestamp": datetime.now(timezone.utc), "role": "bot", "content": full_reply})
        except Exception as exc:
            logger.error("Firestore write (bot msg) failed: %s", exc, exc_info=True)
    persist_bot_msg_end = time.perf_counter()
    logger.info("[PERF] Persisting bot message took: %.4f seconds", persist_bot_msg_end - persist_bot_msg_start)

    task_end_time = time.perf_counter()
    logger.info("[PERF] TOTAL task duration for conversation '%s': %.4f seconds", conversation_id, task_end_time - task_start_time)

# ------------------------------------------------------------------------------
#  Helper – publish chunks (Non-blocking)
# ------------------------------------------------------------------------------
async def publish_non_blocking(
    publisher: pubsub_v1.PublisherClient, topic_path: str, ordering_key: str,
    user_id: str, conversation_id: str, chunk: str, message_id: str,
) -> None:
    """
    Publishes a single chunk without waiting for the result.
    This is a "fire-and-forget" approach suitable for high-throughput streaming.
    The Google Cloud client library handles batching and sending in the background.
    """
    # --- MODIFICATION ---
    # We no longer `await` the result. We just send the request and move on.
    future = publisher.publish(                 # fire-and-forget
        topic_path, chunk.encode("utf-8"),
         user_id=user_id,
         conversation_id=conversation_id,
         message_id=message_id,
         ordering_key=ordering_key,
     )
    # 👉 when the publish finishes, log the outcome (success or error)
    future.add_done_callback(
        partial(_log_publish_result, ordering_key=ordering_key)
    )
    logger.debug("Queued chunk for publishing to convo '%s'", conversation_id)

 

  • Relay Endpoint Code:

 

# relay.py  – SSE + Pub/Sub (user_id & conversation_id in the path)
import os, logging, threading, time, asyncio, janus
from typing import Dict, Tuple

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette import EventSourceResponse

from google.cloud import pubsub_v1
from google.api_core.exceptions import GoogleAPICallError

# ── CONFIG ─────────────────────────────────────────────────────────────
PROJECT_ID  = os.getenv("GCP_PROJECT_ID", "appsantanafn94")
SUB_ID      = os.getenv("SUB_ID", "chatbot-stream-sub")
PORT        = int(os.getenv("PORT", 8001))

# ── LOGGING ────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO,
                    format="[%(asctime)s][%(levelname)s] %(message)s")
log = logging.getLogger("relay")

# ── FASTAPI APP ────────────────────────────────────────────────────────
app = FastAPI(title="Chat relay")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173", "http://localhost:3000"],
    allow_methods=["*"], allow_headers=["*"],
)

# ── PER-CONVERSATION REGISTRY ──────────────────────────────────────────
# conversation_id ➜ (janus.Queue, owning_event_loop)
CLIENTS: Dict[str, Tuple[janus.Queue, asyncio.AbstractEventLoop]] = {}

# ── SSE ENDPOINT ───────────────────────────────────────────────────────
@app.get("/stream/{user_id}/{conversation_id}")
async def stream(user_id: str, conversation_id: str):
    q    = janus.Queue()
    loop = asyncio.get_running_loop()                 # the uvicorn loop
    CLIENTS[conversation_id] = (q, loop)

    log.info("👋  attach  conv=%s  user=%s", conversation_id, user_id)

    async def event_gen():
        try:
            while True:
                chunk = await q.async_q.get()
                yield {"data": chunk}
        finally:
            CLIENTS.pop(conversation_id, None)
            await q.aclose()
            log.info("👋  detach  conv=%s  user=%s", conversation_id, user_id)

    return EventSourceResponse(event_gen())

# ── PUB/SUB STREAMING-PULL ─────────────────────────────────────────────
subscriber = pubsub_v1.SubscriberClient()
SUB_PATH   = subscriber.subscription_path(PROJECT_ID, SUB_ID)

def _callback(msg: pubsub_v1.subscriber.message.Message):
    cid  = msg.attributes.get("conversation_id")
    data = msg.data.decode()

    tpl = CLIENTS.get(cid)
    if tpl is None:
        log.info("NACK %s – no active queue", cid)    # ← NEW
        msg.nack()
        return

    q, _ = tpl
    try:
        q.sync_q.put_nowait(data)
        log.info("ACK  %s – chunk enqueued", cid)     # ← NEW
        msg.ack()
    except Exception as exc:
        log.error("enqueue failed: %s", exc)
        msg.nack()

def _subscriber_runner():
    while True:
        future = subscriber.subscribe(SUB_PATH, callback=_callback)
        log.info("🟢 streaming-pull connected to %s", SUB_PATH)
        try:
            future.result()         # blocks until error / cancel
        except GoogleAPICallError as err:
            log.warning("stream error: %s – reconnecting", err)
        except Exception as err:
            log.exception("callback raised: %s – reconnecting", err)
        time.sleep(2)

threading.Thread(target=_subscriber_runner,
                 name="pubsub-pull", daemon=True).start()

# ── RUN ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=PORT)

 

Behavior Example

Here’s an example illustrating the issue:

  • Publish timestamp (from worker node):

 

2025-07-08 11:30:44,569][tasks][INFO   ][worker-1] ✓ Publish OK [q2-1] msg_id=15495676146163822
[2025-07-08 11:30:44,587][tasks][INFO   ][worker-1] [PERF] Agent streaming and publishing took: 2.3399 seconds
[2025-07-08 11:30:44,738][tasks][INFO   ][worker-1] [PERF] Persisting bot message took: 0.1505 seconds
[2025-07-08 11:30:44,738][tasks][INFO   ][worker-1] [PERF] TOTAL task duration for conversation 'q2-1': 4.1520 seconds
[2025-07-08 11:30:44,806][tasks][INFO   ][worker-1] ✓ Publish OK [q2-1] msg_id=15495990913479959

 

  • Receive timestamp (at relay endpoint):

 

INFO:     127.0.0.1:54599 - "GET /stream/2/q-1 HTTP/1.1" 200 OK
[2025-07-08 11:30:37,531][INFO] 👋  attach  conv=q2-1  user=2
INFO:     127.0.0.1:54601 - "GET /stream/2/q2-1 HTTP/1.1" 200 OK
[2025-07-08 11:38:24,451][INFO] ACK  q2-1 – chunk enqueued
[2025-07-08 11:38:24,734][INFO] ACK  q2-1 – chunk enqueued
[2025-07-08 11:38:24,816][INFO] NACK q1-1 – no active queue
[2025-07-08 11:38:24,845][INFO] ACK  q2-1 – chunk enqueued
[2025-07-08 11:38:25,128][INFO] ACK  q2-1 – chunk enqueued
[2025-07-08 11:38:25,554][INFO] ACK  q2-1 – chunk enqueued
[2025-07-08 11:38:25,981][INFO] NACK q1-1 – no active queue

 

As you can see, the delay between publishing and receiving is unexpectedly long.

What I’ve Checked

  • The message isn’t being nacked or retried. I log acks/nacks, and neither occurs until the message finally shows up several minutes later.

I'm not sure what else to try at this point. If anyone has experience with similar patterns or can spot something I'm missing, I’d really appreciate your input.

Thanks in advance!

0 1 105
1 REPLY 1

Hi @fnavarro94,

Welcome to Google Cloud Community!

The delay you’re experiencing where messages are published promptly but take several minutes to arrive at the relay suggests a problem between the Pub/Sub service and your relay node. Since publishing succeeds without issue, the likely cause of the delay lies in the subscription setup or how the relay node processes incoming messages.

Here’s what you can try to resolve the issue:

  1. Improve Relay Logging: It’s important to add detailed timestamp logs at the beginning of the _callback function and just before calling msg.ack(). This will help determine whether the delay occurs before your callback starts or during its execution.
  2. Monitor Pub/Sub Metrics (GCP Console): Check the metrics for your chatbot-stream-sub subscription—specifically, the oldest unacknowledged message age and the number of undelivered messages. A high oldest unacked message age indicates that messages are delivered but not acknowledged promptly by your relay, which can block subsequent messages due to ordering. Additionally, if the count of undelivered messages grows, it may mean Pub/Sub is holding messages back.
  3. Investigate Message Ordering: Since you’re using ordering_key=conversation_id, if one message for a conversation isn’t acknowledged quickly—even if it gets processed later—Pub/Sub will wait and hold back all the following messages for that same conversation. This is likely causing the delays you’re seeing.

  4. Network or Pub/Sub Service Side: Sometimes, delays happen because of network problems or issues with Pub/Sub in your region, though it’s not very common. To check, look at the Google Cloud Status Dashboard for any Pub/Sub problems in your area. Also, make sure your worker, Pub/Sub topic and subscription, and relay are all in the same or nearby Google Cloud regions, since messages take longer if they have to travel between distant regions.

In addition, you can also refer to this documentation, which offers common troubleshooting tips for publishing messages to a standard Pub/Sub topic.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.