I'm running service on GAE to consume the message from GCP Pubsub with below configuration and it takes only ~40ms to ack the messages.
Java8 :
Spring boot : 2.1.5.RELEASE
GCP Pubsub : 1.82.0
- Total time taken by processing 3 out of 3 messages by 38ms
With Java 17 and the configuration below, we have observed an increase in message acknowledgment latency in GCP PubSub :
Spring boot : 2.7.9
GCP Pubsub : 1.82.0 or 1.129.0
- Total time taken by ack processing 3 out of 3 messages by 234ms
The main culprit method is :
public Boolean deleteMessages(final List<String> ackIds, final PubSubConfig pubSubConfig) throws Exception {
final SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings
.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20MB
.build()
).build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
final String subscriptionName = ProjectSubscriptionName.format(pubSubConfig.getProjectId(), pubSubConfig.getSubscriptionId());
if (!ackIds.isEmpty()) {
// acknowledge received messages
final AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
return true;
}
}
The increase in message acknowledgment latency when migrating from Java 8 to Java 17 in your Google Cloud Pub/Sub application can be attributed to several factors related to changes in the Java runtime, library versions, and configuration settings. Here's a revised approach to diagnose and address this issue:
SubscriberStubSettings
. Ensure they are optimized for Java 17 and your specific workload. Pay close attention to keepAlive settings, message size limits, and thread pool configurations.deleteMessages
method efficiently batches acknowledgments to minimize the number of RPC calls to the Pub/Sub service.SubscriberStubSettings
. Ensure that retries are configured appropriately to handle transient errors without introducing excessive delays.Additional Recommendations:
Example Updated Code (with batching):
public Boolean deleteMessages(final List<String> ackIds, final PubSubConfig pubSubConfig) throws Exception {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20MB
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(pubSubConfig.getProjectId(), pubSubConfig.getSubscriptionId());
if (!ackIds.isEmpty()) {
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
return true;
}
}
Thanks @ms4446 I found the issue.