Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Facing issue while publishing Instant.now() (in java code) on google pubsub topic

 

 

Hi Team,

Please let me know workaround for this or let me know if I am missing anything.

 

 

{
    "eventDate": "2023-06-28T18:22:54.998Z"
}

 

 

When I am trying to publish above pubsub message onto pubsub topic having below AVRO schema  getting error (added inline stacktrace)

 

{
    "name": "eventDate",
    "type": {
    "type": "long",
  "logicalType": "timestamp-millis"
}

 


Getting below error while publishing message in above avro schema.

 

[2023-06-28 18:11:17,903] [ERROR] Exception occurred while publishing pubsubMessage projects/clover-dev-managed/topics/ephemeral-na-boarding-service-main-topic topic: java.util.concurrent.ExecutionException: com.google.cloud.spring.pubsub.core.PubSubDeliveryException: Publishing to projects/clover-dev-managed/topics/ephemeral-na-boarding-service-main-topic topic failed.; nested exception is com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.


complete stacktrace:-

 

com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
        at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
        at com.google.common.util.concurrent.Futures$4.run(Futures.java:1126)
        at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:399)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:902)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:813)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:677)
        at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
        at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
        at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
        at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
        at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
        at io.grpc.Status.asRuntimeException(Status.java:539)

 



1 3 1,638
3 REPLIES 3

The error message indicates that the data you're trying to publish to Pub/Sub is not compatible with the schema you have defined.

You're trying to publish an ISO 8601 formatted timestamp (e.g., "2023-06-28T18:22:54.998Z") as a string, but your Avro schema specifies the "eventDate" field as a long type with a logical type of "timestamp-millis".

The "timestamp-millis" logical type represents the number of milliseconds since the epoch (1970-01-01T00:00:00Z), not a string formatted timestamp. When you're trying to publish the message, the schema validation is failing because the string formatted timestamp is not a valid long.

To resolve this issue, you could do one of the following:

  1. Change the Schema: If you want to keep using the ISO 8601 formatted timestamps, you could change the Avro schema to expect a string for the "eventDate" field.

  2. Change the Message: If you want to keep the current Avro schema, you should convert the timestamp to the number of milliseconds since the epoch before publishing it. In Java, you can do this with the Instant.now().toEpochMilli() method.

Here's an example:

Instant now = Instant.now();

long millis = now.toEpochMilli();

// Now you can publish `millis` to Pub/Sub

Remember to convert the long to a String if the data you are sending to Pub/Sub is a String.

 

String millisString = String.valueOf(millis);

When I use below library to serialize then it converts it to long and also at subscriber side I deserialized message using same library :- 

https://avro.apache.org/docs/1.11.1/getting-started-java/#creating-users-1

Below schema is working fine with google pubsub :-

{
    "name": "eventDate",
    "type": {
    "type": "long",
  "logicalType": "timestamp-millis"
}

 

Thanks for your help.

This one also resolve when I serialised message before publish it to google pubsub.

https://www.googlecloudcommunity.com/gc/Data-Analytics/Not-supporting-AVRO-schema-with-default-null-...