diff --git a/Cargo.lock b/Cargo.lock index b1351203b..8c22cd713 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14055,6 +14055,7 @@ dependencies = [ "frame-benchmarking", "frame-benchmarking-cli", "frame-system", + "frame-system-rpc-runtime-api", "futures 0.3.30", "hex", "hex-literal 0.4.1", @@ -14065,6 +14066,7 @@ dependencies = [ "pallet-services-rpc", "pallet-transaction-payment", "pallet-transaction-payment-rpc", + "pallet-transaction-payment-rpc-runtime-api", "parity-scale-codec", "primitives-ext", "rand", @@ -14100,11 +14102,15 @@ dependencies = [ "sp-blockchain", "sp-consensus", "sp-consensus-babe", + "sp-consensus-grandpa", "sp-core", "sp-inherents", + "sp-io", "sp-keyring", "sp-keystore", + "sp-offchain", "sp-runtime", + "sp-session", "sp-timestamp", "sp-transaction-pool", "sp-transaction-storage-proof", diff --git a/node/Cargo.toml b/node/Cargo.toml index 1cb2dd69f..da9709aca 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -60,9 +60,9 @@ sp-inherents = { workspace = true, features = ["std"] } sp-keystore = { workspace = true, features = ["std"] } sp-runtime = { workspace = true, features = ["std"] } sp-timestamp = { workspace = true, features = ["std"] } - pallet-services-rpc = { workspace = true } - +sp-consensus-grandpa = { workspace = true } +sp-offchain = { workspace = true } pallet-airdrop-claims = { workspace = true } sc-chain-spec = { workspace = true } sc-rpc = { workspace = true } @@ -72,12 +72,16 @@ sp-block-builder = { workspace = true } sp-blockchain = { workspace = true } substrate-frame-rpc-system = { workspace = true } substrate-prometheus-endpoint = { workspace = true } +sp-session = { workspace = true } +frame-system-rpc-runtime-api = { workspace = true } +sp-io = { workspace = true } # RPC related dependencies jsonrpsee = { workspace = true } pallet-im-online = { workspace = true } pallet-transaction-payment = { workspace = true } pallet-transaction-payment-rpc = { workspace = true } +pallet-transaction-payment-rpc-runtime-api = { workspace = true } sc-transaction-pool-api = { workspace = true } # Frontier diff --git a/node/src/cli.rs b/node/src/cli.rs index 5c4ed5012..3298016f3 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -48,7 +48,7 @@ pub struct Cli { /// Choose sealing method. #[cfg(feature = "manual-seal")] #[arg(long, value_enum, ignore_case = true)] - pub sealing: Option, + pub sealing: Sealing, } #[derive(Debug, clap::Subcommand)] diff --git a/node/src/command.rs b/node/src/command.rs index 50db3a4b4..952173b52 100644 --- a/node/src/command.rs +++ b/node/src/command.rs @@ -116,6 +116,7 @@ pub fn run() -> sc_cli::Result<()> { let runner = cli.create_runner(cmd)?; runner.sync_run(|config| cmd.run(config.chain_spec, config.network)) }, + #[cfg(not(feature = "manual-seal"))] Some(Subcommand::CheckBlock(cmd)) => { let runner = cli.create_runner(cmd)?; runner.async_run(|config| { @@ -124,6 +125,16 @@ pub fn run() -> sc_cli::Result<()> { Ok((cmd.run(client, import_queue), task_manager)) }) }, + #[cfg(feature = "manual-seal")] + Some(Subcommand::CheckBlock(cmd)) => { + let runner = cli.create_runner(cmd)?; + runner.async_run(|mut config| { + let (client, _, import_queue, task_manager, _) = + service::new_chain_ops(&mut config, &cli.eth)?; + Ok((cmd.run(client, import_queue), task_manager)) + }) + }, + #[cfg(not(feature = "manual-seal"))] Some(Subcommand::ExportBlocks(cmd)) => { let runner = cli.create_runner(cmd)?; runner.async_run(|config| { @@ -132,6 +143,16 @@ pub fn run() -> sc_cli::Result<()> { Ok((cmd.run(client, config.database), task_manager)) }) }, + #[cfg(feature = "manual-seal")] + Some(Subcommand::ExportBlocks(cmd)) => { + let runner = cli.create_runner(cmd)?; + runner.async_run(|mut config| { + let (client, _, import_queue, task_manager, _) = + service::new_chain_ops(&mut config, &cli.eth)?; + Ok((cmd.run(client, config.database), task_manager)) + }) + }, + #[cfg(not(feature = "manual-seal"))] Some(Subcommand::ExportState(cmd)) => { let runner = cli.create_runner(cmd)?; runner.async_run(|config| { @@ -140,6 +161,16 @@ pub fn run() -> sc_cli::Result<()> { Ok((cmd.run(client, config.chain_spec), task_manager)) }) }, + #[cfg(feature = "manual-seal")] + Some(Subcommand::ExportState(cmd)) => { + let runner = cli.create_runner(cmd)?; + runner.async_run(|mut config| { + let (client, _, import_queue, task_manager, _) = + service::new_chain_ops(&mut config, &cli.eth)?; + Ok((cmd.run(client, config.chain_spec), task_manager)) + }) + }, + #[cfg(not(feature = "manual-seal"))] Some(Subcommand::ImportBlocks(cmd)) => { let runner = cli.create_runner(cmd)?; runner.async_run(|config| { @@ -148,10 +179,20 @@ pub fn run() -> sc_cli::Result<()> { Ok((cmd.run(client, import_queue), task_manager)) }) }, + #[cfg(feature = "manual-seal")] + Some(Subcommand::ImportBlocks(cmd)) => { + let runner = cli.create_runner(cmd)?; + runner.async_run(|mut config| { + let (client, _, import_queue, task_manager, _) = + service::new_chain_ops(&mut config, &cli.eth)?; + Ok((cmd.run(client, import_queue), task_manager)) + }) + }, Some(Subcommand::PurgeChain(cmd)) => { let runner = cli.create_runner(cmd)?; runner.sync_run(|config| cmd.run(config.database)) }, + #[cfg(not(feature = "manual-seal"))] Some(Subcommand::Revert(cmd)) => { let runner = cli.create_runner(cmd)?; runner.async_run(|config| { @@ -164,6 +205,20 @@ pub fn run() -> sc_cli::Result<()> { Ok((cmd.run(client, backend, Some(aux_revert)), task_manager)) }) }, + #[cfg(feature = "manual-seal")] + Some(Subcommand::Revert(cmd)) => { + let runner = cli.create_runner(cmd)?; + runner.async_run(|mut config| { + let (client, backend, import_queue, task_manager, _) = + service::new_chain_ops(&mut config, &cli.eth)?; + let aux_revert = Box::new(|client, _, blocks| { + sc_consensus_grandpa::revert(client, blocks)?; + Ok(()) + }); + Ok((cmd.run(client, backend, Some(aux_revert)), task_manager)) + }) + }, + #[cfg(not(feature = "manual-seal"))] Some(Subcommand::Benchmark(cmd)) => { let runner = cli.create_runner(cmd)?; @@ -212,6 +267,10 @@ pub fn run() -> sc_cli::Result<()> { } }) }, + #[cfg(feature = "manual-seal")] + Some(Subcommand::Benchmark(cmd)) => { + unimplemented!() + }, Some(Subcommand::FrontierDb(cmd)) => { let runner = cli.create_runner(cmd)?; runner.sync_run(|mut config| { @@ -268,6 +327,8 @@ pub fn run() -> sc_cli::Result<()> { eth_config: cli.eth, debug_output: cli.output_path, auto_insert_keys: cli.auto_insert_keys, + #[cfg(feature = "manual-seal")] + sealing: cli.sealing, }) .map_err(Into::into) .await diff --git a/node/src/manual_seal.rs b/node/src/manual_seal.rs index 230366474..6b9a5c831 100644 --- a/node/src/manual_seal.rs +++ b/node/src/manual_seal.rs @@ -14,24 +14,33 @@ // limitations under the License. //! Service and ServiceFactory implementation. Specialized wrapper over substrate service. +use crate::cli::Sealing; pub use crate::eth::{db_config_dir, EthConfiguration}; use crate::eth::{ new_frontier_partial, spawn_frontier_tasks, BackendType, EthApi, FrontierBackend, - FrontierBlockImport, FrontierPartialComponents, RpcConfig, + FrontierBlockImport, FrontierPartialComponents, RpcConfig, StorageOverride, + StorageOverrideHandler, }; +use futures::future; use futures::FutureExt; +use futures::{channel::mpsc, prelude::*}; use sc_client_api::{Backend, BlockBackend}; use sc_consensus::BasicQueue; use sc_consensus_babe::{BabeWorkerHandle, SlotProportion}; use sc_consensus_grandpa::SharedVoterState; +#[allow(deprecated)] pub use sc_executor::NativeElseWasmExecutor; use sc_service::{error::Error as ServiceError, ChainType, Configuration, TaskManager}; +use sc_telemetry::TelemetryHandle; use sc_telemetry::{Telemetry, TelemetryWorker}; +use sc_transaction_pool::FullPool; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use sp_core::U256; -use tangle_primitives::Block; - +use sp_runtime::traits::Block as BlockT; +use std::cell::RefCell; use std::{path::Path, sync::Arc, time::Duration}; +use substrate_prometheus_endpoint::Registry; +use tangle_primitives::Block; #[cfg(not(feature = "testnet"))] use tangle_runtime::{self, RuntimeApi, TransactionConverter}; @@ -43,9 +52,6 @@ use tangle_testnet_runtime::{self, RuntimeApi, TransactionConverter}; /// imported and generated. const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512; -pub type HostFunctions = - (frame_benchmarking::benchmarking::HostFunctions, primitives_ext::ext::HostFunctions); - #[cfg(not(feature = "testnet"))] pub mod tangle { // Our native executor instance. @@ -95,10 +101,12 @@ pub mod testnet { } #[cfg(not(feature = "testnet"))] +#[allow(deprecated)] pub(crate) type FullClient = sc_service::TFullClient>; #[cfg(feature = "testnet")] +#[allow(deprecated)] pub(crate) type FullClient = sc_service::TFullClient>; @@ -107,11 +115,14 @@ type FullSelectChain = sc_consensus::LongestChain; type GrandpaLinkHalf = sc_consensus_grandpa::LinkHalf; type BoxBlockImport = sc_consensus::BoxBlockImport; +type GrandpaBlockImport = + sc_consensus_grandpa::GrandpaBlockImport; #[allow(clippy::type_complexity)] -pub fn new_partial( +pub fn new_partial( config: &Configuration, eth_config: &EthConfiguration, + build_import_queue: BIQ, ) -> Result< sc_service::PartialComponents< FullClient, @@ -125,12 +136,22 @@ pub fn new_partial( GrandpaLinkHalf, sc_consensus_babe::BabeLink, FrontierBackend, - Arc>, - BabeWorkerHandle, + Arc>, + Option>, ), >, ServiceError, -> { +> +where + BIQ: FnOnce( + Arc, + &Configuration, + &EthConfiguration, + &TaskManager, + Option, + GrandpaBlockImport, + ) -> Result<(BasicQueue, BoxBlockImport), ServiceError>, +{ println!(" ++++++++++++++++++++++++ +++++++++++++++++++++++++++ +++++++++++++++++++++++++++ @@ -157,6 +178,7 @@ pub fn new_partial( }) .transpose()?; + #[allow(deprecated)] let executor = sc_service::new_native_or_wasm_executor(config); let (client, backend, keystore_container, task_manager) = @@ -190,13 +212,13 @@ pub fn new_partial( telemetry.as_ref().map(|x| x.handle()), )?; - let overrides = crate::rpc::overrides_handle(client.clone()); + let storage_override = Arc::new(StorageOverrideHandler::::new(client.clone())); let frontier_backend = match eth_config.frontier_backend_type { - BackendType::KeyValue => FrontierBackend::KeyValue(fc_db::kv::Backend::open( + BackendType::KeyValue => FrontierBackend::KeyValue(Arc::new(fc_db::kv::Backend::open( Arc::clone(&client), &config.database, &db_config_dir(config), - )?), + )?)), BackendType::Sql => { let db_path = db_config_dir(config).join("sql"); std::fs::create_dir_all(&db_path).expect("failed creating sql db directory"); @@ -213,10 +235,10 @@ pub fn new_partial( }), eth_config.frontier_sql_backend_pool_size, std::num::NonZeroU32::new(eth_config.frontier_sql_backend_num_ops_timeout), - overrides.clone(), + storage_override.clone(), )) .unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err)); - FrontierBackend::Sql(backend) + FrontierBackend::Sql(Arc::new(backend)) }, }; @@ -226,43 +248,18 @@ pub fn new_partial( client.clone(), )?; - let import_queue = sc_consensus_manual_seal::import_queue( - Box::new(client.clone()), - &task_manager.spawn_essential_handle(), - config.prometheus_registry(), - ); - let slot_duration = babe_link.config().slot_duration(); let target_gas_price = eth_config.target_gas_price; - let frontier_block_import = FrontierBlockImport::new(block_import.clone(), client.clone()); - - let (import_queue, babe_worker_handle) = - sc_consensus_babe::import_queue(sc_consensus_babe::ImportQueueParams { - link: babe_link.clone(), - block_import: frontier_block_import.clone(), - justification_import: Some(Box::new(grandpa_block_import.clone())), - client: client.clone(), - select_chain: select_chain.clone(), - create_inherent_data_providers: move |_, ()| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - let _dynamic_fee = - fp_dynamic_fee::InherentDataProvider(U256::from(target_gas_price)); - Ok((slot, timestamp)) - }, - spawner: &task_manager.spawn_essential_handle(), - registry: config.prometheus_registry(), - telemetry: telemetry.as_ref().map(|x| x.handle()), - offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool.clone()), - })?; + let (import_queue, block_import) = build_import_queue( + client.clone(), + config, + eth_config, + &task_manager, + telemetry.as_ref().map(|x| x.handle()), + grandpa_block_import, + )?; Ok(sc_service::PartialComponents { client, @@ -274,18 +271,55 @@ pub fn new_partial( transaction_pool, other: ( telemetry, - Box::new(frontier_block_import), + block_import, grandpa_link, babe_link, frontier_backend, - overrides, - babe_worker_handle, + storage_override, + None, ), }) } -pub async fn new_full( - RunFullParams { mut config, eth_config, rpc_config, debug_output: _, auto_insert_keys }: RunFullParams, +#[allow(dead_code)] +pub struct RunFullParams { + pub config: Configuration, + pub eth_config: EthConfiguration, + pub rpc_config: RpcConfig, + pub debug_output: Option, + pub auto_insert_keys: bool, + pub sealing: Sealing, +} + +pub fn build_manual_seal_import_queue( + client: Arc, + config: &Configuration, + _eth_config: &EthConfiguration, + task_manager: &TaskManager, + _telemetry: Option, + _grandpa_block_import: GrandpaBlockImport, +) -> Result<(BasicQueue, BoxBlockImport), ServiceError> { + let frontier_block_import = FrontierBlockImport::new(client.clone(), client); + Ok(( + sc_consensus_manual_seal::import_queue( + Box::new(frontier_block_import.clone()), + &task_manager.spawn_essential_handle(), + config.prometheus_registry(), + ), + Box::new(frontier_block_import), + )) +} + +/// Builds a new service for a full client. +pub async fn new_full::Hash>>( + RunFullParams { + mut config, + eth_config, + rpc_config, + debug_output: _, + auto_insert_keys, + sealing, + }: RunFullParams, ) -> Result { let sc_service::PartialComponents { client, @@ -302,15 +336,67 @@ pub async fn new_full( grandpa_link, babe_link, frontier_backend, - overrides, + storage_override, babe_worker_handle, ), - } = new_partial(&config, ð_config)?; + } = new_partial(&config, ð_config, build_manual_seal_import_queue)?; + + if config.role.is_authority() { + if config.chain_spec.chain_type() == ChainType::Development + || config.chain_spec.chain_type() == ChainType::Local + { + if auto_insert_keys { + crate::utils::insert_controller_account_keys_into_keystore( + &config, + Some(keystore_container.local_keystore()), + ); + } else { + crate::utils::insert_dev_controller_account_keys_into_keystore( + &config, + Some(keystore_container.local_keystore()), + ); + } + } + + // finally check if keys are inserted correctly + if crate::utils::ensure_all_keys_exist_in_keystore(keystore_container.keystore()).is_err() { + if config.chain_spec.chain_type() == ChainType::Development + || config.chain_spec.chain_type() == ChainType::Local + { + println!(" + ++++++++++++++++++++++++++++++++++++++++++++++++ + Validator keys not found, validator keys are essential to run a validator on + Tangle Network, refer to https://docs.webb.tools/docs/ecosystem-roles/validator/required-keys/ on + how to generate and insert keys. OR start the node with --auto-insert-keys to automatically generate the keys in testnet. + ++++++++++++++++++++++++++++++++++++++++++++++++ + \n"); + panic!("Keys not detected!") + } else { + println!(" + ++++++++++++++++++++++++++++++++++++++++++++++++ + Validator keys not found, validator keys are essential to run a validator on + Tangle Network, refer to https://docs.webb.tools/docs/ecosystem-roles/validator/required-keys/ on + how to generate and insert keys. + ++++++++++++++++++++++++++++++++++++++++++++++++ + \n"); + panic!("Keys not detected!") + } + } + } let FrontierPartialComponents { filter_pool, fee_history_cache, fee_history_cache_limit } = new_frontier_partial(ð_config)?; - let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); + let mut net_config = sc_network::config::FullNetworkConfiguration::< + Block, + ::Hash, + Network, + >::new(&config.network); + + let peer_store_handle = net_config.peer_store_handle(); + let metrics = Network::register_notification_metrics( + config.prometheus_config.as_ref().map(|cfg| &cfg.registry), + ); let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name( &client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"), @@ -318,7 +404,11 @@ pub async fn new_full( ); let (grandpa_protocol_config, grandpa_notification_service) = - sc_consensus_grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone()); + sc_consensus_grandpa::grandpa_peers_set_config::<_, Network>( + grandpa_protocol_name.clone(), + metrics.clone(), + peer_store_handle, + ); net_config.add_notification_protocol(grandpa_protocol_config); @@ -339,6 +429,7 @@ pub async fn new_full( block_announce_validator_builder: None, warp_sync_params: Some(sc_service::WarpSyncParams::WithProvider(warp_sync)), block_relay: None, + metrics, })?; let role = config.role.clone(); @@ -358,7 +449,7 @@ pub async fn new_full( transaction_pool: Some(OffchainTransactionPoolFactory::new( transaction_pool.clone(), )), - network_provider: network.clone(), + network_provider: Arc::new(network.clone()), is_validator: role.is_authority(), enable_http_requests: true, custom_extensions: move |_| vec![], @@ -383,6 +474,7 @@ pub async fn new_full( let slot_duration = babe_link.config().slot_duration(); let target_gas_price = eth_config.target_gas_price; + let frontier_backend = Arc::new(frontier_backend); let ethapi_cmd = rpc_config.ethapi.clone(); let tracing_requesters = @@ -392,7 +484,7 @@ pub async fn new_full( client.clone(), backend.clone(), frontier_backend.clone(), - overrides.clone(), + storage_override.clone(), &rpc_config, prometheus_registry.clone(), ) @@ -413,6 +505,8 @@ pub async fn new_full( Ok((slot, timestamp, dynamic_fee)) }; + let network_clone = network.clone(); + let eth_rpc_params = crate::rpc::EthDeps { client: client.clone(), pool: transaction_pool.clone(), @@ -420,16 +514,16 @@ pub async fn new_full( converter: Some(TransactionConverter), is_authority: config.role.is_authority(), enable_dev_signer: eth_config.enable_dev_signer, - network: network.clone(), + network: network_clone, sync: sync_service.clone(), - frontier_backend: match frontier_backend.clone() { - fc_db::Backend::KeyValue(b) => Arc::new(b), - fc_db::Backend::Sql(b) => Arc::new(b), + frontier_backend: match &*frontier_backend { + fc_db::Backend::KeyValue(b) => b.clone(), + fc_db::Backend::Sql(b) => b.clone(), }, - overrides: overrides.clone(), + storage_override: storage_override.clone(), block_data_cache: Arc::new(fc_rpc::EthBlockDataCacheTask::new( task_manager.spawn_handle(), - overrides.clone(), + storage_override.clone(), eth_config.eth_log_block_cache, eth_config.eth_statuses_cache, prometheus_registry.clone(), @@ -470,10 +564,7 @@ pub async fn new_full( pool: pool.clone(), deny_unsafe, eth: eth_rpc_params.clone(), - babe: crate::rpc::BabeDeps { - keystore: keystore.clone(), - babe_worker_handle: babe_worker_handle.clone(), - }, + babe: None, select_chain: select_chain_clone.clone(), grandpa: crate::rpc::GrandpaDeps { shared_voter_state: shared_voter_state.clone(), @@ -500,7 +591,7 @@ pub async fn new_full( backend.clone(), frontier_backend, filter_pool, - overrides, + storage_override.clone(), fee_history_cache, fee_history_cache_limit, sync_service.clone(), @@ -524,34 +615,26 @@ pub async fn new_full( }; let _rpc_handlers = sc_service::spawn_tasks(params)?; - if role.is_authority() { - let proposer = sc_basic_authorship::ProposerFactory::new( - task_manager.spawn_handle(), - client.clone(), - transaction_pool.clone(), - prometheus_registry.as_ref(), - telemetry.as_ref().map(|x| x.handle()), - ); + // Channel for the rpc handler to communicate with the authorship task. + let (command_sink, commands_stream) = mpsc::channel(1000); - let params = sc_consensus_manual_seal::InstantSealParams { - block_import: client.clone(), - env: proposer, + if role.is_authority() { + run_manual_seal_authorship( + ð_config, + sealing, client, - pool: transaction_pool.clone(), + transaction_pool, select_chain, - consensus_data_provider: None, - create_inherent_data_providers: move |_, ()| async move { - Ok(sp_timestamp::InherentDataProvider::from_system_time()) - }, - }; - - let authorship_future = sc_consensus_manual_seal::run_instant_seal(params); + block_import, + &task_manager, + prometheus_registry.as_ref(), + telemetry.as_ref(), + commands_stream, + )?; - task_manager.spawn_essential_handle().spawn_blocking( - "manual-seal", - None, - authorship_future, - ); + network_starter.start_network(); + log::info!("Manual Seal Ready"); + return Ok(task_manager); } // if the node isn't actively participating in consensus then it doesn't @@ -603,6 +686,96 @@ pub async fn new_full( Ok(task_manager) } +fn run_manual_seal_authorship( + eth_config: &EthConfiguration, + sealing: Sealing, + client: Arc, + transaction_pool: Arc>, + select_chain: FullSelectChain, + block_import: BoxBlockImport, + task_manager: &TaskManager, + prometheus_registry: Option<&Registry>, + telemetry: Option<&Telemetry>, + commands_stream: mpsc::Receiver< + sc_consensus_manual_seal::rpc::EngineCommand<::Hash>, + >, +) -> Result<(), ServiceError> { + let proposer_factory = sc_basic_authorship::ProposerFactory::new( + task_manager.spawn_handle(), + client.clone(), + transaction_pool.clone(), + prometheus_registry, + telemetry.as_ref().map(|x| x.handle()), + ); + + thread_local!(static TIMESTAMP: RefCell = const { RefCell::new(0) }); + + /// Provide a mock duration starting at 0 in millisecond for timestamp inherent. + /// Each call will increment timestamp by slot_duration making Aura think time has passed. + struct MockTimestampInherentDataProvider; + + #[async_trait::async_trait] + impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider { + async fn provide_inherent_data( + &self, + inherent_data: &mut sp_inherents::InherentData, + ) -> Result<(), sp_inherents::Error> { + TIMESTAMP.with(|x| { + *x.borrow_mut() += tangle_testnet_runtime::SLOT_DURATION; + inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow()) + }) + } + + async fn try_handle_error( + &self, + _identifier: &sp_inherents::InherentIdentifier, + _error: &[u8], + ) -> Option> { + // The pallet never reports error. + None + } + } + + let target_gas_price = eth_config.target_gas_price; + let create_inherent_data_providers = move |_, ()| async move { + let timestamp = MockTimestampInherentDataProvider; + let dynamic_fee = fp_dynamic_fee::InherentDataProvider(U256::from(target_gas_price)); + Ok((timestamp, dynamic_fee)) + }; + + let manual_seal = match sealing { + Sealing::Manual => future::Either::Left(sc_consensus_manual_seal::run_manual_seal( + sc_consensus_manual_seal::ManualSealParams { + block_import, + env: proposer_factory, + client, + pool: transaction_pool, + commands_stream, + select_chain, + consensus_data_provider: None, + create_inherent_data_providers, + }, + )), + Sealing::Instant => future::Either::Right(sc_consensus_manual_seal::run_instant_seal( + sc_consensus_manual_seal::InstantSealParams { + block_import, + env: proposer_factory, + client, + pool: transaction_pool, + select_chain, + consensus_data_provider: None, + create_inherent_data_providers, + }, + )), + }; + + // we spawn the future on a background thread managed by service. + task_manager + .spawn_essential_handle() + .spawn_blocking("manual-seal", None, manual_seal); + Ok(()) +} + #[allow(clippy::type_complexity)] pub fn new_chain_ops( config: &mut Configuration, @@ -614,6 +787,6 @@ pub fn new_chain_ops( config.keystore = sc_service::config::KeystoreConfig::InMemory; let sc_service::PartialComponents { client, backend, import_queue, task_manager, other, .. - } = new_partial(config, eth_config)?; + } = new_partial(config, eth_config, build_manual_seal_import_queue)?; Ok((client, backend, import_queue, task_manager, other.4)) } diff --git a/node/src/rpc/mod.rs b/node/src/rpc/mod.rs index 9922ff7bc..58668c869 100644 --- a/node/src/rpc/mod.rs +++ b/node/src/rpc/mod.rs @@ -82,7 +82,7 @@ pub struct FullDeps { /// Ethereum-compatibility specific dependencies. pub eth: EthDeps, /// BABE specific dependencies. - pub babe: BabeDeps, + pub babe: Option, /// The SelectChain Strategy pub select_chain: SC, /// GRANDPA specific dependencies. @@ -232,7 +232,13 @@ where let mut io = RpcModule::new(()); let FullDeps { client, pool, deny_unsafe, eth, babe, select_chain, grandpa } = deps; - let BabeDeps { keystore, babe_worker_handle } = babe; + if let Some(babe) = babe { + let BabeDeps { babe_worker_handle, keystore } = babe; + io.merge( + Babe::new(client.clone(), babe_worker_handle, keystore, select_chain, deny_unsafe) + .into_rpc(), + )?; + } let GrandpaDeps { shared_voter_state, @@ -245,11 +251,6 @@ where io.merge(System::new(client.clone(), pool, deny_unsafe).into_rpc())?; io.merge(TransactionPayment::new(client.clone()).into_rpc())?; - io.merge( - Babe::new(client.clone(), babe_worker_handle.clone(), keystore, select_chain, deny_unsafe) - .into_rpc(), - )?; - io.merge( Grandpa::new( subscription_executor, diff --git a/node/src/service.rs b/node/src/service.rs index 010e280bc..e68e2ee37 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -536,10 +536,10 @@ pub async fn new_full Result { - let sc_service::PartialComponents { - client, - backend, - mut task_manager, - import_queue, - keystore_container, - select_chain, - transaction_pool, - other: - ( - mut telemetry, - block_import, - grandpa_link, - babe_link, - frontier_backend, - overrides, - babe_worker_handle, - ), - } = new_partial(&config, ð_config)?; - - let FrontierPartialComponents { filter_pool, fee_history_cache, fee_history_cache_limit } = - new_frontier_partial(ð_config)?; - - let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); - - let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name( - &client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"), - &config.chain_spec, - ); - - let (grandpa_protocol_config, grandpa_notification_service) = - sc_consensus_grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone()); - - net_config.add_notification_protocol(grandpa_protocol_config); - - let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new( - backend.clone(), - grandpa_link.shared_authority_set().clone(), - Vec::default(), - )); - - let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = - sc_service::build_network(sc_service::BuildNetworkParams { - config: &config, - net_config, - client: client.clone(), - transaction_pool: transaction_pool.clone(), - spawn_handle: task_manager.spawn_handle(), - import_queue, - block_announce_validator_builder: None, - warp_sync_params: Some(sc_service::WarpSyncParams::WithProvider(warp_sync)), - block_relay: None, - })?; - - let role = config.role.clone(); - let force_authoring = config.force_authoring; - let name = config.network.node_name.clone(); - let enable_grandpa = !config.disable_grandpa; - let prometheus_registry = config.prometheus_registry().cloned(); - - if config.offchain_worker.enabled { - task_manager.spawn_handle().spawn( - "offchain-workers-runner", - "offchain-work", - sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { - runtime_api_provider: client.clone(), - keystore: Some(keystore_container.keystore()), - offchain_db: backend.offchain_storage(), - transaction_pool: Some(OffchainTransactionPoolFactory::new( - transaction_pool.clone(), - )), - network_provider: network.clone(), - is_validator: role.is_authority(), - enable_http_requests: true, - custom_extensions: move |_| vec![], - }) - .run(client.clone(), task_manager.spawn_handle()) - .boxed(), - ); - } - - // Sinks for pubsub notifications. - // Everytime a new subscription is created, a new mpsc channel is added to the sink pool. - // The MappingSyncWorker sends through the channel on block import and the subscription emits a - // notification to the subscriber on receiving a message through this channel. This way we avoid - // race conditions when using native substrate block import notification stream. - let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks< - fc_mapping_sync::EthereumBlockNotification, - > = Default::default(); - let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); - - // for ethereum-compatibility rpc. - config.rpc_id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider)); - - let slot_duration = babe_link.config().slot_duration(); - let target_gas_price = eth_config.target_gas_price; - - let ethapi_cmd = rpc_config.ethapi.clone(); - let tracing_requesters = - if ethapi_cmd.contains(&EthApi::Debug) || ethapi_cmd.contains(&EthApi::Trace) { - crate::rpc::tracing::spawn_tracing_tasks( - &task_manager, - client.clone(), - backend.clone(), - frontier_backend.clone(), - overrides.clone(), - &rpc_config, - prometheus_registry.clone(), - ) - } else { - crate::rpc::tracing::RpcRequesters { debug: None, trace: None } - }; - - let pending_create_inherent_data_providers = move |_, ()| async move { - let current = sp_timestamp::InherentDataProvider::from_system_time(); - let next_slot = current.timestamp().as_millis() + slot_duration.as_millis(); - let timestamp = sp_timestamp::InherentDataProvider::new(next_slot.into()); - let slot = - sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - let dynamic_fee = fp_dynamic_fee::InherentDataProvider(U256::from(target_gas_price)); - Ok((slot, timestamp, dynamic_fee)) - }; - - let eth_rpc_params = crate::rpc::EthDeps { - client: client.clone(), - pool: transaction_pool.clone(), - graph: transaction_pool.pool().clone(), - converter: Some(TransactionConverter), - is_authority: config.role.is_authority(), - enable_dev_signer: eth_config.enable_dev_signer, - network: network.clone(), - sync: sync_service.clone(), - frontier_backend: match frontier_backend.clone() { - fc_db::Backend::KeyValue(b) => Arc::new(b), - fc_db::Backend::Sql(b) => Arc::new(b), - }, - overrides: overrides.clone(), - block_data_cache: Arc::new(fc_rpc::EthBlockDataCacheTask::new( - task_manager.spawn_handle(), - overrides.clone(), - eth_config.eth_log_block_cache, - eth_config.eth_statuses_cache, - prometheus_registry.clone(), - )), - filter_pool: filter_pool.clone(), - max_past_logs: eth_config.max_past_logs, - fee_history_cache: fee_history_cache.clone(), - fee_history_cache_limit, - execute_gas_limit_multiplier: eth_config.execute_gas_limit_multiplier, - forced_parent_hashes: None, - tracing_config: Some(crate::rpc::eth::TracingConfig { - tracing_requesters: tracing_requesters.clone(), - trace_filter_max_count: rpc_config.ethapi_trace_max_count, - }), - pending_create_inherent_data_providers, - }; - - let keystore = keystore_container.keystore(); - let select_chain_clone = select_chain.clone(); - let rpc_builder = { - let client = client.clone(); - let pool = transaction_pool.clone(); - let pubsub_notification_sinks = pubsub_notification_sinks.clone(); - let justification_stream = grandpa_link.justification_stream(); - let shared_authority_set = grandpa_link.shared_authority_set().clone(); - let shared_voter_state = sc_consensus_grandpa::SharedVoterState::empty(); - let _shared_voter_state2 = shared_voter_state.clone(); - - let finality_proof_provider = sc_consensus_grandpa::FinalityProofProvider::new_for_service( - backend.clone(), - Some(shared_authority_set.clone()), - ); - - Box::new( - move |deny_unsafe, subscription_task_executor: sc_rpc::SubscriptionTaskExecutor| { - let deps = crate::rpc::FullDeps { - client: client.clone(), - pool: pool.clone(), - deny_unsafe, - eth: eth_rpc_params.clone(), - babe: crate::rpc::BabeDeps { - keystore: keystore.clone(), - babe_worker_handle: babe_worker_handle.clone(), - }, - select_chain: select_chain_clone.clone(), - grandpa: crate::rpc::GrandpaDeps { - shared_voter_state: shared_voter_state.clone(), - shared_authority_set: shared_authority_set.clone(), - justification_stream: justification_stream.clone(), - subscription_executor: subscription_task_executor.clone(), - finality_provider: finality_proof_provider.clone(), - }, - }; - - crate::rpc::create_full( - deps, - subscription_task_executor, - pubsub_notification_sinks.clone(), - ) - .map_err(Into::into) - }, - ) - }; - - spawn_frontier_tasks( - &task_manager, - client.clone(), - backend.clone(), - frontier_backend, - filter_pool, - overrides, - fee_history_cache, - fee_history_cache_limit, - sync_service.clone(), - pubsub_notification_sinks, - ) - .await; - - let params = sc_service::SpawnTasksParams { - network: network.clone(), - client: client.clone(), - keystore: keystore_container.keystore(), - task_manager: &mut task_manager, - transaction_pool: transaction_pool.clone(), - rpc_builder, - backend: backend.clone(), - system_rpc_tx, - tx_handler_controller, - sync_service: sync_service.clone(), - config, - telemetry: telemetry.as_mut(), - }; - let _rpc_handlers = sc_service::spawn_tasks(params)?; - - if role.is_authority() { - let proposer = sc_basic_authorship::ProposerFactory::new( - task_manager.spawn_handle(), - client.clone(), - transaction_pool.clone(), - prometheus_registry.as_ref(), - telemetry.as_ref().map(|x| x.handle()), - ); - - let params = sc_consensus_manual_seal::InstantSealParams { - block_import: client.clone(), - env: proposer, - client, - pool: transaction_pool.clone(), - select_chain, - consensus_data_provider: None, - create_inherent_data_providers: move |_, ()| async move { - Ok(sp_timestamp::InherentDataProvider::from_system_time()) - }, - }; - - let authorship_future = sc_consensus_manual_seal::run_instant_seal(params); - - task_manager.spawn_essential_handle().spawn_blocking( - "instant-seal", - None, - authorship_future, - ); - } - - // if the node isn't actively participating in consensus then it doesn't - // need a keystore, regardless of which protocol we use below. - let keystore = if role.is_authority() { Some(keystore_container.keystore()) } else { None }; - - let grandpa_config = sc_consensus_grandpa::Config { - // FIXME #1578 make this available through chainspec - gossip_duration: Duration::from_millis(333), - justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD, - name: Some(name), - observer_enabled: false, - keystore, - local_role: role, - telemetry: telemetry.as_ref().map(|x| x.handle()), - protocol_name: grandpa_protocol_name, - }; - - if enable_grandpa { - // start the full GRANDPA voter - // NOTE: non-authorities could run the GRANDPA observer protocol, but at - // this point the full voter should provide better guarantees of block - // and vote data availability than the observer. The observer has not - // been tested extensively yet and having most nodes in a network run it - // could lead to finality stalls. - let grandpa_config = sc_consensus_grandpa::GrandpaParams { - config: grandpa_config, - link: grandpa_link, - network, - sync: Arc::new(sync_service), - notification_service: grandpa_notification_service, - voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(), - prometheus_registry, - shared_voter_state: SharedVoterState::empty(), - telemetry: telemetry.as_ref().map(|x| x.handle()), - offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool), - }; - - // the GRANDPA voter task is considered infallible, i.e. - // if it fails we take down the service with it. - task_manager.spawn_essential_handle().spawn_blocking( - "grandpa-voter", - None, - sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?, - ); - } - - network_starter.start_network(); - Ok(task_manager) -} - #[allow(clippy::type_complexity)] pub fn new_chain_ops( config: &mut Configuration,