I have implemented a Pub/Sub model using Google Cloud Pub/Sub, employing ordering keys to ensure that messages with the same key are processed in order. Here's a simplified version of my publishing logic:
Publisher publisher = Publisher.newBuilder(topicName).build(); String[] orderingKeys = {"OrderKey1", "OrderKey2", "OrderKey3"}; // Multiple ordering keys for (String orderingKey : orderingKeys) { for (int i = 1; i <= 500; i++) { // Publish 500 messages for each ordering key String messageStr = "Message " + i + " for " + orderingKey; ByteString data = ByteString.copyFromUtf8(messageStr); PubsubMessage pubsubMessage = PubsubMessage.newBuilder() .setData(data) .setOrderingKey(orderingKey) .build(); ApiFuture<String> future = publisher.publish(pubsubMessage); System.out.println("Published message ID: " + future.get()); } }
And my subscriber simply acknowledges the messages as they come in, printing out their content.
Despite the ordering keys ensuring that messages with the same key are ordered, I am facing an issue where messages from different ordering keys are not received in the sequence they were published. For example, I might receive messages in the following sequence:
Received message: Message 1 for OrderKey1
Received message: Message 2 for OrderKey1
Received message: Message 1 for OrderKey2
Received message: Message 3 for OrderKey1
Received message: Message 2 for OrderKey2
This is problematic for my use case as I need the messages to be processed exactly in the sequence they were published, regardless of the ordering key.
I am looking for a solution or design pattern that allows me to maintain the global ordering of messages across multiple ordering keys. Ideally, messages should be received and processed in the exact sequence they were published by the publisher, irrespective of the ordering key used.
Is there a way to achieve this in Google Cloud Pub/Sub, or should I consider an alternative approach or solution to meet this requirement?
Pub/Sub prioritizes scalability and throughput. To achieve this, it offers guaranteed message ordering only within a single ordering key. This design decision has implications when your application requires strict ordering across multiple keys.
Potential Solutions
Single Ordering Key:
Sequencing Layer:
Hybrid Approach:
Choosing the Right Approach
Consider the following factors:
Example
import redis # For sequence number management
from google.cloud import pubsub_v1
# Setup Redis and Pub/Sub clients
redis_client = redis.Redis()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('your-project-id', 'your-topic-name')
def publish_with_sequence(message_data):
global_sequence_id = redis_client.incr('global_message_sequence')
pubsub_message = pubsub_v1.types.PubsubMessage(
data=message_data.encode('utf-8'),
attributes={"sequence_id": str(global_sequence_id)},
ordering_key='single_ordering_key' # Unified ordering key
)
future = publisher.publish(topic_path, pubsub_message)
return future.result()
Important Considerations
Is it possible to have both ordering key and filtering and if so which takes precedence in the order of operations? I can understand having multiple ordering keys as in above example but what about having 1 ordering key with several attributes? is this even possible?
In Pub/Sub, it is possible to use both ordering keys and message filtering to achieve the desired processing behavior for messages. The ordering key ensures that messages with the same key are delivered in the order they were published, while filtering allows subscribers to receive only those messages that match specified attributes.
The ordering key guarantees the sequential delivery of messages that share the same key. This feature is particularly useful for scenarios where the order of message processing is crucial. Each ordering key effectively creates a separate ordered stream within the topic, ensuring that messages within this stream are processed in the exact order they were published.
Filtering enables subscribers to define conditions based on message attributes. Only messages that meet these conditions are delivered to the subscriber. This feature helps in reducing the load on subscribers by allowing them to process only relevant messages.
When both filtering and ordering keys are employed, the operations occur in a specific sequence:
Publisher publisher = Publisher.newBuilder(topicName).build();
String orderingKey = "OrderKey1";
for (int i = 1; i <= 500; i++) {
String messageStr = "Message " + i;
ByteString data = ByteString.copyFromUtf8(messageStr);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(data)
.putAttributes("attributeKey", "attributeValue" + i) // Add attributes
.setOrderingKey(orderingKey)
.build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
System.out.println("Published message ID: " + future.get());
}
Setting Up a Subscription with a Filter
SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("your-project-id", "your-subscription-name");
ProjectTopicName topicName = ProjectTopicName.of("your-project-id", "your-topic-name");
String filter = "attributes.attributeKey=\"attributeValue100\""; // Define a filter
PushConfig pushConfig = PushConfig.getDefaultInstance();
Subscription subscription = subscriptionAdminClient.createSubscription(
subscriptionName, topicName, pushConfig, 0,
Subscription.newBuilder().setFilter(filter).build().toBuilder());
System.out.println("Created subscription with filter: " + subscription.getFilter());
While you can add multiple attributes to a message for filtering, the ordering key remains a single string that defines the order within its stream. The attributes can be used for filtering but do not affect the ordering of messages within the same ordering key.
Publisher publisher = Publisher.newBuilder(topicName).build();
String orderingKey = "OrderKey1";
for (int i = 1; i <= 500; i++) {
String messageStr = "Message " + i;
ByteString data = ByteString.copyFromUtf8(messageStr);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(data)
.putAttributes("attributeKey1", "value1")
.putAttributes("attributeKey2", "value2" + i)
.setOrderingKey(orderingKey)
.build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
System.out.println("Published message ID: " + future.get());
}
Setting Up a Subscription with a Complex Filter
SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("your-project-id", "your-subscription-name");
ProjectTopicName topicName = ProjectTopicName.of("your-project-id", "your-topic-name");
String filter = "attributes.attributeKey1=\"value1\" AND attributes.attributeKey2=\"value250\""; // Complex filter
PushConfig pushConfig = PushConfig.getDefaultInstance();
Subscription subscription = subscriptionAdminClient.createSubscription(
subscriptionName, topicName, pushConfig, 0,
Subscription.newBuilder().setFilter(filter).build().toBuilder());
System.out.println("Created subscription with complex filter: " + subscription.getFilter());
In Pub/Sub, filtering and ordering keys can be effectively combined to control the flow and order of message processing. Filtering first narrows down the messages based on attributes, and then the ordering key ensures the correct sequence of delivery within each stream. While multiple attributes can be used for filtering, the ordering key remains singular to maintain order. This setup provides flexibility in handling message attributes and ensuring ordered delivery as required by the application logic.