Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

storing the pubsub received messages in firestore database

I want to store the message received from the pubsub topic via pull subscription in firestore database in firebase. 
so for that I have published the messages from gcp console and deployed the nodejs application for pull subscription on cloud run. Now I want to store that messages in firestore ensuring that no duplicate messages are stored. 
I'm stuck in Node js code for storing the message in firebase and also I have already created new database in firestored names logged-user so I need to use that instead of default.
Cloud you please Provide me any steps and   code for the same? 

Solved Solved
2 1 1,140
1 ACCEPTED SOLUTION

To reliably store messages received from a Google Cloud Pub/Sub topic into Firestore using a Node.js application on Cloud Run, follow these steps:

1. Prerequisites

Firestore Database

Ensure you have a Firestore database named logged-user created in your Firebase project.

Cloud Run Service

Your Node.js application should be deployed on Cloud Run.

Service Account

  • Create a Service Account:

    • Assign roles: roles/pubsub.subscriber and roles/datastore.user.

    • Download the JSON key file for this service account.

2. Initialize Firebase Admin SDK

Initialize the Firebase Admin SDK in your Node.js application to connect to Firestore using the downloaded service account key.

// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');

// Initialize Firebase Admin SDK
admin.initializeApp({
  credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});

const db = admin.firestore();
const pubsub = new PubSub();

3. Pull Messages and Store in Firestore

Implement the function to pull messages from the Pub/Sub subscription and store them in Firestore. Ensure no duplicate messages are stored by using the message ID as the Firestore document ID.

// index.js (continued)

const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');

async function listenForMessages() {
  const subscription = pubsub.subscription(subscriptionName);

  subscription.on('message', async (message) => {
    try {
      // Optional: Validate message content type
      // if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
      //   throw new Error('Invalid message content type');
      // }

      const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
      const docId = message.id; // Use message ID to ensure uniqueness

      await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
      message.ack();
      console.log(`Message ${message.id} processed and stored.`);
    } catch (error) {
      console.error('Error processing message:', error);
      message.nack(); // Retry the message in case of a transient error
    }
  });

  subscription.on('error', (error) => {
    console.error('Error listening for messages:', error);
  });
}

listenForMessages();

4. Deploy to Cloud Run

Ensure that your Node.js application is correctly set up for Cloud Run deployment and that it uses the service account key for authentication.

  • Add GOOGLE_APPLICATION_CREDENTIALS Environment Variable: Ensure your Cloud Run service has the environment variable GOOGLE_APPLICATION_CREDENTIALS set to the path of your service account key file.

Additional Considerations

Error Handling

  • Implement Robust Error Handling: Use retry mechanisms with exponential backoff to handle transient errors. Log errors for monitoring and debugging.

Data Validation

  • Validate Data: Add data validation to ensure the message data conforms to your expected schema before storing it in Firestore.

Alternative Deduplication

  • Track Processed Messages: For high-volume scenarios, consider using a separate Firestore collection to track processed message IDs for faster deduplication.

Scaling

  • Monitor and Scale: Monitor your Cloud Run service performance and configure auto-scaling to handle increased message load efficiently.

Security

  • Manage Keys and Permissions: Follow best practices for managing service account keys and securing access to cloud resources.

Here’s the complete example for your Node.js application:

// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');

// Initialize Firebase Admin SDK
admin.initializeApp({
  credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});

const db = admin.firestore();
const pubsub = new PubSub();

const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');

async function listenForMessages() {
  const subscription = pubsub.subscription(subscriptionName);

  subscription.on('message', async (message) => {
    try {
      // Optional: Validate message content type
      // if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
      //   throw new Error('Invalid message content type');
      // }

      const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
      const docId = message.id; // Use message ID to ensure uniqueness

      await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
      message.ack();
      console.log(`Message ${message.id} processed and stored.`);
    } catch (error) {
      console.error('Error processing message:', error);
      message.nack(); // Retry the message in case of a transient error
    }
  });

  subscription.on('error', (error) => {
    console.error('Error listening for messages:', error);
  });
}

listenForMessages();

 

View solution in original post

1 REPLY 1

To reliably store messages received from a Google Cloud Pub/Sub topic into Firestore using a Node.js application on Cloud Run, follow these steps:

1. Prerequisites

Firestore Database

Ensure you have a Firestore database named logged-user created in your Firebase project.

Cloud Run Service

Your Node.js application should be deployed on Cloud Run.

Service Account

  • Create a Service Account:

    • Assign roles: roles/pubsub.subscriber and roles/datastore.user.

    • Download the JSON key file for this service account.

2. Initialize Firebase Admin SDK

Initialize the Firebase Admin SDK in your Node.js application to connect to Firestore using the downloaded service account key.

// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');

// Initialize Firebase Admin SDK
admin.initializeApp({
  credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});

const db = admin.firestore();
const pubsub = new PubSub();

3. Pull Messages and Store in Firestore

Implement the function to pull messages from the Pub/Sub subscription and store them in Firestore. Ensure no duplicate messages are stored by using the message ID as the Firestore document ID.

// index.js (continued)

const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');

async function listenForMessages() {
  const subscription = pubsub.subscription(subscriptionName);

  subscription.on('message', async (message) => {
    try {
      // Optional: Validate message content type
      // if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
      //   throw new Error('Invalid message content type');
      // }

      const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
      const docId = message.id; // Use message ID to ensure uniqueness

      await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
      message.ack();
      console.log(`Message ${message.id} processed and stored.`);
    } catch (error) {
      console.error('Error processing message:', error);
      message.nack(); // Retry the message in case of a transient error
    }
  });

  subscription.on('error', (error) => {
    console.error('Error listening for messages:', error);
  });
}

listenForMessages();

4. Deploy to Cloud Run

Ensure that your Node.js application is correctly set up for Cloud Run deployment and that it uses the service account key for authentication.

  • Add GOOGLE_APPLICATION_CREDENTIALS Environment Variable: Ensure your Cloud Run service has the environment variable GOOGLE_APPLICATION_CREDENTIALS set to the path of your service account key file.

Additional Considerations

Error Handling

  • Implement Robust Error Handling: Use retry mechanisms with exponential backoff to handle transient errors. Log errors for monitoring and debugging.

Data Validation

  • Validate Data: Add data validation to ensure the message data conforms to your expected schema before storing it in Firestore.

Alternative Deduplication

  • Track Processed Messages: For high-volume scenarios, consider using a separate Firestore collection to track processed message IDs for faster deduplication.

Scaling

  • Monitor and Scale: Monitor your Cloud Run service performance and configure auto-scaling to handle increased message load efficiently.

Security

  • Manage Keys and Permissions: Follow best practices for managing service account keys and securing access to cloud resources.

Here’s the complete example for your Node.js application:

// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');

// Initialize Firebase Admin SDK
admin.initializeApp({
  credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});

const db = admin.firestore();
const pubsub = new PubSub();

const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');

async function listenForMessages() {
  const subscription = pubsub.subscription(subscriptionName);

  subscription.on('message', async (message) => {
    try {
      // Optional: Validate message content type
      // if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
      //   throw new Error('Invalid message content type');
      // }

      const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
      const docId = message.id; // Use message ID to ensure uniqueness

      await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
      message.ack();
      console.log(`Message ${message.id} processed and stored.`);
    } catch (error) {
      console.error('Error processing message:', error);
      message.nack(); // Retry the message in case of a transient error
    }
  });

  subscription.on('error', (error) => {
    console.error('Error listening for messages:', error);
  });
}

listenForMessages();