From 31f0bb1acf7fb69509e443b0cdfe7a938d5a8db9 Mon Sep 17 00:00:00 2001 From: Arthur Bit-Monnot Date: Mon, 3 Oct 2022 15:50:43 +0200 Subject: [PATCH] [grpc] Send intermediate solution to gRPC client --- ext/up/unified_planning | 2 +- ext/up/up_planner.py | 22 +++++++--- grpc/server/src/bin/client.rs | 8 +++- grpc/server/src/bin/server.rs | 83 ++++++++++++++++++++++++----------- planners/src/bin/lcp.rs | 10 ++++- planners/src/solver.rs | 17 +++++-- 6 files changed, 103 insertions(+), 39 deletions(-) diff --git a/ext/up/unified_planning b/ext/up/unified_planning index 875724ef..e21d3df9 160000 --- a/ext/up/unified_planning +++ b/ext/up/unified_planning @@ -1 +1 @@ -Subproject commit 875724effd7e8297de31abca0970fe259e1eb180 +Subproject commit e21d3df9cb731cb1d6cf2459c505a73c53d8530e diff --git a/ext/up/up_planner.py b/ext/up/up_planner.py index 91d4879a..f2574512 100755 --- a/ext/up/up_planner.py +++ b/ext/up/up_planner.py @@ -54,10 +54,12 @@ def _solve(self, problem: 'up.model.AbstractProblem', for response in response_stream: response = self._reader.convert(response, problem) assert isinstance(response, up.engines.results.PlanGenerationResult) - if response.status == PlanGenerationResultStatus.INTERMEDIATE and callback is not None: + if response.status != PlanGenerationResultStatus.INTERMEDIATE: + return response + elif callback: callback(response) else: - return response + pass # Intermediate plan but no callback aries_path = '.' # Assumes that the script is launched from whithin Aries's repository @@ -73,9 +75,9 @@ def __init__(self, port: int): build = subprocess.Popen(aries_build_cmd, shell=True, cwd=aries_path) build.wait() print(f"Launching Aries gRPC server (logs at {log_file})...") - # logs = open(log_file, "w") - # subprocess.Popen([f"{aries_exe}"], cwd=aries_path, shell=True, stdout=logs, stderr=logs) - subprocess.Popen([f"{aries_exe}"], cwd=aries_path, shell=True, stdout=sys.stdout, stderr=sys.stderr) + logs = open(log_file, "w") + subprocess.Popen([f"{aries_exe}"], cwd=aries_path, shell=True, stdout=logs, stderr=logs) + # subprocess.Popen([f"{aries_exe}"], cwd=aries_path, shell=True, stdout=sys.stdout, stderr=sys.stderr) time.sleep(.1) # Let a few milliseconds pass to make sure the server is up and running GRPCPlanner.__init__(self, host="localhost", port=port) @@ -115,8 +117,16 @@ def cost(problem, plan): return None assert len(problem.quality_metrics) == 1 metric = problem.quality_metrics[0] + if metric == None: + return None if isinstance(metric, up.model.metrics.MinimizeActionCosts): return sum([metric.get_action_cost(action_instance.action).int_constant_value() for _, action_instance, _ in plan.timed_actions]) + elif isinstance(metric, up.model.metrics.MinimizeMakespan): + if isinstance(plan, up.plans.TimeTriggeredPlan): + return max([start + (dur if dur else 0) for start, _, dur in plan.timed_actions] + [0]) + else: + raise ValueError("The makespan metric is only defined for time-triggerered plan") + else: raise ValueError("Unsupported metric: ", metric) @@ -127,7 +137,7 @@ def cost(problem, plan): def plan(problem, expected_cost=None): print(problem) print(f"\n==== {problem.name} ====") - result = planner.solve(problem) + result = planner.solve(problem, callback=lambda p: print("New plan with cost: ", cost(problem, p), flush=True)) print("Answer: ", result.status) if result.plan: diff --git a/grpc/server/src/bin/client.rs b/grpc/server/src/bin/client.rs index b91dbf91..0e80774a 100644 --- a/grpc/server/src/bin/client.rs +++ b/grpc/server/src/bin/client.rs @@ -21,7 +21,13 @@ async fn main() -> Result<(), Box> { let response = client.plan_one_shot(request).await?; - println!("RESPONSE={:?}", response.into_inner()); + let mut response = response.into_inner(); + while let Some(msg) = response.message().await? { + println!("GOT: {:?}", &msg); + for log in msg.log_messages { + println!(" [{}] {}", log.level, log.message); + } + } Ok(()) } diff --git a/grpc/server/src/bin/server.rs b/grpc/server/src/bin/server.rs index 5b7b2443..1f5fd167 100644 --- a/grpc/server/src/bin/server.rs +++ b/grpc/server/src/bin/server.rs @@ -1,32 +1,30 @@ use anyhow::{bail, ensure, Error}; use aries_grpc_server::chronicles::problem_to_chronicles; - use aries_grpc_server::serialize::{engine, serialize_plan}; +use aries_model::extensions::SavedAssignment; use aries_planners::solver; -use unified_planning as up; -use up::Problem; - -use unified_planning::unified_planning_server::{UnifiedPlanning, UnifiedPlanningServer}; -use unified_planning::{PlanGenerationResult, PlanRequest}; - use aries_planners::solver::Metric; use aries_planning::chronicles::analysis::hierarchical_is_non_recursive; +use aries_planning::chronicles::FiniteProblem; use async_trait::async_trait; use futures_util::StreamExt; use prost::Message; +use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{transport::Server, Request, Response, Status}; +use unified_planning as up; use unified_planning::metric::MetricKind; +use unified_planning::unified_planning_server::{UnifiedPlanning, UnifiedPlanningServer}; +use unified_planning::{PlanGenerationResult, PlanRequest}; +use up::Problem; -pub fn solve(problem: &up::Problem) -> Result, Error> { - let mut answers = Vec::new(); - //TODO: Get the options from the problem - +/// Solves the given problem, giving any intermediate solution to the callback. +pub fn solve(problem: &up::Problem, on_new_sol: impl Fn(up::Plan) + Clone) -> Result { let strategies = vec![]; let htn_mode = problem.hierarchy.is_some(); - ensure!(problem.metrics.len() <= 1, "Unsupported: more than on metric provided."); + ensure!(problem.metrics.len() <= 1, "Unsupported: multiple metrics provided."); let metric = if let Some(metric) = problem.metrics.get(0) { match up::metric::MetricKind::from_i32(metric.kind) { Some(MetricKind::MinimizeActionCosts) => Some(Metric::ActionCosts), @@ -47,7 +45,24 @@ pub fn solve(problem: &up::Problem) -> Result, Err 0 }; - let result = solver::solve(base_problem, min_depth, max_depth, &strategies, metric, htn_mode)?; + // callback that will be invoked each time an intermediate solution is found + let on_new_solution = |pb: &FiniteProblem, ass: Arc| { + let plan = serialize_plan(problem, pb, &ass); + match plan { + Ok(plan) => on_new_sol(plan), + Err(err) => eprintln!("Error when serializing intermediate plan: {}", err), + } + }; + // run solver + let result = solver::solve( + base_problem, + min_depth, + max_depth, + &strategies, + metric, + htn_mode, + on_new_solution, + )?; if let Some((finite_problem, plan)) = result { println!( "************* PLAN FOUND **************\n\n{}", @@ -59,26 +74,23 @@ pub fn solve(problem: &up::Problem) -> Result, Err up::plan_generation_result::Status::SolvedSatisficing }; let plan = serialize_plan(problem, &finite_problem, &plan)?; - let answer = up::PlanGenerationResult { + Ok(up::PlanGenerationResult { status: status as i32, plan: Some(plan), metrics: Default::default(), log_messages: vec![], engine: Some(aries_grpc_server::serialize::engine()), - }; - answers.push(answer); + }) } else { println!("************* NO PLAN FOUND **************"); - answers.push(up::PlanGenerationResult { + Ok(up::PlanGenerationResult { status: up::plan_generation_result::Status::UnsolvableIncompletely as i32, plan: None, metrics: Default::default(), log_messages: vec![], engine: Some(engine()), - }); + }) } - // TODO: allow sending a stream of answers rather that sending the vector - Ok(answers) } #[derive(Default)] pub struct UnifiedPlanningService {} @@ -88,26 +100,45 @@ impl UnifiedPlanning for UnifiedPlanningService { type planOneShotStream = ReceiverStream>; async fn plan_one_shot(&self, request: Request) -> Result, Status> { - let (tx, rx) = mpsc::channel(4); + let (tx, rx) = mpsc::channel(32); let plan_request = request.into_inner(); let problem = plan_request .problem .ok_or_else(|| Status::aborted("The `problem` field is empty"))?; + let tx2 = tx.clone(); + let on_new_sol = move |plan: up::Plan| { + let answer = up::PlanGenerationResult { + status: up::plan_generation_result::Status::Intermediate as i32, + plan: Some(plan), + metrics: Default::default(), + log_messages: vec![], + engine: Some(aries_grpc_server::serialize::engine()), + }; + + // start a new green thread in charge for sending the result + let tx2 = tx2.clone(); + tokio::spawn(async move { + if tx2.send(Ok(answer)).await.is_err() { + eprintln!("Could not send intermediate solution through the gRPC channel."); + } + }); + }; + + // run a new green thread in which the solver will run tokio::spawn(async move { - let result = solve(&problem); + let result = solve(&problem, on_new_sol); match result { - Ok(answers) => { - for answer in answers { - tx.send(Ok(answer.clone())).await.unwrap(); - } + Ok(answer) => { + tx.send(Ok(answer)).await.unwrap(); } Err(e) => { tx.send(Err(Status::internal(e.to_string()))).await.unwrap(); } } }); + // return the output channel Ok(Response::new(ReceiverStream::new(rx))) } diff --git a/planners/src/bin/lcp.rs b/planners/src/bin/lcp.rs index 806537d4..cac9585b 100644 --- a/planners/src/bin/lcp.rs +++ b/planners/src/bin/lcp.rs @@ -74,7 +74,15 @@ fn main() -> Result<()> { 0 }; - let result = solve(spec, min_depth, max_depth, &opt.strategies, opt.optimize, htn_mode)?; + let result = solve( + spec, + min_depth, + max_depth, + &opt.strategies, + opt.optimize, + htn_mode, + |_, _| {}, + )?; if let Some((finite_problem, assignment)) = result { let plan_out = format_plan(&finite_problem, &assignment, htn_mode)?; println!("{}", plan_out); diff --git a/planners/src/solver.rs b/planners/src/solver.rs index 9a53b7f8..a9053c60 100644 --- a/planners/src/solver.rs +++ b/planners/src/solver.rs @@ -38,7 +38,7 @@ impl FromStr for Metric { "plan-length" | "length" => Ok(Metric::PlanLength), "action-costs" | "costs" => Ok(Metric::ActionCosts), _ => Err(format!( - "Unknown metric: '{}'. Valid options are: 'makespan', 'plan-length'", + "Unknown metric: '{}'. Valid options are: 'makespan', 'plan-length', 'action-costs", s )), } @@ -62,7 +62,8 @@ pub fn solve( strategies: &[Strat], metric: Option, htn_mode: bool, -) -> Result)>> { + on_new_sol: impl Fn(&FiniteProblem, Arc) + Clone, +) -> Result, Arc)>> { println!("===== Preprocessing ======"); aries_planning::chronicles::preprocessing::preprocess(&mut base_problem); println!("=========================="); @@ -86,8 +87,15 @@ pub fn solve( } else { populate_with_template_instances(&mut pb, &base_problem, |_| Some(depth))?; } + let pb = Arc::new(pb); + + let on_new_valid_assignment = { + let pb = pb.clone(); + let on_new_sol = on_new_sol.clone(); + move |ass: Arc| on_new_sol(&pb, ass) + }; println!(" [{:.3}s] Populated", start.elapsed().as_secs_f32()); - let result = solve_finite_problem(&pb, strategies, metric, htn_mode); + let result = solve_finite_problem(&pb, strategies, metric, htn_mode, on_new_valid_assignment); println!(" [{:.3}s] Solved", start.elapsed().as_secs_f32()); if let Some(result) = result { @@ -224,6 +232,7 @@ fn solve_finite_problem( strategies: &[Strat], metric: Option, htn_mode: bool, + on_new_solution: impl Fn(Arc), ) -> Option> { if PRINT_INITIAL_PROPAGATION.get() { propagate_and_print(pb); @@ -242,7 +251,7 @@ fn solve_finite_problem( aries_solver::parallel_solver::ParSolver::new(solver, strats.len(), |id, s| strats[id].adapt_solver(s, pb)); let found_plan = if let Some(metric) = metric { - let res = solver.minimize(metric).unwrap(); + let res = solver.minimize_with(metric, on_new_solution).unwrap(); res.map(|tup| tup.1) } else { solver.solve().unwrap()