Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

GCPPub

Our application is using GCP PubSub.
We have setup the consumer as

spring.cloud.gcp.pubsub.function.definition= receiveMessage

I am getting the messages and all, but the problem is,
its working similar to the sync flow.
consumer and producer both are running on the same thread.

also, the producer flow waits for the consumer method to complete, and then completes its execution.
Its similar to a method call and in that case the pubsub is not even required.
So what do I need to do in my spring boot application for the pub/sub consumer to behave as async instead of sync.
I am stuck from a week and did not find any solution yet.

 

 

 

Solved Solved
0 4 1,241
1 ACCEPTED SOLUTION

The original issue is with the consumer being blocked, not the producer. The streamBridge.send() method is not blocking, but the consumer might be blocking the producer if it is not processing messages asynchronously.

Revised triggerMessage method

public CompletableFuture<Void> triggerMessage(MyMessage msg) {
    // Send the message using StreamBridge
    boolean sent = streamBridge.send("polling-msg", msg);

    // Create a CompletableFuture that is already completed with the result of the send operation
    CompletableFuture<Void> future = sent ? CompletableFuture.completedFuture(null)
                                          : CompletableFuture.failedFuture(new RuntimeException("Failed to send message"));

    // Handle the result of the message send operation
    future.thenAccept(result -> {
        // Handle the successful send operation
    }).exceptionally(throwable -> {
        // Handle the failed send operation
        log.error("Failed to send message", throwable);
        // Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
        return null;
    });

    return future;
}

Code comment

I have changed the comment in the revised triggerMessage method to be more accurate and provide more guidance:

// Handle the result of the message send operation
future.thenAccept(result -> {
    // Handle the successful send operation
}).exceptionally(throwable -> {
    // Handle the failed send operation
    log.error("Failed to send message", throwable);
    // Take appropriate action based on the exception, such as retrying the send operation or sending the message to a dead-letter queue
    return null;
});

Error handling

I have added a note about the complexity of error handling in real-world applications, and provided some examples of appropriate actions to take:

In a real-world application, the error handling might be more complex, and could involve retrying the message, sending it to a dead-letter queue, or taking other actions based on the specific type of exception.

View solution in original post

4 REPLIES 4