How can I clone MongoDB documents from one container to another while updating ID fields in a specific format without retrieving them into application memory? I’m using MongoDB with C# and need to transform documents with ID format ‘OwnerId\ContainerId:MyDocId’ when cloning from one ContainerId to another. The documents are uniquely identified by a combination of OwnerId, MyDocId, and ContainerId fields. I want to use MongoDB’s aggregation framework to achieve this, similar to SQL string concatenation operations.
The MongoDB aggregation framework enables efficient document cloning and transformation between containers without retrieving data into application memory by using pipeline stages like $match, $project, and $set to reformat ID fields and bulk operations for performance. You can achieve the ‘OwnerId\ContainerId:MyDocId’ format transformation by combining string concatenation operators in the $project stage and using the $out or $merge stages to write transformed documents to the target container, all within a single aggregation pipeline executed from C#.
Contents
- Understanding MongoDB Aggregation Pipeline
- Document Cloning Strategy
- ID Field Transformation Techniques
- C# Implementation with MongoDB Driver
- Bulk Operations for Performance
- Error Handling and Validation
- Complete Example Solution
Understanding MongoDB Aggregation Pipeline
The MongoDB aggregation framework provides a powerful way to process documents through a series of stages, each performing a specific transformation on the data [source]. An aggregation pipeline consists of one or more stages that process documents sequentially, where the output of one stage becomes the input to the next [source].
Each stage performs an operation on the input documents - stages can filter documents, group documents, calculate values, transform fields, and more [source]. The pipeline then performs successive transformations on the data until our goal is achieved, allowing us to break down complex operations into easier, manageable stages [source].
For document cloning and transformation, key stages include:
- $match: Filters documents based on specified criteria
- $project: Reshapes documents by including, excluding, or renaming fields
- $set: Adds new fields or updates existing fields in documents
- $addFields: Similar to $set, adds new fields to documents
- $out: Writes the results of the aggregation to a collection
- $merge: Writes results to a collection, potentially combining with existing data
“The input of the pipeline can be a single collection, where others can be merged later down the pipeline. The pipeline then performs successive transformations on the data until our goal is achieved.” [source]
Document Cloning Strategy
To clone documents from one container to another while transforming ID fields, you need a strategy that:
- Filters source documents using the unique identifier combination (OwnerId, MyDocId, ContainerId)
- Transforms the ID format from the original to the target format
- Writes transformed documents to the target collection in bulk
The key insight is that MongoDB aggregation pipelines can perform all these operations in a single database operation, eliminating the need to retrieve documents into application memory [source].
// Basic aggregation pipeline structure for document cloning
db.sourceCollection.aggregate([
{
$match: {
"OwnerId": "owner123",
"ContainerId": "sourceContainer",
"MyDocId": { $exists: true }
}
},
{
$project: {
// Field transformations go here
_id: 0,
newId: { $concat: ["$$ROOT.OwnerId", "\\$$ROOT.targetContainer", ":", "$$ROOT.MyDocId"] },
// Preserve other fields
"OwnerId": 1,
"MyDocId": 1,
"ContainerId": "$$ROOT.targetContainer",
// Copy all other fields
"field1": 1,
"field2": 1
}
},
{
$out: "targetCollection"
}
])
ID Field Transformation Techniques
The core challenge is transforming the ID format from ‘OwnerId\SourceContainer:MyDocId’ to ‘OwnerId\TargetContainer:MyDocId’. MongoDB provides several string aggregation operators to achieve this:
Using $concat for String Concatenation
The $concat operator joins multiple strings together, similar to SQL string concatenation [source]:
{
$project: {
transformedId: {
$concat: [
"$OwnerId",
"\\",
"targetContainer", // This could be a variable
":",
"$MyDocId"
]
}
}
}
Using Variables for Container Transformation
For dynamic container names, you can use aggregation variables and the $let operator:
{
$let: {
vars: {
targetContainer: "newContainerName"
},
in: {
$concat: [
"$$ROOT.OwnerId",
"\\",
"$$targetContainer",
":",
"$$ROOT.MyDocId"
]
}
}
}
Preserving Document Structure
When cloning documents, you need to maintain the original structure while updating specific fields:
{
$project: {
// Transform the ID format
_id: { $concat: ["$$ROOT.OwnerId", "\\$$ROOT.targetContainer", ":", "$$ROOT.MyDocId"] },
// Update ContainerId to target
ContainerId: "$$ROOT.targetContainer",
// Copy all other fields from source document
"OwnerId": 1,
"MyDocId": 1,
// Preserve nested objects and arrays
"nested.field": 1,
"arrayField": 1
}
}
C# Implementation with MongoDB Driver
The MongoDB C# driver provides comprehensive support for aggregation operations. Here’s how to implement the document cloning transformation:
Setting Up the Aggregation Pipeline
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
public class DocumentCloner
{
private readonly IMongoCollection<BsonDocument> _sourceCollection;
private readonly IMongoCollection<BsonDocument> _targetCollection;
public DocumentCloner(IMongoDatabase database, string sourceCollectionName, string targetCollectionName)
{
_sourceCollection = database.GetCollection<BsonDocument>(sourceCollectionName);
_targetCollection = database.GetCollection<BsonDocument>(targetCollectionName);
}
public void CloneDocumentsWithIdTransformation(
string ownerId,
string sourceContainer,
string targetContainer)
{
// Define the aggregation pipeline
var pipeline = new List<BsonDocument>
{
// Match documents to clone
new BsonDocument("$match", new BsonDocument
{
{ "OwnerId", ownerId },
{ "ContainerId", sourceContainer },
{ "MyDocId", new BsonDocument("$exists", true) }
}),
// Transform document structure and ID format
new BsonDocument("$project", new BsonDocument
{
{ "_id", new BsonDocument("$concat", new BsonArray
{
"$OwnerId",
"\\",
targetContainer,
":",
"$MyDocId"
})
},
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", targetContainer },
// Copy all other fields except _id
{ "otherField", 1 }
}),
// Write to target collection
new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
};
// Execute the aggregation pipeline
_sourceCollection.Aggregate<BsonDocument>(pipeline);
}
}
Using Strongly Typed Classes
For better type safety and maintainability:
[BsonIgnoreExtraElements]
public class SourceDocument
{
[BsonId]
[BsonRepresentation(BsonType.String)]
public string Id { get; set; }
public string OwnerId { get; set; }
public string ContainerId { get; set; }
public string MyDocId { get; set; }
public string OtherField { get; set; }
// Other fields...
}
[BsonIgnoreExtraElements]
public class TargetDocument
{
[BsonId]
[BsonRepresentation(BsonType.String)]
public string Id { get; set; }
public string OwnerId { get; set; }
public string ContainerId { get; set; }
public string MyDocId { get; set; }
public string OtherField { get; set; }
// Other fields...
}
public class TypedDocumentCloner
{
private readonly IMongoCollection<SourceDocument> _sourceCollection;
private readonly IMongoCollection<TargetDocument> _targetCollection;
public TypedDocumentCloner(IMongoDatabase database, string sourceCollectionName, string targetCollectionName)
{
_sourceCollection = database.GetCollection<SourceDocument>(sourceCollectionName);
_targetCollection = database.GetCollection<TargetDocument>(targetCollectionName);
}
public async Task CloneDocumentsWithIdTransformationAsync(
string ownerId,
string sourceContainer,
string targetContainer)
{
var pipeline = new EmptyPipelineDefinition<SourceDocument>()
.Match(doc => doc.OwnerId == ownerId && doc.ContainerId == sourceContainer && doc.MyDocId != null)
.Project(doc => new TargetDocument
{
Id = $"{doc.OwnerId}\\{targetContainer}:{doc.MyDocId}",
OwnerId = doc.OwnerId,
ContainerId = targetContainer,
MyDocId = doc.MyDocId,
OtherField = doc.OtherField
// Map other fields as needed
});
await _targetCollection.InsertManyAsync(
await _sourceCollection.Aggregate(pipeline).ToListAsync());
}
}
Bulk Operations for Performance
When dealing with large volumes of documents, performance optimization becomes crucial. MongoDB provides several strategies for efficient bulk operations:
Using $out Stage
The $out stage writes the results of the aggregation to a collection. This is highly efficient as it performs the operation server-side [source]:
var pipeline = new List<BsonDocument>
{
// Match and transform stages...
new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
};
_sourceCollection.Aggregate<BsonDocument>(pipeline);
Using $merge Stage for Partial Updates
If you need to merge with existing data instead of replacing:
var pipeline = new List<BsonDocument>
{
// Match and transform stages...
new BsonDocument("$merge", new BsonDocument
{
{ "into", _targetCollection.CollectionNamespace.CollectionName },
{ "on", "_id" },
{ "whenMatched", "replace" },
{ "whenNotMatched", "insert" }
})
};
_sourceCollection.Aggregate<BsonDocument>(pipeline);
Batch Processing for Large Datasets
For extremely large datasets, consider processing in batches:
public async Task CloneDocumentsInBatchesAsync(
string ownerId,
string sourceContainer,
string targetContainer,
int batchSize = 1000)
{
var filter = Builders<BsonDocument>.Filter.And(
Builders<BsonDocument>.Filter.Eq("OwnerId", ownerId),
Builders<BsonDocument>.Filter.Eq("ContainerId", sourceContainer),
Builders<BsonDocument>.Filter.Exists("MyDocId")
);
var totalDocuments = await _sourceCollection.CountDocumentsAsync(filter);
var batches = (int)Math.Ceiling(totalDocuments / (double)batchSize);
for (int i = 0; i < batches; i++)
{
var skip = i * batchSize;
var pipeline = new List<BsonDocument>
{
new BsonDocument("$match", filter),
new BsonDocument("$skip", skip),
new BsonDocument("$limit", batchSize),
new BsonDocument("$project", GetProjection(targetContainer)),
new BsonDocument("$out", $"{_targetCollection.CollectionNamespace.CollectionName}_batch_{i}")
};
await _sourceCollection.Aggregate<BsonDocument>(pipeline).ToListAsync();
// Optionally merge batch results into final collection
var mergePipeline = new List<BsonDocument>
{
new BsonDocument("$merge", new BsonDocument
{
{ "into", _targetCollection.CollectionNamespace.CollectionName },
{ "on", "_id" },
{ "whenMatched", "replace" },
{ "whenNotMatched", "insert" }
})
};
await _targetCollection.Aggregate<BsonDocument>(mergePipeline).ToListAsync();
}
}
Error Handling and Validation
Implementing proper error handling is crucial for production environments:
public async Task<bool> CloneDocumentsWithValidationAsync(
string ownerId,
string sourceContainer,
string targetContainer)
{
try
{
// Validate input parameters
if (string.IsNullOrWhiteSpace(ownerId) ||
string.IsNullOrWhiteSpace(sourceContainer) ||
string.IsNullOrWhiteSpace(targetContainer))
{
throw new ArgumentException("Owner ID, source container, and target container must be provided.");
}
// Check if target collection exists
var collectionExists = await _targetCollection.Database
.ListCollectionNames()
.AnyAsync(name => name == _targetCollection.CollectionNamespace.CollectionName);
if (!collectionExists)
{
await _targetCollection.Database.CreateCollectionAsync(_targetCollection.CollectionNamespace.CollectionName);
}
var pipeline = new List<BsonDocument>
{
new BsonDocument("$match", new BsonDocument
{
{ "OwnerId", ownerId },
{ "ContainerId", sourceContainer },
{ "MyDocId", new BsonDocument("$exists", true) }
}),
new BsonDocument("$project", new BsonDocument
{
{ "_id", new BsonDocument("$concat", new BsonArray
{
"$OwnerId",
"\\",
targetContainer,
":",
"$MyDocId"
})
},
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", targetContainer }
}),
new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
};
var result = await _sourceCollection.Aggregate<BsonDocument>(pipeline).FirstOrDefaultAsync();
return result != null;
}
catch (MongoException ex)
{
// Log MongoDB-specific errors
Console.WriteLine($"MongoDB error: {ex.Message}");
throw;
}
catch (Exception ex)
{
// Log general errors
Console.WriteLine($"General error: {ex.Message}");
throw;
}
}
Complete Example Solution
Here’s a complete, production-ready solution that demonstrates all the concepts:
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
public class DocumentMigrator
{
private readonly IMongoDatabase _database;
private readonly string _sourceCollectionName;
private readonly string _targetCollectionName;
public DocumentMigrator(IMongoClient client, string databaseName,
string sourceCollectionName, string targetCollectionName)
{
_database = client.GetDatabase(databaseName);
_sourceCollectionName = sourceCollectionName;
_targetCollectionName = targetCollectionName;
}
/// <summary>
/// Clones documents from source to target container with ID transformation
/// </summary>
public async Task CloneDocumentsAsync(
string ownerId,
string sourceContainer,
string targetContainer,
bool replaceExisting = false)
{
var sourceCollection = _database.GetCollection<BsonDocument>(_sourceCollectionName);
var targetCollection = _database.GetCollection<BsonDocument>(_targetCollectionName);
// Build aggregation pipeline
var pipeline = new List<BsonDocument>
{
// Filter documents to clone
new BsonDocument("$match", new BsonDocument
{
{ "OwnerId", ownerId },
{ "ContainerId", sourceContainer },
{ "MyDocId", new BsonDocument("$exists", true) }
}),
// Transform document structure
new BsonDocument("$project", new BsonDocument
{
{ "_id", new BsonDocument("$concat", new BsonArray
{
"$OwnerId",
"\\",
targetContainer,
":",
"$MyDocId"
})
},
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", targetContainer },
// Copy all other fields dynamically
{ "additionalFields", new BsonDocument("$objectToArray", "$$ROOT") }
}),
// Flatten additional fields
new BsonDocument("$project", new BsonDocument
{
{ "_id", 1 },
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", 1 },
{ "additionalFields", new BsonDocument("$filter", new BsonDocument
{
{ "input", "$additionalFields" },
{ "as", "field" },
{ "cond", new BsonDocument("$not", new BsonArray
{
"$in", new BsonArray { "$$field.k", "_id", "OwnerId", "MyDocId", "ContainerId" }
})
}
})
}
}),
// Unwind additional fields
new BsonDocument("$unwind", "$additionalFields"),
// Pivot fields back to document
new BsonDocument("$replaceRoot", new BsonDocument
{
{ "newRoot", new BsonDocument
{
{ "_id", "$_id" },
{ "OwnerId", "$OwnerId" },
{ "MyDocId", "$MyDocId" },
{ "ContainerId", "$ContainerId" },
{ "$$ROOT.additionalFields.k", "$$ROOT.additionalFields.v" }
}
}
}),
// Remove temporary field
new BsonDocument("$project", new BsonDocument
{
{ "_id", 1 },
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", 1 },
{ "additionalFields", 0 }
})
};
// Choose output stage based on whether to replace existing data
if (replaceExisting)
{
pipeline.Add(new BsonDocument("$out", _targetCollectionName));
}
else
{
pipeline.Add(new BsonDocument("$merge", new BsonDocument
{
{ "into", _targetCollectionName },
{ "on", "_id" },
{ "whenMatched", "replace" },
{ "whenNotMatched", "insert" }
}));
}
// Execute pipeline
await sourceCollection.Aggregate<BsonDocument>(pipeline).ToListAsync();
}
/// <summary>
/// Validates that transformation was successful
/// </summary>
public async Task<bool> ValidateTransformationAsync(
string ownerId,
string sourceContainer,
string targetContainer,
int sampleSize = 10)
{
var sourceCollection = _database.GetCollection<BsonDocument>(_sourceCollectionName);
var targetCollection = _database.GetCollection<BsonDocument>(_targetCollectionName);
// Get sample documents from source
var sourceDocs = await sourceCollection
.Find(new BsonDocument
{
{ "OwnerId", ownerId },
{ "ContainerId", sourceContainer }
})
.Limit(sampleSize)
.ToListAsync();
foreach (var sourceDoc in sourceDocs)
{
var expectedId = $"{sourceDoc["OwnerId"]}\\{targetContainer}:{sourceDoc["MyDocId"]}";
var targetDoc = await targetCollection
.Find(new BsonDocument { { "_id", expectedId } })
.FirstOrDefaultAsync();
if (targetDoc == null)
{
return false;
}
// Verify other fields match
foreach (var element in sourceDoc.Elements)
{
if (element.Name != "_id" && element.Name != "ContainerId")
{
if (!targetDoc.Contains(element.Name) ||
!BsonDocumentComparer.Equal(element.Value, targetDoc[element.Name]))
{
return false;
}
}
}
}
return true;
}
}
Sources
- MongoDB Aggregation Pipeline - Database Manual
- MongoDB Aggregation: tutorial with examples and exercises | Studio 3T
- Updates with Aggregation Pipeline - Database Manual
- MongoDB Aggregation Pipeline | MongoDB
- Aggregation Operations - Database Manual
- How To Use Aggregations in MongoDB | DigitalOcean
- MongoDB Aggregation Framework: How to simplify complex logic into stages
Conclusion
- MongoDB aggregation framework enables efficient document cloning without retrieving data into application memory by processing transformations server-side through pipeline stages
- ID field transformation is achieved using string concatenation operators like
$concatin the$projectstage, allowing you to reformat ‘OwnerId\ContainerId:MyDocId’ patterns dynamically - C# implementation leverages the MongoDB driver’s aggregation support to build pipelines programmatically, providing type safety and performance for bulk operations
- Bulk operations are optimized using
$outor$mergestages to write transformed documents directly to target collections, eliminating the need for individual insert operations - Production-ready solutions should include proper error handling, validation, and batch processing for large datasets to ensure reliable document migration between containers