Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataflow template - Streaming - Mongo DB CDC to Bigquery

Hey Team,

I want to synchronize atlas mongodb data to bigquery in real time.
I tried to use this template to replay data to bigquery by posting to pub/sub's mongodb changestream event.
However, it seems that this template doesn't have the logic to handle the different operationType for replaying。

a.Is there any configuration required please? or any suggestions?

b.How do I create the table structure for bigquery if userOption=json

changestream event:

{
"_id": {
"_data": "82682C351E000000022B042C0100296E5A100475CDCE6DBD9749CFA93E3B18AD668597463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064682C351E2AC5890013E999FD000004"
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1747727646,
"i": 2
}
},
"wallTime": "2025-05-20T07:54:06.115Z",
"fullDocument": {
"_id": "682c351e2ac5890013e999fd",
"receiptNumber": "857552503558460",
"time": "2025-05-20T07:54:05.778Z",
"type": "status_updated",
"info": {
"status": "paid"
},
"operatorId": "63204b95146ed80007008196",
"operatorType": "employee",
"createdTime": "2025-05-20T07:54:06.111Z",
"modifiedTime": "2025-05-20T07:54:06.111Z",
"__v": 0
},
"ns": {
"db": "server",
"coll": "orderlogs"
},
"documentKey": {
"_id": "682c351e2ac5890013e999fd"
}
}

Thank you.

0 2 166
2 REPLIES 2

Hi @dataflow,

Welcome to Google Cloud Community!

a.Is there any configuration required please? or any suggestions?

The MongoDB to BigQuery template (Stream) is primarily used for MongoDB change streams and load the new data to BigQuery, it doesn't have the built-in logic to handle the different operationType for replaying. This template uses MongoDB change streams and doesn't support BigQuery change data capture. One possible approach for handling different operationType when replaying is to explore User-Defined Function (UDF) for more complex transformations since it can perform other custom logic and reformat the input data to align with the target schema.

b.How do I create the table structure for bigquery if userOption=json?

The userOption parameter controls the output format. To create the table structure for BigQuery if userOption=json, you need to explicitly define the target table schema by manually creating it to align with your expected structure. Create the BigQuery table based on the schema you defined. It's best not to rely on BigQuery’s auto-detect feature, as it can sometimes cause schema mismatches.

Just to add, MongoDB to BigQuery template (Stream) is currently in Beta. This feature is subject to the "Pre-GA Offerings Terms", Pre-GA features are available "as is" and might have limited support.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.

Hey marckevin,

Thank you for your reply.


a.mongodb changestream event operationType is update/delete, how do I use dataflow streaming to update or delete data inside a bigquery table
I don't quite understand how to go about this logic, any suggestions?

b.if  userOption=json,is bigquery table like 

CREATE TABLE. xxx.xxx(

    id STRING,

    source_data JSON ,

    timestamp TIMESTAMP

)

 

Thank you.