diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index c13df02079..f05d149c72 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -71,8 +71,6 @@ use crate::proof_tracker::ProofTracker; use crate::types::{CUGroups, PhysicalCoreGroups}; const PROOF_POLL_LIMIT: usize = 50; -// TODO: move to config -const WS_PING_PERIOD_SEC: u64 = 10; #[derive(Clone)] struct OnChainWorker { @@ -457,19 +455,17 @@ impl ChainListener { } } - pub async fn create_ws_client(ws_endpoint: &str) -> Result { + pub async fn create_ws_client(config: &ChainListenerConfig) -> Result { let ws_client = retry(ExponentialBackoff::default(), || async { let client = WsClientBuilder::default() - .enable_ws_ping( - PingConfig::new().ping_interval(Duration::from_secs(WS_PING_PERIOD_SEC)), - ) - .build(ws_endpoint) + .enable_ws_ping(PingConfig::new().ping_interval(config.ws_ping_period)) + .build(config.ws_endpoint.clone()) .await .map_err(|err| { tracing::warn!( target: "chain-listener", "Error connecting to websocket endpoint {}, error: {}; Retrying...", - ws_endpoint, + config.ws_endpoint, err ); err @@ -482,7 +478,7 @@ impl ChainListener { tracing::info!( target: "chain-listener", "Successfully connected to websocket endpoint: {}", - ws_endpoint + config.ws_endpoint ); Ok(ws_client) @@ -506,8 +502,7 @@ impl ChainListener { async fn refresh_subscriptions(&mut self) -> Result<(), client::Error> { if !self.ws_client.is_connected() { - self.ws_client = - ChainListener::create_ws_client(&self.listener_config.ws_endpoint).await?; + self.ws_client = ChainListener::create_ws_client(&self.listener_config).await?; } // loop because subscriptions can fail and require reconnection, we can't proceed without them @@ -532,8 +527,7 @@ impl ChainListener { client::Error::RestartNeeded(_) => { tracing::warn!(target: "chain-listener", "Failed to refresh subscriptions: {err}; Restart client..."); self.ws_client = - ChainListener::create_ws_client(&self.listener_config.ws_endpoint) - .await?; + ChainListener::create_ws_client(&self.listener_config).await?; } _ => { tracing::error!(target: "chain-listener", "Failed to refresh subscriptions: {err}; Retrying..."); @@ -551,6 +545,10 @@ impl ChainListener { let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect(); + let current_units: Vec = units.iter().map(|unit| CUID::new(unit.id.0)).collect(); + self.core_distributor + .cleanup_cache(current_units.as_slice()); + self.cc_compute_units = units .into_iter() .map(|unit| (CUID::new(unit.id.0), unit)) diff --git a/crates/core-distributor/src/distributor.rs b/crates/core-distributor/src/distributor.rs index a086aaaf86..1f4737197b 100644 --- a/crates/core-distributor/src/distributor.rs +++ b/crates/core-distributor/src/distributor.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID}; use cpu_utils::CPUTopology; use fxhash::FxBuildHasher; +use fxhash::FxHashSet; use parking_lot::RwLock; use range_set_blaze::RangeSetBlaze; @@ -44,6 +45,8 @@ pub trait CoreDistributor: Send + Sync { fn release_worker_cores(&self, unit_ids: &[CUID]); fn get_system_cpu_assignment(&self) -> SystemAssignment; + + fn cleanup_cache(&self, allowed_cuids: &[CUID]); } /// `PersistentCoreDistributor` is a CPU core distributor responsible for allocating and releasing CPU cores @@ -67,6 +70,7 @@ impl From for CoreDistributorState { available_cores: value.available_cores.into_iter().collect(), unit_id_mapping: value.unit_id_mapping.into_iter().collect(), work_type_mapping: value.work_type_mapping.into_iter().collect(), + cuid_cache: value.cuid_cache.into_iter().collect(), } } } @@ -203,12 +207,16 @@ impl PersistentCoreDistributor { let type_mapping = Map::with_capacity_and_hasher(available_core_count, FxBuildHasher::default()); + let cpu_cache = + Map::with_capacity_and_hasher(available_core_count, FxBuildHasher::default()); + let inner_state = CoreDistributorState { cores_mapping, system_cores, available_cores, unit_id_mapping, work_type_mapping: type_mapping, + cuid_cache: cpu_cache, }; let result = Self::make_instance_with_task(file_name, inner_state, acquire_strategy); @@ -288,6 +296,13 @@ impl CoreDistributor for PersistentCoreDistributor { } SystemAssignment::new(lock.system_cores.clone(), logical_core_ids) } + + fn cleanup_cache(&self, allowed_cuids: &[CUID]) { + let mut lock = self.state.write(); + let allowed_unit_ids: FxHashSet = allowed_cuids.iter().cloned().collect(); + lock.cuid_cache + .retain(|cuid, _| allowed_unit_ids.contains(cuid)) + } } pub(crate) struct CoreDistributorState { @@ -302,6 +317,8 @@ pub(crate) struct CoreDistributorState { pub unit_id_mapping: BiMap, // mapping between unit id and workload type pub work_type_mapping: Map, + // cache + pub cuid_cache: Map, } #[cfg(test)] @@ -431,6 +448,52 @@ mod tests { assert_eq!(assignment_1, assignment_3); } + #[test] + fn test_acquire_same_cuid_strict() { + let cpu_topology = mocked_topology(); + let temp_dir = tempfile::tempdir().expect("Failed to create temp dir"); + + let (distributor, _task) = PersistentCoreDistributor::from_path( + temp_dir.path().join("test.toml"), + 2, + CoreRange::from_str("0-7").unwrap(), + AcquireStrategy::Strict, + &cpu_topology, + ) + .unwrap(); + let unit_id_1 = + ::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea") + .unwrap(); + let unit_id_2 = + ::from_hex("1cce3d08f784b11d636f2fb55adf291d43c2e9cbe7ae7eeb2d0301a96be0a3a0") + .unwrap(); + let unit_id_3 = + ::from_hex("1cce3d08f784b11d636f2fb55adf291d43c2e9cbe7ae7eeb2d0301a96be0a3d0") + .unwrap(); + let all_unit_ids = vec![unit_id_1, unit_id_2, unit_id_3]; + + let unit_ids = vec![unit_id_1, unit_id_2]; + let assignment_1 = distributor + .acquire_worker_cores(AcquireRequest { + unit_ids: unit_ids.clone(), + work_type: WorkType::CapacityCommitment, + }) + .unwrap(); + distributor.release_worker_cores(all_unit_ids.as_slice()); + let unit_ids = vec![unit_id_2, unit_id_3]; + let assignment_2 = distributor + .acquire_worker_cores(AcquireRequest { + unit_ids: unit_ids.clone(), + work_type: WorkType::CapacityCommitment, + }) + .unwrap(); + + assert_eq!( + assignment_1.cuid_cores.get(&unit_id_2), + assignment_2.cuid_cores.get(&unit_id_2) + ) + } + #[test] fn test_acquire_and_release_strict() { let cpu_topology = mocked_topology(); @@ -535,6 +598,7 @@ mod tests { available_cores: vec![PhysicalCoreId::new(2)], unit_id_mapping: vec![(PhysicalCoreId::new(3), init_id_1)], work_type_mapping: vec![(init_id_1, WorkType::Deal)], + cuid_cache: Default::default(), }; let (distributor, _task) = PersistentCoreDistributor::make_instance_with_task( temp_dir.into_path(), diff --git a/crates/core-distributor/src/dummy.rs b/crates/core-distributor/src/dummy.rs index 3046fbea64..b0fcc06001 100644 --- a/crates/core-distributor/src/dummy.rs +++ b/crates/core-distributor/src/dummy.rs @@ -62,4 +62,6 @@ impl CoreDistributor for DummyCoreDistibutor { fn get_system_cpu_assignment(&self) -> SystemAssignment { SystemAssignment::new(vec![PhysicalCoreId::new(0)], vec![LogicalCoreId::new(0)]) } + + fn cleanup_cache(&self, _allowed_cuids: &[CUID]) {} } diff --git a/crates/core-distributor/src/persistence.rs b/crates/core-distributor/src/persistence.rs index 716cef9765..fb8e0e5b01 100644 --- a/crates/core-distributor/src/persistence.rs +++ b/crates/core-distributor/src/persistence.rs @@ -95,6 +95,8 @@ pub struct PersistentCoreDistributorState { pub unit_id_mapping: Vec<(PhysicalCoreId, CUID)>, #[serde_as(as = "Vec<(Hex, _)>")] pub work_type_mapping: Vec<(CUID, WorkType)>, + #[serde_as(as = "Vec<(Hex, _)>")] + pub cuid_cache: Vec<(CUID, PhysicalCoreId)>, } impl PersistentCoreDistributorState { @@ -124,6 +126,7 @@ impl From<&CoreDistributorState> for PersistentCoreDistributorState { .iter() .map(|(k, v)| ((*k), *v)) .collect(), + cuid_cache: value.cuid_cache.iter().map(|(k, v)| ((*k), *v)).collect(), } } } @@ -137,7 +140,7 @@ mod tests { #[test] fn test_serde() { - let init_id_1 = + let unit_id_1 = ::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea") .unwrap(); let persistent_state = PersistentCoreDistributorState { @@ -153,15 +156,17 @@ mod tests { ], system_cores: vec![PhysicalCoreId::new(1)], available_cores: vec![PhysicalCoreId::new(2), PhysicalCoreId::new(3)], - unit_id_mapping: vec![(PhysicalCoreId::new(4), init_id_1)], - work_type_mapping: vec![(init_id_1, WorkType::Deal)], + unit_id_mapping: vec![(PhysicalCoreId::new(4), unit_id_1)], + work_type_mapping: vec![(unit_id_1, WorkType::Deal)], + cuid_cache: vec![(unit_id_1, PhysicalCoreId::new(1))], }; let actual = toml::to_string(&persistent_state).unwrap(); let expected = "cores_mapping = [[1, 1], [1, 2], [2, 3], [2, 4], [3, 5], [3, 6], [4, 7], [4, 8]]\n\ system_cores = [1]\n\ available_cores = [2, 3]\n\ unit_id_mapping = [[4, \"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\"]]\n\ - work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n"; + work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n\ + cuid_cache = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", 1]]\n"; assert_eq!(expected, actual) } } diff --git a/crates/core-distributor/src/strategy.rs b/crates/core-distributor/src/strategy.rs index b17e9d78b4..556c127f3d 100644 --- a/crates/core-distributor/src/strategy.rs +++ b/crates/core-distributor/src/strategy.rs @@ -109,17 +109,45 @@ impl AcquireStrategyOperations for StrictAcquireStrategy { }); } + let mut new_core_allocation = Vec::with_capacity(core_allocation.len()); for (unit_id, physical_core_id) in core_allocation { + match state.cuid_cache.get(&unit_id) { + None => new_core_allocation.push((unit_id, physical_core_id)), + Some(cached_physical_core_id) => { + let position = state + .available_cores + .iter() + .position(|core_id| core_id == cached_physical_core_id); + match position { + None => new_core_allocation.push((unit_id, physical_core_id)), + Some(index) => { + // SAFETY: this should never happen because we already found position in the previous step + let physical_core_id = state + .available_cores + .remove(index) + .expect("Unexpected state. Should not be empty never"); + + state.unit_id_mapping.insert(physical_core_id, unit_id); + state.work_type_mapping.insert(unit_id, *worker_unit_type); + Self::add_cuid_cores(state, unit_id, physical_core_id, &mut cuid_cores); + } + } + } + } + } + + for (unit_id, physical_core_id) in new_core_allocation { let physical_core_id = match physical_core_id { None => { // SAFETY: this should never happen because we already checked the availability of cores - let core_id = state + let physical_core_id = state .available_cores .pop_back() .expect("Unexpected state. Should not be empty never"); - state.unit_id_mapping.insert(core_id, unit_id); + state.unit_id_mapping.insert(physical_core_id, unit_id); state.work_type_mapping.insert(unit_id, *worker_unit_type); - core_id + state.cuid_cache.insert(unit_id, physical_core_id); + physical_core_id } Some(core_id) => { state.work_type_mapping.insert(unit_id, *worker_unit_type); @@ -127,21 +155,7 @@ impl AcquireStrategyOperations for StrictAcquireStrategy { } }; - // SAFETY: The physical core always has corresponding logical ids, - // unit_id_mapping can't have a wrong physical_core_id - let logical_core_ids = state - .cores_mapping - .get_vec(&physical_core_id) - .cloned() - .expect("Unexpected state. Should not be empty never"); - - cuid_cores.insert( - unit_id, - Cores { - physical_core_id, - logical_core_ids, - }, - ); + Self::add_cuid_cores(state, unit_id, physical_core_id, &mut cuid_cores); } Ok(Assignment::new(cuid_cores)) @@ -157,6 +171,31 @@ impl AcquireStrategyOperations for StrictAcquireStrategy { } } +impl StrictAcquireStrategy { + fn add_cuid_cores( + state: &CoreDistributorState, + unit_id: CUID, + physical_core_id: PhysicalCoreId, + cuid_cores: &mut Map, + ) { + // SAFETY: The physical core always has corresponding logical ids, + // unit_id_mapping can't have a wrong physical_core_id + let logical_core_ids = state + .cores_mapping + .get_vec(&physical_core_id) + .cloned() + .expect("Unexpected state. Should not be empty never"); + + cuid_cores.insert( + unit_id, + Cores { + physical_core_id, + logical_core_ids, + }, + ); + } +} + pub(crate) struct RoundRobinAcquireStrategy; impl AcquireStrategyOperations for RoundRobinAcquireStrategy { diff --git a/crates/nox-tests/tests/chain_listener/tests.rs b/crates/nox-tests/tests/chain_listener/tests.rs index 4991d13b77..f7ebe8a14f 100644 --- a/crates/nox-tests/tests/chain_listener/tests.rs +++ b/crates/nox-tests/tests/chain_listener/tests.rs @@ -85,6 +85,7 @@ impl ChainListenerTestEntities { max_batch_count: 0, max_proof_batch_size: 0, epoch_end_window: Default::default(), + ws_ping_period: Duration::from_secs(5), }); cfg.cc_events_dir = Some(cc_events_dir.clone()); @@ -393,8 +394,8 @@ async fn test_deal_insufficient_funds_flow() { assert_allocation( result_2.cu_allocation, vec![ - (PhysicalCoreId::new(125), cu_id_2), - (PhysicalCoreId::new(126), cu_id_3), + (PhysicalCoreId::new(125), cu_id_3), + (PhysicalCoreId::new(126), cu_id_2), ], ); } diff --git a/crates/server-config/src/defaults.rs b/crates/server-config/src/defaults.rs index 59cab61ec1..fd4ab81333 100644 --- a/crates/server-config/src/defaults.rs +++ b/crates/server-config/src/defaults.rs @@ -291,3 +291,7 @@ pub fn default_max_proof_batch_size() -> usize { pub fn default_epoch_end_window() -> Duration { Duration::from_secs(300) } + +pub fn default_ws_ping_period() -> Duration { + Duration::from_secs(10) +} diff --git a/crates/server-config/src/node_config.rs b/crates/server-config/src/node_config.rs index dc6ae668ea..8c44a209cc 100644 --- a/crates/server-config/src/node_config.rs +++ b/crates/server-config/src/node_config.rs @@ -570,6 +570,9 @@ pub struct ChainListenerConfig { #[serde(default = "default_epoch_end_window")] #[serde(with = "humantime_serde")] pub epoch_end_window: Duration, + #[serde(default = "default_ws_ping_period")] + #[serde(with = "humantime_serde")] + pub ws_ping_period: Duration, // TODO: must be >0 } /// Name of the effector module diff --git a/nox/src/node.rs b/nox/src/node.rs index f114459a36..3c2b12381c 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -142,7 +142,7 @@ async fn setup_listener( None }; - let ws_client = ChainListener::create_ws_client(&listener_config.ws_endpoint).await?; + let ws_client = ChainListener::create_ws_client(&listener_config).await?; let cc_events_dir = config.dir_config.cc_events_dir.clone(); let host_id = config.root_key_pair.get_peer_id();