Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

GCP PubSub asynch Publish, long delays in running callback

We are running Microservices on GKE and we subscribe and publish messages to PubSub asynchronously. We are seeing huge delays executing the callbacks where the P99 is in the order of 6-10s irrespective of the load.

Each of our GKE nodes are provisioned with 2 CPUs and 2GB memory. The CPU and memory usage is < 25% even under high loads.

My publish code

 

  private void publish(
      final ProcessingContext context,
      final PubsubMessage message,
      final AckReplyConsumer consumer,
      final TopicName targetTopic,
      ExecutorService callbackExecutor,
      PublisherFactory publisherFactory) {

    // Get Publisher from Cache
    // This will be a persistent publisher
    Supplier<ApiFuture<String>> futureSupplier =
        () -> publisherFactory.createPublisher(String.valueOf(targetTopic)).publish(message);

    long ts = System.nanoTime();
    ApiFuture<String> future = context.execPublishCallForMessage(futureSupplier);
    long publishCallLatency = System.nanoTime() - ts;
    log.info("Success publishCallLatency: {}", publishCallLatency);

    final long publishBeginTs = System.nanoTime();
    ApiFutures.addCallback(
        future,
        new ApiFutureCallback<>() {

          @Override
          public void onSuccess(String result) {
            long publishAndCallbackLatency = System.nanoTime() - publishBeginTs;
            log.info("Success publishAndCallbackLatency: {}", publishAndCallbackLatency);
            callbackExecutor.execute(consumer::ack);
          }

          @Override
          public void onFailure(Throwable t) {
            long publishAndCallbackLatency = System.nanoTime() - publishBeginTs;
            log.info("Failed publishAndCallbackLatency: {}", publishAndCallbackLatency);
            callbackExecutor.execute(consumer::nack);
          }
        },
        MoreExecutors.directExecutor());
  }

 

 

The publishCallLatency is low in the order of 5-10 ms but the publishAndCallbackLatency is very high in the order of 6-10s.

I have flow control and batching enabled. But I'm unable to figure why it takes such a long time for the callback to execute. I tried increasing the publisher executor threads and ran profiler but most of the executor threads are idle. I'm trying to reduce this latency and can't explain this latency.

Any ideas?

 

0 6 1,130
6 REPLIES 6

Pub/Sub's asynchronous design prioritizes high throughput and decouples the publishing process from acknowledgment (ACK). When you publish a message, it is added to a queue, and the acknowledgment process is separate from the publishing operation, which can lead to perceived delays in callbacks.

Network latency, Pub/Sub's internal message processing, and potential queuing mechanisms contribute to delays before the acknowledgment reaches your callback. This includes time spent in Pub/Sub's message queuing and processing.

Even if your publisher executor threads are underutilized, the callbackExecutor might be a bottleneck if it has insufficient threads or if it's handling other high-latency tasks. Proper configuration of the executor is crucial to handle the volume of callbacks efficiently.

Here are Steps to Diagnose and Reduce Latency:

Increase Acknowledgement Deadline:

  • Pub/Sub has a default acknowledgment deadline, which, if too short, can cause premature redelivery of messages if the processing takes longer than expected. Adjust the deadline based on your average processing time to ensure messages are acknowledged within the appropriate time frame.

Optimize Callback Executor:

  • Thread Pool Sizing: Ensure the callbackExecutor has a sufficient number of threads to handle the volume of callbacks promptly. If your callbacks involve CPU-bound tasks, increasing the thread count might improve performance.
  • Dedicated Executor: For complex or time-consuming callback logic, use a separate, dedicated executor pool to avoid contention with other tasks and to handle the workload more efficiently.

Flow Control and Batching:

  • Fine-Tune Settings: Adjust flow control and batching settings to balance throughput and latency. Excessive batching can increase latency, so it's essential to find an optimal configuration. For example:
Publisher publisher = Publisher.newBuilder(targetTopic)
    .setBatchingSettings(
        BatchingSettings.newBuilder()
            .setElementCountThreshold(100L)
            .setRequestByteThreshold(1_000_000L)
            .setDelayThreshold(Duration.ofMillis(10))
            .build())
    .setFlowControlSettings(
        FlowControlSettings.newBuilder()
            .setMaxOutstandingElementCount(1000L)
            .setMaxOutstandingRequestBytes(10_000_000L)
            .build())
    .build();

Message Ordering:

  • Ordered Delivery: Be aware that enforcing strict message ordering can introduce additional latency compared to unordered delivery. If ordering is not critical, consider configuring Pub/Sub for unordered delivery to reduce latency.

Monitoring and Profiling:

  • Pub/Sub Metrics: Monitor key Pub/Sub metrics like backlog and delivery latency to gain insights into system behavior and identify potential bottlenecks.
  • Code Profiling: Profile your callback code to pinpoint any performance issues within your application logic that could be contributing to delays.

Alternative Acknowledgement Strategies:

  • Batch Acknowledgement: If your processing logic permits, acknowledge messages in batches to improve throughput. This can reduce the overhead associated with individual message acknowledgments.

Code Refinement Suggestions

Explicit Executor Configuration:

  • Configure thread pools explicitly for both the publisher and callback executors to match the application's concurrency needs and avoid relying on default settings, which may not be optimal for your workload.

Error Handling and Retries:

  • Implement robust error handling and retry mechanisms in your callback logic to manage transient failures gracefully and ensure reliability in message processing.
// Dedicated thread pool for callbacks
ExecutorService callbackExecutor = Executors.newFixedThreadPool(10); // Adjust size based on workload

// ... rest of your publish code ...

ApiFutures.addCallback(
    future,
    new ApiFutureCallback<>() {
        @Override
        public void onSuccess(String result) {
            // Handle success
        }

        @Override
        public void onFailure(Throwable t) {
            // Handle failure
        }
    },
    callbackExecutor); // Use the dedicated executor

Important Considerations

Pub/Sub Limits:

  • Be aware of Pub/Sub's service limits, such as the maximum number of outstanding messages and message size. These limits can affect the performance and scalability of your Pub/Sub usage.

Resource Constraints:

  • If your GKE nodes are underutilized, consider adjusting resource allocations or exploring autoscaling options to ensure resources are effectively used. Use monitoring tools to verify actual constraints and adjust accordingly.

 

Hi @ms4446 

Thanks a lot for your detailed response.

Our service reads from PubSub using StreamingPull and publishes to another target PubSub topic.  To ensure delivery, we ack the message only onSuccess in the publish callback.  

I'm assuming that when I call AckReplyConsumer consumer.ack() the Google PubSub client lib internally batches and send the acks to PubSub and so there is no need for me to batch. Am I right in my assumption?

The reason why I use MoreExecutors.directExecutor() for the callback is to log the time for the response to reach the callback function. Once I log I immediately delegate the acking to the callBackExecutor. So there is no CPU intensive operation within the callback. The delay I see is in reaching the callback. I checked the publish latency to the topic in PubSub

Screenshot 2024-06-10 at 1.59.36 PM.png

The p99 here is around 50ms. So I think the response reaches my service but it is stuck in the grpc/netty layer before it reaches the callback. I tried printing the thread name and stacktrace inside the callback function and I see that it is using the Publisher executor threads

13:16:06.448 [pubsub-connect-system-3] INFO  c.f.d.d.load.pubsub.PubsubPublisher - Published message with id = 10192072744586459, threadName = pubsub-connect-system-3, stackTrace = java.lang.Throwable
	at com.f.d.d.load.pubsub.PubsubPublisher$1.onSuccess(PubsubPublisher.java:55)
	at com.f.d.d.load.pubsub.PubsubPublisher$1.onSuccess(PubsubPublisher.java:35)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
	at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
	at com.google.api.core.SettableApiFuture.set(SettableApiFuture.java:46)
	at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onSuccess(Publisher.java:581)
	at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1900(Publisher.java:537)
	at com.google.cloud.pubsub.v1.Publisher$3.onSuccess(Publisher.java:483)
	at com.google.cloud.pubsub.v1.Publisher$3.onSuccess(Publisher.java:470)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:203)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:115)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
	at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onSuccess(GrpcExceptionCallable.java:88)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:563)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:536)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

I have tried the following

  1. To trace the response from PubSub. This doesn't seem to be possible as I can't add a tracer to the Publisher. 
  2. To add interceptors. But I see that you cannot intercept the response on the client side.

Really appreciate your response

Thanks

 

There is a syntax error in the sample codes under above "Error Handling and Retries" section:

  • The method setFlowControlSettings(FlowControlSettings) is undefined for the type Publisher.Builder

Discard above posting for syntax error, the setFlowControlSettings(FlowControlSettings) should be set on BatchingSettings, not on Publisher.Builder.

Thanks a lot for your response.

Our service reads from a topic using StreamingPull and publishes to a PubSub topic.  So we ack the message on successful publish to the target topic. 

As far as using DirectExecutors for the ApiFutureCallback, the reason I do it is to only log the duration from the time I published to the time the onSuccess method is called. I immediately delegate the acking to the callBackExecutor. There is no other CPU intensive operation in the onSuccess method. 

I have different executors for my Publisher and callBackExecutor. The callBackExcutor I believe is sufficiently ( at least from the profiling POV) as I see most of them to be idle. But I will still take a look. Meanwhile I logged a Thread StackTrace in the OnSuccess method and this is what I see

	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
	at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
	at com.google.api.core.SettableApiFuture.set(SettableApiFuture.java:46)
	at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onSuccess(Publisher.java:581)
	at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1900(Publisher.java:537)
	at com.google.cloud.pubsub.v1.Publisher$3.onSuccess(Publisher.java:483)
	at com.google.cloud.pubsub.v1.Publisher$3.onSuccess(Publisher.java:470)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:203)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:115)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
	at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onSuccess(GrpcExceptionCallable.java:88)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
	at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:563)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:536)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Could it be possible that the responses are stuck somewhere in grpc/netty call stack?

Also I verified the publish latency in GCP monitoring and the latencies are quite low

Screenshot 2024-06-10 at 1.59.36 PM.png

So as per my understanding it looks like the response is coming from pubsub but my service is taking time to process the response. Is my observation correct?

 

Thanks again for the very detailed response.

 

 

 

Hi @kvajjala59 ,

The Pub/Sub client libraries handle internal batching of acknowledgments (ACKs) based on internal thresholds and optimizations. You do not need to implement additional batching for ACKs in your code. 

While using MoreExecutors.directExecutor() for immediate logging is efficient, the subsequent delegation to callbackExecutor might introduce contention if the latter isn't properly configured for high concurrency. If callbackExecutor has a limited number of threads, incoming responses may queue up, causing delays in reaching the callbacks.

Your logs show that callbacks are running on pubsub-connect-system threads, which belong to the publisher's internal thread pool. If this pool is saturated due to a high publishing rate, it could delay callback execution. Even if the callback processing isn't CPU-intensive, insufficient thread availability can still introduce delays.

Despite the p99 publish latency being around 50ms, occasional spikes or network variations could affect individual message delivery, leading to delays in triggering callbacks. The observed latency spike could be related to transient network issues or service load variations.

Potential bottlenecks in your client-side code, such as resource contention or inefficient logging mechanisms, could contribute to delays. Inefficient handling of response data or blocking operations within the callback might exacerbate the issue.

Here is how to troubleshoot:

Analyze the callbackExecutor:

Ensure it's a thread pool executor with an appropriate number of threads to handle expected concurrency. Adjust the thread pool size and monitor the impact on callback delays.

Example configuration:

 
ExecutorService callbackExecutor = Executors.newFixedThreadPool(10); // Adjust size as needed

Tune Publisher Settings:

Experiment with setMaxOutstandingMessages() and setMaxOutstandingBytes() settings in your Publisher configuration to reduce the load on the publisher's internal thread pool.

Example:

 
Publisher publisher = Publisher.newBuilder(targetTopic)
    .setFlowControlSettings(
        FlowControlSettings.newBuilder()
            .setMaxOutstandingElementCount(1000L)
            .setMaxOutstandingRequestBytes(10_000_000L)
            .build())
    .build();

Profile Your Application:

Use profiling tools to identify performance bottlenecks in your application, particularly within the callback and logging operations. If logging is identified as a blocking operation, consider using asynchronous logging mechanisms.

Enable Additional Logging:

Log detailed timing information at various stages within the callback execution, including when the response is received, when logging completes, and when the ACK is sent. This will help isolate where the delay occurs.

Monitor Network and Pub/Sub Metrics:

Continuously monitor Pub/Sub publish latency metrics, as well as network latency and throughput metrics between your service and Pub/Sub. Look for correlations between latency spikes and callback delays to identify patterns or issues.

If optimizing callback execution does not resolve the issue, you might explore an alternative ACK strategy. Consider acknowledging the message immediately after publishing and handling potential failures through a dead-letter topic or other error-handling mechanisms. This approach may reduce latency at the cost of added complexity in error management.

Please Note:  While you cannot directly add a tracer to the Publisher, you can use tools like OpenTelemetry or similar to trace interactions with Pub/Sub. These tools provide insights into request/response timings and can help diagnose performance issues.