I want to store the message received from the pubsub topic via pull subscription in firestore database in firebase.
so for that I have published the messages from gcp console and deployed the nodejs application for pull subscription on cloud run. Now I want to store that messages in firestore ensuring that no duplicate messages are stored.
I'm stuck in Node js code for storing the message in firebase and also I have already created new database in firestored names logged-user so I need to use that instead of default.
Cloud you please Provide me any steps and code for the same?
Solved! Go to Solution.
To reliably store messages received from a Google Cloud Pub/Sub topic into Firestore using a Node.js application on Cloud Run, follow these steps:
1. Prerequisites
Firestore Database
Ensure you have a Firestore database named logged-user
created in your Firebase project.
Cloud Run Service
Your Node.js application should be deployed on Cloud Run.
Service Account
Create a Service Account:
Assign roles: roles/pubsub.subscriber
and roles/datastore.user
.
Download the JSON key file for this service account.
2. Initialize Firebase Admin SDK
Initialize the Firebase Admin SDK in your Node.js application to connect to Firestore using the downloaded service account key.
// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');
// Initialize Firebase Admin SDK
admin.initializeApp({
credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});
const db = admin.firestore();
const pubsub = new PubSub();
3. Pull Messages and Store in Firestore
Implement the function to pull messages from the Pub/Sub subscription and store them in Firestore. Ensure no duplicate messages are stored by using the message ID as the Firestore document ID.
// index.js (continued)
const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');
async function listenForMessages() {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('message', async (message) => {
try {
// Optional: Validate message content type
// if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
// throw new Error('Invalid message content type');
// }
const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
const docId = message.id; // Use message ID to ensure uniqueness
await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
message.ack();
console.log(`Message ${message.id} processed and stored.`);
} catch (error) {
console.error('Error processing message:', error);
message.nack(); // Retry the message in case of a transient error
}
});
subscription.on('error', (error) => {
console.error('Error listening for messages:', error);
});
}
listenForMessages();
4. Deploy to Cloud Run
Ensure that your Node.js application is correctly set up for Cloud Run deployment and that it uses the service account key for authentication.
Add GOOGLE_APPLICATION_CREDENTIALS
Environment Variable: Ensure your Cloud Run service has the environment variable GOOGLE_APPLICATION_CREDENTIALS
set to the path of your service account key file.
Additional Considerations
Error Handling
Implement Robust Error Handling: Use retry mechanisms with exponential backoff to handle transient errors. Log errors for monitoring and debugging.
Data Validation
Validate Data: Add data validation to ensure the message data conforms to your expected schema before storing it in Firestore.
Alternative Deduplication
Track Processed Messages: For high-volume scenarios, consider using a separate Firestore collection to track processed message IDs for faster deduplication.
Scaling
Monitor and Scale: Monitor your Cloud Run service performance and configure auto-scaling to handle increased message load efficiently.
Security
Manage Keys and Permissions: Follow best practices for managing service account keys and securing access to cloud resources.
Here’s the complete example for your Node.js application:
// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');
// Initialize Firebase Admin SDK
admin.initializeApp({
credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});
const db = admin.firestore();
const pubsub = new PubSub();
const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');
async function listenForMessages() {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('message', async (message) => {
try {
// Optional: Validate message content type
// if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
// throw new Error('Invalid message content type');
// }
const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
const docId = message.id; // Use message ID to ensure uniqueness
await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
message.ack();
console.log(`Message ${message.id} processed and stored.`);
} catch (error) {
console.error('Error processing message:', error);
message.nack(); // Retry the message in case of a transient error
}
});
subscription.on('error', (error) => {
console.error('Error listening for messages:', error);
});
}
listenForMessages();
To reliably store messages received from a Google Cloud Pub/Sub topic into Firestore using a Node.js application on Cloud Run, follow these steps:
1. Prerequisites
Firestore Database
Ensure you have a Firestore database named logged-user
created in your Firebase project.
Cloud Run Service
Your Node.js application should be deployed on Cloud Run.
Service Account
Create a Service Account:
Assign roles: roles/pubsub.subscriber
and roles/datastore.user
.
Download the JSON key file for this service account.
2. Initialize Firebase Admin SDK
Initialize the Firebase Admin SDK in your Node.js application to connect to Firestore using the downloaded service account key.
// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');
// Initialize Firebase Admin SDK
admin.initializeApp({
credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});
const db = admin.firestore();
const pubsub = new PubSub();
3. Pull Messages and Store in Firestore
Implement the function to pull messages from the Pub/Sub subscription and store them in Firestore. Ensure no duplicate messages are stored by using the message ID as the Firestore document ID.
// index.js (continued)
const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');
async function listenForMessages() {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('message', async (message) => {
try {
// Optional: Validate message content type
// if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
// throw new Error('Invalid message content type');
// }
const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
const docId = message.id; // Use message ID to ensure uniqueness
await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
message.ack();
console.log(`Message ${message.id} processed and stored.`);
} catch (error) {
console.error('Error processing message:', error);
message.nack(); // Retry the message in case of a transient error
}
});
subscription.on('error', (error) => {
console.error('Error listening for messages:', error);
});
}
listenForMessages();
4. Deploy to Cloud Run
Ensure that your Node.js application is correctly set up for Cloud Run deployment and that it uses the service account key for authentication.
Add GOOGLE_APPLICATION_CREDENTIALS
Environment Variable: Ensure your Cloud Run service has the environment variable GOOGLE_APPLICATION_CREDENTIALS
set to the path of your service account key file.
Additional Considerations
Error Handling
Implement Robust Error Handling: Use retry mechanisms with exponential backoff to handle transient errors. Log errors for monitoring and debugging.
Data Validation
Validate Data: Add data validation to ensure the message data conforms to your expected schema before storing it in Firestore.
Alternative Deduplication
Track Processed Messages: For high-volume scenarios, consider using a separate Firestore collection to track processed message IDs for faster deduplication.
Scaling
Monitor and Scale: Monitor your Cloud Run service performance and configure auto-scaling to handle increased message load efficiently.
Security
Manage Keys and Permissions: Follow best practices for managing service account keys and securing access to cloud resources.
Here’s the complete example for your Node.js application:
// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');
// Initialize Firebase Admin SDK
admin.initializeApp({
credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});
const db = admin.firestore();
const pubsub = new PubSub();
const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');
async function listenForMessages() {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('message', async (message) => {
try {
// Optional: Validate message content type
// if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
// throw new Error('Invalid message content type');
// }
const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
const docId = message.id; // Use message ID to ensure uniqueness
await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
message.ack();
console.log(`Message ${message.id} processed and stored.`);
} catch (error) {
console.error('Error processing message:', error);
message.nack(); // Retry the message in case of a transient error
}
});
subscription.on('error', (error) => {
console.error('Error listening for messages:', error);
});
}
listenForMessages();