Datafusion Mongodb record duplicate record problem

Hello, 

I'm trying to read data from MongoDB and send it to a BigQuery sink.

I'm having issues reading the data because I have duplicate name records in different nodes (same problem when I have the same field name) see the pictures attached 

Anyone who has experienced this problem and resolve it?

exemple_record.png

exemple_field.png

ouss94_0-1693750939948.png

 



Thank you
Best regards 

0 10 788
10 REPLIES 10

If you're facing issues with duplicate field names within different nodes of a document, you can use the Rename transformation in Cloud Data Fusion. For instance, if two nodes in a MongoDB document both have a field named "address", you can rename one of them to "billing_address" and the other to "shipping_address" to avoid conflicts.

Additional Tips:

  • When dealing with duplicate field names, especially within nested structures of a MongoDB document, the Rename transformation is essential. It allows you to provide distinct names for fields that might conflict when ingested into systems like BigQuery.
  • If merging data from multiple MongoDB collections into a single BigQuery table, ensure that the schema is consistent and handle any field name overlaps appropriately.

Thank you for your quick reply, 
Do you have an example I can see? 
Is rename transform a component (didn't find it in hub)? or you mean rename the field in mongoDb input ? 

what should i do in case of 2 record with the same name ( see picture 1  customer.contact and shipTo.contact ) i've tried to rename in mongodb input but i have 0 output.

Thank you

Best Regards

Cloud Data Fusion does not currently have a standalone "Rename" transformation in the Hub. However, you can still rename fields in your data using the following methods:

  • Rename fields in the MongoDB source configuration. This is a valid approach to handle duplicate field names.
  • Use a scripting transformation to rename fields. This can be useful if you need to rename fields based on complex criteria.

Example

The following example shows how to rename the customer.contact and shipTo.contact fields in the MongoDB source configuration:

Source: MongoDB
Configuration:
  Rename fields:
    customer.contact: customer_contact
    shipTo.contact: shipTo_contact
Sink: BigQuery

This will rename the customer.contact field to customer_contact and the shipTo.contact field to shipTo_contact before the data is sent to BigQuery.

Troubleshooting Zero Output

If you are getting 0 output after renaming the fields, check the following:

  • Make sure that the fields are renamed correctly in the MongoDB source configuration or scripting transformation.
  • Make sure that the output of the MongoDB source connector or scripting transformation is connected to the sink connector.
  • Make sure that the sink connector is configured correctly.

Thank you for your replay, 
i've tried a lot of syntaxe possibility in mongodb input query but still have no output on my renamed field, note that when i try my query in mongodb client ( studio 3T / robot ) it works fine see picture ( in blue old field name in orange new field name ).

here is an exemple of a query : 
{
"$or": [
{
"objectInfos.creation.date": {
"$gte": {
"$date": "2023-08-04T00:11:16.847Z"
}
}
},
{
"objectInfos.lastUpdate.date": {
"$gte": {
"$date": "2023-08-04T00:11:16.847Z"
}
}
},
{
"objectInfos.lastModification.date": {
"$gte": {
"$date": "2023-08-04T00:11:16.847Z"
}
}
}
]
},
{
"$addFields": {
"shipTo.contact2": "$shipTo.contact"
}
}

can you correct ? or give me exemple of how i should write it in mongodb input query 

ouss94_0-1693814784072.png

 


Thank you 
Best Regards

Rename Fields in MongoDB Input

To rename fields in the MongoDB input query, use the $addFields operator. The syntax is as follows:

 

{"$addFields": {"new_field_name": "$old_field_name"}}

For example, to rename the shipTo.contact field to shipTo.contact2, you would use the following query:

 

{"$addFields": {"shipTo.contact2": "$shipTo.contact"}}

Aggregation Pipeline Structure

To combine a filter ($or) and renaming ($addFields) in MongoDB's aggregation pipeline, use the following structure:

 

[
  {"$match": {/* filter criteria */}},
  {"$addFields": {/* rename fields */}},
]

For example, the following aggregation pipeline will rename the shipTo.contact field to shipTo.contact2 for all documents where the objectInfos.creation.date, objectInfos.lastUpdate.date, or objectInfos.lastModification.date field is greater than or equal to 2023-08-04T00:11:16.847Z

[
  {"$match": {
    "$or": [
      {"objectInfos.creation.date": {"$gte": {"$date": "2023-08-04T00:11:16.847Z"}}},
      {"objectInfos.lastUpdate.date": {"$gte": {"$date": "2023-08-04T00:11:16.847Z"}}},
      {"objectInfos.lastModification.date": {"$gte": {"$date": "2023-08-04T00:11:16.847Z"}}},
    ]
  }},
  {"$addFields": {"shipTo.contact2": "$shipTo.contact"}}
]

Overwriting Fields with $addFields

The $addFields operator will create a new field in the document. If the new field already exists, the old field will be overwritten.

Example Cloud Data Fusion Pipeline

The following example shows a Cloud Data Fusion pipeline that uses the $addFields operator to rename the shipTo.contact field to shipTo.contact2:

 

Source: MongoDB
Input: {"$addFields": {"shipTo.contact2": "$shipTo.contact"}}
Sink: BigQuery

This pipeline will rename the shipTo.contact field to shipTo.contact2 before the data is sent to BigQuery.

Hello ms4446, 
I've tried multiple syntax possibility, but with no result, it seems that the mongodb input query understands the only operator function ($or / $and ...) i get this error ('unknown operator: $match' / $project .....)  every time i try to use $match / $project / $addFields / getField ... ,maybe i'm not seeing something or i'm writing the query falsely, it would be really helpful to have a real example that was being tested
ps : I've tried all possible syntax possibilities I could imagine including the one you specified
i have this exemple mongo query : include _id & rename alfa_id to alfa_id2


db.getCollection('alfa').aggregate([{$project: {'_id': 1,'alfa_id2': '$alfa_id'}}])


the extended json format would be :
[ { "$project": { "_id": 1, "alfa_id2": "alfa_id2" } } ]

when i run in data fusion i get this error : 'unknown operator: $project' 

if i run without $project : [ { "_id": 1, "alfa_id2": "$alfa_id" } ] 
i get no error but i have nothing in output 0 rows.

Thank you for your time

Best Regards

Based on your previous responses and the error messages you have provided, I believe the issue is that you are trying to use MongoDB aggregation pipeline operators in the Cloud Data Fusion MongoDB input query. The MongoDB input query only supports the following operators:

  • $match
  • $limit
  • $sample

If you need to use other aggregation pipeline operators, such as $project or $addFields, you can use the MongoDB source plugin to input queries that include these operators.

Example

The following example shows a Cloud Data Fusion pipeline that uses the MongoDB source plugin to rename the alfa_id field to alfa_id2 and include the _id field:

Source: MongoDB
Input: {"$project": {"_id": 1, "alfa_id2": "$alfa_id"}}
Sink: BigQuery

This pipeline will rename the alfa_id field to alfa_id2 and include the _id field before the data is sent to BigQuery.

the error message to : 

{"$project": {"_id": 1, "alfa_id2": "$alfa_id"}}
Exception reading next key/val from mongo: Query failed with error code 2 and error message 'unknown top level operator: $project.

Her is an example of the syntaxes i used :

{"$project": {"_id": 1, "alfa_id2": "$alfa_id"}} // error : unknown operator: $project
[{"$project": {"_id": 1, "alfa_id2": "$alfa_id"}}] error : unknown operator: $project
[ { "_id": 1, "alfa_id2": "$alfa_id" } ] no error but i have nothing in output 0 rows.
{ "_id": 1, "alfa_id2": "$alfa_id" } no error but i have nothing in output 0 rows.
db.getCollection('alfa').aggregate([{$project: {'_id': 1,'alfa_id2': '$alfa_id'}}]) error can't use db.getCollection('alfa') ...

 i still can't figure out how i may do project aggregation in mongodb Input query ....

Best Regards

The error messages you're receiving indicate that the MongoDB input query in Cloud Data Fusion doesn't support the $project operator directly. This is consistent with the information you've provided earlier.

To handle this:

  1. Use MongoDB Source Plugin: In Cloud Data Fusion, use the MongoDB source plugin to input queries. While direct input might have limitations, using the plugin could offer more flexibility with aggregation pipeline operators.

  2. Example Pipeline:

    • Source: MongoDB
    • Input Query:
      [ {"$project": {"_id": 1, "alfa_id2": "$alfa_id"}} ]
    • Sink: BigQuery

This pipeline aims to rename the alfa_id field to alfa_id2 and include the _id field before sending the data to BigQuery.

oh my god, this is absurd.
i can't be more clear then that.
thank you for your replay, but it's unfortunately useless I already tried :

{"$project": {"_id": 1, "alfa_id2": "$alfa_id"}} // error : unknown operator: $project
[{"$project": {"_id": 1, "alfa_id2": "$alfa_id"}}] error : unknown operator: $project
[ { "_id": 1, "alfa_id2": "$alfa_id" } ] no error but i have nothing in output 0 rows.
{ "_id": 1, "alfa_id2": "$alfa_id" } no error but i have nothing in output 0 rows.
db.getCollection('alfa').aggregate([{$project: {'_id': 1,'alfa_id2': '$alfa_id'}}]) error can't use db.getCollection('alfa') ...

as i specified it in my previous replay.

Anyone who  has an alternative way to use aggregation in mongodb source query pls ? 

Thank you !