From 8c8fde2ff4cbc4320f75ae41b567ad635d78b8c4 Mon Sep 17 00:00:00 2001 From: Yirmandias Date: Fri, 13 Oct 2023 16:28:59 +0200 Subject: [PATCH] add input stream for parsolver --- examples/sat/main.rs | 7 +++++ examples/scheduling/src/main.rs | 9 ++++++ planning/grpc/server/src/bin/server.rs | 25 ++++++++++++++++ planning/planners/src/bin/lcp.rs | 1 + planning/planners/src/solver.rs | 11 +++++-- planning/unified/deps/unified-planning | 2 +- solver/src/solver/parallel/parallel_solver.rs | 30 ++++++++++++++++++- 7 files changed, 80 insertions(+), 5 deletions(-) diff --git a/examples/sat/main.rs b/examples/sat/main.rs index 32fec4d6..bce47e8c 100644 --- a/examples/sat/main.rs +++ b/examples/sat/main.rs @@ -129,6 +129,13 @@ fn solve_multi_threads(model: Model, opt: &Opt, num_threads: usize) -> Result<() std::process::exit(1); } } + SolverResult::Interrupt(_) => { + println!("INTERRUPT"); + if opt.expected_satisfiability.is_some() { + eprintln!("Error: could not conclude on SAT or UNSAT within the allocated time"); + std::process::exit(1); + } + } } par_solver.print_stats(); diff --git a/examples/scheduling/src/main.rs b/examples/scheduling/src/main.rs index def93b8b..610d72d2 100644 --- a/examples/scheduling/src/main.rs +++ b/examples/scheduling/src/main.rs @@ -136,6 +136,15 @@ fn solve(kind: ProblemKind, instance: &str, opt: &Opt) { solver.print_stats(); println!("TIMEOUT (best solution cost {best_cost}"); } + SolverResult::Interrupt(None) => { + solver.print_stats(); + println!("INTERRUPT (not solution found)"); + } + SolverResult::Interrupt(Some(solution)) => { + let best_cost = solution.var_domain(makespan).lb; + solver.print_stats(); + println!("INTERRUPT (best solution cost {best_cost}"); + } } println!("TOTAL RUNTIME: {:.6}", start_time.elapsed().as_secs_f64()); } diff --git a/planning/grpc/server/src/bin/server.rs b/planning/grpc/server/src/bin/server.rs index 443c930c..cba129a3 100644 --- a/planning/grpc/server/src/bin/server.rs +++ b/planning/grpc/server/src/bin/server.rs @@ -130,6 +130,7 @@ pub fn solve( engine: Some(engine()), }) } + SolverResult::Timeout(opt_plan) => { println!("************* TIMEOUT **************"); let opt_plan = if let Some((finite_problem, plan)) = opt_plan { @@ -153,6 +154,30 @@ pub fn solve( engine: Some(engine()), }) } + + SolverResult::Interrupt(opt_plan) => { + println!("************* INTERRUPT **************"); + let opt_plan = if let Some((finite_problem, plan)) = opt_plan { + println!("\n{}", solver::format_plan(&finite_problem, &plan, htn_mode)?); + Some(serialize_plan(problem, &finite_problem, &plan)?) + } else { + None + }; + + let status = if opt_plan.is_none() { + up::plan_generation_result::Status::Timeout + } else { + up::plan_generation_result::Status::SolvedSatisficing + }; + + Ok(up::PlanGenerationResult { + status: status as i32, + plan: opt_plan, + metrics: Default::default(), + log_messages: vec![], + engine: Some(engine()), + }) + } } } #[derive(Default)] diff --git a/planning/planners/src/bin/lcp.rs b/planning/planners/src/bin/lcp.rs index bb4d52ad..7288b301 100644 --- a/planning/planners/src/bin/lcp.rs +++ b/planning/planners/src/bin/lcp.rs @@ -135,6 +135,7 @@ fn main() -> Result<()> { println!("\nNo plan found"); } SolverResult::Timeout(_) => println!("\nTimeout"), + SolverResult::Interrupt(_) => println!("\nInterrupt"), } Ok(()) diff --git a/planning/planners/src/solver.rs b/planning/planners/src/solver.rs index 449d725a..7c62dcbc 100644 --- a/planning/planners/src/solver.rs +++ b/planning/planners/src/solver.rs @@ -151,7 +151,7 @@ pub fn solve( fn propagate_and_print(pb: &FiniteProblem) -> bool { let Ok(EncodedProblem { model, .. }) = encode(pb, None) else { println!("==> Invalid model"); - return false + return false; }; let mut solver = init_solver(model); @@ -292,8 +292,13 @@ fn solve_finite_problem( if PRINT_INITIAL_PROPAGATION.get() { propagate_and_print(&pb); } - let Ok( EncodedProblem { mut model, objective: metric, encoding }) = encode(&pb, metric) else { - return SolverResult::Unsat + let Ok(EncodedProblem { + mut model, + objective: metric, + encoding, + }) = encode(&pb, metric) + else { + return SolverResult::Unsat; }; if let Some(metric) = metric { model.enforce(metric.le_lit(cost_upper_bound), []); diff --git a/planning/unified/deps/unified-planning b/planning/unified/deps/unified-planning index d0df86de..3376b6ee 160000 --- a/planning/unified/deps/unified-planning +++ b/planning/unified/deps/unified-planning @@ -1 +1 @@ -Subproject commit d0df86de2ee570cfd52c1a1e35d394cd4ba8bd53 +Subproject commit 3376b6eecf766eeb69b7858562ac5f71ac4512ec diff --git a/solver/src/solver/parallel/parallel_solver.rs b/solver/src/solver/parallel/parallel_solver.rs index be5ce73a..aa6bbb8e 100644 --- a/solver/src/solver/parallel/parallel_solver.rs +++ b/solver/src/solver/parallel/parallel_solver.rs @@ -1,7 +1,7 @@ use crate::model::extensions::{AssignmentExt, SavedAssignment, Shaped}; use crate::model::lang::IAtom; use crate::model::{Label, ModelShape}; -use crate::solver::parallel::signals::{InputSignal, InputStream, OutputSignal, SolverOutput, ThreadID}; +use crate::solver::parallel::signals::{InputSignal, InputStream, OutputSignal, SolverOutput, Synchro, ThreadID}; use crate::solver::{Exit, Solver}; use crossbeam_channel::{select, Receiver, Sender}; use std::sync::Arc; @@ -11,6 +11,7 @@ use std::time::{Duration, Instant}; pub struct ParSolver { base_model: ModelShape, solvers: Vec>, + synchro: Synchro, } pub type Solution = Arc; @@ -20,6 +21,7 @@ pub enum SolverResult { Sol(Solution), /// The solver terminated, without a finding a solution Unsat, + Interrupt(Option), /// Teh solver was interrupted due to a timeout. /// It may have found a suboptimal solution. Timeout(Option), @@ -31,6 +33,7 @@ impl SolverResult { SolverResult::Sol(s) => SolverResult::Sol(f(s)), SolverResult::Unsat => SolverResult::Unsat, SolverResult::Timeout(opt_sol) => SolverResult::Timeout(opt_sol.map(f)), + SolverResult::Interrupt(opt_sol) => SolverResult::Interrupt(opt_sol.map(f)), } } } @@ -87,6 +90,7 @@ impl ParSolver { let mut solver = ParSolver { base_model: base_solver.model.shape.clone(), solvers: Vec::with_capacity(num_workers), + synchro: Default::default(), }; for i in 0..(num_workers - 1) { let mut s = base_solver.clone(); @@ -114,6 +118,10 @@ impl ParSolver { rcv } + pub fn input_stream(&mut self) -> InputStream { + self.synchro.input_stream() + } + /// Solve the problem that was given on initialization using all available solvers. pub fn solve(&mut self, deadline: Option) -> SolverResult { self.race_solvers(|s| s.solve(), |_| {}, deadline) @@ -216,6 +224,26 @@ impl ParSolver { Duration::MAX }; select! { + recv(self.synchro.signals) -> res => { + if let Ok(signal) = res { + match signal { + InputSignal::Interrupt => { + for s in &mut self.solvers { + // notify all threads that they should stop ASAP + s.interrupt() + } + let result = match status { + SolverStatus::Pending => SolverResult::Interrupt(None), + SolverStatus::Intermediate(sol) => SolverResult::Interrupt(Some(sol)), + SolverStatus::Final(result) => result, + }; + status = SolverStatus::Final(result); + } + _ => unreachable!() + + } + } + } recv(result_rcv) -> res => { // solver termination let WorkerResult { id: worker_id,