Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

GCPPub

Our application is using GCP PubSub.
We have setup the consumer as

spring.cloud.gcp.pubsub.function.definition= receiveMessage

I am getting the messages and all, but the problem is,
its working similar to the sync flow.
consumer and producer both are running on the same thread.

also, the producer flow waits for the consumer method to complete, and then completes its execution.
Its similar to a method call and in that case the pubsub is not even required.
So what do I need to do in my spring boot application for the pub/sub consumer to behave as async instead of sync.
I am stuck from a week and did not find any solution yet.

 

 

 

Solved Solved
0 4 1,234
1 ACCEPTED SOLUTION

The original issue is with the consumer being blocked, not the producer. The streamBridge.send() method is not blocking, but the consumer might be blocking the producer if it is not processing messages asynchronously.

Revised triggerMessage method

public CompletableFuture<Void> triggerMessage(MyMessage msg) {
    // Send the message using StreamBridge
    boolean sent = streamBridge.send("polling-msg", msg);

    // Create a CompletableFuture that is already completed with the result of the send operation
    CompletableFuture<Void> future = sent ? CompletableFuture.completedFuture(null)
                                          : CompletableFuture.failedFuture(new RuntimeException("Failed to send message"));

    // Handle the result of the message send operation
    future.thenAccept(result -> {
        // Handle the successful send operation
    }).exceptionally(throwable -> {
        // Handle the failed send operation
        log.error("Failed to send message", throwable);
        // Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
        return null;
    });

    return future;
}

Code comment

I have changed the comment in the revised triggerMessage method to be more accurate and provide more guidance:

// Handle the result of the message send operation
future.thenAccept(result -> {
    // Handle the successful send operation
}).exceptionally(throwable -> {
    // Handle the failed send operation
    log.error("Failed to send message", throwable);
    // Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
    return null;
});

Error handling

I have added a note about the complexity of error handling in real-world applications, and provided some examples of appropriate actions to take:

In a real-world application, the error handling might be more complex, and could involve retrying the message, sending it to a dead-letter queue, or taking other actions based on the specific type of exception.

View solution in original post

4 REPLIES 4

To make your Pub/Sub consumer behave as async, you should:

  1. Configure the subscriber executor threads. You can do this by setting the following property in your application.properties file:
spring.cloud.gcp.pubsub.subscriber.executor-threads=4

This sets the number of threads to be used by the subscriber.

  1. Make sure your message processing is non-blocking or handled asynchronously. This means that your message processing code should not block the main thread. If your message processing code is blocking, you can use a thread pool to handle it asynchronously.

  2. Optionally, set the acknowledgment mode to MANUAL if you need to control when to acknowledge the message. This is not required to make your consumer asynchronous, but it can be helpful in some scenarios.

Creating and managing the ExecutorService

The ExecutorService needs to be created and managed, preferably as a Spring bean. Here is an example of how to create and configure the ExecutorService bean

@Bean
public ExecutorService executorService() {
    return Executors.newFixedThreadPool(4);
}

This creates a thread pool with 4 threads. You can adjust the number of threads depending on your needs.

Alternative example for non-Spring Cloud Stream users

If you are not using Spring Cloud Stream, you can use the @ServiceActivator annotation to bind to a Pub/Sub subscription. Here is an example:

@Service
public class PubSubConsumer {

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void receiveMessage(PubsubMessage message) {
        // Process the message here
    }
}

The pubsubInputChannel channel must be configured to receive messages from the Pub/Sub subscription.

Testing

Make sure to test your application thoroughly to ensure it behaves as expected under load.

Additional notes

  • The @PubsubListenerannotation is part of the spring-cloud-gcp-pubsub-stream-binder module, which is an extension of the Spring Cloud Stream project. If you are not using Spring Cloud Stream, you can use the @ServiceActivatorannotation or another method to bind to a Pub/Sub subscription.
  • The ExecutorService that you pass to the PubSubConsumerconstructor needs to be created and managed, preferably as a Spring bean. This ensures that the ExecutorService is properly initialized and destroyed when your application starts and stops.

I hope this is helpful!

Hi,
Thanks for the faster response.
We are not using any Listeners but the function definition property.
I am adding some of the properties here for your reference.
Also, This was already added.

@Bean public ExecutorService executorService() { return Executors.newFixedThreadPool(4); }

Still The publisher is blocked on subscriber's response.

spring.cloud:
  gcp.pubsub:
    project-id: my-project
    credentials.encoded-key: "<MY_KEY>"
    publisher:
      batching:
        enabled: true
    subscriber:
      executor-threads: 4
    ack-deadline-seconds: 2
  function:
    definition: receiveMessage
  stream:
    default-binder: pubsub
    gcp:
    function:
      bindings:
        receiveMessage-in-0: polling-msg
    bindings:
      input:
        consumer:
          concurrency: 5
      polling-msg:
        binder: pubsub
        destination: topic
        group: topic-goup

This is the consumer:

@Bean
public Consumer receiveMessage() {

//Do something

}


And producer :

public boolean triggerMessage(MyMessage msg) {
return streamBridge.send("polling-msg", msg);
}

The original issue is with the consumer being blocked, not the producer. The streamBridge.send() method is not blocking, but the consumer might be blocking the producer if it is not processing messages asynchronously.

Revised triggerMessage method

public CompletableFuture<Void> triggerMessage(MyMessage msg) {
    // Send the message using StreamBridge
    boolean sent = streamBridge.send("polling-msg", msg);

    // Create a CompletableFuture that is already completed with the result of the send operation
    CompletableFuture<Void> future = sent ? CompletableFuture.completedFuture(null)
                                          : CompletableFuture.failedFuture(new RuntimeException("Failed to send message"));

    // Handle the result of the message send operation
    future.thenAccept(result -> {
        // Handle the successful send operation
    }).exceptionally(throwable -> {
        // Handle the failed send operation
        log.error("Failed to send message", throwable);
        // Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
        return null;
    });

    return future;
}

Code comment

I have changed the comment in the revised triggerMessage method to be more accurate and provide more guidance:

// Handle the result of the message send operation
future.thenAccept(result -> {
    // Handle the successful send operation
}).exceptionally(throwable -> {
    // Handle the failed send operation
    log.error("Failed to send message", throwable);
    // Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
    return null;
});

Error handling

I have added a note about the complexity of error handling in real-world applications, and provided some examples of appropriate actions to take:

In a real-world application, the error handling might be more complex, and could involve retrying the message, sending it to a dead-letter queue, or taking other actions based on the specific type of exception.

TSYM ms4446
The idea of making triggerMessage async helped and now I am able to consume the messages asynchronously.
This has saved us from a lot of blocking calls in between.
Appreciate the fast response. Thanks again