From fbd4f6dabbfce6c501c32fb3853621d935e11bce Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 17 Sep 2024 23:06:38 +0300 Subject: [PATCH] feat(core-distributor): add cuid cache --- crates/chain-listener/src/listener.rs | 4 ++ crates/core-distributor/src/distributor.rs | 64 ++++++++++++++++++ crates/core-distributor/src/dummy.rs | 2 + crates/core-distributor/src/persistence.rs | 8 ++- crates/core-distributor/src/strategy.rs | 75 ++++++++++++++++------ 5 files changed, 134 insertions(+), 19 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 73c193819c..08f8d313c0 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -546,6 +546,10 @@ impl ChainListener { let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect(); + let current_units: Vec = units.iter().map(|unit| CUID::new(unit.id.0)).collect(); + self.core_distributor + .cleanup_cache(current_units.as_slice()); + self.cc_compute_units = units .into_iter() .map(|unit| (CUID::new(unit.id.0), unit)) diff --git a/crates/core-distributor/src/distributor.rs b/crates/core-distributor/src/distributor.rs index a086aaaf86..ec14cf8244 100644 --- a/crates/core-distributor/src/distributor.rs +++ b/crates/core-distributor/src/distributor.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID}; use cpu_utils::CPUTopology; use fxhash::FxBuildHasher; +use fxhash::FxHashSet; use parking_lot::RwLock; use range_set_blaze::RangeSetBlaze; @@ -44,6 +45,8 @@ pub trait CoreDistributor: Send + Sync { fn release_worker_cores(&self, unit_ids: &[CUID]); fn get_system_cpu_assignment(&self) -> SystemAssignment; + + fn cleanup_cache(&self, allowed_cuids: &[CUID]); } /// `PersistentCoreDistributor` is a CPU core distributor responsible for allocating and releasing CPU cores @@ -67,6 +70,7 @@ impl From for CoreDistributorState { available_cores: value.available_cores.into_iter().collect(), unit_id_mapping: value.unit_id_mapping.into_iter().collect(), work_type_mapping: value.work_type_mapping.into_iter().collect(), + cuid_cache: value.cpu_cache.into_iter().collect(), } } } @@ -203,12 +207,16 @@ impl PersistentCoreDistributor { let type_mapping = Map::with_capacity_and_hasher(available_core_count, FxBuildHasher::default()); + let cpu_cache = + Map::with_capacity_and_hasher(available_core_count, FxBuildHasher::default()); + let inner_state = CoreDistributorState { cores_mapping, system_cores, available_cores, unit_id_mapping, work_type_mapping: type_mapping, + cuid_cache: cpu_cache, }; let result = Self::make_instance_with_task(file_name, inner_state, acquire_strategy); @@ -288,6 +296,13 @@ impl CoreDistributor for PersistentCoreDistributor { } SystemAssignment::new(lock.system_cores.clone(), logical_core_ids) } + + fn cleanup_cache(&self, allowed_cuids: &[CUID]) { + let mut lock = self.state.write(); + let allowed_unit_ids: FxHashSet = allowed_cuids.iter().cloned().collect(); + lock.cuid_cache + .retain(|cuid, _| allowed_unit_ids.contains(cuid)) + } } pub(crate) struct CoreDistributorState { @@ -302,6 +317,8 @@ pub(crate) struct CoreDistributorState { pub unit_id_mapping: BiMap, // mapping between unit id and workload type pub work_type_mapping: Map, + // cache + pub cuid_cache: Map, } #[cfg(test)] @@ -431,6 +448,52 @@ mod tests { assert_eq!(assignment_1, assignment_3); } + #[test] + fn test_acquire_same_cuid_strict() { + let cpu_topology = mocked_topology(); + let temp_dir = tempfile::tempdir().expect("Failed to create temp dir"); + + let (distributor, _task) = PersistentCoreDistributor::from_path( + temp_dir.path().join("test.toml"), + 2, + CoreRange::from_str("0-7").unwrap(), + AcquireStrategy::Strict, + &cpu_topology, + ) + .unwrap(); + let unit_id_1 = + ::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea") + .unwrap(); + let unit_id_2 = + ::from_hex("1cce3d08f784b11d636f2fb55adf291d43c2e9cbe7ae7eeb2d0301a96be0a3a0") + .unwrap(); + let unit_id_3 = + ::from_hex("1cce3d08f784b11d636f2fb55adf291d43c2e9cbe7ae7eeb2d0301a96be0a3d0") + .unwrap(); + let all_unit_ids = vec![unit_id_1, unit_id_2, unit_id_3]; + + let unit_ids = vec![unit_id_1, unit_id_2]; + let assignment_1 = distributor + .acquire_worker_cores(AcquireRequest { + unit_ids: unit_ids.clone(), + work_type: WorkType::CapacityCommitment, + }) + .unwrap(); + distributor.release_worker_cores(all_unit_ids.as_slice()); + let unit_ids = vec![unit_id_2, unit_id_3]; + let assignment_2 = distributor + .acquire_worker_cores(AcquireRequest { + unit_ids: unit_ids.clone(), + work_type: WorkType::CapacityCommitment, + }) + .unwrap(); + + assert_eq!( + assignment_1.cuid_cores.get(&unit_id_2), + assignment_2.cuid_cores.get(&unit_id_2) + ) + } + #[test] fn test_acquire_and_release_strict() { let cpu_topology = mocked_topology(); @@ -535,6 +598,7 @@ mod tests { available_cores: vec![PhysicalCoreId::new(2)], unit_id_mapping: vec![(PhysicalCoreId::new(3), init_id_1)], work_type_mapping: vec![(init_id_1, WorkType::Deal)], + cpu_cache: Default::default(), }; let (distributor, _task) = PersistentCoreDistributor::make_instance_with_task( temp_dir.into_path(), diff --git a/crates/core-distributor/src/dummy.rs b/crates/core-distributor/src/dummy.rs index 3046fbea64..b0fcc06001 100644 --- a/crates/core-distributor/src/dummy.rs +++ b/crates/core-distributor/src/dummy.rs @@ -62,4 +62,6 @@ impl CoreDistributor for DummyCoreDistibutor { fn get_system_cpu_assignment(&self) -> SystemAssignment { SystemAssignment::new(vec![PhysicalCoreId::new(0)], vec![LogicalCoreId::new(0)]) } + + fn cleanup_cache(&self, _allowed_cuids: &[CUID]) {} } diff --git a/crates/core-distributor/src/persistence.rs b/crates/core-distributor/src/persistence.rs index 716cef9765..2f8fce53c6 100644 --- a/crates/core-distributor/src/persistence.rs +++ b/crates/core-distributor/src/persistence.rs @@ -33,6 +33,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::errors::PersistError; use crate::types::WorkType; +use crate::Map; pub(crate) trait StatePersister: Send + Sync { fn persist(&self) -> Result<(), PersistError>; @@ -95,6 +96,8 @@ pub struct PersistentCoreDistributorState { pub unit_id_mapping: Vec<(PhysicalCoreId, CUID)>, #[serde_as(as = "Vec<(Hex, _)>")] pub work_type_mapping: Vec<(CUID, WorkType)>, + #[serde_as(as = "Vec<(Hex, _)>")] + pub cpu_cache: Map, } impl PersistentCoreDistributorState { @@ -124,6 +127,7 @@ impl From<&CoreDistributorState> for PersistentCoreDistributorState { .iter() .map(|(k, v)| ((*k), *v)) .collect(), + cpu_cache: value.cuid_cache.iter().map(|(k, v)| ((*k), *v)).collect(), } } } @@ -155,13 +159,15 @@ mod tests { available_cores: vec![PhysicalCoreId::new(2), PhysicalCoreId::new(3)], unit_id_mapping: vec![(PhysicalCoreId::new(4), init_id_1)], work_type_mapping: vec![(init_id_1, WorkType::Deal)], + cpu_cache: Default::default(), }; let actual = toml::to_string(&persistent_state).unwrap(); let expected = "cores_mapping = [[1, 1], [1, 2], [2, 3], [2, 4], [3, 5], [3, 6], [4, 7], [4, 8]]\n\ system_cores = [1]\n\ available_cores = [2, 3]\n\ unit_id_mapping = [[4, \"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\"]]\n\ - work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n"; + work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n\ + cpu_cache = []\n"; assert_eq!(expected, actual) } } diff --git a/crates/core-distributor/src/strategy.rs b/crates/core-distributor/src/strategy.rs index b17e9d78b4..556c127f3d 100644 --- a/crates/core-distributor/src/strategy.rs +++ b/crates/core-distributor/src/strategy.rs @@ -109,17 +109,45 @@ impl AcquireStrategyOperations for StrictAcquireStrategy { }); } + let mut new_core_allocation = Vec::with_capacity(core_allocation.len()); for (unit_id, physical_core_id) in core_allocation { + match state.cuid_cache.get(&unit_id) { + None => new_core_allocation.push((unit_id, physical_core_id)), + Some(cached_physical_core_id) => { + let position = state + .available_cores + .iter() + .position(|core_id| core_id == cached_physical_core_id); + match position { + None => new_core_allocation.push((unit_id, physical_core_id)), + Some(index) => { + // SAFETY: this should never happen because we already found position in the previous step + let physical_core_id = state + .available_cores + .remove(index) + .expect("Unexpected state. Should not be empty never"); + + state.unit_id_mapping.insert(physical_core_id, unit_id); + state.work_type_mapping.insert(unit_id, *worker_unit_type); + Self::add_cuid_cores(state, unit_id, physical_core_id, &mut cuid_cores); + } + } + } + } + } + + for (unit_id, physical_core_id) in new_core_allocation { let physical_core_id = match physical_core_id { None => { // SAFETY: this should never happen because we already checked the availability of cores - let core_id = state + let physical_core_id = state .available_cores .pop_back() .expect("Unexpected state. Should not be empty never"); - state.unit_id_mapping.insert(core_id, unit_id); + state.unit_id_mapping.insert(physical_core_id, unit_id); state.work_type_mapping.insert(unit_id, *worker_unit_type); - core_id + state.cuid_cache.insert(unit_id, physical_core_id); + physical_core_id } Some(core_id) => { state.work_type_mapping.insert(unit_id, *worker_unit_type); @@ -127,21 +155,7 @@ impl AcquireStrategyOperations for StrictAcquireStrategy { } }; - // SAFETY: The physical core always has corresponding logical ids, - // unit_id_mapping can't have a wrong physical_core_id - let logical_core_ids = state - .cores_mapping - .get_vec(&physical_core_id) - .cloned() - .expect("Unexpected state. Should not be empty never"); - - cuid_cores.insert( - unit_id, - Cores { - physical_core_id, - logical_core_ids, - }, - ); + Self::add_cuid_cores(state, unit_id, physical_core_id, &mut cuid_cores); } Ok(Assignment::new(cuid_cores)) @@ -157,6 +171,31 @@ impl AcquireStrategyOperations for StrictAcquireStrategy { } } +impl StrictAcquireStrategy { + fn add_cuid_cores( + state: &CoreDistributorState, + unit_id: CUID, + physical_core_id: PhysicalCoreId, + cuid_cores: &mut Map, + ) { + // SAFETY: The physical core always has corresponding logical ids, + // unit_id_mapping can't have a wrong physical_core_id + let logical_core_ids = state + .cores_mapping + .get_vec(&physical_core_id) + .cloned() + .expect("Unexpected state. Should not be empty never"); + + cuid_cores.insert( + unit_id, + Cores { + physical_core_id, + logical_core_ids, + }, + ); + } +} + pub(crate) struct RoundRobinAcquireStrategy; impl AcquireStrategyOperations for RoundRobinAcquireStrategy {