Our application is using GCP PubSub.
We have setup the consumer as
spring.cloud.gcp.pubsub.function.definition= receiveMessage
I am getting the messages and all, but the problem is,
its working similar to the sync flow.
consumer and producer both are running on the same thread.
also, the producer flow waits for the consumer method to complete, and then completes its execution.
Its similar to a method call and in that case the pubsub is not even required.
So what do I need to do in my spring boot application for the pub/sub consumer to behave as async instead of sync.
I am stuck from a week and did not find any solution yet.
Solved! Go to Solution.
The original issue is with the consumer being blocked, not the producer. The streamBridge.send()
method is not blocking, but the consumer might be blocking the producer if it is not processing messages asynchronously.
Revised triggerMessage method
public CompletableFuture<Void> triggerMessage(MyMessage msg) {
// Send the message using StreamBridge
boolean sent = streamBridge.send("polling-msg", msg);
// Create a CompletableFuture that is already completed with the result of the send operation
CompletableFuture<Void> future = sent ? CompletableFuture.completedFuture(null)
: CompletableFuture.failedFuture(new RuntimeException("Failed to send message"));
// Handle the result of the message send operation
future.thenAccept(result -> {
// Handle the successful send operation
}).exceptionally(throwable -> {
// Handle the failed send operation
log.error("Failed to send message", throwable);
// Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
return null;
});
return future;
}
Code comment
I have changed the comment in the revised triggerMessage
method to be more accurate and provide more guidance:
// Handle the result of the message send operation
future.thenAccept(result -> {
// Handle the successful send operation
}).exceptionally(throwable -> {
// Handle the failed send operation
log.error("Failed to send message", throwable);
// Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
return null;
});
Error handling
I have added a note about the complexity of error handling in real-world applications, and provided some examples of appropriate actions to take:
In a real-world application, the error handling might be more complex, and could involve retrying the message, sending it to a dead-letter queue, or taking other actions based on the specific type of exception.
To make your Pub/Sub consumer behave as async, you should:
spring.cloud.gcp.pubsub.subscriber.executor-threads=4
This sets the number of threads to be used by the subscriber.
Make sure your message processing is non-blocking or handled asynchronously. This means that your message processing code should not block the main thread. If your message processing code is blocking, you can use a thread pool to handle it asynchronously.
Optionally, set the acknowledgment mode to MANUAL if you need to control when to acknowledge the message. This is not required to make your consumer asynchronous, but it can be helpful in some scenarios.
Creating and managing the ExecutorService
The ExecutorService
needs to be created and managed, preferably as a Spring bean. Here is an example of how to create and configure the ExecutorService
bean
@Bean
public ExecutorService executorService() {
return Executors.newFixedThreadPool(4);
}
This creates a thread pool with 4 threads. You can adjust the number of threads depending on your needs.
Alternative example for non-Spring Cloud Stream users
If you are not using Spring Cloud Stream, you can use the @ServiceActivator
annotation to bind to a Pub/Sub subscription. Here is an example:
@Service
public class PubSubConsumer {
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void receiveMessage(PubsubMessage message) {
// Process the message here
}
}
The pubsubInputChannel
channel must be configured to receive messages from the Pub/Sub subscription.
Testing
Make sure to test your application thoroughly to ensure it behaves as expected under load.
Additional notes
@PubsubListener
annotation is part of the spring-cloud-gcp-pubsub-stream-binder
module, which is an extension of the Spring Cloud Stream project. If you are not using Spring Cloud Stream, you can use the @ServiceActivator
annotation or another method to bind to a Pub/Sub subscription.ExecutorService
that you pass to the PubSubConsumer
constructor needs to be created and managed, preferably as a Spring bean. This ensures that the ExecutorService
is properly initialized and destroyed when your application starts and stops.I hope this is helpful!
Hi,
Thanks for the faster response.
We are not using any Listeners but the function definition property.
I am adding some of the properties here for your reference.
Also, This was already added.
@Bean public ExecutorService executorService() { return Executors.newFixedThreadPool(4); }
Still The publisher is blocked on subscriber's response.
spring.cloud:
gcp.pubsub:
project-id: my-project
credentials.encoded-key: "<MY_KEY>"
publisher:
batching:
enabled: true
subscriber:
executor-threads: 4
ack-deadline-seconds: 2
function:
definition: receiveMessage
stream:
default-binder: pubsub
gcp:
function:
bindings:
receiveMessage-in-0: polling-msg
bindings:
input:
consumer:
concurrency: 5
polling-msg:
binder: pubsub
destination: topic
group: topic-goup
This is the consumer:
@Bean
public Consumer receiveMessage() {
//Do something
}
And producer :
public boolean triggerMessage(MyMessage msg) {
return streamBridge.send("polling-msg", msg);
}
The original issue is with the consumer being blocked, not the producer. The streamBridge.send()
method is not blocking, but the consumer might be blocking the producer if it is not processing messages asynchronously.
Revised triggerMessage method
public CompletableFuture<Void> triggerMessage(MyMessage msg) {
// Send the message using StreamBridge
boolean sent = streamBridge.send("polling-msg", msg);
// Create a CompletableFuture that is already completed with the result of the send operation
CompletableFuture<Void> future = sent ? CompletableFuture.completedFuture(null)
: CompletableFuture.failedFuture(new RuntimeException("Failed to send message"));
// Handle the result of the message send operation
future.thenAccept(result -> {
// Handle the successful send operation
}).exceptionally(throwable -> {
// Handle the failed send operation
log.error("Failed to send message", throwable);
// Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
return null;
});
return future;
}
Code comment
I have changed the comment in the revised triggerMessage
method to be more accurate and provide more guidance:
// Handle the result of the message send operation
future.thenAccept(result -> {
// Handle the successful send operation
}).exceptionally(throwable -> {
// Handle the failed send operation
log.error("Failed to send message", throwable);
// Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
return null;
});
Error handling
I have added a note about the complexity of error handling in real-world applications, and provided some examples of appropriate actions to take:
In a real-world application, the error handling might be more complex, and could involve retrying the message, sending it to a dead-letter queue, or taking other actions based on the specific type of exception.
TSYM ms4446
The idea of making triggerMessage async helped and now I am able to consume the messages asynchronously.
This has saved us from a lot of blocking calls in between.
Appreciate the fast response. Thanks again