Skip to content

Commit

Permalink
added a retry for node communication due to the coordinator being fas…
Browse files Browse the repository at this point in the history
…ter than the network creation in some cases
  • Loading branch information
Saetch committed Jun 30, 2024
1 parent 1f3b220 commit bdc1eeb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cluster/coordinator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cluster/coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ strip = "symbols"
[dependencies]
actix-web = "4.6.0"
awc = "3.5.0"
bytes = "1.6.0"
env_logger = "0.11.3"
futures = "0.3.30"
rand = "0.8.5"
Expand Down
35 changes: 29 additions & 6 deletions Cluster/coordinator/src/deal_with_nodes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{ sync::Arc};

use actix_web::web;

Expand All @@ -24,12 +24,35 @@ pub async fn redistribute_values(data: Arc<InteriorMutableState>, hash_value: u1
});
let to_distribute_from = if index != all_known_nodes.len()-1 {index + 1} else {0} ; //the values are distributed to the next hash value, so getting the next one after the current one yields the one that might have a share of values to redistribute
let node_to_distribute_from = &all_known_nodes[to_distribute_from];
let url = format!("http://{}:5552/deleteSavedValuesBelow/{}", node_to_distribute_from.name, hash_value.to_string());
println!("URL: {}", url);
let mut worked_communication = false;
let mut response = Err(awc::error::SendRequestError::Timeout);
for _ in 0..=5{
let url = format!("http://{}:5552/deleteSavedValuesBelow/{}", node_to_distribute_from.name, hash_value.to_string());
println!("URL: {}", url);
//If trying to deal with nonexistent or non-responding node, edit here!
response = awc::Client::default().post(url).send().await;
if response.is_ok(){
break;
}else{
println!("Failed to communicate with node: {} ... retrying ...", node_to_distribute_from.name);
std::thread::sleep(std::time::Duration::from_secs(1));
}


}
let mut resp: bytes::Bytes;
if let Ok(mut response) = response{
resp = response.body().await.unwrap();
println!("Response: {:?}", response);
}else{
println!("Expected aborting ... ");
panic!("Node with hash value {} not found in known_nodes with data: {:?}", hash_value, data);
}
let values_to_distribute : Vec<Position> = serde_json::from_slice(&resp).unwrap();
println!("Response: {:?}", resp);
println!("Values to distribute: {:?}", values_to_distribute);
//If trying to deal with nonexistent or non-responding node, edit here!
let response = awc::Client::default().post(url).send().await.unwrap().body().await.unwrap();
println!("Response: {:?}", response);
let values_to_distribute : Vec<Position> = serde_json::from_slice(&response).unwrap();
let values_to_distribute : Vec<Position> = serde_json::from_slice(&resp).unwrap();
println!("Values to distribute: {:?}", values_to_distribute);
positions_vec = values_to_distribute;
}else{
Expand Down

0 comments on commit bdc1eeb

Please sign in to comment.