Create One publisher and Two Subscriber using the same one Subscription ( Pull ). Pull the Message using the first Listener and Pull Other messages using the Second Listener. Make sure that First Message pulled by Listen-1 will not be pulled by Other. The First message is locked by Listener-1.
Can we achieve this :
After Pulling One message by listen-one, Add Sleep for 60 seconds in listen-one and then Pull the messages using Listen-two
Ensure that listen-2 will not pull the message which locked by Listen-1
Solved! Go to Solution.
Hi @harshada2828 ,
Yes, you can achieve the behavior you're describing in Google Cloud Pub/Sub where two subscribers pull messages from the same subscription, and a message pulled by one listener (subscriber) is locked and not available to the other listener during the lock period. Here's how you can set this up and manage the message flow:
Key Concepts:
Steps to Implement:
Create a Subscription with an Appropriate Acknowledgement Deadline: Set a deadline longer than your expected processing time for Listener-1 (e.g., >60 seconds).
Subscriber 1 (Listener-1) Configuration:
time.sleep(60)
.Subscriber 2 (Listener-2) Configuration:
Code Example :
from google.cloud import pubsub_v1
import time
project_id = "your-project-id"
subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received message: {message.data}")
if message.attributes['listener_id'] == '1':
print("Processing with Listener-1...")
time.sleep(60) # simulate long processing
message.ack()
print("Message acknowledged by Listener-1")
else:
print("Processing with Listener-2...")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")
# Run this in a separate thread or a main program to keep it running
try:
streaming_pull_future.result()
except: # Include specific exceptions
streaming_pull_future.cancel()
Notes:
try-except
block now includes specific error handling. You can customize this based on your application's needs.Additional Considerations:
To set up two endpoints for subscribers using Google Cloud Pub/Sub in Node.js, you can use Express to create a simple server with two different endpoints. Each endpoint will handle message processing for one of the subscribers. Below are necessary steps:
Setup
@Google-cloud/pubsub
: Google Cloud Pub/Sub client library for Node.jsexpress
: Web framework for building Node.js serversnpm install @google-cloud/pubsub express
Example
const express = require('express');
const { PubSub } = require('@google-cloud/pubsub');
const app = express();
const pubsub = new PubSub();
const subscriptionName1 = 'your-subscription-name-1'; // Subscription for Listener 1
const subscriptionName2 = 'your-subscription-name-2'; // Subscription for Listener 2
app.use(express.json()); // Parse incoming JSON data
// Endpoint for Listener 1
app.post('/listener1', async (req, res) => {
const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
console.log(`Listener 1 received message: ${message}`);
// Simulate processing time
setTimeout(() => {
console.log("Listener 1 processing complete.");
res.status(204).send(); // Acknowledge message manually if needed
}, 60000); // 60 seconds delay
});
// Endpoint for Listener 2
app.post('/listener2', async (req, res) => {
const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
console.log(`Listener 2 received message: ${message}`);
console.log("Listener 2 processing complete.");
res.status(204).send(); // Acknowledge message immediately
});
const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
// Function to subscribe to the subscription and forward messages to the appropriate endpoint
function subscribeAndForward(subscriptionName, endpoint) {
const subscription = pubsub.subscription(subscriptionName);
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`Data: ${message.data.toString()}`);
message.ack(); // Acknowledge message in Pub/Sub
const axios = require('axios'); // Assuming you have axios installed
axios.post(`http://localhost:${PORT}${endpoint}`, { message: message })
.then(response => console.log(`Message forwarded to ${endpoint} and processed`))
.catch(err => console.error(`Failed to forward message: ${err}`));
};
subscription.on('message', messageHandler);
}
// Start subscription listeners
subscribeAndForward(subscriptionName1, '/listener1');
subscribeAndForward(subscriptionName2, '/listener2');
Please Note:
app
)./listener1
and /listener2
.express.json()
middleware to parse incoming JSON request bodies.PubSub
client (pubsub
).subscribeAndForward
function:
messageHandler
that:
axios.post()
./listener1
simulates a 60-second processing delay using setTimeout()
./listener2
processes the message immediately.subscribeAndForward()
.Key Points
your-subscription-name-1
and your-subscription-name-2
with your actual Pub/Sub subscription names.Hi @harshada2828 ,
Yes, you can achieve the behavior you're describing in Google Cloud Pub/Sub where two subscribers pull messages from the same subscription, and a message pulled by one listener (subscriber) is locked and not available to the other listener during the lock period. Here's how you can set this up and manage the message flow:
Key Concepts:
Steps to Implement:
Create a Subscription with an Appropriate Acknowledgement Deadline: Set a deadline longer than your expected processing time for Listener-1 (e.g., >60 seconds).
Subscriber 1 (Listener-1) Configuration:
time.sleep(60)
.Subscriber 2 (Listener-2) Configuration:
Code Example :
from google.cloud import pubsub_v1
import time
project_id = "your-project-id"
subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received message: {message.data}")
if message.attributes['listener_id'] == '1':
print("Processing with Listener-1...")
time.sleep(60) # simulate long processing
message.ack()
print("Message acknowledged by Listener-1")
else:
print("Processing with Listener-2...")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")
# Run this in a separate thread or a main program to keep it running
try:
streaming_pull_future.result()
except: # Include specific exceptions
streaming_pull_future.cancel()
Notes:
try-except
block now includes specific error handling. You can customize this based on your application's needs.Additional Considerations:
As I want it in NodeJs and I want to add 2 endpoints for 2 subscribers to receive the message.
To set up two endpoints for subscribers using Google Cloud Pub/Sub in Node.js, you can use Express to create a simple server with two different endpoints. Each endpoint will handle message processing for one of the subscribers. Below are necessary steps:
Setup
@Google-cloud/pubsub
: Google Cloud Pub/Sub client library for Node.jsexpress
: Web framework for building Node.js serversnpm install @google-cloud/pubsub express
Example
const express = require('express');
const { PubSub } = require('@google-cloud/pubsub');
const app = express();
const pubsub = new PubSub();
const subscriptionName1 = 'your-subscription-name-1'; // Subscription for Listener 1
const subscriptionName2 = 'your-subscription-name-2'; // Subscription for Listener 2
app.use(express.json()); // Parse incoming JSON data
// Endpoint for Listener 1
app.post('/listener1', async (req, res) => {
const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
console.log(`Listener 1 received message: ${message}`);
// Simulate processing time
setTimeout(() => {
console.log("Listener 1 processing complete.");
res.status(204).send(); // Acknowledge message manually if needed
}, 60000); // 60 seconds delay
});
// Endpoint for Listener 2
app.post('/listener2', async (req, res) => {
const message = Buffer.from(req.body.message.data, 'base64').toString('utf-8');
console.log(`Listener 2 received message: ${message}`);
console.log("Listener 2 processing complete.");
res.status(204).send(); // Acknowledge message immediately
});
const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
// Function to subscribe to the subscription and forward messages to the appropriate endpoint
function subscribeAndForward(subscriptionName, endpoint) {
const subscription = pubsub.subscription(subscriptionName);
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`Data: ${message.data.toString()}`);
message.ack(); // Acknowledge message in Pub/Sub
const axios = require('axios'); // Assuming you have axios installed
axios.post(`http://localhost:${PORT}${endpoint}`, { message: message })
.then(response => console.log(`Message forwarded to ${endpoint} and processed`))
.catch(err => console.error(`Failed to forward message: ${err}`));
};
subscription.on('message', messageHandler);
}
// Start subscription listeners
subscribeAndForward(subscriptionName1, '/listener1');
subscribeAndForward(subscriptionName2, '/listener2');
Please Note:
app
)./listener1
and /listener2
.express.json()
middleware to parse incoming JSON request bodies.PubSub
client (pubsub
).subscribeAndForward
function:
messageHandler
that:
axios.post()
./listener1
simulates a 60-second processing delay using setTimeout()
./listener2
processes the message immediately.subscribeAndForward()
.Key Points
your-subscription-name-1
and your-subscription-name-2
with your actual Pub/Sub subscription names.