Hi everyone,
I'm building a system where a worker node pushes ADK agent responses to a Google Pub/Sub topic. A relay node, which is subscribed to that topic, receives those messages and streams them to users via Server-Sent Events (SSE).
For the most part, everything works as expected. However, I'm encountering an intermittent issue: about 1 out of every 10 messages is delayed by several minutes before it reaches the relay node—even though the message is successfully published to Pub/Sub. This delay makes the system unreliable for production use.
I suspect there's something I'm doing wrong, but I haven’t been able to identify the issue.
Api endpoint:
from fastapi import FastAPI, Depends, status
from pydantic import BaseModel, Field
from redis.asyncio import Redis
from fastapi.middleware.cors import CORSMiddleware
# Internal imports
from redis_conn import get_redis_connection
from tasks import process_message_task
# --- Pydantic Models ---
class ChatRequest(BaseModel):
"""Defines the structure for an incoming chat message."""
# The user_id is now correctly typed as an integer.
user_id: int = Field(..., example=12345)
# Added conversation_id to match the frontend payload
conversation_id: str = Field(..., example="conv-abc-123")
message: str = Field(..., example="Hello, tell me a joke.")
class QueueResponse(BaseModel):
"""Defines the structure for the API's response."""
status: str
task_id: str
# This now correctly matches the integer type for user_id
user_id: int
# Added conversation_id to the response model
conversation_id: str
# --- FastAPI Application ---
app = FastAPI(
title="Chatbot API",
description="Receives user messages and queues them for processing by a worker.",
version="1.1.0" # Version bump to reflect changes
)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.post(
"/api/v1/message",
status_code=status.HTTP_202_ACCEPTED,
response_model=QueueResponse,
)
async def receive_message(
request: ChatRequest,
# This dependency injection is a clean way to get a Redis connection,
# but we don't actually need it here since the broker handles the connection.
# It's kept for demonstration purposes if you need direct Redis access in an endpoint.
redis: Redis = Depends(get_redis_connection)
):
"""
Receives a message from a user, validates it, and places it into a
queue for the chatbot worker to process.
"""
print(f"API: Received message from user '{request.user_id}' in conversation '{request.conversation_id}'. Queuing task...")
# Asynchronously send the task to the broker.
# .kiq() is the method to send a task.
# Now passing the conversation_id to the worker task.
task = await process_message_task.kiq(
user_id=request.user_id,
conversation_id=request.conversation_id,
message=request.message
)
print(f"API: Task '{task.task_id}' for user '{request.user_id}' has been queued.")
# Return a response confirming the task has been queued.
# The conversation_id is now included in the response.
return QueueResponse(
status="Message received and queued for processing.",
task_id=task.task_id,
user_id=request.user_id,
conversation_id=request.conversation_id,
)
@app.get("/")
async def root():
"""A simple root endpoint to confirm the API is running."""
return {"message": "Chatbot API is running."}
Worker Node Code:
# ==============================================================================
# File: tasks.py (stream‑enabled worker with non-blocking publish)
# Purpose: Consume user messages, run ADK agent with SSE streaming, and publish
# each token chunk to a Google Cloud Pub/Sub topic.
# ==============================================================================
import os
import logging
import time
import uuid
from datetime import datetime, timezone
from broker import broker
from db_queries import get_all_pet_details_by_user_id
# --- Google Cloud --------------------------------------------------------------
from google.cloud import pubsub_v1
from google.cloud import firestore
# ⬇️ add just after the other imports
from functools import partial # NEW
# --- ADK / Vertex --------------------------------------------------------------
# from adk_client import (
# runner,
# run_config,
# session_service,
# )
from adk_client import ensure_session, run_agent_stream, create_session
# ------------------------------------------------------------------------------
# Logging
# ------------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s][%(levelname)s][Worker] %(message)s",
)
logger = logging.getLogger(__name__)
# ⬇️ add somewhere below the logger definition (keep it close for clarity)
def _log_publish_result(future: pubsub_v1.publisher.futures.Future,
ordering_key: str) -> None:
"""
Runs inside the Pub/Sub publisher thread when a publish completes.
Logs success or failure and unpauses the ordering key on error.
"""
try:
msg_id = future.result() # returns server-assigned ID
logger.info("✓ Publish OK [%s] msg_id=%s", ordering_key, msg_id)
except Exception as exc:
logger.error("✗ Publish FAILED [%s]: %s", ordering_key, exc)
# 🔑 If publish failed, unlock the key so later messages can flow
publisher.resume_publish(ordering_key)
# ------------------------------------------------------------------------------
# Firestore & Pub/Sub clients
# ------------------------------------------------------------------------------
db = firestore.AsyncClient()
publisher = pubsub_v1.PublisherClient(
publisher_options=pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
)
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
TOPIC_ID = "chatbot-stream-topic"
TOPIC_PATH = publisher.topic_path(GCP_PROJECT_ID, TOPIC_ID)
# ------------------------------------------------------------------------------
# Taskiq task
# ------------------------------------------------------------------------------
@broker.task
async def process_message_task(user_id: int, conversation_id: str, message: str) -> None:
"""Handle one inbound user message with performance logging."""
task_start_time = time.perf_counter()
logger.info("Processing message for user '%s' in conversation '%s'", user_id, conversation_id)
convo_doc_ref = db.collection("conversations").document(conversation_id)
session_id = None
@firestore.async_transactional
async def get_or_create_convo(transaction):
nonlocal session_id
convo_snapshot = await convo_doc_ref.get(transaction=transaction)
if convo_snapshot.exists:
session_id = convo_snapshot.get("agent_session_id")
# cheap no-op ping – prevents very rare stale-session errors
if session_id:
await ensure_session(session_id, str(user_id))
transaction.update(convo_doc_ref, {"last_message_at": datetime.now(timezone.utc)})
else:
try:
logger.info("New conversation '%s'. Creating session and Firestore doc.", conversation_id)
pet_data_start = time.perf_counter()
all_pets_info = await get_all_pet_details_by_user_id(user_id)
logger.info("Esta es all_pets_info %s", all_pets_info)
pet_data_end = time.perf_counter()
logger.info("[PERF] Pet data query took: %.4f seconds", pet_data_end - pet_data_start)
init_state = {"info_mascotas": all_pets_info or {}}
create_start = time.perf_counter()
session_id = (await create_session(str(user_id), state=init_state))["id"]
create_end = time.perf_counter()
logger.info("[PERF] Cloud-Run session creation took: %.4f s", create_end - create_start)
transaction.set(convo_doc_ref, {
"user_id": str(user_id), "agent_session_id": session_id,
"subject": "New Conversation", "created_at": datetime.now(timezone.utc),
"last_message_at": datetime.now(timezone.utc),
})
except Exception as e:
logger.critical(
"CRITICAL ERROR during new conversation creation for convo_id '%s': %s",
conversation_id, e, exc_info=True
)
raise
try:
get_convo_start = time.perf_counter()
await get_or_create_convo(db.transaction())
get_convo_end = time.perf_counter()
logger.info("[PERF] Get/Create conversation took: %.4f seconds", get_convo_end - get_convo_start)
except Exception as e:
logger.error("Transaction to get/create conversation failed. Task will not proceed: %s", e)
return
if not session_id:
logger.error("Failed to get or create a session ID. Aborting task.")
return
persist_user_msg_start = time.perf_counter()
try:
user_msg_ref = convo_doc_ref.collection("messages").document()
await user_msg_ref.set({"timestamp": datetime.now(timezone.utc), "role": "user", "content": message})
except Exception as exc:
logger.error("Firestore write (user msg) failed: %s", exc, exc_info=True)
persist_user_msg_end = time.perf_counter()
logger.info("[PERF] Persisting user message took: %.4f seconds", persist_user_msg_end - persist_user_msg_start)
stream_agent_start = time.perf_counter()
message_id = str(uuid.uuid4())
full_reply_parts: list[str] = []
first_token_time = None
try:
async for chunk, is_first in run_agent_stream(str(user_id), session_id, message):
if is_first:
first_token_time = time.perf_counter()
logger.info("[PERF] Time to first token: %.4f seconds", first_token_time - stream_agent_start)
await publish_non_blocking(
publisher, TOPIC_PATH, ordering_key=conversation_id,
user_id=str(user_id), conversation_id=conversation_id,
chunk=chunk, message_id=message_id,
)
full_reply_parts.append(chunk)
except Exception as exc:
logger.error("Streaming/publish failure: %s", exc, exc_info=True)
stream_agent_end = time.perf_counter()
logger.info("[PERF] Agent streaming and publishing took: %.4f seconds", stream_agent_end - stream_agent_start)
persist_bot_msg_start = time.perf_counter()
full_reply = "".join(full_reply_parts)
if full_reply:
try:
bot_msg_ref = convo_doc_ref.collection("messages").document()
await bot_msg_ref.set({"timestamp": datetime.now(timezone.utc), "role": "bot", "content": full_reply})
except Exception as exc:
logger.error("Firestore write (bot msg) failed: %s", exc, exc_info=True)
persist_bot_msg_end = time.perf_counter()
logger.info("[PERF] Persisting bot message took: %.4f seconds", persist_bot_msg_end - persist_bot_msg_start)
task_end_time = time.perf_counter()
logger.info("[PERF] TOTAL task duration for conversation '%s': %.4f seconds", conversation_id, task_end_time - task_start_time)
# ------------------------------------------------------------------------------
# Helper – publish chunks (Non-blocking)
# ------------------------------------------------------------------------------
async def publish_non_blocking(
publisher: pubsub_v1.PublisherClient, topic_path: str, ordering_key: str,
user_id: str, conversation_id: str, chunk: str, message_id: str,
) -> None:
"""
Publishes a single chunk without waiting for the result.
This is a "fire-and-forget" approach suitable for high-throughput streaming.
The Google Cloud client library handles batching and sending in the background.
"""
# --- MODIFICATION ---
# We no longer `await` the result. We just send the request and move on.
future = publisher.publish( # fire-and-forget
topic_path, chunk.encode("utf-8"),
user_id=user_id,
conversation_id=conversation_id,
message_id=message_id,
ordering_key=ordering_key,
)
# 👉 when the publish finishes, log the outcome (success or error)
future.add_done_callback(
partial(_log_publish_result, ordering_key=ordering_key)
)
logger.debug("Queued chunk for publishing to convo '%s'", conversation_id)
Relay Endpoint Code:
# relay.py – SSE + Pub/Sub (user_id & conversation_id in the path)
import os, logging, threading, time, asyncio, janus
from typing import Dict, Tuple
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette import EventSourceResponse
from google.cloud import pubsub_v1
from google.api_core.exceptions import GoogleAPICallError
# ── CONFIG ─────────────────────────────────────────────────────────────
PROJECT_ID = os.getenv("GCP_PROJECT_ID", "appsantanafn94")
SUB_ID = os.getenv("SUB_ID", "chatbot-stream-sub")
PORT = int(os.getenv("PORT", 8001))
# ── LOGGING ────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO,
format="[%(asctime)s][%(levelname)s] %(message)s")
log = logging.getLogger("relay")
# ── FASTAPI APP ────────────────────────────────────────────────────────
app = FastAPI(title="Chat relay")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000"],
allow_methods=["*"], allow_headers=["*"],
)
# ── PER-CONVERSATION REGISTRY ──────────────────────────────────────────
# conversation_id ➜ (janus.Queue, owning_event_loop)
CLIENTS: Dict[str, Tuple[janus.Queue, asyncio.AbstractEventLoop]] = {}
# ── SSE ENDPOINT ───────────────────────────────────────────────────────
@app.get("/stream/{user_id}/{conversation_id}")
async def stream(user_id: str, conversation_id: str):
q = janus.Queue()
loop = asyncio.get_running_loop() # the uvicorn loop
CLIENTS[conversation_id] = (q, loop)
log.info("👋 attach conv=%s user=%s", conversation_id, user_id)
async def event_gen():
try:
while True:
chunk = await q.async_q.get()
yield {"data": chunk}
finally:
CLIENTS.pop(conversation_id, None)
await q.aclose()
log.info("👋 detach conv=%s user=%s", conversation_id, user_id)
return EventSourceResponse(event_gen())
# ── PUB/SUB STREAMING-PULL ─────────────────────────────────────────────
subscriber = pubsub_v1.SubscriberClient()
SUB_PATH = subscriber.subscription_path(PROJECT_ID, SUB_ID)
def _callback(msg: pubsub_v1.subscriber.message.Message):
cid = msg.attributes.get("conversation_id")
data = msg.data.decode()
tpl = CLIENTS.get(cid)
if tpl is None:
log.info("NACK %s – no active queue", cid) # ← NEW
msg.nack()
return
q, _ = tpl
try:
q.sync_q.put_nowait(data)
log.info("ACK %s – chunk enqueued", cid) # ← NEW
msg.ack()
except Exception as exc:
log.error("enqueue failed: %s", exc)
msg.nack()
def _subscriber_runner():
while True:
future = subscriber.subscribe(SUB_PATH, callback=_callback)
log.info("🟢 streaming-pull connected to %s", SUB_PATH)
try:
future.result() # blocks until error / cancel
except GoogleAPICallError as err:
log.warning("stream error: %s – reconnecting", err)
except Exception as err:
log.exception("callback raised: %s – reconnecting", err)
time.sleep(2)
threading.Thread(target=_subscriber_runner,
name="pubsub-pull", daemon=True).start()
# ── RUN ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT)
Here’s an example illustrating the issue:
Publish timestamp (from worker node):
2025-07-08 11:30:44,569][tasks][INFO ][worker-1] ✓ Publish OK [q2-1] msg_id=15495676146163822
[2025-07-08 11:30:44,587][tasks][INFO ][worker-1] [PERF] Agent streaming and publishing took: 2.3399 seconds
[2025-07-08 11:30:44,738][tasks][INFO ][worker-1] [PERF] Persisting bot message took: 0.1505 seconds
[2025-07-08 11:30:44,738][tasks][INFO ][worker-1] [PERF] TOTAL task duration for conversation 'q2-1': 4.1520 seconds
[2025-07-08 11:30:44,806][tasks][INFO ][worker-1] ✓ Publish OK [q2-1] msg_id=15495990913479959
Receive timestamp (at relay endpoint):
INFO: 127.0.0.1:54599 - "GET /stream/2/q-1 HTTP/1.1" 200 OK
[2025-07-08 11:30:37,531][INFO] 👋 attach conv=q2-1 user=2
INFO: 127.0.0.1:54601 - "GET /stream/2/q2-1 HTTP/1.1" 200 OK
[2025-07-08 11:38:24,451][INFO] ACK q2-1 – chunk enqueued
[2025-07-08 11:38:24,734][INFO] ACK q2-1 – chunk enqueued
[2025-07-08 11:38:24,816][INFO] NACK q1-1 – no active queue
[2025-07-08 11:38:24,845][INFO] ACK q2-1 – chunk enqueued
[2025-07-08 11:38:25,128][INFO] ACK q2-1 – chunk enqueued
[2025-07-08 11:38:25,554][INFO] ACK q2-1 – chunk enqueued
[2025-07-08 11:38:25,981][INFO] NACK q1-1 – no active queue
As you can see, the delay between publishing and receiving is unexpectedly long.
The message isn’t being nacked or retried. I log acks/nacks, and neither occurs until the message finally shows up several minutes later.
I'm not sure what else to try at this point. If anyone has experience with similar patterns or can spot something I'm missing, I’d really appreciate your input.
Thanks in advance!