Skip to content

Commit

Permalink
[#2] Added communication from node to coordinator in order to request…
Browse files Browse the repository at this point in the history
… information about relevant node
  • Loading branch information
Saetch committed May 18, 2024
1 parent f722a5e commit e0960dc
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 79 deletions.
55 changes: 45 additions & 10 deletions Cluster/Node_cs/ApiConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,26 @@ public class ApiConfig
{

public Dictionary<Tuple<int, int>, double> savedValues = new Dictionary<Tuple<int, int>, double>();

public const String BICUBIC_INTERPOLATION_SERVICE_URL = "http://bicubic_interpolation_service:8080/calculate";
public Dictionary<Tuple<int, int>, String> exteriorValuesInNodes = new Dictionary<Tuple<int, int>, String>();
public String BICUBIC_INTERPOLATION_SERVICE_URL = "http://bicubic_interpolation_service:8080/calculate";
public String hostname = Environment.GetEnvironmentVariable("HOSTNAME");
public String COORDINATOR_SERVICE_URL = "coordinator-1";

public String COORDINATOR_SERVICE_URL = "coordinator";
public int PORT = 8080;
public int initializeConfigValues()
{
Console.WriteLine("hostname is: "+ this.hostname);

String env = Environment.GetEnvironmentVariable("CLUSTER_ENVIRONMENT");
if (env == null){
Console.WriteLine("CLUSTER_ENVIRONMENT is: "+ env);
this.COORDINATOR_SERVICE_URL = "localhost";
this.PORT = 5003;
this.BICUBIC_INTERPOLATION_SERVICE_URL = "http://localhost:5555/calculate";
this.hostname = "node1";
}
TcpClient tcpClient = new TcpClient();
var result = tcpClient.BeginConnect(this.COORDINATOR_SERVICE_URL, 8080, null, null);
var result = tcpClient.BeginConnect(this.COORDINATOR_SERVICE_URL, this.PORT, null, null);

var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(15));
var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(150));
if (success)
{
tcpClient.EndConnect(result);
Expand All @@ -37,9 +44,9 @@ public int initializeConfigValues()


HttpClient client = new HttpClient();
var response = client.GetAsync("http://" + this.COORDINATOR_SERVICE_URL + ":8080/register/").Result;
Console.WriteLine("Received response from coordinator service: " + response.Content.ReadAsStringAsync().Result);
return 1; //TODO! Update this if the Node is configured correctly according to the response from the coordinator service
var response = client.PostAsync("http://" + this.COORDINATOR_SERVICE_URL + ":"+ this.PORT+"/organize/register/"+this.hostname, null).Result;
DealWithResponse(response);
return 0; //TODO! Update this if the Node is configured correctly according to the response from the coordinator service
}

public WebApplication setupServer(){
Expand Down Expand Up @@ -108,6 +115,34 @@ private void registerRequests(WebApplication app)
}
});
}

private void DealWithResponse(HttpResponseMessage response){
String responseString = response.Content.ReadAsStringAsync().Result;
Console.WriteLine("Received response from coordinator service: " + responseString);
if (responseString.Contains("WAIT")){
Console.WriteLine("Received WAIT command ... ");
return;
}
if (!responseString.Contains("HANDLE")){
Console.WriteLine("Received invalid response from coordinator service: " + responseString);
throw new Exception("Received invalid response from coordinator service: " + responseString);
}
//This is an example response: {"HANDLE":{"positions":[{"x":0,"y":0,"value":0.6816054984788531},{"x":1,"y":0,"value":0.6952797360508614},{"x":0,"y":1,"value":3.0950656335878035},{"x":1,"y":1,"value":2.0697533239357435}]}}
String valuesPart = responseString.Split("[{")[1];
String [] valuesStrings = valuesPart.Split("{");
foreach (String valueString in valuesStrings){
if (valueString.Length < 3){
continue;
}
String [] valueParts = valueString.Split(",");
int x = Int32.Parse(valueParts[0].Split(":")[1]);
int y = Int32.Parse(valueParts[1].Split(":")[1]);
double value = Double.Parse(valueParts[2].Replace("}","").Replace("]","").Split(":")[1]);
savedValues.Add(new Tuple<int, int>(x,y), value);
}


}

}

Expand Down
18 changes: 16 additions & 2 deletions Cluster/Node_cs/GetRequests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ static async Task<double> CalculateInterpolatedValue(double x, double y, ApiConf
int zeroed_actual_x = (int)Math.Floor(x);
int zeroed_actual_y = (int)Math.Floor(y);
double [][] valueArray = await NodeBehavior.GetValuesForInterpolation(zeroed_actual_x, zeroed_actual_y, config);

Console.WriteLine("Successfully got values for interpolation");
//request solving the interpolation from the bicubic interpolation service
var client = new HttpClient();
String arr = "";
Expand All @@ -38,7 +40,7 @@ static async Task<double> CalculateInterpolatedValue(double x, double y, ApiConf
}
arr = arr.TrimEnd(';');
Console.WriteLine("Sending request to bicubic interpolation service with values: " + arr);
var response = await client.GetAsync(ApiConfig.BICUBIC_INTERPOLATION_SERVICE_URL + "?x="+(x -zeroed_actual_x)+"&y="+(y -zeroed_actual_y)+"&arr=" + arr);
var response = await client.GetAsync(config.BICUBIC_INTERPOLATION_SERVICE_URL + "?x="+(x -zeroed_actual_x)+"&y="+(y -zeroed_actual_y)+"&arr=" + arr);
Console.WriteLine("Received response from bicubic interpolation service: " + response.Content.ReadAsStringAsync().Result);
double actual_value = Double.Parse(response.Content.ReadAsStringAsync().Result, CultureInfo.InvariantCulture);
return actual_value;
Expand All @@ -49,17 +51,29 @@ static async Task<double> CalculateInterpolatedValue(double x, double y, ApiConf



public static double? GetSavedNodeValue(int x, int y, ApiConfig config){
public static double? GetSavedNodeValue(int x, int y, ApiConfig config){
Console.WriteLine("Received SavedValue-call with params: " + x + "/" + y);
Tuple<int, int> key = new Tuple<int, int>(x, y);
DisplayState(config);
if(config.savedValues.ContainsKey(key)){
return config.savedValues[key];
}
return null;
}

private static void DisplayState(ApiConfig config){
Console.WriteLine("State: " + config.savedValues);
Tuple<int, int>[] keys = config.savedValues.Keys.ToArray();
for (int i = 0; i < keys.Length; i++)
{
Console.WriteLine("Key: " + keys[i] + " Value: " + config.savedValues[keys[i]]);
}
}

public static XYValues GetValue(string values, ApiConfig config)
{
Console.WriteLine("Received GetValue-call with params: " + values);
DisplayState(config);
string[] splitValues = values.Split("_");
double x = Double.Parse(splitValues[0].Replace(',', '.'), CultureInfo.InvariantCulture);
double y = Double.Parse(splitValues[1].Replace(',', '.'), CultureInfo.InvariantCulture);
Expand Down
116 changes: 64 additions & 52 deletions Cluster/Node_cs/NodeBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ public static double[][] FillInNullValues(double? [][] values){
}
}
}
for(int j = 1; j <3; j++){
for(int j = 0; j < 4; j++){
double? ToFillValue = null;
for(int i = 0; i < 4; i++){
for(int i = j == 0 ? 1 : j == 3 ? 1 : 0 ; i < (j==3 ? 3 : j == 0 ? 3 : 4); i++){
if (values[i][j] == null){
values[i][j] = ToFillValue;
}else{
Expand Down Expand Up @@ -137,28 +137,62 @@ public static double[][] FillInNullValues(double? [][] values){
}


//TODO! This is hardcoded and only works with the default configuration now. Update this to work with any configuration
public static int[] GetNodePoints(int x, int y, ApiConfig config){
var ret = new int[2];
if (x < 0) {
ret [0] = (x-1) / 2;
}else{
ret [0] = x / 2;
public static String GetHoldingNode(int x, int y, ApiConfig config){
Tuple<int, int> key = new Tuple<int, int>(x, y);
if (config.exteriorValuesInNodes.TryGetValue(key, out string? value)){
TcpClient tcpClient = new TcpClient();
var result = tcpClient.BeginConnect(value, 5552, null, null);
var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(22));
if (success)
{
tcpClient.EndConnect(result);
tcpClient.Close();
return value;
}
else{
Console.WriteLine("Failed to connect to known node, requesting new information ... ");
tcpClient.Close();
}
} else {
Console.WriteLine("No information about node found, requesting new information ... ");
}
if (y < 0) {
ret [1] = (y-1) / 2;
}else{
ret [1] = y / 2;

TcpClient tcpClient2 = new TcpClient();
var result2 = tcpClient2.BeginConnect(config.COORDINATOR_SERVICE_URL, config.PORT, null, null);
var success2 = result2.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(150));
if (success2)
{
tcpClient2.EndConnect(result2);
}
return ret;
else{
while (!success2){
Console.WriteLine("Failed to connect to coordinator service ... ");
//sleep for 1 second
System.Threading.Thread.Sleep(1000);
result2 = tcpClient2.BeginConnect(config.COORDINATOR_SERVICE_URL, config.PORT, null, null);
success2 = result2.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(150));
}
tcpClient2.EndConnect(result2);
}
tcpClient2.Close();
HttpClient client = new HttpClient();
var response = client.GetAsync("http://" + config.COORDINATOR_SERVICE_URL + ":"+ config.PORT+"/organize/get_node/"+x+"/"+y).Result;
var node = response.Content.ReadAsStringAsync().Result;
config.exteriorValuesInNodes[key] = node;
return node;

}

//as of yet it is hardcoded since the nodes are hardcoded, too. Assuming that all nodes have the same height and width
static String GetNodeURL(int x, int y, ApiConfig config){
static String? GetNodeURL(int x, int y, ApiConfig config){

var nodePoints = GetNodePoints(x, y, config);
var nodePoints = GetHoldingNode(x, y, config);

return "http://node_x_" + nodePoints[0] + "_y_" + nodePoints[1] +":5552/getSavedValue/" + x + "/" + y + "/";
if (nodePoints.Equals("Unknown")){
Console.WriteLine("Node not found!");
return null;
}
return nodePoints + "/getSavedValue/"+x+"/"+y;
}


Expand All @@ -171,53 +205,31 @@ static String GetNodeURL(int x, int y, ApiConfig config){
values[i] = new double?[4];
}
String apiUrl;
String? node;
HttpResponseMessage response;
var current_x = zeroed_actual_x;
var current_y = zeroed_actual_y;
bool found = false;

for (int i = -1; i < 3; i++)
{
Console.WriteLine("Filling in row " + i);
for (int j = -1; j < 3; j++){
Tuple<int, int> current_key = new Tuple<int, int>(zeroed_actual_x + i, zeroed_actual_y + j);
if (config.savedValues.ContainsKey(current_key)){
values[i][j] = config.savedValues[current_key];
Console.WriteLine("Value found in saved values, using it ... ");
Console.WriteLine("Key: "+current_key+" -> " + config.savedValues[current_key]);
values[i+1][j+1] = config.savedValues.GetValueOrDefault(current_key);
continue;
}
TcpClient tcpClient = new TcpClient();
found = false;
current_x = zeroed_actual_x + i;
current_y = zeroed_actual_y + j;
//in order to correctly identify negative values, it is needed to offset them by one, otherwise the division will not work correctly
var nodePoints = GetNodePoints(current_x, current_y, config);
int node_x = nodePoints[0];
int node_y = nodePoints[1];
Console.WriteLine("Filling in value for " + current_x + "/" + current_y);
//check wether or not there is a connection

Console.WriteLine("Trying to connect to node_x_" + node_x + "_y_" + node_y);
var result = tcpClient.BeginConnect("node_x_" + node_x + "_y_" + node_y, 5552, null, null);

var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(22));
if (success)
{
found = true;
tcpClient.EndConnect(result);
}
tcpClient.Close();

if (found){
apiUrl = GetNodeURL(zeroed_actual_x + i, zeroed_actual_y + j, config);
Console.WriteLine("Making request to: " + apiUrl+" while trying to catch HTTPRequestExceptions");

// Make a GET request to the external API
response = httpClient.Send(new HttpRequestMessage(HttpMethod.Get, apiUrl));
}else{
Console.WriteLine("No server running, setting value to null!");
node = GetNodeURL(zeroed_actual_x + i, zeroed_actual_y + j, config);
if (node == null){
Console.WriteLine("Node not found, setting value to null!");
values[i + 1][j + 1] = null;
continue;
}
apiUrl = node;
Console.WriteLine("Making request to: " + apiUrl+" while trying to catch HTTPRequestExceptions");

// Make a GET request to the external API
response = httpClient.Send(new HttpRequestMessage(HttpMethod.Get, apiUrl));


// Read the response content as a string
string responseBody = await response.Content.ReadAsStringAsync();
Expand Down
1 change: 1 addition & 0 deletions Cluster/Node_cs/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}

}
}
}
15 changes: 15 additions & 0 deletions Cluster/coordinator/src/get_requests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{rc::Weak, sync::Arc};

use actix_web::{ web, HttpResponse, Responder};

use crate::state::{ImmutableState, InteriorMutableState};
Expand All @@ -9,4 +11,17 @@ pub async fn get_complete_state(data: web::Data<InteriorMutableState>) -> impl R
println!("{:?}", state);
let immutable_state = ImmutableState::from_interior(&state).await;
HttpResponse::Ok().json(web::Json(immutable_state))
}

pub async fn get_node_for_point(path: web::Path<(i32, i32)>, data: web::Data<InteriorMutableState>) -> impl Responder {

let state = data.into_inner();
let (x, y) = path.into_inner();
let point_map = state.map_data.read().await;
let node = point_map.get(&(x, y));
if let Some(node) = node {
let string = node.upgrade().unwrap().as_ref().clone();
return HttpResponse::Ok().body(string);
}
HttpResponse::Ok().body("Unknown")
}
4 changes: 1 addition & 3 deletions Cluster/coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ async fn main() -> std::io::Result<()> {
.route("/initialize", web::post().to(initialize))
.route("/initialize", web::get().to(initialize))
.route("/get_complete_state", web::get().to(get_requests::get_complete_state))
.route("/delete", web::delete().to(|| HttpResponse::Ok()))
.route("/update", web::put().to(|| HttpResponse::Ok()))
.route("/read", web::get().to(|| HttpResponse::Ok()))
.route("/get_node/{x}/{y}", web::get().to(get_requests::get_node_for_point))
.route("/hey", web::get().to(manual_hello)),
)
.route("/ping", web::get().to(ping))
Expand Down
16 changes: 7 additions & 9 deletions Cluster/coordinator/src/post_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,14 @@ pub async fn register(path: web::Path<String>, data: web::Data<InteriorMutableSt
//if there are values to distribute, distribute them
let mut vec_positions = Vec::new();
let number_to_distribute = *data.expected_values_per_node.read().await;
let to_distribute = data.to_distribute.read().await;
let mut iter = to_distribute.iter();
for _i in 0..number_to_distribute{
let node_val_to_distribute = iter.next();
if let Some(node_val_to_distribute) = node_val_to_distribute {
vec_positions.push(node_val_to_distribute.clone());
}else {
break;
}
let mut to_distribute = data.to_distribute.write().await;
let to_drain = u32::min(number_to_distribute,to_distribute.len() as u32);

let mut iter = to_distribute.drain(0..to_drain as usize);
while let Some(node_val_to_distribute) = iter.next() {
vec_positions.push(node_val_to_distribute.clone());
}
drop(iter);
drop(to_distribute);
let mut nodes_map = data.known_nodes.write().await;
let node_state = nodes_map.get_mut(&cloned).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions Cluster/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ services:
context: ./coordinator
dockerfile: Dockerfile
ports:
- "8080" ## this is for debug purposes, the service does not need to be exposed to the host at all
- "5003:8080" ## this is for debug purposes, the service does not need to be exposed to the host at all
networks:
- distr-network
environment:
- "ENVIRONMENT=Development"
deploy:
mode: replicated
replicas: 1
restart: always
replicas: 1 ## this is needed as of now. Do not change this or the values will be distributed unevenly
restart: always
networks:
distr-network:
driver: bridge

0 comments on commit e0960dc

Please sign in to comment.