Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broker properties not getting updated on Configuration change #711

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 84 additions & 57 deletions agent/src/discovery_handler_manager/discovery_handler_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ pub trait DiscoveryHandlerEndpoint: Send + Sync {
/// results across the different registered handlers of that type, and generate the Instance objects for discovered
/// devices.
#[cfg_attr(test, automock)]
#[async_trait]
pub trait DiscoveryHandlerRequest: Sync + Send {
fn get_instances(&self) -> Result<Vec<Instance>, DiscoveryError>;
async fn get_instances(&self) -> Result<Vec<Instance>, DiscoveryError>;
async fn set_extra_device_properties(&self, extra_device_properties: HashMap<String, String>);
}

/// This trait is here to help with testing for code that interract with the discovery handler registry
Expand Down Expand Up @@ -169,35 +171,60 @@ pub trait DiscoveryHandlerRegistry: Sync + Send {
/// Real world implementation of the Discovery Handler Request
struct DHRequestImpl {
endpoints: RwLock<Vec<watch::Receiver<Vec<Arc<DiscoveredDevice>>>>>,
notifier: watch::Sender<Vec<Arc<DiscoveredDevice>>>,
notifier: watch::Sender<crate::device_manager::cdi::Kind>,
key: String,
handler_name: String,
details: String,
properties: Vec<DiscoveryProperty>,
extra_device_properties: HashMap<String, String>,
extra_device_properties: RwLock<HashMap<String, String>>,
kube_client: Arc<dyn DiscoveryManagerKubeInterface>,
termination_notifier: Arc<Notify>,
}

#[async_trait]
impl DiscoveryHandlerRequest for DHRequestImpl {
fn get_instances(&self) -> Result<Vec<Instance>, DiscoveryError> {
async fn get_instances(&self) -> Result<Vec<Instance>, DiscoveryError> {
let properties = self.extra_device_properties.read().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same point here: could this skipped and instead let the called function (device_to_instance) access the properties? Or is the concern wanting to avoid too many reads? It is RWLock, so reads should be cheaper. I could also see how your strategy is trying to avoid a racecase where some containers have properties that were written after the initial call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acquiring the lock is an async operation, you cannot do this from within the map call (same reason for not doing it in device_to_instance).
But in the end I find it better anyway, as it means we have a consistent set of properties for the entire run

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

device_to_instance could be made async but i am aligned on wanting the consistent set of properties

Ok(self
.notifier
.borrow()
.endpoints
.read()
.await
.iter()
.map(|i| self.device_to_instance(i))
.flat_map(|r| r.borrow().clone().into_iter())
.map(|i| self.device_to_instance(i.as_ref(), &properties))
.collect())
}

async fn set_extra_device_properties(&self, extra_device_properties: HashMap<String, String>) {
let mut current = self.extra_device_properties.write().await;
if extra_device_properties != *current {
let edit = extra_device_properties
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect();
*current = extra_device_properties;
self.notifier
.send_modify(|k| k.container_edits.first_mut().unwrap().env = edit);
}
}
}

impl DHRequestImpl {
fn device_to_instance(&self, dev: &DiscoveredDevice) -> Instance {
fn device_to_instance(
&self,
dev: &DiscoveredDevice,
extra_device_properties: &HashMap<String, String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you passing in properties instead of reading them from self.extra_device_properties?

) -> Instance {
let (rdev, shared) = match dev {
DiscoveredDevice::LocalDevice(d, _) => (d, false),
DiscoveredDevice::SharedDevice(d) => (d, true),
};
let mut properties = rdev.properties.clone();
properties.extend(self.extra_device_properties.clone());
properties.extend(
extra_device_properties
.iter()
.map(|(k, v)| (k.clone(), v.clone())),
);
Instance {
spec: InstanceSpec {
cdi_name: self.get_device_cdi_fqdn(dev),
Expand Down Expand Up @@ -255,13 +282,27 @@ impl DHRequestImpl {
.await
.iter_mut()
.flat_map(|r| r.borrow_and_update().clone().into_iter())
.unique_by(|d| self.get_device_cdi_fqdn(d))
.collect();
self.notifier.send_replace(
devices
.into_iter()
.unique_by(|d| self.get_device_cdi_fqdn(d))
.collect(),
);
self.notifier
.send_replace(crate::device_manager::cdi::Kind {
kind: format!("{}/{}", AKRI_PREFIX, self.key),
annotations: Default::default(),
devices: devices
.into_iter()
.map(|d| d.as_ref().clone().into())
.collect(),
container_edits: vec![ContainerEdit {
env: self
.extra_device_properties
.read()
.await
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect(),
..Default::default()
}],
});
}
}

Expand Down Expand Up @@ -324,41 +365,23 @@ impl DHRegistryImpl {
}

async fn handle_request(
mut req_notifier: watch::Receiver<Vec<Arc<DiscoveredDevice>>>,
key: String,
mut req_notifier: watch::Receiver<crate::device_manager::cdi::Kind>,
key: &String,
namespace: &String,
cdi_sender: Arc<Mutex<watch::Sender<HashMap<String, crate::device_manager::cdi::Kind>>>>,
local_config_sender: mpsc::Sender<ObjectRef<Configuration>>,
extra_device_properties: HashMap<String, String>,
) {
let cdi_kind = format!("{}/{}", AKRI_PREFIX, key);
loop {
match req_notifier.changed().await {
Ok(_) => {
cdi_sender.lock().await.send_modify(|kind| {
kind.insert(
cdi_kind.clone(),
crate::device_manager::cdi::Kind {
kind: cdi_kind.clone(),
annotations: Default::default(),
devices: req_notifier
.borrow_and_update()
.iter()
.map(|d| d.as_ref().clone().into())
.collect(),
container_edits: vec![ContainerEdit {
env: extra_device_properties
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect(),
..Default::default()
}],
},
);
let kind = req_notifier.borrow_and_update().clone();
cdi_sender.lock().await.send_modify(|kinds| {
kinds.insert(cdi_kind.clone(), kind);
});
trace!("Ask for reconciliation of {}::{}", namespace, key);
let res = local_config_sender
.send(ObjectRef::<Configuration>::new(&key).within(namespace))
.send(ObjectRef::<Configuration>::new(key).within(namespace))
.await;
if res.is_err() {
cdi_sender.lock().await.send_modify(|kind| {
Expand All @@ -370,7 +393,7 @@ async fn handle_request(
Err(_) => {
trace!("Ask for reconciliation of {}::{}", namespace, key);
let _ = local_config_sender
.send(ObjectRef::<Configuration>::new(&key).within(namespace))
.send(ObjectRef::<Configuration>::new(key).within(namespace))
.await;
cdi_sender.lock().await.send_modify(|kind| {
kind.remove(&cdi_kind);
Expand All @@ -394,7 +417,7 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl {
) -> Result<(), DiscoveryError> {
match self.handlers.read().await.get(dh_name) {
Some(handlers) => {
let (notifier, _) = watch::channel(vec![]);
let (notifier, _) = watch::channel(Default::default());
let terminated = Arc::new(Notify::new());
let mut dh_req = DHRequestImpl {
endpoints: Default::default(),
Expand All @@ -403,7 +426,7 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl {
handler_name: dh_name.to_string(),
details: dh_details.to_string(),
properties: dh_properties.to_vec(),
extra_device_properties: extra_device_properties.clone(),
extra_device_properties: RwLock::new(extra_device_properties),
kube_client: self.kube_client.clone(),
termination_notifier: terminated.clone(),
};
Expand Down Expand Up @@ -433,11 +456,10 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl {
tokio::spawn(async move {
handle_request(
local_req_notifier,
local_key,
&local_key,
&namespace,
local_cdi_sender,
local_config_sender,
extra_device_properties,
)
.await
});
Expand Down Expand Up @@ -608,9 +630,9 @@ mod tests {
);
}

#[test]
fn test_dh_request_impl_get_instances() {
let (notifier, _) = watch::channel(vec![Arc::new(DiscoveredDevice::LocalDevice(
#[tokio::test]
async fn test_dh_request_impl_get_instances() {
let (_, notifier) = watch::channel(vec![Arc::new(DiscoveredDevice::LocalDevice(
Device {
id: "my_local_device".to_owned(),
properties: HashMap::from([(
Expand All @@ -622,23 +644,25 @@ mod tests {
},
"my_node".to_owned(),
))]);
let endpoints = RwLock::new(vec![notifier]);
let (cdi_notifier, _) = watch::channel(Default::default());
let req = DHRequestImpl {
endpoints: Default::default(),
notifier,
endpoints,
notifier: cdi_notifier,
key: "my_config".to_owned(),
handler_name: "mock_handler".to_string(),
details: Default::default(),
properties: Default::default(),
extra_device_properties: HashMap::from([(
extra_device_properties: RwLock::new(HashMap::from([(
"MY_EXTRA_KEY".to_owned(),
"value".to_owned(),
)]),
)])),
kube_client: Arc::new(MockDiscoveryManagerKubeInterface::new()),
termination_notifier: Arc::new(Notify::new()),
};

assert_eq!(
req.get_instances().unwrap(),
req.get_instances().await.unwrap(),
vec![Instance {
metadata: ObjectMeta {
name: Some("my_config-e77db4".to_owned()),
Expand All @@ -662,7 +686,7 @@ mod tests {

#[tokio::test]
async fn test_dh_request_impl_watch_devices() {
let (notifier, mut n_rec) = watch::channel(vec![]);
let (notifier, mut n_rec) = watch::channel(Default::default());
let (dh_send, dh_rec) = watch::channel(Default::default());
let req = Arc::new(DHRequestImpl {
endpoints: RwLock::new(vec![dh_rec]),
Expand All @@ -675,10 +699,10 @@ mod tests {
value: Some("value_1".to_string()),
value_from: None,
}],
extra_device_properties: HashMap::from([(
extra_device_properties: RwLock::new(HashMap::from([(
"MY_EXTRA_KEY".to_owned(),
"value".to_owned(),
)]),
)])),
kube_client: Arc::new(MockDiscoveryManagerKubeInterface::new()),
termination_notifier: Arc::new(Notify::new()),
});
Expand All @@ -687,7 +711,7 @@ mod tests {
let (new_dh_sen, rec) = broadcast::channel(1);

let task = tokio::spawn(async move { req_ref.watch_devices(rec).await });
assert!(n_rec.borrow_and_update().is_empty());
assert!(n_rec.borrow_and_update().devices.is_empty());

let new_device = Arc::new(DiscoveredDevice::SharedDevice(Device {
id: "my_shared_device".to_owned(),
Expand All @@ -698,7 +722,10 @@ mod tests {
dh_send.send(vec![new_device.clone()]).unwrap();

tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(n_rec.borrow_and_update().clone(), vec![new_device]);
assert_eq!(
n_rec.borrow_and_update().devices.clone(),
vec![new_device.as_ref().clone().into()]
);

let mut new_dh = MockDiscoveryHandlerEndpoint::new();
let new_dh_senders = Arc::new(std::sync::Mutex::new(vec![]));
Expand Down
29 changes: 18 additions & 11 deletions agent/src/util/discovery_configuration_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,21 @@ pub async fn reconcile(

let discovered_instances: Vec<Instance> =
match ctx.dh_registry.get_request(&dc.name_any()).await {
Some(req) => req
.get_instances()?
.into_iter()
.map(|mut instance| {
// Add
instance.spec.nodes = vec![ctx.agent_identifier.to_owned()];
instance.owner_references_mut().push(owner_ref.clone());
instance.spec.capacity = dc.spec.capacity;
instance
})
.collect(),
Some(req) => {
req.set_extra_device_properties(dc.spec.broker_properties.clone())
.await;
req.get_instances()
.await?
.into_iter()
.map(|mut instance| {
// Add
instance.spec.nodes = vec![ctx.agent_identifier.to_owned()];
instance.owner_references_mut().push(owner_ref.clone());
instance.spec.capacity = dc.spec.capacity;
instance
})
.collect()
}
None => {
ctx.dh_registry
.new_request(
Expand Down Expand Up @@ -474,6 +478,9 @@ mod tests {

let mut registry = MockDiscoveryHandlerRegistry::new();
let mut request = MockDiscoveryHandlerRequest::new();
request
.expect_set_extra_device_properties()
.returning(|_| {});
request.expect_get_instances().returning(|| Ok(vec![]));
registry
.expect_get_request()
Expand Down
Loading