Using a Shared Subscription with Multiple Pull Subscribers in Google Cloud Pub/Sub

Create One publisher and Two Subscriber using the same one Subscription ( Pull ). Pull the Message using the first Listener and Pull Other messages using the Second Listener. Make sure that First Message pulled by Listen-1 will not be pulled by Other. The First message is locked by Listener-1.

Can we achieve this :

After Pulling One message by listen-one, Add Sleep for 60 seconds in listen-one and then Pull the messages using Listen-two

Ensure that listen-2 will not pull the message which locked by Listen-1

Solved Solved
6 3 114
2 ACCEPTED SOLUTIONS

Hi @harshada2828 ,

Yes, you can achieve the behavior you're describing in Google Cloud Pub/Sub where two subscribers pull messages from the same subscription, and a message pulled by one listener (subscriber) is locked and not available to the other listener during the lock period. Here's how you can set this up and manage the message flow:

Key Concepts:

  • Acknowledgement Deadline: This is the time a subscriber has to acknowledge a message after pulling it. If not acknowledged within this time, it becomes available for redelivery.
  • Message Locking: Pulling a message locks it from other subscribers until acknowledged or the deadline expires.

Steps to Implement:

  1. Create a Subscription with an Appropriate Acknowledgement Deadline: Set a deadline longer than your expected processing time for Listener-1 (e.g., >60 seconds).

  2. Subscriber 1 (Listener-1) Configuration:

    • Pull the message.
    • Process the message or simulate with time.sleep(60).
    • Acknowledge the message to prevent redelivery.
  3. Subscriber 2 (Listener-2) Configuration:

    • Configure to pull messages independently.
    • It won't receive Listener-1's locked message during processing.

Code Example :

 
from google.cloud import pubsub_v1
import time

project_id = "your-project-id"
subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received message: {message.data}")
    if message.attributes['listener_id'] == '1':
        print("Processing with Listener-1...")
        time.sleep(60)  # simulate long processing
        message.ack()
        print("Message acknowledged by Listener-1")
    else:
        print("Processing with Listener-2...")
        message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")
# Run this in a separate thread or a main program to keep it running
try:
    streaming_pull_future.result()
except:  # Include specific exceptions
    streaming_pull_future.cancel()

View solution in original post

To set up two endpoints for subscribers using Google Cloud Pub/Sub in Node.js, you can use Express to create a simple server with two different endpoints. Each endpoint will handle message processing for one of the subscribers. Below are necessary steps:

Setup

  • Install Node.js and NPM: Ensure you have Node.js and npm (Node Package Manager) installed on your system.
  • Install Dependencies: You'll need the following packages:
    • @Google-cloud/pubsub: Google Cloud Pub/Sub client library for Node.js
    • express: Web framework for building Node.js servers
npm install @google-cloud/pubsub express

Example

 
const express = require('express');
const { PubSub } = require('@google-cloud/pubsub');

const app = express();
const pubsub = new PubSub();

const subscriptionName1 = 'your-subscription-name-1'; // Subscription for Listener 1
const subscriptionName2 = 'your-subscription-name-2'; // Subscription for Listener 2

app.use(express.json()); // Parse incoming JSON data

// Endpoint for Listener 1
app.post('/listener1', async (req, res) => {
    const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
    console.log(`Listener 1 received message: ${message}`);

    // Simulate processing time
    setTimeout(() => {
        console.log("Listener 1 processing complete.");
        res.status(204).send(); // Acknowledge message manually if needed
    }, 60000); // 60 seconds delay
});

// Endpoint for Listener 2
app.post('/listener2', async (req, res) => {
    const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
    console.log(`Listener 2 received message: ${message}`);
    console.log("Listener 2 processing complete.");
    res.status(204).send(); // Acknowledge message immediately
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
    console.log(`Server running on port ${PORT}`);
});

// Function to subscribe to the subscription and forward messages to the appropriate endpoint
function subscribeAndForward(subscriptionName, endpoint) {
    const subscription = pubsub.subscription(subscriptionName);

    const messageHandler = message => {
        console.log(`Received message ${message.id}:`);
        console.log(`Data: ${message.data.toString()}`);
        message.ack(); // Acknowledge message in Pub/Sub

        const axios = require('axios'); // Assuming you have axios installed
        axios.post(`http://localhost:${PORT}${endpoint}`, { message: message })
            .then(response => console.log(`Message forwarded to ${endpoint} and processed`))
            .catch(err => console.error(`Failed to forward message: ${err}`));
    };

    subscription.on('message', messageHandler);
}

// Start subscription listeners
subscribeAndForward(subscriptionName1, '/listener1');
subscribeAndForward(subscriptionName2, '/listener2');

Please Note:

  • Express Server:
    • Creates an Express server (app).
    • Defines two POST endpoints: /listener1 and /listener2.
    • Uses express.json() middleware to parse incoming JSON request bodies.
  • Pub/Sub Subscriptions:
    • Creates a PubSub client (pubsub).
    • Defines subscription names for Listener 1 and Listener 2.
    • The subscribeAndForward function:
      • Subscribes to a Pub/Sub subscription.
      • Defines a messageHandler that:
        • Logs received message details.
        • Acknowledges the message in Pub/Sub (prevents redelivery).
        • Forwards the message to the corresponding endpoint using axios.post().
  • Message Processing:
    • /listener1 simulates a 60-second processing delay using setTimeout().
    • /listener2 processes the message immediately.
  • Running the Server:
    • Starts the server on the specified port (or 8080 by default).
    • Initiates subscription listeners using subscribeAndForward().

Key Points

  • Replace your-subscription-name-1 and your-subscription-name-2 with your actual Pub/Sub subscription names.
  • Ensure your Google Cloud project is set up correctly, and your service account has the necessary permissions.

View solution in original post

3 REPLIES 3

Hi @harshada2828 ,

Yes, you can achieve the behavior you're describing in Google Cloud Pub/Sub where two subscribers pull messages from the same subscription, and a message pulled by one listener (subscriber) is locked and not available to the other listener during the lock period. Here's how you can set this up and manage the message flow:

Key Concepts:

  • Acknowledgement Deadline: This is the time a subscriber has to acknowledge a message after pulling it. If not acknowledged within this time, it becomes available for redelivery.
  • Message Locking: Pulling a message locks it from other subscribers until acknowledged or the deadline expires.

Steps to Implement:

  1. Create a Subscription with an Appropriate Acknowledgement Deadline: Set a deadline longer than your expected processing time for Listener-1 (e.g., >60 seconds).

  2. Subscriber 1 (Listener-1) Configuration:

    • Pull the message.
    • Process the message or simulate with time.sleep(60).
    • Acknowledge the message to prevent redelivery.
  3. Subscriber 2 (Listener-2) Configuration:

    • Configure to pull messages independently.
    • It won't receive Listener-1's locked message during processing.

Code Example :

 
from google.cloud import pubsub_v1
import time

project_id = "your-project-id"
subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received message: {message.data}")
    if message.attributes['listener_id'] == '1':
        print("Processing with Listener-1...")
        time.sleep(60)  # simulate long processing
        message.ack()
        print("Message acknowledged by Listener-1")
    else:
        print("Processing with Listener-2...")
        message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")
# Run this in a separate thread or a main program to keep it running
try:
    streaming_pull_future.result()
except:  # Include specific exceptions
    streaming_pull_future.cancel()

As I want it in NodeJs and I want to add 2 endpoints for 2 subscribers to receive the message.

 

To set up two endpoints for subscribers using Google Cloud Pub/Sub in Node.js, you can use Express to create a simple server with two different endpoints. Each endpoint will handle message processing for one of the subscribers. Below are necessary steps:

Setup

  • Install Node.js and NPM: Ensure you have Node.js and npm (Node Package Manager) installed on your system.
  • Install Dependencies: You'll need the following packages:
    • @Google-cloud/pubsub: Google Cloud Pub/Sub client library for Node.js
    • express: Web framework for building Node.js servers
npm install @google-cloud/pubsub express

Example

 
const express = require('express');
const { PubSub } = require('@google-cloud/pubsub');

const app = express();
const pubsub = new PubSub();

const subscriptionName1 = 'your-subscription-name-1'; // Subscription for Listener 1
const subscriptionName2 = 'your-subscription-name-2'; // Subscription for Listener 2

app.use(express.json()); // Parse incoming JSON data

// Endpoint for Listener 1
app.post('/listener1', async (req, res) => {
    const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
    console.log(`Listener 1 received message: ${message}`);

    // Simulate processing time
    setTimeout(() => {
        console.log("Listener 1 processing complete.");
        res.status(204).send(); // Acknowledge message manually if needed
    }, 60000); // 60 seconds delay
});

// Endpoint for Listener 2
app.post('/listener2', async (req, res) => {
    const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
    console.log(`Listener 2 received message: ${message}`);
    console.log("Listener 2 processing complete.");
    res.status(204).send(); // Acknowledge message immediately
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
    console.log(`Server running on port ${PORT}`);
});

// Function to subscribe to the subscription and forward messages to the appropriate endpoint
function subscribeAndForward(subscriptionName, endpoint) {
    const subscription = pubsub.subscription(subscriptionName);

    const messageHandler = message => {
        console.log(`Received message ${message.id}:`);
        console.log(`Data: ${message.data.toString()}`);
        message.ack(); // Acknowledge message in Pub/Sub

        const axios = require('axios'); // Assuming you have axios installed
        axios.post(`http://localhost:${PORT}${endpoint}`, { message: message })
            .then(response => console.log(`Message forwarded to ${endpoint} and processed`))
            .catch(err => console.error(`Failed to forward message: ${err}`));
    };

    subscription.on('message', messageHandler);
}

// Start subscription listeners
subscribeAndForward(subscriptionName1, '/listener1');
subscribeAndForward(subscriptionName2, '/listener2');

Please Note:

  • Express Server:
    • Creates an Express server (app).
    • Defines two POST endpoints: /listener1 and /listener2.
    • Uses express.json() middleware to parse incoming JSON request bodies.
  • Pub/Sub Subscriptions:
    • Creates a PubSub client (pubsub).
    • Defines subscription names for Listener 1 and Listener 2.
    • The subscribeAndForward function:
      • Subscribes to a Pub/Sub subscription.
      • Defines a messageHandler that:
        • Logs received message details.
        • Acknowledges the message in Pub/Sub (prevents redelivery).
        • Forwards the message to the corresponding endpoint using axios.post().
  • Message Processing:
    • /listener1 simulates a 60-second processing delay using setTimeout().
    • /listener2 processes the message immediately.
  • Running the Server:
    • Starts the server on the specified port (or 8080 by default).
    • Initiates subscription listeners using subscribeAndForward().

Key Points

  • Replace your-subscription-name-1 and your-subscription-name-2 with your actual Pub/Sub subscription names.
  • Ensure your Google Cloud project is set up correctly, and your service account has the necessary permissions.