Skip to content

Commit

Permalink
Refactor rpc module register subscription outside of subscription fac…
Browse files Browse the repository at this point in the history
…tory (#8066)
  • Loading branch information
StevenChongHuo authored Jan 17, 2025
1 parent 1ab5908 commit 0e03832
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 65 deletions.
18 changes: 9 additions & 9 deletions src/Nethermind/Nethermind.Init/Steps/RegisterRpcModules.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
using Nethermind.Network.Config;
using Nethermind.JsonRpc.Modules.Rpc;
using Nethermind.Synchronization.FastBlocks;
using System.Transactions;
using Newtonsoft.Json.Schema;
using Nethermind.Blockchain;
using Nethermind.Core.Specs;
using Nethermind.Blockchain.Filters;

namespace Nethermind.Init.Steps;

Expand Down Expand Up @@ -172,15 +177,10 @@ public virtual async Task Execute(CancellationToken cancellationToken)

_api.JsonRpcLocalStats = jsonRpcLocalStats;

SubscriptionFactory subscriptionFactory = new(
_api.LogManager,
_api.BlockTree,
_api.TxPool,
_api.ReceiptMonitor,
_api.FilterStore,
_api.EthSyncingInfo!,
_api.SpecProvider,
rpcModuleProvider.Serializer);
SubscriptionFactory subscriptionFactory = new();

// Register the standard subscription types in the dictionary
subscriptionFactory.RegisterStandardSubscription(_api.BlockTree, _api.LogManager, _api.SpecProvider, _api.ReceiptMonitor, _api.FilterStore, _api.TxPool, _api.EthSyncingInfo);

_api.SubscriptionFactory = subscriptionFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using FluentAssertions;
using FluentAssertions.Json;
using Google.Protobuf.WellKnownTypes;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Filters;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class SubscribeModuleTests
private IReceiptMonitor _receiptCanonicalityMonitor = null!;
private ISyncConfig _syncConfig = null!;
private ISyncProgressResolver _syncProgressResolver = null!;
private EthSyncingInfo _ethSyncingInfo;

[SetUp]
public void Setup()
Expand All @@ -71,23 +73,19 @@ public void Setup()
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_receiptStorage, _logManager);
_syncConfig = new SyncConfig();
_syncProgressResolver = Substitute.For<ISyncProgressResolver>();
_ethSyncingInfo = new EthSyncingInfo(_blockTree, Substitute.For<ISyncPointers>(), _syncConfig,
new StaticSelector(SyncMode.All), _syncProgressResolver, _logManager);

IJsonSerializer jsonSerializer = new EthereumJsonSerializer();

SubscriptionFactory subscriptionFactory = new(
_logManager,
_blockTree,
_txPool,
_receiptCanonicalityMonitor,
_filterStore,
new EthSyncingInfo(_blockTree, Substitute.For<ISyncPointers>(), _syncConfig,
new StaticSelector(SyncMode.All), _syncProgressResolver, _logManager),
_specProvider,
jsonSerializer);
SubscriptionFactory subscriptionFactory = new();

// Register the standard subscription types in the dictionary
subscriptionFactory.RegisterStandardSubscription(_blockTree, _logManager, _specProvider, _receiptCanonicalityMonitor, _filterStore, _txPool, _ethSyncingInfo);

_subscriptionManager = new SubscriptionManager(
subscriptionFactory,
_logManager);
subscriptionFactory,
_logManager);

_subscribeRpcModule = new SubscribeRpcModule(_subscriptionManager);
_subscribeRpcModule.Context = new JsonRpcContext(RpcEndpoint.Ws, _jsonRpcDuplexClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,8 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Filters;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Facade.Eth;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.Logging;
using Nethermind.Serialization.Json;
using Nethermind.TxPool;
using System.Text.Json;

namespace Nethermind.JsonRpc.Modules.Subscribe;
Expand All @@ -28,46 +21,11 @@ namespace Nethermind.JsonRpc.Modules.Subscribe;
/// </remarks>
public class SubscriptionFactory : ISubscriptionFactory
{
private readonly IJsonSerializer _jsonSerializer;
private readonly ConcurrentDictionary<string, CustomSubscriptionType> _subscriptionConstructors;

public SubscriptionFactory(ILogManager? logManager,
IBlockTree? blockTree,
ITxPool? txPool,
IReceiptMonitor receiptCanonicalityMonitor,
IFilterStore? filterStore,
IEthSyncingInfo ethSyncingInfo,
ISpecProvider specProvider,
IJsonSerializer jsonSerializer)
public SubscriptionFactory()
{
_jsonSerializer = jsonSerializer;
logManager = logManager ?? throw new ArgumentNullException(nameof(logManager));
blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
txPool = txPool ?? throw new ArgumentNullException(nameof(txPool));
receiptCanonicalityMonitor = receiptCanonicalityMonitor ?? throw new ArgumentNullException(nameof(receiptCanonicalityMonitor));
filterStore = filterStore ?? throw new ArgumentNullException(nameof(filterStore));
ethSyncingInfo = ethSyncingInfo ?? throw new ArgumentNullException(nameof(ethSyncingInfo));
specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider));

_subscriptionConstructors = new ConcurrentDictionary<string, CustomSubscriptionType>
{

//Register the standard subscription types in the dictionary.
[SubscriptionType.NewHeads] = CreateSubscriptionType<TransactionsOption?>((jsonRpcDuplexClient, args) =>
new NewHeadSubscription(jsonRpcDuplexClient, blockTree, logManager, specProvider, args)),

[SubscriptionType.Logs] = CreateSubscriptionType<Filter?>((jsonRpcDuplexClient, filter) =>
new LogsSubscription(jsonRpcDuplexClient, receiptCanonicalityMonitor, filterStore, blockTree, logManager, filter)),

[SubscriptionType.NewPendingTransactions] = CreateSubscriptionType<TransactionsOption?>((jsonRpcDuplexClient, args) =>
new NewPendingTransactionsSubscription(jsonRpcDuplexClient, txPool, specProvider, logManager, args)),

[SubscriptionType.DroppedPendingTransactions] = CreateSubscriptionType(jsonRpcDuplexClient =>
new DroppedPendingTransactionsSubscription(jsonRpcDuplexClient, txPool, logManager)),

[SubscriptionType.Syncing] = CreateSubscriptionType(jsonRpcDuplexClient =>
new SyncingSubscription(jsonRpcDuplexClient, blockTree, ethSyncingInfo, logManager))
};
_subscriptionConstructors = new ConcurrentDictionary<string, CustomSubscriptionType>();
}

public Subscription CreateSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, string subscriptionType, string? args = null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Filters;
using Nethermind.Core.Specs;
using Nethermind.Facade.Eth;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.Logging;
using Nethermind.TxPool;

namespace Nethermind.JsonRpc.Modules.Subscribe;
public static class SubscriptionFactoryExtensions
{
public static void RegisterNewHeadSubscription(
this ISubscriptionFactory subscriptionFactory,
IBlockTree? blockTree,
ILogManager? logManager,
ISpecProvider specProvider
)
{
subscriptionFactory.RegisterSubscriptionType<TransactionsOption?>(
SubscriptionType.NewHeads,
(jsonRpcDuplexClient, args) =>
new NewHeadSubscription(jsonRpcDuplexClient, blockTree, logManager, specProvider, args)
);
}

public static void RegisterLogsSubscription(
this ISubscriptionFactory subscriptionFactory,
IReceiptMonitor receiptMonitor,
IFilterStore? filterStore,
IBlockTree? blockTree,
ILogManager? logManager
)
{
subscriptionFactory.RegisterSubscriptionType<Filter?>(
SubscriptionType.Logs,
(jsonRpcDuplexClient, filter) =>
new LogsSubscription(jsonRpcDuplexClient, receiptMonitor, filterStore, blockTree, logManager, filter)
);
}

public static void RegisterNewPendingTransactionsSubscription(
this ISubscriptionFactory subscriptionFactory,
ITxPool? txPool,
ISpecProvider? specProvider,
ILogManager? logManager
)
{
subscriptionFactory.RegisterSubscriptionType<TransactionsOption?>(
SubscriptionType.NewPendingTransactions,
(jsonRpcDuplexClient, args) =>
new NewPendingTransactionsSubscription(jsonRpcDuplexClient, txPool, specProvider, logManager, args)
);
}

public static void RegisterDroppedPendingTransactionsSubscription(
this ISubscriptionFactory subscriptionFactory,
ITxPool? txPool,
ILogManager? logManager
)
{
subscriptionFactory.RegisterSubscriptionType(
SubscriptionType.DroppedPendingTransactions,
(jsonRpcDuplexClient) =>
new DroppedPendingTransactionsSubscription(jsonRpcDuplexClient, txPool, logManager)
);
}

public static void RegisterSyncingSubscription(
this ISubscriptionFactory subscriptionFactory,
IBlockTree? blockTree,
IEthSyncingInfo ethSyncingInfo,
ILogManager? logManager
)
{
subscriptionFactory.RegisterSubscriptionType(
SubscriptionType.Syncing,
(jsonRpcDuplexClient) =>
new SyncingSubscription(jsonRpcDuplexClient, blockTree, ethSyncingInfo, logManager)
);
}

public static void RegisterStandardSubscription(
this ISubscriptionFactory subscriptionFactory,
IBlockTree? blockTree,
ILogManager? logManager,
ISpecProvider specProvider,
IReceiptMonitor receiptMonitor,
IFilterStore? filterStore,
ITxPool? txPool,
IEthSyncingInfo ethSyncingInfo
)
{
subscriptionFactory.RegisterNewHeadSubscription(blockTree, logManager, specProvider);
subscriptionFactory.RegisterLogsSubscription(receiptMonitor, filterStore, blockTree, logManager);
subscriptionFactory.RegisterNewPendingTransactionsSubscription(txPool, specProvider, logManager);
subscriptionFactory.RegisterDroppedPendingTransactionsSubscription(txPool, logManager);
subscriptionFactory.RegisterSyncingSubscription(blockTree, ethSyncingInfo, logManager);
}
}

0 comments on commit 0e03832

Please sign in to comment.