Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Ensuring Sequential Delivery with Multiple Ordering Keys in Pub/Sub

I'm working on a Pub/Sub system where I need to ensure that messages are received in the exact sequence they are published, even when using multiple ordering keys. My system publishes a large number of messages, potentially spanning various topics and ordering keys, but it's crucial for the application logic that these messages are processed in the order they were sent.

I have implemented a Pub/Sub model using Google Cloud Pub/Sub, employing ordering keys to ensure that messages with the same key are processed in order. Here's a simplified version of my publishing logic:

Publisher publisher = Publisher.newBuilder(topicName).build();
String[] orderingKeys = {"OrderKey1", "OrderKey2", "OrderKey3"}; // Multiple ordering keys
for (String orderingKey : orderingKeys) {
    for (int i = 1; i <= 500; i++) { // Publish 500 messages for each ordering key
        String messageStr = "Message " + i + " for " + orderingKey;
        ByteString data = ByteString.copyFromUtf8(messageStr);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                .setData(data)
                .setOrderingKey(orderingKey)
                .build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        System.out.println("Published message ID: " + future.get());
    }
}

And my subscriber simply acknowledges the messages as they come in, printing out their content.

Despite the ordering keys ensuring that messages with the same key are ordered, I am facing an issue where messages from different ordering keys are not received in the sequence they were published. For example, I might receive messages in the following sequence:

Received message: Message 1 for OrderKey1

Received message: Message 2 for OrderKey1

Received message: Message 1 for OrderKey2

Received message: Message 3 for OrderKey1

Received message: Message 2 for OrderKey2

This is problematic for my use case as I need the messages to be processed exactly in the sequence they were published, regardless of the ordering key.

I am looking for a solution or design pattern that allows me to maintain the global ordering of messages across multiple ordering keys. Ideally, messages should be received and processed in the exact sequence they were published by the publisher, irrespective of the ordering key used.

Is there a way to achieve this in Google Cloud Pub/Sub, or should I consider an alternative approach or solution to meet this requirement?

0 3 2,202
3 REPLIES 3

Pub/Sub prioritizes scalability and throughput. To achieve this, it offers guaranteed message ordering only within a single ordering key. This design decision has implications when your application requires strict ordering across multiple keys.

Potential Solutions

  • Single Ordering Key:

    • Ensures absolute global ordering.
    • Limited by a 1 MBps throughput constraint per key.
    • Ideal for low-to-moderate message volumes where strict ordering is non-negotiable.
  • Sequencing Layer:

    • Introduces a system (potentially Redis) to assign globally unique, incrementing sequence numbers to messages, regardless of their ordering key.
    • Messages are published to Pub/Sub with a unified ordering key and their sequence numbers.
    • Pros: Enforces global ordering.
    • Cons: Adds complexity, potential single point of failure (the sequencing layer), and may increase latency.
  • Hybrid Approach:

    • Employs a moderate number of ordering keys combined with a sequencing layer.
    • Seeks to balance load distribution and maintain some ordering control for larger sets of messages.
    • Pros: Enables some parallel processing for higher throughput without entirely sacrificing ordering.
    • Cons: Increased complexity and careful implementation to ensure global order.

Choosing the Right Approach

Consider the following factors:

  • Message Volume and Throughput: Are you likely to exceed per-ordering-key throughput limits?
  • Ordering Strictness: Is perfect ordering mandatory, or is near-real-time ordering with minimal reordering acceptable?
  • System Complexity Tolerance: Are you comfortable managing the additional elements required by some solutions?
  • Message Replay/Retry Logic: Custom sequencing or external state management necessitates robust error handling and replay management.

Example

 
import redis  # For sequence number management
from google.cloud import pubsub_v1 

# Setup Redis and Pub/Sub clients
redis_client = redis.Redis()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('your-project-id', 'your-topic-name')

def publish_with_sequence(message_data):
    global_sequence_id = redis_client.incr('global_message_sequence')
    pubsub_message = pubsub_v1.types.PubsubMessage(
        data=message_data.encode('utf-8'),
        attributes={"sequence_id": str(global_sequence_id)},
        ordering_key='single_ordering_key'  # Unified ordering key 
    )
    future = publisher.publish(topic_path, pubsub_message)
    return future.result() 

Important Considerations

  • Redis Configuration: Ensure high availability and durability.
  • Error Handling and Retry Logic: Implement for system resilience.
  • Performance Assessment: Test scalability thoroughly under your expected loads.
  • Message Size: Pub/Sub pricing is based on message size, be mindful of cost.
  • "Good Enough" Ordering: Explore less complex solutions if slight ordering discrepancies are tolerable

Is it possible to have both ordering key and filtering and if so which takes precedence in the order of operations? I can understand having multiple ordering keys as in above example but what about having 1 ordering key with several attributes? is this even possible?

In Pub/Sub, it is possible to use both ordering keys and message filtering to achieve the desired processing behavior for messages. The ordering key ensures that messages with the same key are delivered in the order they were published, while filtering allows subscribers to receive only those messages that match specified attributes.

The ordering key guarantees the sequential delivery of messages that share the same key. This feature is particularly useful for scenarios where the order of message processing is crucial. Each ordering key effectively creates a separate ordered stream within the topic, ensuring that messages within this stream are processed in the exact order they were published.

Filtering enables subscribers to define conditions based on message attributes. Only messages that meet these conditions are delivered to the subscriber. This feature helps in reducing the load on subscribers by allowing them to process only relevant messages.

When both filtering and ordering keys are employed, the operations occur in a specific sequence:

  1. Filtering: First, messages are evaluated against the filter criteria defined by the subscription. If a message's attributes match the filter conditions, it is passed on to the subscriber.
  2. Ordering Key: Among the messages that pass the filter, the ordering key ensures that they are delivered in the order they were published.
Publisher publisher = Publisher.newBuilder(topicName).build();
String orderingKey = "OrderKey1";

for (int i = 1; i <= 500; i++) {
    String messageStr = "Message " + i;
    ByteString data = ByteString.copyFromUtf8(messageStr);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
            .setData(data)
            .putAttributes("attributeKey", "attributeValue" + i)  // Add attributes
            .setOrderingKey(orderingKey)
            .build();
    ApiFuture<String> future = publisher.publish(pubsubMessage);
    System.out.println("Published message ID: " + future.get());
}

Setting Up a Subscription with a Filter

SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("your-project-id", "your-subscription-name");
ProjectTopicName topicName = ProjectTopicName.of("your-project-id", "your-topic-name");

String filter = "attributes.attributeKey=\"attributeValue100\"";  // Define a filter

PushConfig pushConfig = PushConfig.getDefaultInstance();
Subscription subscription = subscriptionAdminClient.createSubscription(
        subscriptionName, topicName, pushConfig, 0,
        Subscription.newBuilder().setFilter(filter).build().toBuilder());

System.out.println("Created subscription with filter: " + subscription.getFilter());

While you can add multiple attributes to a message for filtering, the ordering key remains a single string that defines the order within its stream. The attributes can be used for filtering but do not affect the ordering of messages within the same ordering key.

Publisher publisher = Publisher.newBuilder(topicName).build();
String orderingKey = "OrderKey1";

for (int i = 1; i <= 500; i++) {
    String messageStr = "Message " + i;
    ByteString data = ByteString.copyFromUtf8(messageStr);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
            .setData(data)
            .putAttributes("attributeKey1", "value1")
            .putAttributes("attributeKey2", "value2" + i)
            .setOrderingKey(orderingKey)
            .build();
    ApiFuture<String> future = publisher.publish(pubsubMessage);
    System.out.println("Published message ID: " + future.get());
}

Setting Up a Subscription with a Complex Filter

SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("your-project-id", "your-subscription-name");
ProjectTopicName topicName = ProjectTopicName.of("your-project-id", "your-topic-name");

String filter = "attributes.attributeKey1=\"value1\" AND attributes.attributeKey2=\"value250\"";  // Complex filter

PushConfig pushConfig = PushConfig.getDefaultInstance();
Subscription subscription = subscriptionAdminClient.createSubscription(
        subscriptionName, topicName, pushConfig, 0,
        Subscription.newBuilder().setFilter(filter).build().toBuilder());

System.out.println("Created subscription with complex filter: " + subscription.getFilter());

In Pub/Sub, filtering and ordering keys can be effectively combined to control the flow and order of message processing. Filtering first narrows down the messages based on attributes, and then the ordering key ensures the correct sequence of delivery within each stream. While multiple attributes can be used for filtering, the ordering key remains singular to maintain order. This setup provides flexibility in handling message attributes and ensuring ordered delivery as required by the application logic.