Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-distributor)!: add cuid cache #2381

Merged
merged 9 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ use crate::proof_tracker::ProofTracker;
use crate::types::{CUGroups, PhysicalCoreGroups};

const PROOF_POLL_LIMIT: usize = 50;
// TODO: move to config
const WS_PING_PERIOD_SEC: u64 = 10;

#[derive(Clone)]
struct OnChainWorker {
Expand Down Expand Up @@ -457,19 +455,17 @@ impl ChainListener {
}
}

pub async fn create_ws_client(ws_endpoint: &str) -> Result<WsClient, client::Error> {
pub async fn create_ws_client(config: &ChainListenerConfig) -> Result<WsClient, client::Error> {
let ws_client = retry(ExponentialBackoff::default(), || async {
let client = WsClientBuilder::default()
.enable_ws_ping(
PingConfig::new().ping_interval(Duration::from_secs(WS_PING_PERIOD_SEC)),
)
.build(ws_endpoint)
.enable_ws_ping(PingConfig::new().ping_interval(config.ws_ping_period))
.build(config.ws_endpoint.clone())
.await
.map_err(|err| {
tracing::warn!(
target: "chain-listener",
"Error connecting to websocket endpoint {}, error: {}; Retrying...",
ws_endpoint,
config.ws_endpoint,
err
);
err
Expand All @@ -482,7 +478,7 @@ impl ChainListener {
tracing::info!(
target: "chain-listener",
"Successfully connected to websocket endpoint: {}",
ws_endpoint
config.ws_endpoint
);

Ok(ws_client)
Expand All @@ -506,8 +502,7 @@ impl ChainListener {

async fn refresh_subscriptions(&mut self) -> Result<(), client::Error> {
if !self.ws_client.is_connected() {
self.ws_client =
ChainListener::create_ws_client(&self.listener_config.ws_endpoint).await?;
self.ws_client = ChainListener::create_ws_client(&self.listener_config).await?;
}

// loop because subscriptions can fail and require reconnection, we can't proceed without them
Expand All @@ -532,8 +527,7 @@ impl ChainListener {
client::Error::RestartNeeded(_) => {
tracing::warn!(target: "chain-listener", "Failed to refresh subscriptions: {err}; Restart client...");
self.ws_client =
ChainListener::create_ws_client(&self.listener_config.ws_endpoint)
.await?;
ChainListener::create_ws_client(&self.listener_config).await?;
}
_ => {
tracing::error!(target: "chain-listener", "Failed to refresh subscriptions: {err}; Retrying...");
Expand All @@ -551,6 +545,10 @@ impl ChainListener {

let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect();

let current_units: Vec<CUID> = units.iter().map(|unit| CUID::new(unit.id.0)).collect();
self.core_distributor
.cleanup_cache(current_units.as_slice());

self.cc_compute_units = units
.into_iter()
.map(|unit| (CUID::new(unit.id.0), unit))
Expand Down
64 changes: 64 additions & 0 deletions crates/core-distributor/src/distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID};
use cpu_utils::CPUTopology;
use fxhash::FxBuildHasher;
use fxhash::FxHashSet;
use parking_lot::RwLock;
use range_set_blaze::RangeSetBlaze;

Expand All @@ -44,6 +45,8 @@ pub trait CoreDistributor: Send + Sync {
fn release_worker_cores(&self, unit_ids: &[CUID]);

fn get_system_cpu_assignment(&self) -> SystemAssignment;

fn cleanup_cache(&self, allowed_cuids: &[CUID]);
}

/// `PersistentCoreDistributor` is a CPU core distributor responsible for allocating and releasing CPU cores
Expand All @@ -67,6 +70,7 @@ impl From<PersistentCoreDistributorState> for CoreDistributorState {
available_cores: value.available_cores.into_iter().collect(),
unit_id_mapping: value.unit_id_mapping.into_iter().collect(),
work_type_mapping: value.work_type_mapping.into_iter().collect(),
cuid_cache: value.cuid_cache.into_iter().collect(),
}
}
}
Expand Down Expand Up @@ -203,12 +207,16 @@ impl PersistentCoreDistributor {
let type_mapping =
Map::with_capacity_and_hasher(available_core_count, FxBuildHasher::default());

let cpu_cache =
Map::with_capacity_and_hasher(available_core_count, FxBuildHasher::default());

let inner_state = CoreDistributorState {
cores_mapping,
system_cores,
available_cores,
unit_id_mapping,
work_type_mapping: type_mapping,
cuid_cache: cpu_cache,
};

let result = Self::make_instance_with_task(file_name, inner_state, acquire_strategy);
Expand Down Expand Up @@ -288,6 +296,13 @@ impl CoreDistributor for PersistentCoreDistributor {
}
SystemAssignment::new(lock.system_cores.clone(), logical_core_ids)
}

fn cleanup_cache(&self, allowed_cuids: &[CUID]) {
let mut lock = self.state.write();
let allowed_unit_ids: FxHashSet<CUID> = allowed_cuids.iter().cloned().collect();
lock.cuid_cache
.retain(|cuid, _| allowed_unit_ids.contains(cuid))
}
}

pub(crate) struct CoreDistributorState {
Expand All @@ -302,6 +317,8 @@ pub(crate) struct CoreDistributorState {
pub unit_id_mapping: BiMap<PhysicalCoreId, CUID>,
// mapping between unit id and workload type
pub work_type_mapping: Map<CUID, WorkType>,
// cache
pub cuid_cache: Map<CUID, PhysicalCoreId>,
}

#[cfg(test)]
Expand Down Expand Up @@ -431,6 +448,52 @@ mod tests {
assert_eq!(assignment_1, assignment_3);
}

#[test]
fn test_acquire_same_cuid_strict() {
let cpu_topology = mocked_topology();
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");

let (distributor, _task) = PersistentCoreDistributor::from_path(
temp_dir.path().join("test.toml"),
2,
CoreRange::from_str("0-7").unwrap(),
AcquireStrategy::Strict,
&cpu_topology,
)
.unwrap();
let unit_id_1 =
<CUID>::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea")
.unwrap();
let unit_id_2 =
<CUID>::from_hex("1cce3d08f784b11d636f2fb55adf291d43c2e9cbe7ae7eeb2d0301a96be0a3a0")
.unwrap();
let unit_id_3 =
<CUID>::from_hex("1cce3d08f784b11d636f2fb55adf291d43c2e9cbe7ae7eeb2d0301a96be0a3d0")
.unwrap();
let all_unit_ids = vec![unit_id_1, unit_id_2, unit_id_3];

let unit_ids = vec![unit_id_1, unit_id_2];
let assignment_1 = distributor
.acquire_worker_cores(AcquireRequest {
unit_ids: unit_ids.clone(),
work_type: WorkType::CapacityCommitment,
})
.unwrap();
distributor.release_worker_cores(all_unit_ids.as_slice());
let unit_ids = vec![unit_id_2, unit_id_3];
let assignment_2 = distributor
.acquire_worker_cores(AcquireRequest {
unit_ids: unit_ids.clone(),
work_type: WorkType::CapacityCommitment,
})
.unwrap();

assert_eq!(
assignment_1.cuid_cores.get(&unit_id_2),
assignment_2.cuid_cores.get(&unit_id_2)
)
}

#[test]
fn test_acquire_and_release_strict() {
let cpu_topology = mocked_topology();
Expand Down Expand Up @@ -535,6 +598,7 @@ mod tests {
available_cores: vec![PhysicalCoreId::new(2)],
unit_id_mapping: vec![(PhysicalCoreId::new(3), init_id_1)],
work_type_mapping: vec![(init_id_1, WorkType::Deal)],
cuid_cache: Default::default(),
};
let (distributor, _task) = PersistentCoreDistributor::make_instance_with_task(
temp_dir.into_path(),
Expand Down
2 changes: 2 additions & 0 deletions crates/core-distributor/src/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ impl CoreDistributor for DummyCoreDistibutor {
fn get_system_cpu_assignment(&self) -> SystemAssignment {
SystemAssignment::new(vec![PhysicalCoreId::new(0)], vec![LogicalCoreId::new(0)])
}

fn cleanup_cache(&self, _allowed_cuids: &[CUID]) {}
}
13 changes: 9 additions & 4 deletions crates/core-distributor/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct PersistentCoreDistributorState {
pub unit_id_mapping: Vec<(PhysicalCoreId, CUID)>,
#[serde_as(as = "Vec<(Hex, _)>")]
pub work_type_mapping: Vec<(CUID, WorkType)>,
#[serde_as(as = "Vec<(Hex, _)>")]
pub cuid_cache: Vec<(CUID, PhysicalCoreId)>,
}

impl PersistentCoreDistributorState {
Expand Down Expand Up @@ -124,6 +126,7 @@ impl From<&CoreDistributorState> for PersistentCoreDistributorState {
.iter()
.map(|(k, v)| ((*k), *v))
.collect(),
cuid_cache: value.cuid_cache.iter().map(|(k, v)| ((*k), *v)).collect(),
}
}
}
Expand All @@ -137,7 +140,7 @@ mod tests {

#[test]
fn test_serde() {
let init_id_1 =
let unit_id_1 =
<CUID>::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea")
.unwrap();
let persistent_state = PersistentCoreDistributorState {
Expand All @@ -153,15 +156,17 @@ mod tests {
],
system_cores: vec![PhysicalCoreId::new(1)],
available_cores: vec![PhysicalCoreId::new(2), PhysicalCoreId::new(3)],
unit_id_mapping: vec![(PhysicalCoreId::new(4), init_id_1)],
work_type_mapping: vec![(init_id_1, WorkType::Deal)],
unit_id_mapping: vec![(PhysicalCoreId::new(4), unit_id_1)],
work_type_mapping: vec![(unit_id_1, WorkType::Deal)],
cuid_cache: vec![(unit_id_1, PhysicalCoreId::new(1))],
};
let actual = toml::to_string(&persistent_state).unwrap();
let expected = "cores_mapping = [[1, 1], [1, 2], [2, 3], [2, 4], [3, 5], [3, 6], [4, 7], [4, 8]]\n\
system_cores = [1]\n\
available_cores = [2, 3]\n\
unit_id_mapping = [[4, \"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\"]]\n\
work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n";
work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n\
cuid_cache = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", 1]]\n";
assert_eq!(expected, actual)
}
}
75 changes: 57 additions & 18 deletions crates/core-distributor/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,39 +109,53 @@ impl AcquireStrategyOperations for StrictAcquireStrategy {
});
}

let mut new_core_allocation = Vec::with_capacity(core_allocation.len());
for (unit_id, physical_core_id) in core_allocation {
match state.cuid_cache.get(&unit_id) {
None => new_core_allocation.push((unit_id, physical_core_id)),
Some(cached_physical_core_id) => {
let position = state
.available_cores
.iter()
.position(|core_id| core_id == cached_physical_core_id);
match position {
None => new_core_allocation.push((unit_id, physical_core_id)),
Some(index) => {
// SAFETY: this should never happen because we already found position in the previous step
let physical_core_id = state
.available_cores
.remove(index)
.expect("Unexpected state. Should not be empty never");

state.unit_id_mapping.insert(physical_core_id, unit_id);
state.work_type_mapping.insert(unit_id, *worker_unit_type);
Self::add_cuid_cores(state, unit_id, physical_core_id, &mut cuid_cores);
}
}
}
}
}

for (unit_id, physical_core_id) in new_core_allocation {
let physical_core_id = match physical_core_id {
None => {
// SAFETY: this should never happen because we already checked the availability of cores
let core_id = state
let physical_core_id = state
.available_cores
.pop_back()
.expect("Unexpected state. Should not be empty never");
state.unit_id_mapping.insert(core_id, unit_id);
state.unit_id_mapping.insert(physical_core_id, unit_id);
state.work_type_mapping.insert(unit_id, *worker_unit_type);
core_id
state.cuid_cache.insert(unit_id, physical_core_id);
physical_core_id
}
Some(core_id) => {
state.work_type_mapping.insert(unit_id, *worker_unit_type);
core_id
}
};

// SAFETY: The physical core always has corresponding logical ids,
// unit_id_mapping can't have a wrong physical_core_id
let logical_core_ids = state
.cores_mapping
.get_vec(&physical_core_id)
.cloned()
.expect("Unexpected state. Should not be empty never");

cuid_cores.insert(
unit_id,
Cores {
physical_core_id,
logical_core_ids,
},
);
Self::add_cuid_cores(state, unit_id, physical_core_id, &mut cuid_cores);
}

Ok(Assignment::new(cuid_cores))
Expand All @@ -157,6 +171,31 @@ impl AcquireStrategyOperations for StrictAcquireStrategy {
}
}

impl StrictAcquireStrategy {
fn add_cuid_cores(
state: &CoreDistributorState,
unit_id: CUID,
physical_core_id: PhysicalCoreId,
cuid_cores: &mut Map<CUID, Cores>,
) {
// SAFETY: The physical core always has corresponding logical ids,
// unit_id_mapping can't have a wrong physical_core_id
let logical_core_ids = state
.cores_mapping
.get_vec(&physical_core_id)
.cloned()
.expect("Unexpected state. Should not be empty never");

cuid_cores.insert(
unit_id,
Cores {
physical_core_id,
logical_core_ids,
},
);
}
}

pub(crate) struct RoundRobinAcquireStrategy;

impl AcquireStrategyOperations for RoundRobinAcquireStrategy {
Expand Down
5 changes: 3 additions & 2 deletions crates/nox-tests/tests/chain_listener/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl ChainListenerTestEntities {
max_batch_count: 0,
max_proof_batch_size: 0,
epoch_end_window: Default::default(),
ws_ping_period: Default::default(),
});

cfg.cc_events_dir = Some(cc_events_dir.clone());
Expand Down Expand Up @@ -393,8 +394,8 @@ async fn test_deal_insufficient_funds_flow() {
assert_allocation(
result_2.cu_allocation,
vec![
(PhysicalCoreId::new(125), cu_id_2),
(PhysicalCoreId::new(126), cu_id_3),
(PhysicalCoreId::new(125), cu_id_3),
(PhysicalCoreId::new(126), cu_id_2),
],
);
}
Expand Down
4 changes: 4 additions & 0 deletions crates/server-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,7 @@ pub fn default_max_proof_batch_size() -> usize {
pub fn default_epoch_end_window() -> Duration {
Duration::from_secs(300)
}

pub fn default_ws_ping_period() -> Duration {
Duration::from_secs(10)
}
3 changes: 3 additions & 0 deletions crates/server-config/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,9 @@ pub struct ChainListenerConfig {
#[serde(default = "default_epoch_end_window")]
#[serde(with = "humantime_serde")]
pub epoch_end_window: Duration,
#[serde(default = "default_ws_ping_period")]
#[serde(with = "humantime_serde")]
pub ws_ping_period: Duration, // TODO: must be >0
}

/// Name of the effector module
Expand Down
Loading
Loading