Skip to content

Commit

Permalink
Add a channel to node discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
richardgreg committed Feb 7, 2025
1 parent e31659e commit b1515f9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
11 changes: 6 additions & 5 deletions src/Nethermind/Nethermind.Config/NetworkNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@ public NetworkNode(PublicKey publicKey, string ip, int port, long reputation = 0
Reputation = reputation;
}

public PublicKey NodeId => _enode.PublicKey;
public string Host => _enode.HostIp.ToString();
public int Port => _enode.Port;
public long Reputation { get; set; }
public NetworkNode(Enode enode)
{
_enode = enode;
_enode = enode;
}

public Enode Enode => _enode;

public PublicKey NodeId => _enode.PublicKey;
public string Host => _enode.HostIp.ToString();
public int Port => _enode.Port;
public long Reputation { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Config;
using Nethermind.Core.Crypto;
Expand All @@ -23,6 +24,12 @@ public class TrustedNodesManager : ITrustedNodesManager
private ConcurrentDictionary<PublicKey, NetworkNode> _nodes = new();
private readonly string _trustedNodesPath;
private readonly ILogger _logger;
private readonly Channel<Node> _nodeChannel = Channel.CreateBounded<Node>(
new BoundedChannelOptions(1 << 16) // capacity of 2^16 = 65536
{
// "Wait" to have writers wait until there is space.
FullMode = BoundedChannelFullMode.Wait
});

public TrustedNodesManager(string trustedNodesPath, ILogManager logManager)
{
Expand Down Expand Up @@ -75,23 +82,28 @@ public async Task InitAsync()


// ---- INodeSource requirement: IAsyncEnumerable<Node> ----
// C# requires 'async' for IAsyncEnumerable yield. We'll add a small 'await Task.Yield()'
// to avoid the warning about "no awaits".
public async IAsyncEnumerable<Node> DiscoverNodes([EnumeratorCancellation] CancellationToken cancellationToken)
{
// At least one 'await', so no compiler warnings
await Task.Yield();

// yield existing nodes.
foreach (NetworkNode netNode in _nodes.Values)
{
cancellationToken.ThrowIfCancellationRequested();
yield return new Node(netNode) { IsTrusted = true };
}

// continuously yield new nodes as they are added via the channel.
while (await _nodeChannel.Reader.WaitToReadAsync(cancellationToken))
{
while (_nodeChannel.Reader.TryRead(out Node node))
{
yield return node;
}
}
}

public async Task<bool> AddAsync(Enode enode, bool updateFile = true)
{
NetworkNode networkNode = new(enode);
NetworkNode networkNode = new NetworkNode(enode);
if (!_nodes.TryAdd(networkNode.NodeId, networkNode))
{
if (_logger.IsInfo)
Expand All @@ -106,6 +118,10 @@ public async Task<bool> AddAsync(Enode enode, bool updateFile = true)
_logger.Info($"Trusted node added: {enode}");
}

// Publish the newly added node to the channel so DiscoverNodes will yield it.
Node newNode = new Node(networkNode) { IsTrusted = true };
await _nodeChannel.Writer.WriteAsync(newNode);

if (updateFile)
{
await SaveFileAsync();
Expand Down

0 comments on commit b1515f9

Please sign in to comment.