Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Issue with Pub/Sub Subscriber Connecting but Not Receiving Messages

Hello, I am facing an issue with the Google Cloud Pub/Sub subscriber in my Spring application that is hosted on GKE. As of now my subscriber connects successfully to the Pub/Sub service, but it is not receiving any messages from the associated topic. I’ve tried to troubleshoot the issue using several methods but I am still unable to resolve it. I have previously had the exact same setup and code, which has worked fine for an extended period.  I have one publisher for my topic and one subscriber for the subscription. Here is the configuration for my subscription: 

  • Delivery Type: Pull
  • Subscription Expiration: Expires in 31 days if there is no activity.
  • Acknowledgement Deadline: 60 seconds
  • Subscription Message Retention Duration: 7 days
  • Exactly Once Delivery: Enabled
  • Message Ordering: Enabled
  • Dead Lettering: Disabled
  • Retry Policy: Exponential backoff with a minimum backoff duration of 1 second and a maximum backoff duration of 60 seconds.
  • Retain Acknowledged Messages: No
  • Subscription Filter: None
  • Topic Message Retention Duration: Not set

On Saturday 15/11-2024 21:35 CET, my application outputted the following after having worked flawlessly for an extended period: 

  • "WARN LUcw [Subscriber-SE-1-1] com.google.cloud.pubsub.v1.StreamingSubscriberConnection: failed to send operations"
  • "com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed."
  • "INFO LUcw [Subscriber-SE-1-1] com.google.cloud.pubsub.v1.StreamingSubscriberConnection: Permanent error invalid ack id message, will not resend."

Afterwards the subscriber stopped receiving any messages. I restarted my application and it did not resolve the issue. Followed by this, I purged the subscription since the total unacked messages had grown to 55k. Purging the subscription, followed by restarting the application once more, did not either resolve the issue. My application is hosted on cloud, so I reverted to scaling it down and started debugging the issue locally.  I enabled the following and the subscribers connection can be asserted to be successful: 

logging.level.com.google.cloud.pubsub=DEBUG
-INFO LUx7 [ Thread-8] c.g.c.p.v.StreamingSubscriberConnection: Starting subscriber.

However, the subscriber does not receive any messages, even though there are more than 3k messages on the topic. To further troubleshoot the issue, I ran the following command:

gcloud pubsub subscriptions pull projects/[PROJECT_ID]/subscriptions/[SUBSCRIPTION_NAME] --limit=10 --auto-ack --verbosity debug

Despite multiple attempts, the command often returned 0 items, with occasional instances where a single message was successfully retrieved from the subscription. This inconsistent behavior led me to believe that there may be intermittent connectivity issues or problems with message delivery from the Pub/Sub service.  What I am surprised about, is that on multiple restarts and purging of messages, the subscriber still won't receive messages. I have extended the acknowledgement deadline to the maximum, but that did not either help. I have not tried deleting the subscription/topic and starting on a clean slate, since I want to find the cause of this issue, in order to not run into it again.

Here is my source code with redactions made to internal details: (for context, it is known that each message was processed on average sub 50 ms from earlier profiling)

package com.example.BMSubscriber;

import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class BMSubscriber {

   @Value("${gcloud.PROJECT_NAME}")
   private String PROJECT_NAME;

   @Value("${gcloud.SUBSCRIPTION_ID}")
   private String SUBSCRIPTION_ID;

   public void subscriberClass() {
      try {
         ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT_NAME, SUBSCRIPTION_ID);

         MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
            try {
               String messageAsUTF8 = message.getData().toStringUtf8();
               // Internal processing of the message data here 
               consumer.ack();
            } catch (Exception e) {
logger.log(LEVEL.SEVERE, "Error processing message: " + Util.getStackTrace(e)); consumer.nack(); } }; Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver) .setFlowControlSettings(FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(100L) .setMaxOutstandingRequestBytes(100L * 1024L * 1024L) .build()) .build(); subscriber.startAsync().awaitRunning(); } catch (Throwable e) { logger.log(Level.SEVERE, "Could not start BMSubscriber: " + Util.getStackTrace(e)); } } }

 TL;DR:
My Google Cloud Pub/Sub subscriber in a Spring application successfully connects but doesn't receive any messages. The setup has worked for a long time, but recently it stopped working after logging errors like INVALID_ARGUMENT and invalid ack id message. Despite restarting and purging the subscription, the issue persists. The subscriber is set to "Pull" delivery type with retries, exponential backoff, and message retention settings. Messages exist in the topic, but the subscriber fails to receive them. I tested the issue with gcloud pubsub subscriptions pull but got inconsistent results. The problem might be related to connectivity or message delivery from Pub/Sub. The application was scaled down and debugged locally, but no solution yet.

Edit: 19/11-2024 08:32 CET

This morning I went ahead and first tried running my application locally followed by on cloud, and it worked flawlessly, receiving messages and processing them as it had been prior to this issue arising. The question as to why remains. None of the circumstances changed, and I made 0 changes to either my code or any possible configuration. Any help to dig deeper to this is appreciated, since this will be a critical component of my system and I want to avoid running into this issue when the code is out in production.

Edit: 19/11-2024 09:25 CET

This appeared in the logs once at 09:02 CET, but the application seemed to recover from it. However I was thinking this could give further context to the issue. 

com.google.api.gax.rpc.InternalException: io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:110) ~[gax-2.50.0.jar!/:2.50.0]
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41) ~[gax-2.50.0.jar!/:2.50.0]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86) ~[gax-grpc-2.50.0.jar!/:2.50.0]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.50.0.jar!/:2.50.0]
at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82) ~[gax-grpc-2.50.0.jar!/:2.50.0]
at com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84) ~[gax-2.50.0.jar!/:2.50.0]
at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:148) ~[gax-grpc-2.50.0.jar!/:2.50.0]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.62.2.jar!/:1.62.2]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.62.2.jar!/:1.62.2]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.62.2.jar!/:1.62.2]
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570) ~[gax-grpc-2.50.0.jar!/:2.50.0]
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489) ~[grpc-core-1.62.2.jar!/:1.62.2]
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453) ~[grpc-core-1.62.2.jar!/:1.62.2]
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486) ~[grpc-core-1.62.2.jar!/:1.62.2]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) ~[grpc-core-1.62.2.jar!/:1.62.2]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) ~[grpc-core-1.62.2.jar!/:1.62.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) ~[grpc-core-1.62.2.jar!/:1.62.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723) ~[grpc-core-1.62.2.jar!/:1.62.2]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.62.2.jar!/:1.62.2]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[grpc-core-1.62.2.jar!/:1.62.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[na:na]
at java.base/java.lang.Thread.run(Unknown Source) ~[na:na]
Caused by: io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.62.2.jar!/:1.62.2]
... 20 common frames omitted

 

0 0 196
0 REPLIES 0