Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

publisher.publish(message) is not processing messages

This code runs forever, testing this code in local

@GetMapping (value = "/callResourceByKey/{key}",produces={"application/json"})
public ResponseEntity<Response> getResponse(){
GcpPubSubConfig gcpPubSubConfig = new GcpPubSubConfig();
try {
gcpPubSubConfig.publishWithErrorHandler("hello", "project", "topic");
}
catch (Exception e){
System.out.println("error"+e.getMessage());
}
}

onSuccess and onFailure are not being called and it's waiting forever in publisher.shutdown()

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;


@Component
public class GcpPubSubConfig {

@Async
public void publishWithErrorHandler(String message, String projectId, String topicId) throws IOException, InterruptedException {

TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
publisher = Publisher.newBuilder(topicName).build();
System.out.println("hello1");
ByteString data = ByteString.copyFromUtf8(message);
System.out.println("hello2");

PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
System.out.println("hello3");

ApiFuture<String> future = publisher.publish(pubsubMessage);
System.out.println("hello4");


ApiFutures.addCallback(future, new ApiFutureCallback<String>() {
public void onSuccess(String messageId) {
// Ignore for now
System.out.println("hellosuc");
}
public void onFailure(Throwable t) {
if (t instanceof ApiException) {
System.out.println("helloerr");

ApiException apiException = ((ApiException) t);
}
}
}, Executors.newSingleThreadExecutor());
System.out.println("hello5");
} finally {
System.out.println("hello6");
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
System.out.println("hello7");
publisher.shutdown();
System.out.println("hello8");

publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
System.out.println("hello9");
}

}

 

:: Spring Boot :: (v2.7.2)

3408 --- [ main] DemoApplicationResource : No active profile set, falling back to 1 default profile: "default"
3408 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Datastore repositories in DEFAULT mode.
3408 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 18 ms. Found 0 Datastore repository interfaces.
3408 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
3408 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8086 (http)
3408 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
3408 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.58]
3408 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
3408 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2765 ms
3408 --- [ main] o.s.c.g.a.c.GcpContextAutoConfiguration : The default project ID is project
3408 --- [ main] c.g.a.oauth2.DefaultCredentialsProvider : Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error. For more information about service accounts, see https://cloud.google.com/docs/authentication/.
3408 --- [ main] o.s.c.g.core.DefaultCredentialsProvider : Default credentials provider for user .apps.googleusercontent.com
3408 --- [ main] o.s.c.g.core.DefaultCredentialsProvider : Scopes in use by default credentials: [https://www.googleapis.com/auth/pubsub, https://www.googleapis.com/auth/spanner.admin, https://www.googleapis.com/auth/spanner.data, https://www.googleapis.com/auth/datastore, https://www.googleapis.com/auth/sqlservice.admin, https://www.googleapis.com/auth/devstorage.read_only, https://www.googleapis.com/auth/devstorage.read_write, https://www.googleapis.com/auth/cloudruntimeconfig, https://www.googleapis.com/auth/trace.append, https://www.googleapis.com/auth/cloud-platform, https://www.googleapis.com/auth/cloud-vision, https://www.googleapis.com/auth/bigquery, https://www.googleapis.com/auth/monitoring.write]
3408 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 1 endpoint(s) beneath base path '/actuator'
3408 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8086 (http) with context path ''
3408 --- [ main] DemoApplicationResource : Started DemoApplicationResource in 16.485 seconds (JVM running for 17.372)
3408 --- [nio-8086-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
3408 --- [nio-8086-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
3408 --- [nio-8086-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms

0 1 910
1 REPLY 1

Aug 17, 2022 12:14:22 PM com.google.auth.oauth2.DefaultCredentialsProvider warnAboutProblematicCredentials
WARNING: Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error. For more information about service accounts, see https://cloud.google.com/docs/authentication/.
12:14:22.246 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
12:14:22.255 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
12:14:22.255 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 17
12:14:22.258 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
12:14:22.259 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
12:14:22.259 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available
12:14:22.260 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
12:14:22.261 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: unavailable: Reflective setAccessible(true) disabled
12:14:22.262 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
12:14:22.263 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable: class io.netty.util.internal.PlatformDependent0$7 cannot access class jdk.internal.misc.Unsafe (in module java.base) because module java.base does not export jdk.internal.misc to unnamed module @4b2bac3f
12:14:22.265 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): unavailable
12:14:22.265 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
12:14:22.283 [main] DEBUG io.netty.util.internal.PlatformDependent - maxDirectMemory: 8510242816 bytes (maybe)
12:14:22.283 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\sxt415\AppData\Local\Temp\2 (java.io.tmpdir)
12:14:22.284 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
12:14:22.284 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
12:14:22.285 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: -1 bytes
12:14:22.285 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
12:14:22.287 [main] DEBUG io.netty.util.internal.CleanerJava9 - java.nio.ByteBuffer.cleaner(): available
12:14:22.287 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
12:14:22.492 [main] DEBUG io.netty.handler.ssl.OpenSsl - netty-tcnative not in the classpath; OpenSslEngine will be unavailable.
12:14:22.707 [main] DEBUG io.netty.handler.ssl.JdkSslContext - Default protocols (JDK): [TLSv1.3, TLSv1.2]
12:14:22.707 [main] DEBUG io.netty.handler.ssl.JdkSslContext - Default cipher suites (JDK): [TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, TLS_RSA_WITH_AES_128_GCM_SHA256, TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA, TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384]
12:14:22.736 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
12:14:22.762 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
12:14:22.762 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
12:14:22.771 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
12:14:22.771 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
12:14:22.781 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
hello
hello
hello
hello4
hello5
hello6
hello7
12:14:23.259 [grpc-default-executor-0] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
12:14:23.259 [grpc-default-executor-0] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
12:14:23.267 [grpc-default-executor-0] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
12:14:23.268 [grpc-default-executor-0] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
12:14:23.269 [grpc-default-executor-0] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@72256005
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false
12:14:23.320 [grpc-default-executor-0] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
12:14:23.339 [grpc-default-executor-0] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 3480 (auto-detected)
12:14:23.341 [grpc-default-executor-0] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
12:14:23.341 [grpc-default-executor-0] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
12:14:23.352 [grpc-default-executor-0] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
12:14:23.353 [grpc-default-executor-0] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
12:14:23.391 [grpc-default-executor-0] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:05:9a:ff:fe:3c:7a:00 (auto-detected)
12:14:23.408 [grpc-default-executor-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
12:14:23.408 [grpc-default-executor-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
12:14:23.408 [grpc-default-executor-0] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
12:14:23.426 [grpc-default-executor-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
12:14:23.426 [grpc-default-executor-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
12:14:23.426 [grpc-default-executor-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.chunkSize: 32
12:14:23.426 [grpc-default-executor-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.blocking: false
12:14:23.761 [grpc-nio-worker-ELG-1-3] DEBUG io.netty.handler.ssl.SslHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] HANDSHAKEN: protocol:TLSv1.3 cipher suite:TLS_AES_128_GCM_SHA256
12:14:23.767 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] OUTBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
12:14:23.770 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
12:14:23.861 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] INBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=100, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=65536}
12:14:23.862 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] OUTBOUND SETTINGS: ack=true
12:14:23.863 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
12:14:23.864 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] INBOUND SETTINGS: ack=true
12:18:23.867 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] INBOUND GO_AWAY: lastStreamId=2147483647 errorCode=0 length=17 bytes=73657373696f6e5f74696d65645f6f7574
12:18:23.869 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] INBOUND PING: ack=false bytes=0
12:18:23.869 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] OUTBOUND PING: ack=true bytes=0
12:18:23.968 [grpc-nio-worker-ELG-1-3] DEBUG io.grpc.netty.NettyClientHandler - [id: 0x8c6318bd, L:/172.17.222.19:62755 - R:pubsub.googleapis.com/199.36.153.7:443] INBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=17 bytes=73657373696f6e5f74696d65645f6f7574