-
Notifications
You must be signed in to change notification settings - Fork 0
/
MongoChangeStreamTest.cs
65 lines (55 loc) · 2.54 KB
/
MongoChangeStreamTest.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
using MongoDB.Bson;
using MongoDB.Driver;
using Moq;
using NUnit.Framework;
using System.Collections.Generic;
using System.Linq;
namespace ChangeStreamTests
{
public class ChangeStreamTest
{
private Mock<IMongoCollection<BsonDocument>> _mockCollection;
private Mock<IAsyncCursor<ChangeStreamDocument<BsonDocument>>> _mockCursor;
[SetUp]
public void SetUp()
{
_mockCollection = new Mock<IMongoCollection<BsonDocument>>();
_mockCursor = new Mock<IAsyncCursor<ChangeStreamDocument<BsonDocument>>>();
}
[Test]
public void ShouldDetectInsertOperationFromChangeStream()
{
// Arrange
var insertDocument = new ChangeStreamDocument<BsonDocument>(
new BsonDocument("key", "value"), // FullDocument
new BsonDocument("id", new ObjectId()), // ResumeToken
ChangeStreamOperationType.Insert, // OperationType
new BsonDocument("_id", new ObjectId()), // DocumentKey
"test.ns", // Namespace
new BsonDocument("updateDescription", "none"), // UpdateDescription
null); // ClusterTime
var mockResults = new List<ChangeStreamDocument<BsonDocument>> { insertDocument };
// Configure the mock cursor
_mockCursor.SetupSequence(c => c.MoveNext(It.IsAny<System.Threading.CancellationToken>()))
.Returns(true) // First call returns data
.Returns(false); // Subsequent call ends iteration
_mockCursor.Setup(c => c.Current).Returns(mockResults);
// Configure the collection's Watch method
_mockCollection.Setup(c => c.Watch(
It.IsAny<PipelineDefinition<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>>>(),
It.IsAny<ChangeStreamOptions>(),
It.IsAny<System.Threading.CancellationToken>()
)).Returns(_mockCursor.Object);
// Act
var changeStream = _mockCollection.Object.Watch();
var detectedOperations = new List<ChangeStreamOperationType>();
while (changeStream.MoveNext())
{
detectedOperations.AddRange(changeStream.Current.Select(doc => doc.OperationType));
}
// Assert
Assert.That(detectedOperations.Count, Is.EqualTo(1));
Assert.That(detectedOperations.First(), Is.EqualTo(ChangeStreamOperationType.Insert));
}
}
}