diff --git a/README.md b/README.md
index 22fcefb..fb465d1 100644
--- a/README.md
+++ b/README.md
@@ -29,6 +29,7 @@ Both journal and snapshot store share the same configuration keys (however they
```hocon
akka.persistence {
journal {
+ plugin = "akka.persistence.journal.mongodb"
mongodb {
# qualified type name of the MongoDb persistence journal actor
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
@@ -58,6 +59,7 @@ akka.persistence {
}
snapshot-store {
+ plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
# qualified type name of the MongoDB persistence snapshot actor
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 90f6073..9b00b6f 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -1,3 +1,4 @@
-#### 1.4.14 January 13 2021 ####
+#### 1.4.17 March 17 2021 ####
-* Bump [Akka.NET version to 1.4.14](https://github.com/akkadotnet/akka.net/releases/tag/1.4.14), which adds the `Timestamp` to the `EventEnvelope` data structure - so it can be used for sorting / ordering after the fact in an Akka.Persistence.Query
+* Bump [Akka.NET version to 1.4.17](https://github.com/akkadotnet/akka.net/releases/tag/1.4.17)
+* [Resolve MongoDb write atomicity issues on write by not updating metadata collection](https://github.com/akkadotnet/Akka.Persistence.MongoDB/pull/186) - this is an important change that makes all writes atomic for an individual persistentId in MongoDb. We don't update the meta-data collection on write anymore - it's only done when the most recent items in the journal are deleted, and thus we store the highest recorded sequence number in the meta-data collection during deletes. All of the Akka.Persistence.Sql plugins operate this way as well.
diff --git a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj
index be992c1..6f5af45 100644
--- a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj
+++ b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj
@@ -13,7 +13,6 @@
-
diff --git a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj
index 956d413..bd4378a 100644
--- a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj
+++ b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj
@@ -11,6 +11,6 @@
-
+
\ No newline at end of file
diff --git a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
index 84a7fd2..2eed6ec 100644
--- a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
+++ b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
@@ -210,18 +210,15 @@ public override async Task ReadHighestSequenceNrAsync(string persistenceId
var builder = Builders.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
- //Following the SqlJournal implementation
- //I have tried MongoDb lookup query and that caused some deadlocks in some tests!
- var metadataHighestSequenceNr = await _metadataCollection.Value.Find(filter).Project(x => x.SequenceNr).FirstOrDefaultAsync();
+ var metadataHighestSequenceNrTask = _metadataCollection.Value.Find(filter).Project(x => x.SequenceNr).FirstOrDefaultAsync();
- //var journalHighestSequenceNr = await _journalCollection.Value.Find(Builders.Filter.Eq(x => x.PersistenceId, persistenceId)).Project(x => x.SequenceNr).FirstOrDefaultAsync();
+ var journalHighestSequenceNrTask = _journalCollection.Value.Find(Builders.Filter.Eq(x => x.PersistenceId, persistenceId)).Project(x => x.SequenceNr).FirstOrDefaultAsync();
- //if (metadataHighestSequenceNr > journalHighestSequenceNr)
- //return metadataHighestSequenceNr;
+ // journal data is usually good enough, except in cases when it's been deleted.
+ await Task.WhenAll(metadataHighestSequenceNrTask, journalHighestSequenceNrTask);
- //return journalHighestSequenceNr;
- return metadataHighestSequenceNr;
+ return Math.Max(journalHighestSequenceNrTask.Result, metadataHighestSequenceNrTask.Result);
}
protected override async Task> WriteMessagesAsync(IEnumerable messages)
@@ -252,8 +249,6 @@ protected override async Task> WriteMessagesAsync(IEnu
persistentIds.Add(message.PersistenceId);
});
- await SetHighSequenceId(messageList);
-
var result = await Task>
.Factory
.ContinueWhenAll(writeTasks.ToArray(),
@@ -289,15 +284,25 @@ private void NotifyNewEventAppended()
}
}
}
- protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
+ protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
{
var builder = Builders.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
+ // read highest sequence number before we start
+ var highestSeqNo = await ReadHighestSequenceNrAsync(persistenceId, 0L);
+
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.SequenceNr, toSequenceNr);
- return _journalCollection.Value.DeleteManyAsync(filter);
+ // only update the sequence number of the top of the journal
+ // is about to be deleted.
+ if (highestSeqNo <= toSequenceNr)
+ {
+ await SetHighSequenceId(persistenceId, highestSeqNo);
+ }
+
+ await _journalCollection.Value.DeleteManyAsync(filter);
}
private JournalEntry ToJournalEntry(IPersistentRepresentation message)
@@ -431,10 +436,8 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
}
- private async Task SetHighSequenceId(IList messages)
+ private async Task SetHighSequenceId(string persistenceId, long maxSeqNo)
{
- var persistenceId = messages.Select(c => c.PersistenceId).First();
- var highSequenceId = messages.Max(c => c.HighestSequenceNr);
var builder = Builders.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
@@ -442,7 +445,7 @@ private async Task SetHighSequenceId(IList messages)
{
Id = persistenceId,
PersistenceId = persistenceId,
- SequenceNr = highSequenceId
+ SequenceNr = maxSeqNo
};
await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, new ReplaceOptions() { IsUpsert = true });
@@ -489,9 +492,9 @@ private void AddNewEventsSubscriber(IActorRef subscriber)
_newEventsSubscriber.Add(subscriber);
}
protected virtual async Task<(IEnumerable Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(long offset)
- {
+ {
+ var ids = await GetAllPersistenceIds(offset);
var lastOrdering = await GetHighestOrdering();
- var ids = await GetAllPersistenceIds();
return (ids, lastOrdering);
}
@@ -556,17 +559,24 @@ private void AddTagSubscriber(IActorRef subscriber, string tag)
}
}
- private async Task> GetAllPersistenceIds()
+ private async Task> GetAllPersistenceIds(long offset)
{
- var ids = await _metadataCollection.Value.Find(_=> true).ToListAsync();
- return ids.Distinct().Select(x => x.PersistenceId);
+ var ids = await _journalCollection.Value
+ .DistinctAsync(x => x.PersistenceId, entry => entry.Ordering > new BsonTimestamp(offset));
+
+ var hashset = new List();
+ while (await ids.MoveNextAsync())
+ {
+ hashset.AddRange(ids.Current);
+ }
+ return hashset;
}
private async Task GetHighestOrdering()
{
var max = await _journalCollection.Value.AsQueryable()
.Select(je => je.Ordering)
- .Distinct().MaxAsync();
+ .MaxAsync();
return max.Value;
}
diff --git a/src/common.props b/src/common.props
index e2844a7..c41b98b 100644
--- a/src/common.props
+++ b/src/common.props
@@ -18,9 +18,8 @@
2.4.1
- 16.8.3
- 1.4.14
- 4.14.0
+ 16.9.1
+ 1.4.17