From e0960dc53a54b37f223ff957f5591018ebd10792 Mon Sep 17 00:00:00 2001 From: Saetch Date: Sat, 18 May 2024 16:23:34 +0200 Subject: [PATCH] [#2] Added communication from node to coordinator in order to request information about relevant node --- Cluster/Node_cs/ApiConfig.cs | 55 +++++++-- Cluster/Node_cs/GetRequests.cs | 18 ++- Cluster/Node_cs/NodeBehavior.cs | 116 ++++++++++-------- .../Node_cs/Properties/launchSettings.json | 1 + Cluster/coordinator/src/get_requests.rs | 15 +++ Cluster/coordinator/src/main.rs | 4 +- Cluster/coordinator/src/post_requests.rs | 16 ++- Cluster/docker-compose.yml | 6 +- 8 files changed, 152 insertions(+), 79 deletions(-) diff --git a/Cluster/Node_cs/ApiConfig.cs b/Cluster/Node_cs/ApiConfig.cs index b0aa89c..e224543 100644 --- a/Cluster/Node_cs/ApiConfig.cs +++ b/Cluster/Node_cs/ApiConfig.cs @@ -12,19 +12,26 @@ public class ApiConfig { public Dictionary, double> savedValues = new Dictionary, double>(); - - public const String BICUBIC_INTERPOLATION_SERVICE_URL = "http://bicubic_interpolation_service:8080/calculate"; + public Dictionary, String> exteriorValuesInNodes = new Dictionary, String>(); + public String BICUBIC_INTERPOLATION_SERVICE_URL = "http://bicubic_interpolation_service:8080/calculate"; public String hostname = Environment.GetEnvironmentVariable("HOSTNAME"); - public String COORDINATOR_SERVICE_URL = "coordinator-1"; - + public String COORDINATOR_SERVICE_URL = "coordinator"; + public int PORT = 8080; public int initializeConfigValues() { Console.WriteLine("hostname is: "+ this.hostname); - + String env = Environment.GetEnvironmentVariable("CLUSTER_ENVIRONMENT"); + if (env == null){ + Console.WriteLine("CLUSTER_ENVIRONMENT is: "+ env); + this.COORDINATOR_SERVICE_URL = "localhost"; + this.PORT = 5003; + this.BICUBIC_INTERPOLATION_SERVICE_URL = "http://localhost:5555/calculate"; + this.hostname = "node1"; + } TcpClient tcpClient = new TcpClient(); - var result = tcpClient.BeginConnect(this.COORDINATOR_SERVICE_URL, 8080, null, null); + var result = tcpClient.BeginConnect(this.COORDINATOR_SERVICE_URL, this.PORT, null, null); - var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(15)); + var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(150)); if (success) { tcpClient.EndConnect(result); @@ -37,9 +44,9 @@ public int initializeConfigValues() HttpClient client = new HttpClient(); - var response = client.GetAsync("http://" + this.COORDINATOR_SERVICE_URL + ":8080/register/").Result; - Console.WriteLine("Received response from coordinator service: " + response.Content.ReadAsStringAsync().Result); - return 1; //TODO! Update this if the Node is configured correctly according to the response from the coordinator service + var response = client.PostAsync("http://" + this.COORDINATOR_SERVICE_URL + ":"+ this.PORT+"/organize/register/"+this.hostname, null).Result; + DealWithResponse(response); + return 0; //TODO! Update this if the Node is configured correctly according to the response from the coordinator service } public WebApplication setupServer(){ @@ -108,6 +115,34 @@ private void registerRequests(WebApplication app) } }); } + + private void DealWithResponse(HttpResponseMessage response){ + String responseString = response.Content.ReadAsStringAsync().Result; + Console.WriteLine("Received response from coordinator service: " + responseString); + if (responseString.Contains("WAIT")){ + Console.WriteLine("Received WAIT command ... "); + return; + } + if (!responseString.Contains("HANDLE")){ + Console.WriteLine("Received invalid response from coordinator service: " + responseString); + throw new Exception("Received invalid response from coordinator service: " + responseString); + } + //This is an example response: {"HANDLE":{"positions":[{"x":0,"y":0,"value":0.6816054984788531},{"x":1,"y":0,"value":0.6952797360508614},{"x":0,"y":1,"value":3.0950656335878035},{"x":1,"y":1,"value":2.0697533239357435}]}} + String valuesPart = responseString.Split("[{")[1]; + String [] valuesStrings = valuesPart.Split("{"); + foreach (String valueString in valuesStrings){ + if (valueString.Length < 3){ + continue; + } + String [] valueParts = valueString.Split(","); + int x = Int32.Parse(valueParts[0].Split(":")[1]); + int y = Int32.Parse(valueParts[1].Split(":")[1]); + double value = Double.Parse(valueParts[2].Replace("}","").Replace("]","").Split(":")[1]); + savedValues.Add(new Tuple(x,y), value); + } + + + } } diff --git a/Cluster/Node_cs/GetRequests.cs b/Cluster/Node_cs/GetRequests.cs index b0e3b75..82fce06 100644 --- a/Cluster/Node_cs/GetRequests.cs +++ b/Cluster/Node_cs/GetRequests.cs @@ -26,6 +26,8 @@ static async Task CalculateInterpolatedValue(double x, double y, ApiConf int zeroed_actual_x = (int)Math.Floor(x); int zeroed_actual_y = (int)Math.Floor(y); double [][] valueArray = await NodeBehavior.GetValuesForInterpolation(zeroed_actual_x, zeroed_actual_y, config); + + Console.WriteLine("Successfully got values for interpolation"); //request solving the interpolation from the bicubic interpolation service var client = new HttpClient(); String arr = ""; @@ -38,7 +40,7 @@ static async Task CalculateInterpolatedValue(double x, double y, ApiConf } arr = arr.TrimEnd(';'); Console.WriteLine("Sending request to bicubic interpolation service with values: " + arr); - var response = await client.GetAsync(ApiConfig.BICUBIC_INTERPOLATION_SERVICE_URL + "?x="+(x -zeroed_actual_x)+"&y="+(y -zeroed_actual_y)+"&arr=" + arr); + var response = await client.GetAsync(config.BICUBIC_INTERPOLATION_SERVICE_URL + "?x="+(x -zeroed_actual_x)+"&y="+(y -zeroed_actual_y)+"&arr=" + arr); Console.WriteLine("Received response from bicubic interpolation service: " + response.Content.ReadAsStringAsync().Result); double actual_value = Double.Parse(response.Content.ReadAsStringAsync().Result, CultureInfo.InvariantCulture); return actual_value; @@ -49,17 +51,29 @@ static async Task CalculateInterpolatedValue(double x, double y, ApiConf - public static double? GetSavedNodeValue(int x, int y, ApiConfig config){ + public static double? GetSavedNodeValue(int x, int y, ApiConfig config){ + Console.WriteLine("Received SavedValue-call with params: " + x + "/" + y); Tuple key = new Tuple(x, y); + DisplayState(config); if(config.savedValues.ContainsKey(key)){ return config.savedValues[key]; } return null; } + private static void DisplayState(ApiConfig config){ + Console.WriteLine("State: " + config.savedValues); + Tuple[] keys = config.savedValues.Keys.ToArray(); + for (int i = 0; i < keys.Length; i++) + { + Console.WriteLine("Key: " + keys[i] + " Value: " + config.savedValues[keys[i]]); + } + } + public static XYValues GetValue(string values, ApiConfig config) { Console.WriteLine("Received GetValue-call with params: " + values); + DisplayState(config); string[] splitValues = values.Split("_"); double x = Double.Parse(splitValues[0].Replace(',', '.'), CultureInfo.InvariantCulture); double y = Double.Parse(splitValues[1].Replace(',', '.'), CultureInfo.InvariantCulture); diff --git a/Cluster/Node_cs/NodeBehavior.cs b/Cluster/Node_cs/NodeBehavior.cs index 9ac8815..3a16961 100644 --- a/Cluster/Node_cs/NodeBehavior.cs +++ b/Cluster/Node_cs/NodeBehavior.cs @@ -82,9 +82,9 @@ public static double[][] FillInNullValues(double? [][] values){ } } } - for(int j = 1; j <3; j++){ + for(int j = 0; j < 4; j++){ double? ToFillValue = null; - for(int i = 0; i < 4; i++){ + for(int i = j == 0 ? 1 : j == 3 ? 1 : 0 ; i < (j==3 ? 3 : j == 0 ? 3 : 4); i++){ if (values[i][j] == null){ values[i][j] = ToFillValue; }else{ @@ -137,28 +137,62 @@ public static double[][] FillInNullValues(double? [][] values){ } - //TODO! This is hardcoded and only works with the default configuration now. Update this to work with any configuration - public static int[] GetNodePoints(int x, int y, ApiConfig config){ - var ret = new int[2]; - if (x < 0) { - ret [0] = (x-1) / 2; - }else{ - ret [0] = x / 2; + public static String GetHoldingNode(int x, int y, ApiConfig config){ + Tuple key = new Tuple(x, y); + if (config.exteriorValuesInNodes.TryGetValue(key, out string? value)){ + TcpClient tcpClient = new TcpClient(); + var result = tcpClient.BeginConnect(value, 5552, null, null); + var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(22)); + if (success) + { + tcpClient.EndConnect(result); + tcpClient.Close(); + return value; + } + else{ + Console.WriteLine("Failed to connect to known node, requesting new information ... "); + tcpClient.Close(); + } + } else { + Console.WriteLine("No information about node found, requesting new information ... "); } - if (y < 0) { - ret [1] = (y-1) / 2; - }else{ - ret [1] = y / 2; + + TcpClient tcpClient2 = new TcpClient(); + var result2 = tcpClient2.BeginConnect(config.COORDINATOR_SERVICE_URL, config.PORT, null, null); + var success2 = result2.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(150)); + if (success2) + { + tcpClient2.EndConnect(result2); } - return ret; + else{ + while (!success2){ + Console.WriteLine("Failed to connect to coordinator service ... "); + //sleep for 1 second + System.Threading.Thread.Sleep(1000); + result2 = tcpClient2.BeginConnect(config.COORDINATOR_SERVICE_URL, config.PORT, null, null); + success2 = result2.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(150)); + } + tcpClient2.EndConnect(result2); + } + tcpClient2.Close(); + HttpClient client = new HttpClient(); + var response = client.GetAsync("http://" + config.COORDINATOR_SERVICE_URL + ":"+ config.PORT+"/organize/get_node/"+x+"/"+y).Result; + var node = response.Content.ReadAsStringAsync().Result; + config.exteriorValuesInNodes[key] = node; + return node; + } //as of yet it is hardcoded since the nodes are hardcoded, too. Assuming that all nodes have the same height and width - static String GetNodeURL(int x, int y, ApiConfig config){ + static String? GetNodeURL(int x, int y, ApiConfig config){ - var nodePoints = GetNodePoints(x, y, config); + var nodePoints = GetHoldingNode(x, y, config); - return "http://node_x_" + nodePoints[0] + "_y_" + nodePoints[1] +":5552/getSavedValue/" + x + "/" + y + "/"; + if (nodePoints.Equals("Unknown")){ + Console.WriteLine("Node not found!"); + return null; + } + return nodePoints + "/getSavedValue/"+x+"/"+y; } @@ -171,53 +205,31 @@ static String GetNodeURL(int x, int y, ApiConfig config){ values[i] = new double?[4]; } String apiUrl; + String? node; HttpResponseMessage response; - var current_x = zeroed_actual_x; - var current_y = zeroed_actual_y; - bool found = false; - for (int i = -1; i < 3; i++) { Console.WriteLine("Filling in row " + i); for (int j = -1; j < 3; j++){ Tuple current_key = new Tuple(zeroed_actual_x + i, zeroed_actual_y + j); if (config.savedValues.ContainsKey(current_key)){ - values[i][j] = config.savedValues[current_key]; + Console.WriteLine("Value found in saved values, using it ... "); + Console.WriteLine("Key: "+current_key+" -> " + config.savedValues[current_key]); + values[i+1][j+1] = config.savedValues.GetValueOrDefault(current_key); continue; } - TcpClient tcpClient = new TcpClient(); - found = false; - current_x = zeroed_actual_x + i; - current_y = zeroed_actual_y + j; - //in order to correctly identify negative values, it is needed to offset them by one, otherwise the division will not work correctly - var nodePoints = GetNodePoints(current_x, current_y, config); - int node_x = nodePoints[0]; - int node_y = nodePoints[1]; - Console.WriteLine("Filling in value for " + current_x + "/" + current_y); - //check wether or not there is a connection - - Console.WriteLine("Trying to connect to node_x_" + node_x + "_y_" + node_y); - var result = tcpClient.BeginConnect("node_x_" + node_x + "_y_" + node_y, 5552, null, null); - - var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(22)); - if (success) - { - found = true; - tcpClient.EndConnect(result); - } - tcpClient.Close(); - - if (found){ - apiUrl = GetNodeURL(zeroed_actual_x + i, zeroed_actual_y + j, config); - Console.WriteLine("Making request to: " + apiUrl+" while trying to catch HTTPRequestExceptions"); - - // Make a GET request to the external API - response = httpClient.Send(new HttpRequestMessage(HttpMethod.Get, apiUrl)); - }else{ - Console.WriteLine("No server running, setting value to null!"); + node = GetNodeURL(zeroed_actual_x + i, zeroed_actual_y + j, config); + if (node == null){ + Console.WriteLine("Node not found, setting value to null!"); values[i + 1][j + 1] = null; continue; } + apiUrl = node; + Console.WriteLine("Making request to: " + apiUrl+" while trying to catch HTTPRequestExceptions"); + + // Make a GET request to the external API + response = httpClient.Send(new HttpRequestMessage(HttpMethod.Get, apiUrl)); + // Read the response content as a string string responseBody = await response.Content.ReadAsStringAsync(); diff --git a/Cluster/Node_cs/Properties/launchSettings.json b/Cluster/Node_cs/Properties/launchSettings.json index 4e6df2e..375fcac 100644 --- a/Cluster/Node_cs/Properties/launchSettings.json +++ b/Cluster/Node_cs/Properties/launchSettings.json @@ -10,6 +10,7 @@ "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development" } + } } } diff --git a/Cluster/coordinator/src/get_requests.rs b/Cluster/coordinator/src/get_requests.rs index 86a76d9..53a2681 100644 --- a/Cluster/coordinator/src/get_requests.rs +++ b/Cluster/coordinator/src/get_requests.rs @@ -1,3 +1,5 @@ +use std::{rc::Weak, sync::Arc}; + use actix_web::{ web, HttpResponse, Responder}; use crate::state::{ImmutableState, InteriorMutableState}; @@ -9,4 +11,17 @@ pub async fn get_complete_state(data: web::Data) -> impl R println!("{:?}", state); let immutable_state = ImmutableState::from_interior(&state).await; HttpResponse::Ok().json(web::Json(immutable_state)) +} + +pub async fn get_node_for_point(path: web::Path<(i32, i32)>, data: web::Data) -> impl Responder { + + let state = data.into_inner(); + let (x, y) = path.into_inner(); + let point_map = state.map_data.read().await; + let node = point_map.get(&(x, y)); + if let Some(node) = node { + let string = node.upgrade().unwrap().as_ref().clone(); + return HttpResponse::Ok().body(string); + } + HttpResponse::Ok().body("Unknown") } \ No newline at end of file diff --git a/Cluster/coordinator/src/main.rs b/Cluster/coordinator/src/main.rs index eeb700d..a89add9 100644 --- a/Cluster/coordinator/src/main.rs +++ b/Cluster/coordinator/src/main.rs @@ -34,9 +34,7 @@ async fn main() -> std::io::Result<()> { .route("/initialize", web::post().to(initialize)) .route("/initialize", web::get().to(initialize)) .route("/get_complete_state", web::get().to(get_requests::get_complete_state)) - .route("/delete", web::delete().to(|| HttpResponse::Ok())) - .route("/update", web::put().to(|| HttpResponse::Ok())) - .route("/read", web::get().to(|| HttpResponse::Ok())) + .route("/get_node/{x}/{y}", web::get().to(get_requests::get_node_for_point)) .route("/hey", web::get().to(manual_hello)), ) .route("/ping", web::get().to(ping)) diff --git a/Cluster/coordinator/src/post_requests.rs b/Cluster/coordinator/src/post_requests.rs index 0b471fd..ece3661 100644 --- a/Cluster/coordinator/src/post_requests.rs +++ b/Cluster/coordinator/src/post_requests.rs @@ -45,16 +45,14 @@ pub async fn register(path: web::Path, data: web::Data