We are running Microservices on GKE and we subscribe and publish messages to PubSub asynchronously. We are seeing huge delays executing the callbacks where the P99 is in the order of 6-10s irrespective of the load.
Each of our GKE nodes are provisioned with 2 CPUs and 2GB memory. The CPU and memory usage is < 25% even under high loads.
My publish code
private void publish(
final ProcessingContext context,
final PubsubMessage message,
final AckReplyConsumer consumer,
final TopicName targetTopic,
ExecutorService callbackExecutor,
PublisherFactory publisherFactory) {
// Get Publisher from Cache
// This will be a persistent publisher
Supplier<ApiFuture<String>> futureSupplier =
() -> publisherFactory.createPublisher(String.valueOf(targetTopic)).publish(message);
long ts = System.nanoTime();
ApiFuture<String> future = context.execPublishCallForMessage(futureSupplier);
long publishCallLatency = System.nanoTime() - ts;
log.info("Success publishCallLatency: {}", publishCallLatency);
final long publishBeginTs = System.nanoTime();
ApiFutures.addCallback(
future,
new ApiFutureCallback<>() {
@Override
public void onSuccess(String result) {
long publishAndCallbackLatency = System.nanoTime() - publishBeginTs;
log.info("Success publishAndCallbackLatency: {}", publishAndCallbackLatency);
callbackExecutor.execute(consumer::ack);
}
@Override
public void onFailure(Throwable t) {
long publishAndCallbackLatency = System.nanoTime() - publishBeginTs;
log.info("Failed publishAndCallbackLatency: {}", publishAndCallbackLatency);
callbackExecutor.execute(consumer::nack);
}
},
MoreExecutors.directExecutor());
}
The publishCallLatency is low in the order of 5-10 ms but the publishAndCallbackLatency is very high in the order of 6-10s.
I have flow control and batching enabled. But I'm unable to figure why it takes such a long time for the callback to execute. I tried increasing the publisher executor threads and ran profiler but most of the executor threads are idle. I'm trying to reduce this latency and can't explain this latency.
Any ideas?
Pub/Sub's asynchronous design prioritizes high throughput and decouples the publishing process from acknowledgment (ACK). When you publish a message, it is added to a queue, and the acknowledgment process is separate from the publishing operation, which can lead to perceived delays in callbacks.
Network latency, Pub/Sub's internal message processing, and potential queuing mechanisms contribute to delays before the acknowledgment reaches your callback. This includes time spent in Pub/Sub's message queuing and processing.
Even if your publisher executor threads are underutilized, the callbackExecutor
might be a bottleneck if it has insufficient threads or if it's handling other high-latency tasks. Proper configuration of the executor is crucial to handle the volume of callbacks efficiently.
Here are Steps to Diagnose and Reduce Latency:
Increase Acknowledgement Deadline:
Optimize Callback Executor:
callbackExecutor
has a sufficient number of threads to handle the volume of callbacks promptly. If your callbacks involve CPU-bound tasks, increasing the thread count might improve performance.Flow Control and Batching:
Publisher publisher = Publisher.newBuilder(targetTopic)
.setBatchingSettings(
BatchingSettings.newBuilder()
.setElementCountThreshold(100L)
.setRequestByteThreshold(1_000_000L)
.setDelayThreshold(Duration.ofMillis(10))
.build())
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
.setMaxOutstandingRequestBytes(10_000_000L)
.build())
.build();
Message Ordering:
Monitoring and Profiling:
Alternative Acknowledgement Strategies:
Code Refinement Suggestions
Explicit Executor Configuration:
Error Handling and Retries:
// Dedicated thread pool for callbacks
ExecutorService callbackExecutor = Executors.newFixedThreadPool(10); // Adjust size based on workload
// ... rest of your publish code ...
ApiFutures.addCallback(
future,
new ApiFutureCallback<>() {
@Override
public void onSuccess(String result) {
// Handle success
}
@Override
public void onFailure(Throwable t) {
// Handle failure
}
},
callbackExecutor); // Use the dedicated executor
Important Considerations
Pub/Sub Limits:
Resource Constraints:
Hi @ms4446
Thanks a lot for your detailed response.
Our service reads from PubSub using StreamingPull and publishes to another target PubSub topic. To ensure delivery, we ack the message only onSuccess in the publish callback.
I'm assuming that when I call AckReplyConsumer consumer.ack() the Google PubSub client lib internally batches and send the acks to PubSub and so there is no need for me to batch. Am I right in my assumption?
The reason why I use MoreExecutors.directExecutor() for the callback is to log the time for the response to reach the callback function. Once I log I immediately delegate the acking to the callBackExecutor. So there is no CPU intensive operation within the callback. The delay I see is in reaching the callback. I checked the publish latency to the topic in PubSub
The p99 here is around 50ms. So I think the response reaches my service but it is stuck in the grpc/netty layer before it reaches the callback. I tried printing the thread name and stacktrace inside the callback function and I see that it is using the Publisher executor threads
13:16:06.448 [pubsub-connect-system-3] INFO c.f.d.d.load.pubsub.PubsubPublisher - Published message with id = 10192072744586459, threadName = pubsub-connect-system-3, stackTrace = java.lang.Throwable
at com.f.d.d.load.pubsub.PubsubPublisher$1.onSuccess(PubsubPublisher.java:55)
at com.f.d.d.load.pubsub.PubsubPublisher$1.onSuccess(PubsubPublisher.java:35)
at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
at com.google.api.core.SettableApiFuture.set(SettableApiFuture.java:46)
at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onSuccess(Publisher.java:581)
at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1900(Publisher.java:537)
at com.google.cloud.pubsub.v1.Publisher$3.onSuccess(Publisher.java:483)
at com.google.cloud.pubsub.v1.Publisher$3.onSuccess(Publisher.java:470)
at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:203)
at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:115)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onSuccess(GrpcExceptionCallable.java:88)
at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:563)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:536)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
I have tried the following
Really appreciate your response
Thanks
There is a syntax error in the sample codes under above "Error Handling and Retries" section:
Discard above posting for syntax error, the setFlowControlSettings(FlowControlSettings) should be set on BatchingSettings, not on Publisher.Builder.
Thanks a lot for your response.
Our service reads from a topic using StreamingPull and publishes to a PubSub topic. So we ack the message on successful publish to the target topic.
As far as using DirectExecutors for the ApiFutureCallback, the reason I do it is to only log the duration from the time I published to the time the onSuccess method is called. I immediately delegate the acking to the callBackExecutor. There is no other CPU intensive operation in the onSuccess method.
I have different executors for my Publisher and callBackExecutor. The callBackExcutor I believe is sufficiently ( at least from the profiling POV) as I see most of them to be idle. But I will still take a look. Meanwhile I logged a Thread StackTrace in the OnSuccess method and this is what I see
at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
at com.google.api.core.SettableApiFuture.set(SettableApiFuture.java:46)
at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onSuccess(Publisher.java:581)
at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1900(Publisher.java:537)
at com.google.cloud.pubsub.v1.Publisher$3.onSuccess(Publisher.java:483)
at com.google.cloud.pubsub.v1.Publisher$3.onSuccess(Publisher.java:470)
at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:203)
at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:115)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onSuccess(GrpcExceptionCallable.java:88)
at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1135)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:784)
at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:563)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:536)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Could it be possible that the responses are stuck somewhere in grpc/netty call stack?
Also I verified the publish latency in GCP monitoring and the latencies are quite low
So as per my understanding it looks like the response is coming from pubsub but my service is taking time to process the response. Is my observation correct?
Thanks again for the very detailed response.
Hi @kvajjala59 ,
The Pub/Sub client libraries handle internal batching of acknowledgments (ACKs) based on internal thresholds and optimizations. You do not need to implement additional batching for ACKs in your code.
While using MoreExecutors.directExecutor()
for immediate logging is efficient, the subsequent delegation to callbackExecutor
might introduce contention if the latter isn't properly configured for high concurrency. If callbackExecutor
has a limited number of threads, incoming responses may queue up, causing delays in reaching the callbacks.
Your logs show that callbacks are running on pubsub-connect-system
threads, which belong to the publisher's internal thread pool. If this pool is saturated due to a high publishing rate, it could delay callback execution. Even if the callback processing isn't CPU-intensive, insufficient thread availability can still introduce delays.
Despite the p99 publish latency being around 50ms, occasional spikes or network variations could affect individual message delivery, leading to delays in triggering callbacks. The observed latency spike could be related to transient network issues or service load variations.
Potential bottlenecks in your client-side code, such as resource contention or inefficient logging mechanisms, could contribute to delays. Inefficient handling of response data or blocking operations within the callback might exacerbate the issue.
Here is how to troubleshoot:
Analyze the callbackExecutor:
Ensure it's a thread pool executor with an appropriate number of threads to handle expected concurrency. Adjust the thread pool size and monitor the impact on callback delays.
Example configuration:
ExecutorService callbackExecutor = Executors.newFixedThreadPool(10); // Adjust size as needed
Tune Publisher Settings:
Experiment with setMaxOutstandingMessages()
and setMaxOutstandingBytes()
settings in your Publisher configuration to reduce the load on the publisher's internal thread pool.
Example:
Publisher publisher = Publisher.newBuilder(targetTopic)
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
.setMaxOutstandingRequestBytes(10_000_000L)
.build())
.build();
Profile Your Application:
Use profiling tools to identify performance bottlenecks in your application, particularly within the callback and logging operations. If logging is identified as a blocking operation, consider using asynchronous logging mechanisms.
Enable Additional Logging:
Log detailed timing information at various stages within the callback execution, including when the response is received, when logging completes, and when the ACK is sent. This will help isolate where the delay occurs.
Monitor Network and Pub/Sub Metrics:
Continuously monitor Pub/Sub publish latency metrics, as well as network latency and throughput metrics between your service and Pub/Sub. Look for correlations between latency spikes and callback delays to identify patterns or issues.
If optimizing callback execution does not resolve the issue, you might explore an alternative ACK strategy. Consider acknowledging the message immediately after publishing and handling potential failures through a dead-letter topic or other error-handling mechanisms. This approach may reduce latency at the cost of added complexity in error management.
Please Note: While you cannot directly add a tracer to the Publisher, you can use tools like OpenTelemetry or similar to trace interactions with Pub/Sub. These tools provide insights into request/response timings and can help diagnose performance issues.