diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 84e6440..90f6073 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,3 @@ -#### 1.4.12 November 26 2020 #### +#### 1.4.14 January 13 2021 #### -* Bump Akka version to 1.4.12 -* Corrected `CurrentPersistentIds` query and `AllPersistentIds` queries to be more memory efficient and query entity ID data directly from Mongo -* Introduced `AllEvents` and `CurrentEvents` query to read the entire MongoDb journal -* Deprecated previous `GetMaxSeqNo` behavior - we no longer query the max sequence number directly from the journal AND the metadata collection. We only get that data directly from the metadata collection itself, which should make this query an O(1) operation rather than O(n) +* Bump [Akka.NET version to 1.4.14](https://github.com/akkadotnet/akka.net/releases/tag/1.4.14), which adds the `Timestamp` to the `EventEnvelope` data structure - so it can be used for sorting / ordering after the fact in an Akka.Persistence.Query diff --git a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj index cf68982..be992c1 100644 --- a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj +++ b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj @@ -13,8 +13,8 @@ - - + + diff --git a/src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs b/src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs index a20624d..79552f8 100644 --- a/src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs @@ -79,7 +79,8 @@ public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag() public void Bug80_CurrentEventsByTag_should_Recover_until_end() { var actor = Sys.ActorOf(TagActor.Props("y")); - var msgCount = 1200; + //increased this to test for non-collision with the generated timestamps + var msgCount = 5000; actor.Tell(msgCount); ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20)); @@ -96,7 +97,8 @@ public void Bug80_CurrentEventsByTag_should_Recover_until_end() public void Bug80_AllEventsByTag_should_Recover_all_messages() { var actor = Sys.ActorOf(TagActor.Props("y")); - var msgCount = 1200; + //increased this to test for non-collision with the generated timestamps + var msgCount = 5000; actor.Tell(msgCount); ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20)); @@ -176,7 +178,7 @@ public object ToJournal(object evt) private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) { var specString = @" - akka.test.single-expect-default = 3s + akka.test.single-expect-default = 10s akka.persistence { publish-plugin-commands = on journal { diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs index 2931b4f..4646402 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs @@ -16,7 +16,7 @@ namespace Akka.Persistence.MongoDb.Tests { [Collection("MongoDbSpec")] - public class MongoDbCurrentEventsByPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture + public class MongoDbCurrentEventsByPersistenceIdsSpec : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture { public static readonly AtomicCounter Counter = new AtomicCounter(0); private readonly ITestOutputHelper _output; diff --git a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj index 5542eab..956d413 100644 --- a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj +++ b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj @@ -11,6 +11,6 @@ - + \ No newline at end of file diff --git a/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs b/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs index 0a6cf0b..1cb7b1b 100644 --- a/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs +++ b/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs @@ -35,7 +35,6 @@ public class JournalEntry [BsonElement("Manifest")] public string Manifest { get; set; } - [BsonElement("Ordering")] public BsonTimestamp Ordering { get; set; } diff --git a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs index 84b7bbf..84a7fd2 100644 --- a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs +++ b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs @@ -317,9 +317,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message) return new JournalEntry { Id = message.PersistenceId + "_" + message.SequenceNr, - //Ordering = _sequenceRepository.GetSequenceValue("journalentry"), Ordering = new BsonTimestamp(0), // Auto-populates with timestamp - //Timestamp = new BsonTimestamp(0), IsDeleted = message.IsDeleted, Payload = payload, PersistenceId = message.PersistenceId, @@ -338,9 +336,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message) return new JournalEntry { Id = message.PersistenceId + "_" + message.SequenceNr, - //Ordering = _sequenceRepository.GetSequenceValue("journalentry"), Ordering = new BsonTimestamp(0), // Auto-populates with timestamp - //Timestamp = new BsonTimestamp(0), IsDeleted = message.IsDeleted, Payload = binary, PersistenceId = message.PersistenceId, @@ -351,8 +347,25 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message) }; } + private static long ToTicks(BsonTimestamp bson) + { + + + // BSON Timestamps are stored natively as Unix epoch seconds + an ordinal value + + // need to use BsonTimestamp.Timestamp because the ordinal value doesn't actually have any + // bearing on the time - it's used to try to somewhat order the events that all occurred concurrently + // according to the MongoDb clock. No need to include that data in the EventEnvelope.Timestamp field + // which is used entirely for end-user purposes. + // + // See https://docs.mongodb.com/manual/reference/bson-types/#timestamps + + return DateTimeOffset.FromUnixTimeSeconds(bson.Timestamp).Ticks; + } + private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sender) { + if (_settings.LegacySerialization) { var manifest = string.IsNullOrEmpty(entry.Manifest) ? entry.Payload.GetType().TypeQualifiedName() : entry.Manifest; @@ -363,14 +376,24 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen entry.PersistenceId, manifest, entry.IsDeleted, - sender); + sender, + timestamp: ToTicks(entry.Ordering)); // MongoDb timestamps are stored as Unix Epoch } var legacy = entry.SerializerId.HasValue || !string.IsNullOrEmpty(entry.Manifest); if (!legacy) { var ser = _serialization.FindSerializerForType(typeof(Persistent)); - return ser.FromBinary((byte[]) entry.Payload); + var output = ser.FromBinary((byte[])entry.Payload); + + // backwards compatibility for https://github.com/akkadotnet/akka.net/pull/4680 + // it the timestamp is not defined in the binary payload + if (output.Timestamp == 0L) + { + output = (Persistent)output.WithTimestamp(ToTicks(entry.Ordering)); + } + + return output; } int? serializerId = null; @@ -396,14 +419,14 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen } if (deserialized is Persistent p) - return p; + return (Persistent)p.WithTimestamp(ToTicks(entry.Ordering)); - return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender); + return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering)); } else // backwards compat for object serialization - Payload was already deserialized by BSON { return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest, - entry.IsDeleted, sender); + entry.IsDeleted, sender, timestamp: ToTicks(entry.Ordering)); } } diff --git a/src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs b/src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs index be94440..30d5c00 100644 --- a/src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs +++ b/src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs @@ -113,6 +113,7 @@ protected bool Replaying(object message) offset: new Sequence(replayed.Offset), persistenceId: replayed.Persistent.PersistenceId, sequenceNr: replayed.Persistent.SequenceNr, + timestamp: replayed.Persistent.Timestamp, @event: replayed.Persistent.Payload)); CurrentOffset = replayed.Offset; diff --git a/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs b/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs index 9a23798..1d857ca 100644 --- a/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs +++ b/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs @@ -131,6 +131,7 @@ protected Receive Replaying(int limit) offset: new Sequence(seqNr), persistenceId: PersistenceId, sequenceNr: seqNr, + timestamp: replayed.Persistent.Timestamp, @event: replayed.Persistent.Payload)); CurrentSequenceNr = seqNr + 1; Buffer.DeliverBuffer(TotalDemand); diff --git a/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs b/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs index 533ab70..8d2829d 100644 --- a/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs +++ b/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs @@ -125,6 +125,7 @@ protected Receive Replaying(int limit) offset: new Sequence(replayed.Offset), persistenceId: replayed.Persistent.PersistenceId, sequenceNr: replayed.Persistent.SequenceNr, + timestamp: replayed.Persistent.Timestamp, @event: replayed.Persistent.Payload)); CurrentOffset = replayed.Offset; diff --git a/src/common.props b/src/common.props index cd662c8..e2844a7 100644 --- a/src/common.props +++ b/src/common.props @@ -1,20 +1,26 @@ - Copyright © 2013-2020 Akka.NET Project + Copyright © 2013-2021 Akka.NET Project Akka.NET Contrib - 1.4.1 + 1.4.14 http://getakka.net/images/akkalogo.png https://github.com/akkadotnet/Akka.Persistence.MongoDB https://github.com/akkadotnet/Akka.Persistence.MongoDB/blob/master/LICENSE.md - Bump Akka version to 1.4.1 + + Bump Akka version to 1.4.14 + Corrected `CurrentPersistentIds` query and `AllPersistentIds` queries to be more memory efficient and query entity ID data directly from Mongo + Introduced `AllEvents` and `CurrentEvents` query to read the entire MongoDb journal + Deprecated previous `GetMaxSeqNo` behavior - we no longer query the max sequence number directly from the journal AND the metadata collection. We only get that data directly from the metadata collection itself, which should make this query an O(1) operation rather than O(n) + true Akka Persistence journal and snapshot store backed by MongoDB database. $(NoWarn);CS1591 2.4.1 - 16.8.0 - 1.4.12 + 16.8.3 + 1.4.14 + 4.14.0