Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Issue with dataflow template "Text files on gcs to pubsub"

I got a error message with SDK version 2.48.0 while everything is normal in 2.47.0:

Show More
Error message from worker: javax.naming.SizeLimitExceededException: Pubsub message of length 264 exceeds maximum of 100 bytes, when considering the payload and attributes. See https://cloud.google.com/pubsub/quotas#resource_limits org.apache.beam.sdk.io.gcp.pubsub.PreparePubsubWriteDoFn.validatePubsubMessageSize(PreparePubsubWriteDoFn.java:101) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Write$PubsubBoundedWriter.processElement(PubsubIO.java:1413)

 

1 4 794
4 REPLIES 4

It appears that you're encountering an error due to a size limit in Pub/Sub. The error message states that a PubSub message of length 264 exceeds the maximum of 100 bytes. This limit considers both the payload and attributes of the message. Here are some steps you can take to address the issue:

  1. Implement a Custom Transform: Before the PubsubIO.write() step, implement a custom transform that checks the size of the PubsubMessage. This transform should only allow messages with payloads less than the maximum allowed size (for example, 7 MB) to proceed to the PubsubIO.write() step. Here's an example of how to implement this in Java:

 

PCollectionList<PubsubMessage> limitedPayloads = input .apply("Limit payload size", Partition .of(2, new PartitionFn<PubsubMessage>() { public int partitionFor(PubsubMessage message, int numPartitions) { return message.getPayload().length < 7 * 1000 * 1000 ? 0 : 1; } })); limitedPayloads.get(0).apply(PubsubIO.write().topic("your-topic-name")); limitedPayloads.get(1).apply(FileIO.write().via(Contextful.fn((PubsubMessage message, Context context) -> message.getPayload())).withDestinationCoder(ByteArrayCoder.of()).to("your-path-in-gcs"));

In this code, the Partition transform is used to split the incoming messages into two collections based on their size. Messages with a payload less than 7 MB are placed in one collection, and the rest are placed in another. The smaller messages are then written to Pubsub, while the larger ones are written to a file in Google Cloud Storage (GCS)​.

  1. Modify Your Data: If the message size limitation is due to the nature of your data, you might need to consider ways to reduce the size of your data. This could involve compressing your data, splitting larger messages into smaller ones, or removing unnecessary information from your messages.

Hi, Can you explain to me why maximum allowed size is 7MB but the error message say that it only 100 bytes?

Sorry for the confusion. Yes, the error message you're receiving seems unusual as the typical maximum message size for Pub/Sub  (100 bytes).

The discrepancy may stem from the specific operation or transform being performed in your Dataflow pipeline?

For instance, the error message mentions PreparePubsubWriteDoFn.validatePubsubMessageSize, which could be a function specific to your pipeline or to the Apache Beam SDK that is imposing a smaller limit for some reason?

Without more context about your specific pipeline and the operations being performed, it's hard to definitively explain why you're seeing a 100 byte limit.

It would be beneficial to review the code or configuration related to PreparePubsubWriteDoFn.validatePubsubMessageSize and the usage of PubsubIO in your pipeline. You may need to examine whether there is any part of your pipeline that is setting a size limit of 100 bytes, or if the error is perhaps misreported.

Hi,

I faced the same issue and I fixed it by changing the dataflow template version, I was using the latest version (gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub) and we are now using the penultimate version (gs://dataflow-templates/2023-04-18-00_RC00/GCS_Text_to_Cloud_PubSub). 

Our job is now executing under the 2.46.0 SDK version instead of the 2.48.0 with this change.