Skip to content

Commit

Permalink
Merge pull request #189 from akkadotnet/dev
Browse files Browse the repository at this point in the history
Akka.Persistence.MongoDb v1.4.17 Release
  • Loading branch information
Aaronontheweb authored Mar 18, 2021
2 parents 226c2aa + 53326fc commit 6e3f31c
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 29 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Both journal and snapshot store share the same configuration keys (however they
```hocon
akka.persistence {
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
# qualified type name of the MongoDb persistence journal actor
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
Expand Down Expand Up @@ -58,6 +59,7 @@ akka.persistence {
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
# qualified type name of the MongoDB persistence snapshot actor
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
Expand Down
5 changes: 3 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#### 1.4.14 January 13 2021 ####
#### 1.4.17 March 17 2021 ####

* Bump [Akka.NET version to 1.4.14](https://github.com/akkadotnet/akka.net/releases/tag/1.4.14), which adds the `Timestamp` to the `EventEnvelope` data structure - so it can be used for sorting / ordering after the fact in an Akka.Persistence.Query
* Bump [Akka.NET version to 1.4.17](https://github.com/akkadotnet/akka.net/releases/tag/1.4.17)
* [Resolve MongoDb write atomicity issues on write by not updating metadata collection](https://github.com/akkadotnet/Akka.Persistence.MongoDB/pull/186) - this is an important change that makes all writes atomic for an individual persistentId in MongoDb. We don't update the meta-data collection on write anymore - it's only done when the most recent items in the journal are deleted, and thus we store the highest recorded sequence number in the meta-data collection during deletes. All of the Akka.Persistence.Sql plugins operate this way as well.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
</PackageReference>
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
<PackageReference Include="Mongo2Go" Version="2.2.16" />
<PackageReference Include="System.Net.NetworkInformation" Version="4.3.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
<ItemGroup>
<PackageReference Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
<PackageReference Include="akka.streams" Version="$(AkkaVersion)" />
<PackageReference Include="MongoDB.Driver" Version="2.11.5" />
<PackageReference Include="MongoDB.Driver" Version="2.12.0" />
</ItemGroup>
</Project>
54 changes: 32 additions & 22 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,15 @@ public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId
var builder = Builders<MetadataEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);

//Following the SqlJournal implementation
//I have tried MongoDb lookup query and that caused some deadlocks in some tests!

var metadataHighestSequenceNr = await _metadataCollection.Value.Find(filter).Project(x => x.SequenceNr).FirstOrDefaultAsync();
var metadataHighestSequenceNrTask = _metadataCollection.Value.Find(filter).Project(x => x.SequenceNr).FirstOrDefaultAsync();

//var journalHighestSequenceNr = await _journalCollection.Value.Find(Builders<JournalEntry>.Filter.Eq(x => x.PersistenceId, persistenceId)).Project(x => x.SequenceNr).FirstOrDefaultAsync();
var journalHighestSequenceNrTask = _journalCollection.Value.Find(Builders<JournalEntry>.Filter.Eq(x => x.PersistenceId, persistenceId)).Project(x => x.SequenceNr).FirstOrDefaultAsync();

//if (metadataHighestSequenceNr > journalHighestSequenceNr)
//return metadataHighestSequenceNr;
// journal data is usually good enough, except in cases when it's been deleted.
await Task.WhenAll(metadataHighestSequenceNrTask, journalHighestSequenceNrTask);

//return journalHighestSequenceNr;
return metadataHighestSequenceNr;
return Math.Max(journalHighestSequenceNrTask.Result, metadataHighestSequenceNrTask.Result);
}

protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
Expand Down Expand Up @@ -252,8 +249,6 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
persistentIds.Add(message.PersistenceId);
});

await SetHighSequenceId(messageList);

var result = await Task<IImmutableList<Exception>>
.Factory
.ContinueWhenAll(writeTasks.ToArray(),
Expand Down Expand Up @@ -289,15 +284,25 @@ private void NotifyNewEventAppended()
}
}
}
protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
{
var builder = Builders<JournalEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);

// read highest sequence number before we start
var highestSeqNo = await ReadHighestSequenceNrAsync(persistenceId, 0L);

if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.SequenceNr, toSequenceNr);

return _journalCollection.Value.DeleteManyAsync(filter);
// only update the sequence number of the top of the journal
// is about to be deleted.
if (highestSeqNo <= toSequenceNr)
{
await SetHighSequenceId(persistenceId, highestSeqNo);
}

await _journalCollection.Value.DeleteManyAsync(filter);
}

private JournalEntry ToJournalEntry(IPersistentRepresentation message)
Expand Down Expand Up @@ -431,18 +436,16 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen

}

private async Task SetHighSequenceId(IList<AtomicWrite> messages)
private async Task SetHighSequenceId(string persistenceId, long maxSeqNo)
{
var persistenceId = messages.Select(c => c.PersistenceId).First();
var highSequenceId = messages.Max(c => c.HighestSequenceNr);
var builder = Builders<MetadataEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);

var metadataEntry = new MetadataEntry
{
Id = persistenceId,
PersistenceId = persistenceId,
SequenceNr = highSequenceId
SequenceNr = maxSeqNo
};

await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, new ReplaceOptions() { IsUpsert = true });
Expand Down Expand Up @@ -489,9 +492,9 @@ private void AddNewEventsSubscriber(IActorRef subscriber)
_newEventsSubscriber.Add(subscriber);
}
protected virtual async Task<(IEnumerable<string> Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(long offset)
{
{
var ids = await GetAllPersistenceIds(offset);
var lastOrdering = await GetHighestOrdering();
var ids = await GetAllPersistenceIds();
return (ids, lastOrdering);
}

Expand Down Expand Up @@ -556,17 +559,24 @@ private void AddTagSubscriber(IActorRef subscriber, string tag)
}
}

private async Task<IEnumerable<string>> GetAllPersistenceIds()
private async Task<IEnumerable<string>> GetAllPersistenceIds(long offset)
{
var ids = await _metadataCollection.Value.Find(_=> true).ToListAsync();
return ids.Distinct().Select(x => x.PersistenceId);
var ids = await _journalCollection.Value
.DistinctAsync(x => x.PersistenceId, entry => entry.Ordering > new BsonTimestamp(offset));

var hashset = new List<string>();
while (await ids.MoveNextAsync())
{
hashset.AddRange(ids.Current);
}
return hashset;
}

private async Task<long> GetHighestOrdering()
{
var max = await _journalCollection.Value.AsQueryable()
.Select(je => je.Ordering)
.Distinct().MaxAsync();
.MaxAsync();

return max.Value;
}
Expand Down
5 changes: 2 additions & 3 deletions src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
</PropertyGroup>
<PropertyGroup>
<XunitVersion>2.4.1</XunitVersion>
<TestSdkVersion>16.8.3</TestSdkVersion>
<AkkaVersion>1.4.14</AkkaVersion>
<FluentAssertionsVersion>4.14.0</FluentAssertionsVersion>
<TestSdkVersion>16.9.1</TestSdkVersion>
<AkkaVersion>1.4.17</AkkaVersion>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
Expand Down

0 comments on commit 6e3f31c

Please sign in to comment.