diff --git a/Cluster/Node_cs/ApiConfig.cs b/Cluster/Node_cs/ApiConfig.cs index 267c5c2..7da5d48 100644 --- a/Cluster/Node_cs/ApiConfig.cs +++ b/Cluster/Node_cs/ApiConfig.cs @@ -133,7 +133,15 @@ private void registerRequests(WebApplication app) return Results.BadRequest(e.Message); } }); - + app.MapGet("/getMultipleSavedValues", async (String parameters) => { + try { + Console.WriteLine("Received getMultipleSavedValues-call with params: " + parameters ); + var retVal = await NodeBehavior.GetMultipleSavedValues(parameters, this); + return Results.Ok(retVal); + } catch (Exception e) { + return Results.BadRequest(e.Message); + } + }); @@ -179,6 +187,12 @@ private void DealWithResponse(HttpResponseMessage response){ } + +public class SavedValue{ + public required Position Position { get; set; } + public double value { get; set; } +} + public class XYValues { public double x { get; set; } @@ -197,6 +211,12 @@ public class Position{ public int y { get; set; } } +public class NodeResponse{ + public required String name {get; set;} + + public ushort hash {get; set;} +} + [JsonSerializable(typeof(String))] [JsonSerializable(typeof(List))] [JsonSerializable(typeof(List))] @@ -206,6 +226,10 @@ public class Position{ [JsonSerializable(typeof(XYValues[]))] [JsonSerializable(typeof(HashedPosition))] [JsonSerializable(typeof(List))] +[JsonSerializable(typeof(Tuple))] +[JsonSerializable(typeof(List>))] +[JsonSerializable(typeof(NodeResponse))] +[JsonSerializable(typeof(List))] internal partial class AppJsonSerializerContext : JsonSerializerContext { diff --git a/Cluster/Node_cs/Dockerfile b/Cluster/Node_cs/Dockerfile index 5eff80e..b7690d2 100644 --- a/Cluster/Node_cs/Dockerfile +++ b/Cluster/Node_cs/Dockerfile @@ -18,4 +18,5 @@ RUN dotnet publish "Node_cs.csproj" -c Release -o /app/publish /p:UseAppHost=fal FROM base AS final WORKDIR /app COPY --from=publish /app/publish . +RUN apk add curl ENTRYPOINT ["dotnet", "Node_cs.dll"] diff --git a/Cluster/Node_cs/Dockerfiles/aot/Dockerfile b/Cluster/Node_cs/Dockerfiles/aot/Dockerfile index 8963d84..5735b35 100644 --- a/Cluster/Node_cs/Dockerfiles/aot/Dockerfile +++ b/Cluster/Node_cs/Dockerfiles/aot/Dockerfile @@ -15,7 +15,7 @@ RUN dotnet build -c Release -o /app/build RUN apt-get update && \ apt-get install -y clang zlib1g-dev # Publish the application -RUN dotnet publish -p:PublishAot=true -p:StripSymbols=true -c Debug -o /app/publish -r linux-musl-x64 +RUN dotnet publish -p:PublishAot=true -p:StripSymbols=true -c Release -o /app/publish -r linux-musl-x64 # Switch to the runtime image FROM alpine:3.19 AS runtime @@ -23,7 +23,7 @@ RUN apk add libc6-compat # Set the working directory inside the Docker image WORKDIR /app - +RUN apk add curl # Copy the published application COPY --from=build /app/publish/* . # Set the entry point for the container diff --git a/Cluster/Node_cs/GetRequests.cs b/Cluster/Node_cs/GetRequests.cs index 82fce06..f963656 100644 --- a/Cluster/Node_cs/GetRequests.cs +++ b/Cluster/Node_cs/GetRequests.cs @@ -54,7 +54,6 @@ static async Task CalculateInterpolatedValue(double x, double y, ApiConf public static double? GetSavedNodeValue(int x, int y, ApiConfig config){ Console.WriteLine("Received SavedValue-call with params: " + x + "/" + y); Tuple key = new Tuple(x, y); - DisplayState(config); if(config.savedValues.ContainsKey(key)){ return config.savedValues[key]; } @@ -62,7 +61,6 @@ static async Task CalculateInterpolatedValue(double x, double y, ApiConf } private static void DisplayState(ApiConfig config){ - Console.WriteLine("State: " + config.savedValues); Tuple[] keys = config.savedValues.Keys.ToArray(); for (int i = 0; i < keys.Length; i++) { diff --git a/Cluster/Node_cs/NodeBehavior.cs b/Cluster/Node_cs/NodeBehavior.cs index 1606f50..090214b 100644 --- a/Cluster/Node_cs/NodeBehavior.cs +++ b/Cluster/Node_cs/NodeBehavior.cs @@ -10,16 +10,174 @@ namespace Node_cs class NodeBehavior{ public static async Task GetValuesForInterpolation(int zeroed_actual_x, int zeroed_actual_y, ApiConfig config){ + var task1 = QueryAllNodesWithHashes(config); + double? [][] nullableMatrix; + var task2 = CheckMatrixValues(zeroed_actual_x, zeroed_actual_y, config); + await Task.WhenAll(task1, task2); + + + List toFindList = task2.Result.Item1; + nullableMatrix = task2.Result.Item2; + List nodeHashes = task1.Result; - Console.WriteLine("Filling in matrix values for "+zeroed_actual_x+"/"+ zeroed_actual_y+" ... "); - double? [][] nullableMatrix = await GetActualValuesFromNodes(zeroed_actual_x, zeroed_actual_y, config); + + bool foundAllValues = await FillInQueriedValues(toFindList, nodeHashes, nullableMatrix, zeroed_actual_x -1 , zeroed_actual_y -1); return FillInNullValues(nullableMatrix); } + private static async Task> QueryAllNodesWithHashes(ApiConfig config){ + Console.WriteLine("Querying hashes for all nodes ... "); + using (HttpClient httpClient = new HttpClient()){ + string baseUrl = "http://"+config.COORDINATOR_SERVICE_URL+":"+config.PORT+"/organize/get_all_nodes"; + Console.WriteLine("Making request to: " + baseUrl); + HttpResponseMessage response = await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Get, baseUrl)); + string responseBody = await response.Content.ReadAsStringAsync(); + Console.WriteLine("Received response for values from nodes: " + responseBody); + //deserialize responseBody to List + var ret = ParseNodeResponsesFromResponse(responseBody); + + return ret; + } + } + + //get the input parameters from the query parameters in ASP.NET + public static async Task> GetMultipleSavedValues(String query, ApiConfig config){ + var retList = new List(); + List positions = []; + + string[] positionString = query.Split(";"); + foreach (string pos in positionString){ + if( pos == "" ){ + continue; + } + string[] parts = pos.Split(","); + if (parts.Length != 2){ + throw new Exception("Invalid query parameters: " + query); + } + int x = Int32.Parse(parts[0]); + int y = Int32.Parse(parts[1]); + positions.Add(new Position { x = x, y = y }); + } + + foreach (Position pos in positions){ + Console.WriteLine("Try getting saved value for: " + pos.x + "/" + pos.y); + + if (config.savedValues.ContainsKey(new Tuple(pos.x, pos.y))){ + retList.Add(new XYValues { x = pos.x, y = pos.y, value = config.savedValues[new Tuple(pos.x, pos.y)] }); + } + } + + return retList; + } + + private static async Task FillInQueriedValues( List toFindLisT, List hashValues, double? [][] values, int starting_x, int starting_y){ + + List> parrallelListForNodes = []; + for (int i = 0; i < hashValues.Count; i++) + { + parrallelListForNodes.Add([]); + } + int index = 0; + foreach (HashedPosition point in toFindLisT){ + parrallelListForNodes[BinarySearchFittingNode(hashValues, point.hash)].Add(index); + index++; + } + List tasks = []; + for(int i = 0; i < hashValues.Count; i++){ + if (parrallelListForNodes[i].Count == 0){ + continue; + } + tasks.Add(FillInValuesForNode(parrallelListForNodes[i], toFindLisT, hashValues[i].name, values, starting_x, starting_y)); + } + + + await Task.WhenAll(tasks); + bool successfullyConnected = true; + + + + + + return successfullyConnected; + } + + + private static async Task FillInValuesForNode(List positionIndices, List positions, String node, double? [][] values, int starting_x, int starting_y){ + Console.WriteLine("Filling in values for node: " + node); + using (HttpClient httpClient = new HttpClient()){ + string apiUrl = "http://"+node+":5552/getMultipleSavedValues?parameters="; + foreach (int index in positionIndices){ + apiUrl += positions[index].x + "," + positions[index].y + ";"; + } + Console.WriteLine("Making request to: " + apiUrl); + HttpResponseMessage response = await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Get, apiUrl)); + string responseBody = await response.Content.ReadAsStringAsync(); + Console.WriteLine("Received response: " + responseBody); + //deserialize responseBody to List + List responseValues = ParseSavedValuesFromResponse(responseBody); + foreach (SavedValue value in responseValues){ + Console.WriteLine("Filling in value: " + value.value + " at position: " + value.Position.x + "/" + value.Position.y); + Position pos = value.Position; + if (values[pos.x-starting_x][pos.y-starting_y] != null){ + Console.WriteLine("Value already filled in, skipping ... Something must have gone wrong!!"); + } + values[pos.x-starting_x][pos.y-starting_y] = value.value; + + } + } + } + + private static int BinarySearchFittingNode(List hashValues, int aimedValue){ + int start = hashValues[0].hash; + int end = hashValues[^1].hash; + if(aimedValue < start || aimedValue > end){ + return 0; + } + int len = hashValues.Count; + int index = len/2; + int upperBound = len -1; + int lowerBound = 0; + Console.WriteLine("Binary searching for NodeResponses: "); + foreach (NodeResponse node in hashValues){ + Console.WriteLine(""+node.hash); + } + Console.WriteLine("Trying to find node for hashValue: "+aimedValue); + while (true){ + int currentValue = hashValues[index].hash; + int nextValue = hashValues[index + 1].hash; + if (aimedValue > currentValue && aimedValue <= nextValue){ + Console.WriteLine("Returning value from binary search: "+(index+1)); + return index + 1; + } + if(aimedValue > currentValue){ + lowerBound = index+1; + index += (upperBound - index)/2; + }else{ + upperBound = index -1; + index -= Math.Max(1,(index - lowerBound)/2); + } + } + + } public static double[][] FillInNullValues(double? [][] values){ + bool allValues = true; + //having null values is relatively unlikely, so we should check wether this is needed at all + for (int i = 0; i < 4; i++) + { + for (int j = 0; j < 4; j++) + { + if (values[i][j] == null){ + Console.WriteLine("Null value found at: " + i + "/" + j); + allValues = false; + + } + } + } + + if(!allValues){ //check wether or not the direct neighbors to the corners are null bool[] corners = new bool[4]; for (int i = 0; i < corners.Length; i++) @@ -110,18 +268,23 @@ public static double[][] FillInNullValues(double? [][] values){ if (values[3][3] == null){ values[3][3] = (values[3][2] + values[2][3]) / 2; } + + Console.WriteLine("Filled in values: "); + + + } + double[][] ret = new double[4][]; for (int i = 0; i < 4; i++) { ret[i] = new double[4]; for (int j = 0; j < 4; j++) { -#pragma warning disable CS8629 // Nullable value type may be null. - ret[i][j] = values[i][j].Value; -#pragma warning restore CS8629 // Nullable value type may be null. - } + #pragma warning disable CS8629 // Nullable value type may be null. + ret[i][j] = values[i][j].Value; + #pragma warning restore CS8629 // Nullable value type may be null. + } } - Console.WriteLine("Filled in values: "); //print the values for (int i = 0; i < 4; i++) { @@ -135,6 +298,58 @@ public static double[][] FillInNullValues(double? [][] values){ } + private static List ParseNodeResponsesFromResponse(String responseBody){ + if (responseBody == null){ + throw new Exception("Response body is null"); + } + if (responseBody == "" || responseBody == "[]"){ + return []; + } + List ret = []; + string[] objects = responseBody.Replace("[{","").Replace("}]","").Split("},{"); + foreach (string obj in objects){ + string[] parts = obj.Split(","); + var nameInputs = parts[0].Split(":"); + var hashInputs = parts[1].Split(":"); + if (!nameInputs[0].Equals("\"name\"") || !hashInputs[0].Equals("\"hash\"")){ + throw new Exception("Invalid response from coordinator service: " + responseBody); + } + String name = nameInputs[1].Replace("\"",""); + ushort hash = UInt16.Parse(hashInputs[1]); + ret.Add(new NodeResponse { name = name, hash = hash }); + } + return ret; + } + + private static async Task, double? [][]>>CheckMatrixValues(int zeroed_actual_x, int zeroed_actual_y, ApiConfig config){ + var values = new double?[4][]; + for (int i = 0; i < 4; i++) + { + values[i] = new double?[4]; + } + Console.WriteLine("Checking matrix values ... "); + List toFindList = []; + for (int i = -1; i < 3; i++) + { + for (int j = -1; j < 3; j++) + { + var tuple = new Tuple(zeroed_actual_x + i, zeroed_actual_y + j); + if (config.savedValues.ContainsKey(tuple)){ + values[i + 1][j + 1] = config.savedValues[tuple]; + }else{ + toFindList.Add(new Position { x = zeroed_actual_x + i, y = zeroed_actual_y + j }); + } + } + } + + var ret = await QueryHasherForPoints(toFindList); + + + return Tuple.Create(ret, values); + } + + + public static String GetHoldingNode(int x, int y, ApiConfig config){ Tuple key = new Tuple(x, y); @@ -287,7 +502,6 @@ internal static async Task> DeleteSavedValuesBelow(string hash, A continue; } } - Console.WriteLine("hashval: " + hashVal + " ownerHash: " + api.ownerHash + " pointHash: " + pointHash.hash); var tuple = new Tuple(pointHash.x, pointHash.y); var ret = api.savedValues.ContainsKey(tuple); if(!ret) throw new Exception("Tried removing nonexistent value"); @@ -321,12 +535,6 @@ private static async Task> QueryHasherForPoints(List(); - for (int i = 0; i < 12; i++) - { - hashPosList.Add(new HashedPosition { x = i, y = i, hash = 1234 }); - } //deserialize into list of HashedPosition List result = ParseHashedPositionsFromResponse(responseBody ?? ""); // Use the null-coalescing operator to provide a default value in case the response body is null Console.WriteLine("Returning result: " + result); @@ -334,7 +542,30 @@ private static async Task> QueryHasherForPoints(List ParseSavedValuesFromResponse(String responseBody){ + if (responseBody == null){ + throw new Exception("Response body is null"); + } + if (responseBody == "" || responseBody == "[]"){ + return []; + } + List ret = []; + string[] objects = responseBody.Replace("[{","").Replace("}]","").Split("},{"); + foreach (string obj in objects){ + string[] parts = obj.Split(","); + var xInputs = parts[0].Split(":"); + var yInputs = parts[1].Split(":"); + var valueInputs = parts[2].Split(":"); + if (!xInputs[0].Equals("\"x\"") || !yInputs[0].Equals("\"y\"") || !valueInputs[0].Equals("\"value\"")){ + throw new Exception("Invalid response from hasher service: " + responseBody); + } + int x = Int32.Parse(xInputs[1]); + int y = Int32.Parse(yInputs[1]); + double value = Double.Parse(valueInputs[1]); + ret.Add(new SavedValue { Position = new Position { x = x, y = y }, value = value }); + } + return ret; + } internal static List ParseHashedPositionsFromResponse(String responseBody){ diff --git a/Cluster/coordinator/src/get_requests.rs b/Cluster/coordinator/src/get_requests.rs index bd4d43d..97de0cb 100644 --- a/Cluster/coordinator/src/get_requests.rs +++ b/Cluster/coordinator/src/get_requests.rs @@ -1,7 +1,9 @@ +use std::borrow::Borrow; + use actix_web::{ web, HttpResponse, Responder}; use futures::{future::join_all, join}; -use crate::{post_requests::HASHER_SERVICE_URL, state::{ImmutableState, InteriorMutableState}}; +use crate::{post_requests::HASHER_SERVICE_URL, state::{ImmutableState, InteriorMutableState, NodeResponse}}; @@ -12,6 +14,12 @@ pub async fn get_complete_state(data: web::Data) -> impl R HttpResponse::Ok().json(web::Json(immutable_state)) } +pub async fn get_all_nodes(data: web::Data) -> impl Responder { + let state = data.into_inner(); + let known_nodes = state.known_nodes.read().await; + HttpResponse::Ok().json(web::Json(known_nodes.iter().map(|x| NodeResponse::from_node_state(x)).collect::>())) +} + pub async fn debug_distribution(data: web::Data) -> impl Responder { let mut fut_vec = Vec::new(); diff --git a/Cluster/coordinator/src/main.rs b/Cluster/coordinator/src/main.rs index 5b11803..50a69b4 100644 --- a/Cluster/coordinator/src/main.rs +++ b/Cluster/coordinator/src/main.rs @@ -43,6 +43,7 @@ async fn main() -> std::io::Result<()> { .route("/upload_value/{x}/{y}/{value}", web::post().to(post_requests::upload_value)) .route("/hey", web::get().to(manual_hello)) .route("/debug_distribution", actix_web::web::get().to(get_requests::debug_distribution)) + .route("/get_all_nodes", actix_web::web::get().to(get_requests::get_all_nodes)) ) .route("/ping", web::get().to(ping)) diff --git a/Cluster/coordinator/src/state.rs b/Cluster/coordinator/src/state.rs index 95ddb5b..7767e3a 100644 --- a/Cluster/coordinator/src/state.rs +++ b/Cluster/coordinator/src/state.rs @@ -34,6 +34,12 @@ pub struct ImmutableState { pub expected_values_per_node: u32, } +#[derive(Serialize)] +pub struct NodeResponse { + pub name: NodeName, + pub hash: HashValue, +} + #[derive(Debug, Clone, Serialize)] @@ -68,6 +74,15 @@ pub enum NodeOccupation { } } +impl NodeResponse { + pub fn from_node_state(node_state: &NodeState) -> Self { + Self { + name: node_state.name.clone(), + hash: node_state.hash_value, + } + } +} + impl InteriorMutableState{ pub fn new() -> Self { Self { @@ -98,31 +113,13 @@ impl InteriorMutableState{ let mut to_distribute = self.to_distribute.write().await; assert!(to_distribute.is_empty()); let mut rand = rand::thread_rng(); - const SIZE: usize = 16; - const WIDTH: usize = 4; - for i in 0 .. SIZE/4 { - - //this is just a dummy and these values are just pushed in this fashion in order to have them in order in the vector to distribute in squares to the nodes - to_distribute.push(DistributeableValue { - x: (i*2) as i32 % WIDTH as i32, - y: (i*2) as i32 / WIDTH as i32 * 2, - value: rand.gen_range(0.0 .. SIZE as f64 / WIDTH as f64), - }); - to_distribute.push(DistributeableValue { - x: (i*2) as i32 % WIDTH as i32 + 1, - y: (i*2) as i32 / WIDTH as i32 * 2, - value: rand.gen_range(0.0 .. SIZE as f64 / WIDTH as f64), - }); - to_distribute.push(DistributeableValue { - x: (i*2) as i32 % WIDTH as i32, - y: (i*2) as i32 / WIDTH as i32 * 2 + 1, - value: rand.gen_range(0.0 .. SIZE as f64 / WIDTH as f64), - }); - to_distribute.push(DistributeableValue { - x: (i*2) as i32 % WIDTH as i32 +1, - y: (i*2) as i32 / WIDTH as i32 * 2 +1, - value: rand.gen_range(0.0 .. SIZE as f64 / WIDTH as f64), - }); + const HEIGHT: usize = 10; + const WIDTH: usize = 10; + + for x in 0..WIDTH { + for y in 0..HEIGHT { + to_distribute.push(Position{x: x as i32, y: y as i32, value: rand.gen_range(0.0..(y +1 )as f64)}); + } } true }