Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IBD/Rewind] Optimize VotingManager for batched operations #933

Open
wants to merge 7 commits into
base: release/1.6.0.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/Stratis.Bitcoin.Features.PoA.Tests/PoATestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class PoATestsBase
protected readonly ChainState chainState;
protected readonly IAsyncProvider asyncProvider;
protected readonly Mock<IFullNode> fullNode;
protected readonly Mock<BlockStore.IBlockRepository> blockRepository;
protected readonly Mock<IBlockStoreQueue> blockStoreQueue;

public PoATestsBase(TestPoANetwork network = null)
{
Expand All @@ -53,13 +53,13 @@ public PoATestsBase(TestPoANetwork network = null)
this.network = network ?? new TestPoANetwork();
this.consensusOptions = this.network.ConsensusOptions;
this.dBreezeSerializer = new DBreezeSerializer(this.network.Consensus.ConsensusFactory);
this.blockRepository = new Mock<BlockStore.IBlockRepository>();
this.blockStoreQueue = new Mock<IBlockStoreQueue>();

this.ChainIndexer = new ChainIndexer(this.network);
IDateTimeProvider dateTimeProvider = new DateTimeProvider();
this.consensusSettings = new ConsensusSettings(NodeSettings.Default(this.network));

(this.federationManager, this.federationHistory) = CreateFederationManager(this, this.network, this.loggerFactory, this.signals, this.blockRepository.Object);
(this.federationManager, this.federationHistory) = CreateFederationManager(this, this.network, this.loggerFactory, this.signals, this.blockStoreQueue.Object);

this.slotsManager = new SlotsManager(this.network, this.federationManager, this.federationHistory, this.ChainIndexer);

Expand All @@ -71,7 +71,7 @@ public PoATestsBase(TestPoANetwork network = null)
this.resultExecutorMock = new Mock<IPollResultExecutor>();

this.votingManager = new VotingManager(this.federationManager, this.resultExecutorMock.Object, new NodeStats(dateTimeProvider, NodeSettings.Default(this.network), new Mock<IVersionProvider>().Object), dataFolder,
this.dBreezeSerializer, this.signals, this.network, this.ChainIndexer, this.blockRepository.Object);
this.dBreezeSerializer, this.signals, this.network, this.ChainIndexer, this.blockStoreQueue.Object);

this.votingManager.Initialize(this.federationHistory);

Expand All @@ -87,7 +87,7 @@ public PoATestsBase(TestPoANetwork network = null)
this.currentHeader = headers.Last();
}

public static (IFederationManager federationManager, IFederationHistory federationHistory) CreateFederationManager(object caller, Network network, LoggerFactory loggerFactory, ISignals signals, BlockStore.IBlockRepository blockRepository)
public static (IFederationManager federationManager, IFederationHistory federationHistory) CreateFederationManager(object caller, Network network, LoggerFactory loggerFactory, ISignals signals, IBlockStoreQueue blockStoreQueue)
{
string dir = TestBase.CreateTestDir(caller);

Expand All @@ -114,7 +114,7 @@ public static (IFederationManager federationManager, IFederationHistory federati
var header = new BlockHeader();
chainIndexerMock.Setup(x => x.Tip).Returns(new ChainedHeader(header, header.GetHash(), 0));
var votingManager = new VotingManager(federationManager, new Mock<IPollResultExecutor>().Object,
new Mock<INodeStats>().Object, nodeSettings.DataFolder, dbreezeSerializer, signals, network, chainIndexerMock.Object, blockRepository, null);
new Mock<INodeStats>().Object, nodeSettings.DataFolder, dbreezeSerializer, signals, network, chainIndexerMock.Object, blockStoreQueue, null);

var federationHistory = new Mock<IFederationHistory>();
federationHistory.Setup(x => x.GetFederationMemberForBlock(It.IsAny<ChainedHeader>())).Returns<ChainedHeader>((chainedHeader) =>
Expand Down
10 changes: 5 additions & 5 deletions src/Stratis.Bitcoin.Features.PoA.Tests/VotingManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void CanVote()
ChainedHeaderBlock[] blocks = GetBlocksWithVotingData(votesRequired, votingData, votingRequest.ChainedHeader);

// Mock the blocks via the block repository.
this.blockRepository.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
this.blockStoreQueue.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
{
return (hash == votingRequest.ChainedHeader.HashBlock) ? votingRequest.Block : blocks.FirstOrDefault(b => b.ChainedHeader.HashBlock == hash)?.Block;
});
Expand Down Expand Up @@ -119,7 +119,7 @@ public void AddVoteAfterPollComplete()
ChainedHeaderBlock[] blocks = GetBlocksWithVotingData(votesRequired + 1, votingData, votingRequest.ChainedHeader);

// Mock the blocks via the block repository.
this.blockRepository.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
this.blockStoreQueue.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
{
return (hash == votingRequest.ChainedHeader.HashBlock) ? votingRequest.Block : blocks.FirstOrDefault(b => b.ChainedHeader.HashBlock == hash)?.Block;
});
Expand Down Expand Up @@ -164,7 +164,7 @@ public void CanCreateVotingRequest()
ChainedHeaderBlock votingRequest = GetBlockWithVotingRequest(memberPubKey);

// Mock the blocks via the block repository.
this.blockRepository.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
this.blockStoreQueue.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
{
return votingRequest.Block;
});
Expand All @@ -190,7 +190,7 @@ public void CanExpireAndUnExpirePollViaBlockDisconnected()
ChainedHeaderBlock[] blocks = GetBlocksWithVotingData(1, votingData, votingRequest.ChainedHeader);

// Mock the blocks via the block repository.
this.blockRepository.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
this.blockStoreQueue.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
{
return (hash == votingRequest.ChainedHeader.HashBlock) ? votingRequest.Block : blocks.FirstOrDefault(b => b.ChainedHeader.HashBlock == hash)?.Block;
});
Expand Down Expand Up @@ -239,7 +239,7 @@ public void CanExpireAndUnExpirePollViaNodeRewind()
ChainedHeaderBlock[] blocks = GetBlocksWithVotingData(1, votingData, votingRequest.ChainedHeader);

// Mock the blocks via the block repository.
this.blockRepository.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
this.blockStoreQueue.Setup(r => r.GetBlock(It.IsAny<uint256>())).Returns((uint256 hash) =>
{
return (hash == votingRequest.ChainedHeader.HashBlock) ? votingRequest.Block : blocks.FirstOrDefault(b => b.ChainedHeader.HashBlock == hash)?.Block;
});
Expand Down
2 changes: 1 addition & 1 deletion src/Stratis.Bitcoin.Features.PoA/PoAFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public override Task InitializeAsync()
this.federationManager.Initialize();
this.whitelistedHashesRepository.Initialize(this.votingManager);

if (!this.votingManager.Synchronize(this.chainIndexer.Tip))
if (!this.votingManager.Synchronize(this.chainIndexer.Tip, false))
throw new System.OperationCanceledException();

this.federationHistory.Initialize();
Expand Down
60 changes: 39 additions & 21 deletions src/Stratis.Bitcoin.Features.PoA/Voting/VotingManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Stratis.Bitcoin.Configuration.Logging;
using Stratis.Bitcoin.EventBus;
using Stratis.Bitcoin.EventBus.CoreEvents;
using Stratis.Bitcoin.Features.BlockStore;
using Stratis.Bitcoin.Interfaces;
using Stratis.Bitcoin.Primitives;
using Stratis.Bitcoin.Signals;
using Stratis.Bitcoin.Utilities;
Expand All @@ -21,7 +21,7 @@ namespace Stratis.Bitcoin.Features.PoA.Voting
public sealed class VotingManager : IDisposable
{
private readonly PoAConsensusOptions poaConsensusOptions;
private readonly IBlockRepository blockRepository;
private readonly IBlockStoreQueue blockStoreQueue;
private readonly ChainIndexer chainIndexer;
private readonly IFederationManager federationManager;

Expand All @@ -48,6 +48,7 @@ public sealed class VotingManager : IDisposable
private IIdleFederationMembersKicker idleFederationMembersKicker;
private readonly INodeLifetime nodeLifetime;
private readonly NodeDeployments nodeDeployments;
private readonly IInitialBlockDownloadState initialBlockDownloadState;

/// <summary>In-memory collection of pending polls.</summary>
/// <remarks>All access should be protected by <see cref="locker"/>.</remarks>
Expand Down Expand Up @@ -75,8 +76,9 @@ public VotingManager(
ISignals signals,
Network network,
ChainIndexer chainIndexer,
IBlockRepository blockRepository = null,
IBlockStoreQueue blockStoreQueue = null,
INodeLifetime nodeLifetime = null,
IInitialBlockDownloadState initialBlockDownloadState = null,
NodeDeployments nodeDeployments = null)
{
this.federationManager = Guard.NotNull(federationManager, nameof(federationManager));
Expand All @@ -94,10 +96,11 @@ public VotingManager(

Guard.Assert(this.poaConsensusOptions.PollExpiryBlocks != 0);

this.blockRepository = blockRepository;
this.blockStoreQueue = blockStoreQueue;
this.chainIndexer = chainIndexer;
this.nodeLifetime = nodeLifetime;
this.nodeDeployments = nodeDeployments;
this.initialBlockDownloadState = initialBlockDownloadState;

// Avoid hiding the above "Assert" errors by doing this last.
// Otherwise we will just see database file in-use error when this constructor is called on dispose.
Expand Down Expand Up @@ -346,7 +349,7 @@ public Poll CreatePendingPoll(PollsRepository.Transaction transaction, VotingDat

transaction.AddPolls(poll);

this.logger.LogInformation("Created poll {0} [{1}] at height {2}.",
this.logger.LogInformation("Created poll {0} [{1}] at height {2}.",
poll.Id, this.pollResultExecutor.ConvertToString(poll.VotingData), poll.PollStartBlockData.Height);
});

Expand Down Expand Up @@ -545,7 +548,9 @@ private void ProcessBlock(PollsRepository.Transaction transaction, ChainedHeader
else
{
this.logger.LogDebug("Applying poll '{0}'.", poll);
this.pollResultExecutor.ApplyChange(poll.VotingData, chBlock.ChainedHeader.Height);
// Hash-related polls have already been applied at the "voted in favor" height.
if (poll.VotingData.Key != VoteKey.WhitelistHash && poll.VotingData.Key != VoteKey.RemoveHash)
this.pollResultExecutor.ApplyChange(poll.VotingData, chBlock.ChainedHeader.Height);

this.polls.AdjustPoll(poll, poll => poll.PollExecutedBlockData = new HashHeightPair(chBlock.ChainedHeader));
transaction.UpdatePoll(poll);
Expand Down Expand Up @@ -665,6 +670,10 @@ private void ProcessBlock(PollsRepository.Transaction transaction, ChainedHeader

this.polls.AdjustPoll(poll, poll => poll.PollVotedInFavorBlockData = new HashHeightPair(chBlock.ChainedHeader));
transaction.UpdatePoll(poll);

// Update the whitelist hash repository early to allow for scenarios where the voting manager is lagging up to MaxReorgLength blocks.
if (poll.VotingData.Key == VoteKey.RemoveHash || poll.VotingData.Key == VoteKey.WhitelistHash)
this.pollResultExecutor.ApplyChange(poll.VotingData, (int)(poll.PollVotedInFavorBlockData.Height + this.network.Consensus.MaxReorgLength));
}
}

Expand Down Expand Up @@ -695,7 +704,8 @@ private void UnProcessBlock(PollsRepository.Transaction transaction, ChainedHead
.ToList())
{
this.logger.LogDebug("Reverting poll execution '{0}'.", poll);
this.pollResultExecutor.RevertChange(poll.VotingData, chBlock.ChainedHeader.Height);
if (poll.VotingData.Key != VoteKey.WhitelistHash && poll.VotingData.Key != VoteKey.RemoveHash)
this.pollResultExecutor.RevertChange(poll.VotingData, chBlock.ChainedHeader.Height);

this.polls.AdjustPoll(poll, poll => poll.PollExecutedBlockData = null);
transaction.UpdatePoll(poll);
Expand Down Expand Up @@ -756,6 +766,10 @@ private void UnProcessBlock(PollsRepository.Transaction transaction, ChainedHead
{
this.polls.AdjustPoll(targetPoll, poll => poll.PollVotedInFavorBlockData = null);
transaction.UpdatePoll(targetPoll);

// Update the whitelist hash repository early to allow for scenarios where the voting manager is lagging up to MaxReorgLength blocks.
if (targetPoll.VotingData.Key == VoteKey.RemoveHash || targetPoll.VotingData.Key == VoteKey.WhitelistHash)
this.pollResultExecutor.RevertChange(targetPoll.VotingData, (int)(chBlock.ChainedHeader.Height + this.network.Consensus.MaxReorgLength));
}

// Pub key of a fed member that created voting data.
Expand Down Expand Up @@ -808,8 +822,12 @@ public List<IFederationMember> GetLastKnownFederation()
return this.GetModifiedFederation(this.chainIndexer.Tip);
}

internal bool Synchronize(ChainedHeader newTip)
internal bool Synchronize(ChainedHeader newTip, bool allowLag = true)
{
// Allowing the voting manager to lag makes it more efficient due to multi-block processing.
if (allowLag && (LastKnownFederationHeight() - 1) > newTip?.Height && (this.initialBlockDownloadState?.IsInitialBlockDownload() ?? false))
return false;

if (newTip?.HashBlock == this.PollsRepository.CurrentTip?.Hash)
return true;

Expand All @@ -832,7 +850,7 @@ internal bool Synchronize(ChainedHeader newTip)

for (ChainedHeader header = repoTip; header.Height > fork.Height; header = header.Previous)
{
Block block = this.blockRepository.GetBlock(header.HashBlock);
Block block = this.blockStoreQueue.GetBlock(header.HashBlock);

this.UnProcessBlock(transaction, new ChainedHeaderBlock(block, header));
}
Expand All @@ -857,29 +875,29 @@ internal bool Synchronize(ChainedHeader newTip)
this.PollsRepository.WithTransaction((currentTransaction) =>
{
int i = 0;
foreach (Block block in this.blockRepository.GetBlocks(headers.Select(h => h.HashBlock).ToList()))
foreach (Block block in this.blockStoreQueue.GetBlocks(headers.Select(h => h.HashBlock).ToList()))
{
if (this.nodeLifetime.ApplicationStopping.IsCancellationRequested)
{
this.logger.LogTrace("(-)[NODE_DISPOSED]");
currentTransaction.Commit();
currentTransaction.Dispose();

bSuccess = false;
return;
}
bSuccess = false;
return;
}

ChainedHeader header = headers[i++];
ChainedHeader header = headers[i++];

this.ProcessBlock(currentTransaction, new ChainedHeaderBlock(block, header));
this.ProcessBlock(currentTransaction, new ChainedHeaderBlock(block, header));

if (header.Height % 10000 == 0)
{
var progress = (int)((decimal)header.Height / this.chainIndexer.Tip.Height * 100);
var progressString = $"Synchronizing voting data at height {header.Height} / {this.chainIndexer.Tip.Height} ({progress} %).";
if (header.Height % 10000 == 0)
{
var progress = (int)((decimal)header.Height / this.chainIndexer.Tip.Height * 100);
var progressString = $"Synchronizing voting data at height {header.Height} / {this.chainIndexer.Tip.Height} ({progress} %).";

this.logger.LogInformation(progressString);
this.signals.Publish(new FullNodeEvent() { Message = progressString, State = FullNodeState.Initializing.ToString() });
this.logger.LogInformation(progressString);
this.signals.Publish(new FullNodeEvent() { Message = progressString, State = FullNodeState.Initializing.ToString() });

currentTransaction.Flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class WhitelistedHashesRepository : IWhitelistedHashesRepository

private readonly ILogger logger;

private readonly Network network;
private readonly PoAConsensusOptions poaConsensusOptions;

// Dictionary of hash histories. Even list entries are additions and odd entries are removals.
Expand All @@ -24,6 +25,7 @@ public WhitelistedHashesRepository(ILoggerFactory loggerFactory, Network network
this.locker = new object();

this.logger = loggerFactory.CreateLogger(this.GetType().FullName);
this.network = network;
this.poaConsensusOptions = network.Consensus.Options as PoAConsensusOptions;
}

Expand All @@ -47,18 +49,19 @@ private void GetWhitelistedHashesFromExecutedPolls(VotingManager votingManager)
{
var federation = new List<IFederationMember>(this.poaConsensusOptions.GenesisFederationMembers);

IEnumerable<Poll> executedPolls = votingManager.GetExecutedPolls().WhitelistPolls();
foreach (Poll poll in executedPolls.OrderBy(a => (a.PollExecutedBlockData.Height, a.Id), pollComparer))
IEnumerable<Poll> approvedPolls = votingManager.GetApprovedPolls().WhitelistPolls();

foreach (Poll poll in approvedPolls.OrderBy(a => (a.PollExecutedBlockData.Height, a.Id), pollComparer))
{
var hash = new uint256(poll.VotingData.Data);

if (poll.VotingData.Key == VoteKey.WhitelistHash)
{
this.AddHash(hash, poll.PollExecutedBlockData.Height);
this.AddHash(hash, (int)(poll.PollVotedInFavorBlockData.Height + this.network.Consensus.MaxReorgLength));
}
else if (poll.VotingData.Key == VoteKey.RemoveHash)
{
this.RemoveHash(hash, poll.PollExecutedBlockData.Height);
this.RemoveHash(hash, (int)(poll.PollVotedInFavorBlockData.Height + this.network.Consensus.MaxReorgLength));
}
}
}
Expand Down