diff --git a/src/bench/checkqueue.cpp b/src/bench/checkqueue.cpp index 70e0b86ebaa1c..114dd9d39c86a 100644 --- a/src/bench/checkqueue.cpp +++ b/src/bench/checkqueue.cpp @@ -37,10 +37,11 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench) return true; } }; - CCheckQueue queue {QUEUE_BATCH_SIZE}; + // The main thread should be counted to prevent thread oversubscription, and // to decrease the variance of benchmark results. - queue.StartWorkerThreads(GetNumCores() - 1); + int worker_threads_num{GetNumCores() - 1}; + CCheckQueue queue{QUEUE_BATCH_SIZE, worker_threads_num}; // create all the data once, then submit copies in the benchmark. FastRandomContext insecure_rand(true); @@ -61,7 +62,6 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench) // it is done explicitly here for clarity control.Wait(); }); - queue.StopWorkerThreads(); ECC_Stop(); } BENCHMARK(CCheckQueueSpeedPrevectorJob, benchmark::PriorityLevel::HIGH); diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp index 995b4781fc29c..31edb86ce2e99 100644 --- a/src/bitcoin-chainstate.cpp +++ b/src/bitcoin-chainstate.cpp @@ -290,7 +290,6 @@ int main(int argc, char* argv[]) // dereferencing and UB. scheduler.stop(); if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join(); - StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); { diff --git a/src/checkqueue.h b/src/checkqueue.h index a3299fb3fe951..a1de000714d59 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -13,9 +13,6 @@ #include #include -template -class CCheckQueueControl; - /** * Queue for verifications that have to be performed. * The verifications are represented by a type T, which must provide an @@ -130,22 +127,11 @@ class CCheckQueue Mutex m_control_mutex; //! Create a new check queue - explicit CCheckQueue(unsigned int nBatchSizeIn) - : nBatchSize(nBatchSizeIn) + explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) + : nBatchSize(batch_size) { - } - - //! Create a pool of new worker threads. - void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) - { - { - LOCK(m_mutex); - nIdle = 0; - nTotal = 0; - fAllOk = true; - } - assert(m_worker_threads.empty()); - for (int n = 0; n < threads_num; ++n) { + m_worker_threads.reserve(worker_threads_num); + for (int n = 0; n < worker_threads_num; ++n) { m_worker_threads.emplace_back([this, n]() { util::ThreadRename(strprintf("scriptch.%i", n)); Loop(false /* worker thread */); @@ -153,6 +139,13 @@ class CCheckQueue } } + // Since this class manages its own resources, which is a thread + // pool `m_worker_threads`, copy and move operations are not appropriate. + CCheckQueue(const CCheckQueue&) = delete; + CCheckQueue& operator=(const CCheckQueue&) = delete; + CCheckQueue(CCheckQueue&&) = delete; + CCheckQueue& operator=(CCheckQueue&&) = delete; + //! Wait until execution finishes, and return whether all evaluations were successful. bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { @@ -179,24 +172,16 @@ class CCheckQueue } } - //! Stop all of the worker threads. - void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + ~CCheckQueue() { WITH_LOCK(m_mutex, m_request_stop = true); m_worker_cv.notify_all(); for (std::thread& t : m_worker_threads) { t.join(); } - m_worker_threads.clear(); - WITH_LOCK(m_mutex, m_request_stop = false); } bool HasThreads() const { return !m_worker_threads.empty(); } - - ~CCheckQueue() - { - assert(m_worker_threads.empty()); - } }; /** diff --git a/src/init.cpp b/src/init.cpp index d6dc62f707ff4..57a94dc7c69c1 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -268,10 +268,9 @@ void Shutdown(NodeContext& node) StopTorControl(); // After everything has been shut down, but before things get flushed, stop the - // CScheduler/checkqueue, scheduler and load block thread. + // scheduler and load block thread. if (node.scheduler) node.scheduler->stop(); if (node.chainman && node.chainman->m_thread_load.joinable()) node.chainman->m_thread_load.join(); - StopScriptCheckWorkerThreads(); // After the threads that potentially access these pointers have been stopped, // destruct and reset all to nullptr. @@ -1114,24 +1113,6 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) return InitError(strprintf(_("Unable to allocate memory for -maxsigcachesize: '%s' MiB"), args.GetIntArg("-maxsigcachesize", DEFAULT_MAX_SIG_CACHE_BYTES >> 20))); } - int script_threads = args.GetIntArg("-par", DEFAULT_SCRIPTCHECK_THREADS); - if (script_threads <= 0) { - // -par=0 means autodetect (number of cores - 1 script threads) - // -par=-n means "leave n cores free" (number of cores - n - 1 script threads) - script_threads += GetNumCores(); - } - - // Subtract 1 because the main thread counts towards the par threads - script_threads = std::max(script_threads - 1, 0); - - // Number of script-checking threads <= MAX_SCRIPTCHECK_THREADS - script_threads = std::min(script_threads, MAX_SCRIPTCHECK_THREADS); - - LogPrintf("Script verification uses %d additional threads\n", script_threads); - if (script_threads >= 1) { - StartScriptCheckWorkerThreads(script_threads); - } - assert(!node.scheduler); node.scheduler = std::make_unique(); diff --git a/src/kernel/chainstatemanager_opts.h b/src/kernel/chainstatemanager_opts.h index 917f7d226c564..ee20eabd7951d 100644 --- a/src/kernel/chainstatemanager_opts.h +++ b/src/kernel/chainstatemanager_opts.h @@ -45,6 +45,8 @@ struct ChainstateManagerOpts { DBOptions coins_db{}; CoinsViewOptions coins_view{}; Notifications& notifications; + //! Number of script check worker threads. Zero means no parallel verification. + int worker_threads_num{0}; }; } // namespace kernel diff --git a/src/node/chainstatemanager_args.cpp b/src/node/chainstatemanager_args.cpp index 87d9238c1805b..1cc126cb051c3 100644 --- a/src/node/chainstatemanager_args.cpp +++ b/src/node/chainstatemanager_args.cpp @@ -6,7 +6,8 @@ #include #include -#include +#include +#include #include #include #include @@ -16,6 +17,7 @@ #include #include +#include #include #include @@ -41,6 +43,16 @@ util::Result ApplyArgsManOptions(const ArgsManager& args, ChainstateManage ReadDatabaseArgs(args, opts.coins_db); ReadCoinsViewArgs(args, opts.coins_view); + int script_threads = args.GetIntArg("-par", DEFAULT_SCRIPTCHECK_THREADS); + if (script_threads <= 0) { + // -par=0 means autodetect (number of cores - 1 script threads) + // -par=-n means "leave n cores free" (number of cores - n - 1 script threads) + script_threads += GetNumCores(); + } + // Subtract 1 because the main thread counts towards the par threads. + opts.worker_threads_num = std::clamp(script_threads - 1, 0, MAX_SCRIPTCHECK_THREADS); + LogPrintf("Script verification uses %d additional threads\n", opts.worker_threads_num); + return {}; } } // namespace node diff --git a/src/node/chainstatemanager_args.h b/src/node/chainstatemanager_args.h index 701515953e8d8..b2cdba68b8fe9 100644 --- a/src/node/chainstatemanager_args.h +++ b/src/node/chainstatemanager_args.h @@ -10,6 +10,11 @@ class ArgsManager; +/** Maximum number of dedicated script-checking threads allowed */ +static constexpr int MAX_SCRIPTCHECK_THREADS{15}; +/** -par default (number of script-checking threads, 0 = auto) */ +static constexpr int DEFAULT_SCRIPTCHECK_THREADS{0}; + namespace node { [[nodiscard]] util::Result ApplyArgsManOptions(const ArgsManager& args, ChainstateManager::Options& opts); } // namespace node diff --git a/src/qt/optionsdialog.cpp b/src/qt/optionsdialog.cpp index 512fce473d20f..6e1d36effbe70 100644 --- a/src/qt/optionsdialog.cpp +++ b/src/qt/optionsdialog.cpp @@ -17,9 +17,9 @@ #include #include +#include #include #include -#include #include diff --git a/src/qt/optionsmodel.cpp b/src/qt/optionsmodel.cpp index c1563fe1e27bd..43564dad1673a 100644 --- a/src/qt/optionsmodel.cpp +++ b/src/qt/optionsmodel.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include // for -dbcache defaults #include #include // For DEFAULT_SCRIPTCHECK_THREADS diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index cb3831071a002..023a5e8e70dc7 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -158,8 +158,7 @@ typedef CCheckQueue FrozenCleanup_Queue; */ static void Correct_Queue_range(std::vector range) { - auto small_queue = std::make_unique(QUEUE_BATCH_SIZE); - small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); + auto small_queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); // Make vChecks here to save on malloc (this test can be slow...) std::vector vChecks; vChecks.reserve(9); @@ -176,7 +175,6 @@ static void Correct_Queue_range(std::vector range) BOOST_REQUIRE(control.Wait()); BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); } - small_queue->StopWorkerThreads(); } /** Test that 0 checks is correct @@ -218,9 +216,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) /** Test that failing checks are caught */ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { - auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE); - fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); - + auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1001; ++i) { CCheckQueueControl control(fail_queue.get()); size_t remaining = i; @@ -240,15 +236,12 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) BOOST_REQUIRE(success); } } - fail_queue->StopWorkerThreads(); } // Test that a block validation which fails does not interfere with // future blocks, ie, the bad state is cleared. BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { - auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE); - fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); - + auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); for (auto times = 0; times < 10; ++times) { for (const bool end_fails : {true, false}) { CCheckQueueControl control(fail_queue.get()); @@ -262,7 +255,6 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_REQUIRE(r != end_fails); } } - fail_queue->StopWorkerThreads(); } // Test that unique checks are actually all called individually, rather than @@ -270,9 +262,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) // more than once as well BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { - auto queue = std::make_unique(QUEUE_BATCH_SIZE); - queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); - + auto queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); size_t COUNT = 100000; size_t total = COUNT; { @@ -294,7 +284,6 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) } BOOST_REQUIRE(r); } - queue->StopWorkerThreads(); } @@ -305,8 +294,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) // time could leave the data hanging across a sequence of blocks. BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { - auto queue = std::make_unique(QUEUE_BATCH_SIZE); - queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); + auto queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1000; ++i) { size_t total = i; { @@ -325,16 +313,14 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) } BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); } - queue->StopWorkerThreads(); } // Test that a new verification cannot occur until all checks // have been destructed BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { - auto queue = std::make_unique(QUEUE_BATCH_SIZE); + auto queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); bool fails = false; - queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); std::thread t0([&]() { CCheckQueueControl control(queue.get()); std::vector vChecks(1); @@ -361,14 +347,13 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) // Wait for control to finish t0.join(); BOOST_REQUIRE(!fails); - queue->StopWorkerThreads(); } /** Test that CCheckQueueControl is threadsafe */ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) { - auto queue = std::make_unique(QUEUE_BATCH_SIZE); + auto queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); { std::vector tg; std::atomic nThreads {0}; diff --git a/src/test/fuzz/checkqueue.cpp b/src/test/fuzz/checkqueue.cpp index 429570526f118..6320b500b6f4b 100644 --- a/src/test/fuzz/checkqueue.cpp +++ b/src/test/fuzz/checkqueue.cpp @@ -31,8 +31,8 @@ FUZZ_TARGET(checkqueue) FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size()); const unsigned int batch_size = fuzzed_data_provider.ConsumeIntegralInRange(0, 1024); - CCheckQueue check_queue_1{batch_size}; - CCheckQueue check_queue_2{batch_size}; + CCheckQueue check_queue_1{batch_size, /*worker_threads_num=*/0}; + CCheckQueue check_queue_2{batch_size, /*worker_threads_num=*/0}; std::vector checks_1; std::vector checks_2; const int size = fuzzed_data_provider.ConsumeIntegralInRange(0, 1024); diff --git a/src/test/transaction_tests.cpp b/src/test/transaction_tests.cpp index 0a7ef3f7807f3..5329c6ac990d9 100644 --- a/src/test/transaction_tests.cpp +++ b/src/test/transaction_tests.cpp @@ -529,11 +529,9 @@ BOOST_AUTO_TEST_CASE(test_big_witness_transaction) // check all inputs concurrently, with the cache PrecomputedTransactionData txdata(tx); - CCheckQueue scriptcheckqueue(128); + CCheckQueue scriptcheckqueue(/*batch_size=*/128, /*worker_threads_num=*/20); CCheckQueueControl control(&scriptcheckqueue); - scriptcheckqueue.StartWorkerThreads(20); - std::vector coins; for(uint32_t i = 0; i < mtx.vin.size(); i++) { Coin coin; @@ -552,7 +550,6 @@ BOOST_AUTO_TEST_CASE(test_big_witness_transaction) bool controlCheck = control.Wait(); assert(controlCheck); - scriptcheckqueue.StopWorkerThreads(); } SignatureData CombineSignatures(const CMutableTransaction& input1, const CMutableTransaction& input2, const CTransactionRef tx) diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 5b67640e6e30f..bc639da4dde9b 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -187,6 +187,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto .adjusted_time_callback = GetAdjustedTime, .check_block_index = true, .notifications = *m_node.notifications, + .worker_threads_num = 2, }; const BlockManager::Options blockman_opts{ .chainparams = chainman_opts.chainparams, @@ -198,15 +199,11 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto .path = m_args.GetDataDirNet() / "blocks" / "index", .cache_bytes = static_cast(m_cache_sizes.block_tree_db), .memory_only = true}); - - constexpr int script_check_threads = 2; - StartScriptCheckWorkerThreads(script_check_threads); } ChainTestingSetup::~ChainTestingSetup() { if (m_node.scheduler) m_node.scheduler->stop(); - StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); m_node.connman.reset(); diff --git a/src/validation.cpp b/src/validation.cpp index 6d791f53d13ef..8a5bb93ef8463 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -2079,18 +2079,6 @@ DisconnectResult Chainstate::DisconnectBlock(const CBlock& block, const CBlockIn return fClean ? DISCONNECT_OK : DISCONNECT_UNCLEAN; } -static CCheckQueue scriptcheckqueue(128); - -void StartScriptCheckWorkerThreads(int threads_num) -{ - scriptcheckqueue.StartWorkerThreads(threads_num); -} - -void StopScriptCheckWorkerThreads() -{ - scriptcheckqueue.StopWorkerThreads(); -} - /** * Threshold condition checker that triggers when unknown versionbits are seen on the network. */ @@ -2179,7 +2167,7 @@ bool Chainstate::ConnectBlock(const CBlock& block, BlockValidationState& state, uint256 block_hash{block.GetHash()}; assert(*pindex->phashBlock == block_hash); - const bool parallel_script_checks{scriptcheckqueue.HasThreads()}; + const bool parallel_script_checks{m_chainman.GetCheckQueue().HasThreads()}; const auto time_start{SteadyClock::now()}; const CChainParams& params{m_chainman.GetParams()}; @@ -2368,7 +2356,7 @@ bool Chainstate::ConnectBlock(const CBlock& block, BlockValidationState& state, // in multiple threads). Preallocate the vector size so a new allocation // doesn't invalidate pointers into the vector, and keep txsdata in scope // for as long as `control`. - CCheckQueueControl control(fScriptChecks && parallel_script_checks ? &scriptcheckqueue : nullptr); + CCheckQueueControl control(fScriptChecks && parallel_script_checks ? &m_chainman.GetCheckQueue() : nullptr); std::vector txsdata(block.vtx.size()); std::vector prevheights; @@ -5792,9 +5780,12 @@ static ChainstateManager::Options&& Flatten(ChainstateManager::Options&& opts) } ChainstateManager::ChainstateManager(const util::SignalInterrupt& interrupt, Options options, node::BlockManager::Options blockman_options) - : m_interrupt{interrupt}, + : m_script_check_queue{/*batch_size=*/128, options.worker_threads_num}, + m_interrupt{interrupt}, m_options{Flatten(std::move(options))}, - m_blockman{interrupt, std::move(blockman_options)} {} + m_blockman{interrupt, std::move(blockman_options)} +{ +} ChainstateManager::~ChainstateManager() { diff --git a/src/validation.h b/src/validation.h index 7473bcbc3ba6b..093cecfcd104b 100644 --- a/src/validation.h +++ b/src/validation.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -65,10 +66,6 @@ namespace util { class SignalInterrupt; } // namespace util -/** Maximum number of dedicated script-checking threads allowed */ -static const int MAX_SCRIPTCHECK_THREADS = 15; -/** -par default (number of script-checking threads, 0 = auto) */ -static const int DEFAULT_SCRIPTCHECK_THREADS = 0; /** Block files containing a block-height within MIN_BLOCKS_TO_KEEP of ActiveChain().Tip() will not be pruned. */ static const unsigned int MIN_BLOCKS_TO_KEEP = 288; static const signed int DEFAULT_CHECKBLOCKS = 6; @@ -98,11 +95,6 @@ extern uint256 g_best_block; /** Documentation for argument 'checklevel'. */ extern const std::vector CHECKLEVEL_DOC; -/** Run instances of script checking worker threads */ -void StartScriptCheckWorkerThreads(int threads_num); -/** Stop all of the script checking worker threads */ -void StopScriptCheckWorkerThreads(); - CAmount GetBlockSubsidy(int nHeight, const Consensus::Params& consensusParams); bool FatalError(kernel::Notifications& notifications, BlockValidationState& state, const std::string& strMessage, const bilingual_str& userMessage = {}); @@ -926,6 +918,9 @@ class ChainstateManager return cs && !cs->m_disabled; } + //! A queue for script verifications that have to be performed by worker threads. + CCheckQueue m_script_check_queue; + public: using Options = kernel::ChainstateManagerOpts; @@ -1276,6 +1271,8 @@ class ChainstateManager //! nullopt. std::optional GetSnapshotBaseHeight() const EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + CCheckQueue& GetCheckQueue() { return m_script_check_queue; } + ~ChainstateManager(); };