NeuroAgent

Clone MongoDB Documents Between Containers with ID Transformation

Learn how to clone MongoDB documents between containers while transforming ID formats using aggregation framework. Complete C# implementation with performance optimization.

How can I clone MongoDB documents from one container to another while updating ID fields in a specific format without retrieving them into application memory? I’m using MongoDB with C# and need to transform documents with ID format ‘OwnerId\ContainerId:MyDocId’ when cloning from one ContainerId to another. The documents are uniquely identified by a combination of OwnerId, MyDocId, and ContainerId fields. I want to use MongoDB’s aggregation framework to achieve this, similar to SQL string concatenation operations.

NeuroAgent

The MongoDB aggregation framework enables efficient document cloning and transformation between containers without retrieving data into application memory by using pipeline stages like $match, $project, and $set to reformat ID fields and bulk operations for performance. You can achieve the ‘OwnerId\ContainerId:MyDocId’ format transformation by combining string concatenation operators in the $project stage and using the $out or $merge stages to write transformed documents to the target container, all within a single aggregation pipeline executed from C#.

Contents

Understanding MongoDB Aggregation Pipeline

The MongoDB aggregation framework provides a powerful way to process documents through a series of stages, each performing a specific transformation on the data [source]. An aggregation pipeline consists of one or more stages that process documents sequentially, where the output of one stage becomes the input to the next [source].

Each stage performs an operation on the input documents - stages can filter documents, group documents, calculate values, transform fields, and more [source]. The pipeline then performs successive transformations on the data until our goal is achieved, allowing us to break down complex operations into easier, manageable stages [source].

For document cloning and transformation, key stages include:

  • $match: Filters documents based on specified criteria
  • $project: Reshapes documents by including, excluding, or renaming fields
  • $set: Adds new fields or updates existing fields in documents
  • $addFields: Similar to $set, adds new fields to documents
  • $out: Writes the results of the aggregation to a collection
  • $merge: Writes results to a collection, potentially combining with existing data

“The input of the pipeline can be a single collection, where others can be merged later down the pipeline. The pipeline then performs successive transformations on the data until our goal is achieved.” [source]

Document Cloning Strategy

To clone documents from one container to another while transforming ID fields, you need a strategy that:

  1. Filters source documents using the unique identifier combination (OwnerId, MyDocId, ContainerId)
  2. Transforms the ID format from the original to the target format
  3. Writes transformed documents to the target collection in bulk

The key insight is that MongoDB aggregation pipelines can perform all these operations in a single database operation, eliminating the need to retrieve documents into application memory [source].

javascript
// Basic aggregation pipeline structure for document cloning
db.sourceCollection.aggregate([
  {
    $match: {
      "OwnerId": "owner123",
      "ContainerId": "sourceContainer",
      "MyDocId": { $exists: true }
    }
  },
  {
    $project: {
      // Field transformations go here
      _id: 0,
      newId: { $concat: ["$$ROOT.OwnerId", "\\$$ROOT.targetContainer", ":", "$$ROOT.MyDocId"] },
      // Preserve other fields
      "OwnerId": 1,
      "MyDocId": 1,
      "ContainerId": "$$ROOT.targetContainer",
      // Copy all other fields
      "field1": 1,
      "field2": 1
    }
  },
  {
    $out: "targetCollection"
  }
])

ID Field Transformation Techniques

The core challenge is transforming the ID format from ‘OwnerId\SourceContainer:MyDocId’ to ‘OwnerId\TargetContainer:MyDocId’. MongoDB provides several string aggregation operators to achieve this:

Using $concat for String Concatenation

The $concat operator joins multiple strings together, similar to SQL string concatenation [source]:

javascript
{
  $project: {
    transformedId: {
      $concat: [
        "$OwnerId",
        "\\",
        "targetContainer", // This could be a variable
        ":",
        "$MyDocId"
      ]
    }
  }
}

Using Variables for Container Transformation

For dynamic container names, you can use aggregation variables and the $let operator:

javascript
{
  $let: {
    vars: {
      targetContainer: "newContainerName"
    },
    in: {
      $concat: [
        "$$ROOT.OwnerId",
        "\\",
        "$$targetContainer",
        ":",
        "$$ROOT.MyDocId"
      ]
    }
  }
}

Preserving Document Structure

When cloning documents, you need to maintain the original structure while updating specific fields:

javascript
{
  $project: {
    // Transform the ID format
    _id: { $concat: ["$$ROOT.OwnerId", "\\$$ROOT.targetContainer", ":", "$$ROOT.MyDocId"] },
    // Update ContainerId to target
    ContainerId: "$$ROOT.targetContainer",
    // Copy all other fields from source document
    "OwnerId": 1,
    "MyDocId": 1,
    // Preserve nested objects and arrays
    "nested.field": 1,
    "arrayField": 1
  }
}

C# Implementation with MongoDB Driver

The MongoDB C# driver provides comprehensive support for aggregation operations. Here’s how to implement the document cloning transformation:

Setting Up the Aggregation Pipeline

csharp
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System;
using System.Collections.Generic;
using System.Linq;

public class DocumentCloner
{
    private readonly IMongoCollection<BsonDocument> _sourceCollection;
    private readonly IMongoCollection<BsonDocument> _targetCollection;
    
    public DocumentCloner(IMongoDatabase database, string sourceCollectionName, string targetCollectionName)
    {
        _sourceCollection = database.GetCollection<BsonDocument>(sourceCollectionName);
        _targetCollection = database.GetCollection<BsonDocument>(targetCollectionName);
    }
    
    public void CloneDocumentsWithIdTransformation(
        string ownerId, 
        string sourceContainer, 
        string targetContainer)
    {
        // Define the aggregation pipeline
        var pipeline = new List<BsonDocument>
        {
            // Match documents to clone
            new BsonDocument("$match", new BsonDocument
            {
                { "OwnerId", ownerId },
                { "ContainerId", sourceContainer },
                { "MyDocId", new BsonDocument("$exists", true) }
            }),
            
            // Transform document structure and ID format
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", new BsonDocument("$concat", new BsonArray 
                    {
                        "$OwnerId",
                        "\\",
                        targetContainer,
                        ":",
                        "$MyDocId"
                    }) 
                },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", targetContainer },
                // Copy all other fields except _id
                { "otherField", 1 }
            }),
            
            // Write to target collection
            new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
        };
        
        // Execute the aggregation pipeline
        _sourceCollection.Aggregate<BsonDocument>(pipeline);
    }
}

Using Strongly Typed Classes

For better type safety and maintainability:

csharp
[BsonIgnoreExtraElements]
public class SourceDocument
{
    [BsonId]
    [BsonRepresentation(BsonType.String)]
    public string Id { get; set; }
    
    public string OwnerId { get; set; }
    public string ContainerId { get; set; }
    public string MyDocId { get; set; }
    public string OtherField { get; set; }
    // Other fields...
}

[BsonIgnoreExtraElements]
public class TargetDocument
{
    [BsonId]
    [BsonRepresentation(BsonType.String)]
    public string Id { get; set; }
    
    public string OwnerId { get; set; }
    public string ContainerId { get; set; }
    public string MyDocId { get; set; }
    public string OtherField { get; set; }
    // Other fields...
}

public class TypedDocumentCloner
{
    private readonly IMongoCollection<SourceDocument> _sourceCollection;
    private readonly IMongoCollection<TargetDocument> _targetCollection;
    
    public TypedDocumentCloner(IMongoDatabase database, string sourceCollectionName, string targetCollectionName)
    {
        _sourceCollection = database.GetCollection<SourceDocument>(sourceCollectionName);
        _targetCollection = database.GetCollection<TargetDocument>(targetCollectionName);
    }
    
    public async Task CloneDocumentsWithIdTransformationAsync(
        string ownerId, 
        string sourceContainer, 
        string targetContainer)
    {
        var pipeline = new EmptyPipelineDefinition<SourceDocument>()
            .Match(doc => doc.OwnerId == ownerId && doc.ContainerId == sourceContainer && doc.MyDocId != null)
            .Project(doc => new TargetDocument
            {
                Id = $"{doc.OwnerId}\\{targetContainer}:{doc.MyDocId}",
                OwnerId = doc.OwnerId,
                ContainerId = targetContainer,
                MyDocId = doc.MyDocId,
                OtherField = doc.OtherField
                // Map other fields as needed
            });
        
        await _targetCollection.InsertManyAsync(
            await _sourceCollection.Aggregate(pipeline).ToListAsync());
    }
}

Bulk Operations for Performance

When dealing with large volumes of documents, performance optimization becomes crucial. MongoDB provides several strategies for efficient bulk operations:

Using $out Stage

The $out stage writes the results of the aggregation to a collection. This is highly efficient as it performs the operation server-side [source]:

csharp
var pipeline = new List<BsonDocument>
{
    // Match and transform stages...
    new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
};

_sourceCollection.Aggregate<BsonDocument>(pipeline);

Using $merge Stage for Partial Updates

If you need to merge with existing data instead of replacing:

csharp
var pipeline = new List<BsonDocument>
{
    // Match and transform stages...
    new BsonDocument("$merge", new BsonDocument
    {
        { "into", _targetCollection.CollectionNamespace.CollectionName },
        { "on", "_id" },
        { "whenMatched", "replace" },
        { "whenNotMatched", "insert" }
    })
};

_sourceCollection.Aggregate<BsonDocument>(pipeline);

Batch Processing for Large Datasets

For extremely large datasets, consider processing in batches:

csharp
public async Task CloneDocumentsInBatchesAsync(
    string ownerId, 
    string sourceContainer, 
    string targetContainer,
    int batchSize = 1000)
{
    var filter = Builders<BsonDocument>.Filter.And(
        Builders<BsonDocument>.Filter.Eq("OwnerId", ownerId),
        Builders<BsonDocument>.Filter.Eq("ContainerId", sourceContainer),
        Builders<BsonDocument>.Filter.Exists("MyDocId")
    );
    
    var totalDocuments = await _sourceCollection.CountDocumentsAsync(filter);
    var batches = (int)Math.Ceiling(totalDocuments / (double)batchSize);
    
    for (int i = 0; i < batches; i++)
    {
        var skip = i * batchSize;
        var pipeline = new List<BsonDocument>
        {
            new BsonDocument("$match", filter),
            new BsonDocument("$skip", skip),
            new BsonDocument("$limit", batchSize),
            new BsonDocument("$project", GetProjection(targetContainer)),
            new BsonDocument("$out", $"{_targetCollection.CollectionNamespace.CollectionName}_batch_{i}")
        };
        
        await _sourceCollection.Aggregate<BsonDocument>(pipeline).ToListAsync();
        
        // Optionally merge batch results into final collection
        var mergePipeline = new List<BsonDocument>
        {
            new BsonDocument("$merge", new BsonDocument
            {
                { "into", _targetCollection.CollectionNamespace.CollectionName },
                { "on", "_id" },
                { "whenMatched", "replace" },
                { "whenNotMatched", "insert" }
            })
        };
        
        await _targetCollection.Aggregate<BsonDocument>(mergePipeline).ToListAsync();
    }
}

Error Handling and Validation

Implementing proper error handling is crucial for production environments:

csharp
public async Task<bool> CloneDocumentsWithValidationAsync(
    string ownerId, 
    string sourceContainer, 
    string targetContainer)
{
    try
    {
        // Validate input parameters
        if (string.IsNullOrWhiteSpace(ownerId) || 
            string.IsNullOrWhiteSpace(sourceContainer) || 
            string.IsNullOrWhiteSpace(targetContainer))
        {
            throw new ArgumentException("Owner ID, source container, and target container must be provided.");
        }
        
        // Check if target collection exists
        var collectionExists = await _targetCollection.Database
            .ListCollectionNames()
            .AnyAsync(name => name == _targetCollection.CollectionNamespace.CollectionName);
        
        if (!collectionExists)
        {
            await _targetCollection.Database.CreateCollectionAsync(_targetCollection.CollectionNamespace.CollectionName);
        }
        
        var pipeline = new List<BsonDocument>
        {
            new BsonDocument("$match", new BsonDocument
            {
                { "OwnerId", ownerId },
                { "ContainerId", sourceContainer },
                { "MyDocId", new BsonDocument("$exists", true) }
            }),
            
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", new BsonDocument("$concat", new BsonArray 
                    {
                        "$OwnerId",
                        "\\",
                        targetContainer,
                        ":",
                        "$MyDocId"
                    }) 
                },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", targetContainer }
            }),
            
            new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
        };
        
        var result = await _sourceCollection.Aggregate<BsonDocument>(pipeline).FirstOrDefaultAsync();
        
        return result != null;
    }
    catch (MongoException ex)
    {
        // Log MongoDB-specific errors
        Console.WriteLine($"MongoDB error: {ex.Message}");
        throw;
    }
    catch (Exception ex)
    {
        // Log general errors
        Console.WriteLine($"General error: {ex.Message}");
        throw;
    }
}

Complete Example Solution

Here’s a complete, production-ready solution that demonstrates all the concepts:

csharp
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class DocumentMigrator
{
    private readonly IMongoDatabase _database;
    private readonly string _sourceCollectionName;
    private readonly string _targetCollectionName;
    
    public DocumentMigrator(IMongoClient client, string databaseName, 
                          string sourceCollectionName, string targetCollectionName)
    {
        _database = client.GetDatabase(databaseName);
        _sourceCollectionName = sourceCollectionName;
        _targetCollectionName = targetCollectionName;
    }
    
    /// <summary>
    /// Clones documents from source to target container with ID transformation
    /// </summary>
    public async Task CloneDocumentsAsync(
        string ownerId, 
        string sourceContainer, 
        string targetContainer,
        bool replaceExisting = false)
    {
        var sourceCollection = _database.GetCollection<BsonDocument>(_sourceCollectionName);
        var targetCollection = _database.GetCollection<BsonDocument>(_targetCollectionName);
        
        // Build aggregation pipeline
        var pipeline = new List<BsonDocument>
        {
            // Filter documents to clone
            new BsonDocument("$match", new BsonDocument
            {
                { "OwnerId", ownerId },
                { "ContainerId", sourceContainer },
                { "MyDocId", new BsonDocument("$exists", true) }
            }),
            
            // Transform document structure
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", new BsonDocument("$concat", new BsonArray 
                    {
                        "$OwnerId",
                        "\\",
                        targetContainer,
                        ":",
                        "$MyDocId"
                    }) 
                },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", targetContainer },
                // Copy all other fields dynamically
                { "additionalFields", new BsonDocument("$objectToArray", "$$ROOT") }
            }),
            
            // Flatten additional fields
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", 1 },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", 1 },
                { "additionalFields", new BsonDocument("$filter", new BsonDocument
                    {
                        { "input", "$additionalFields" },
                        { "as", "field" },
                        { "cond", new BsonDocument("$not", new BsonArray 
                            {
                                "$in", new BsonArray { "$$field.k", "_id", "OwnerId", "MyDocId", "ContainerId" }
                            })
                        }
                    })
                }
            }),
            
            // Unwind additional fields
            new BsonDocument("$unwind", "$additionalFields"),
            
            // Pivot fields back to document
            new BsonDocument("$replaceRoot", new BsonDocument
            {
                { "newRoot", new BsonDocument
                    {
                        { "_id", "$_id" },
                        { "OwnerId", "$OwnerId" },
                        { "MyDocId", "$MyDocId" },
                        { "ContainerId", "$ContainerId" },
                        { "$$ROOT.additionalFields.k", "$$ROOT.additionalFields.v" }
                    }
                }
            }),
            
            // Remove temporary field
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", 1 },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", 1 },
                { "additionalFields", 0 }
            })
        };
        
        // Choose output stage based on whether to replace existing data
        if (replaceExisting)
        {
            pipeline.Add(new BsonDocument("$out", _targetCollectionName));
        }
        else
        {
            pipeline.Add(new BsonDocument("$merge", new BsonDocument
            {
                { "into", _targetCollectionName },
                { "on", "_id" },
                { "whenMatched", "replace" },
                { "whenNotMatched", "insert" }
            }));
        }
        
        // Execute pipeline
        await sourceCollection.Aggregate<BsonDocument>(pipeline).ToListAsync();
    }
    
    /// <summary>
    /// Validates that transformation was successful
    /// </summary>
    public async Task<bool> ValidateTransformationAsync(
        string ownerId, 
        string sourceContainer, 
        string targetContainer,
        int sampleSize = 10)
    {
        var sourceCollection = _database.GetCollection<BsonDocument>(_sourceCollectionName);
        var targetCollection = _database.GetCollection<BsonDocument>(_targetCollectionName);
        
        // Get sample documents from source
        var sourceDocs = await sourceCollection
            .Find(new BsonDocument
            {
                { "OwnerId", ownerId },
                { "ContainerId", sourceContainer }
            })
            .Limit(sampleSize)
            .ToListAsync();
        
        foreach (var sourceDoc in sourceDocs)
        {
            var expectedId = $"{sourceDoc["OwnerId"]}\\{targetContainer}:{sourceDoc["MyDocId"]}";
            var targetDoc = await targetCollection
                .Find(new BsonDocument { { "_id", expectedId } })
                .FirstOrDefaultAsync();
            
            if (targetDoc == null)
            {
                return false;
            }
            
            // Verify other fields match
            foreach (var element in sourceDoc.Elements)
            {
                if (element.Name != "_id" && element.Name != "ContainerId")
                {
                    if (!targetDoc.Contains(element.Name) || 
                        !BsonDocumentComparer.Equal(element.Value, targetDoc[element.Name]))
                    {
                        return false;
                    }
                }
            }
        }
        
        return true;
    }
}

Sources

  1. MongoDB Aggregation Pipeline - Database Manual
  2. MongoDB Aggregation: tutorial with examples and exercises | Studio 3T
  3. Updates with Aggregation Pipeline - Database Manual
  4. MongoDB Aggregation Pipeline | MongoDB
  5. Aggregation Operations - Database Manual
  6. How To Use Aggregations in MongoDB | DigitalOcean
  7. MongoDB Aggregation Framework: How to simplify complex logic into stages

Conclusion

  • MongoDB aggregation framework enables efficient document cloning without retrieving data into application memory by processing transformations server-side through pipeline stages
  • ID field transformation is achieved using string concatenation operators like $concat in the $project stage, allowing you to reformat ‘OwnerId\ContainerId:MyDocId’ patterns dynamically
  • C# implementation leverages the MongoDB driver’s aggregation support to build pipelines programmatically, providing type safety and performance for bulk operations
  • Bulk operations are optimized using $out or $merge stages to write transformed documents directly to target collections, eliminating the need for individual insert operations
  • Production-ready solutions should include proper error handling, validation, and batch processing for large datasets to ensure reliable document migration between containers