Skip to content

Commit

Permalink
[grpc] Send intermediate solution to gRPC client
Browse files Browse the repository at this point in the history
  • Loading branch information
arbimo committed Oct 3, 2022
1 parent cccb43c commit 31f0bb1
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 39 deletions.
2 changes: 1 addition & 1 deletion ext/up/unified_planning
Submodule unified_planning updated 169 files
22 changes: 16 additions & 6 deletions ext/up/up_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ def _solve(self, problem: 'up.model.AbstractProblem',
for response in response_stream:
response = self._reader.convert(response, problem)
assert isinstance(response, up.engines.results.PlanGenerationResult)
if response.status == PlanGenerationResultStatus.INTERMEDIATE and callback is not None:
if response.status != PlanGenerationResultStatus.INTERMEDIATE:
return response
elif callback:
callback(response)
else:
return response
pass # Intermediate plan but no callback


aries_path = '.' # Assumes that the script is launched from whithin Aries's repository
Expand All @@ -73,9 +75,9 @@ def __init__(self, port: int):
build = subprocess.Popen(aries_build_cmd, shell=True, cwd=aries_path)
build.wait()
print(f"Launching Aries gRPC server (logs at {log_file})...")
# logs = open(log_file, "w")
# subprocess.Popen([f"{aries_exe}"], cwd=aries_path, shell=True, stdout=logs, stderr=logs)
subprocess.Popen([f"{aries_exe}"], cwd=aries_path, shell=True, stdout=sys.stdout, stderr=sys.stderr)
logs = open(log_file, "w")
subprocess.Popen([f"{aries_exe}"], cwd=aries_path, shell=True, stdout=logs, stderr=logs)
# subprocess.Popen([f"{aries_exe}"], cwd=aries_path, shell=True, stdout=sys.stdout, stderr=sys.stderr)
time.sleep(.1) # Let a few milliseconds pass to make sure the server is up and running
GRPCPlanner.__init__(self, host="localhost", port=port)

Expand Down Expand Up @@ -115,8 +117,16 @@ def cost(problem, plan):
return None
assert len(problem.quality_metrics) == 1
metric = problem.quality_metrics[0]
if metric == None:
return None
if isinstance(metric, up.model.metrics.MinimizeActionCosts):
return sum([metric.get_action_cost(action_instance.action).int_constant_value() for _, action_instance, _ in plan.timed_actions])
elif isinstance(metric, up.model.metrics.MinimizeMakespan):
if isinstance(plan, up.plans.TimeTriggeredPlan):
return max([start + (dur if dur else 0) for start, _, dur in plan.timed_actions] + [0])
else:
raise ValueError("The makespan metric is only defined for time-triggerered plan")

else:
raise ValueError("Unsupported metric: ", metric)

Expand All @@ -127,7 +137,7 @@ def cost(problem, plan):
def plan(problem, expected_cost=None):
print(problem)
print(f"\n==== {problem.name} ====")
result = planner.solve(problem)
result = planner.solve(problem, callback=lambda p: print("New plan with cost: ", cost(problem, p), flush=True))

print("Answer: ", result.status)
if result.plan:
Expand Down
8 changes: 7 additions & 1 deletion grpc/server/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let response = client.plan_one_shot(request).await?;

println!("RESPONSE={:?}", response.into_inner());
let mut response = response.into_inner();
while let Some(msg) = response.message().await? {
println!("GOT: {:?}", &msg);
for log in msg.log_messages {
println!(" [{}] {}", log.level, log.message);
}
}

Ok(())
}
83 changes: 57 additions & 26 deletions grpc/server/src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
use anyhow::{bail, ensure, Error};
use aries_grpc_server::chronicles::problem_to_chronicles;

use aries_grpc_server::serialize::{engine, serialize_plan};
use aries_model::extensions::SavedAssignment;
use aries_planners::solver;
use unified_planning as up;
use up::Problem;

use unified_planning::unified_planning_server::{UnifiedPlanning, UnifiedPlanningServer};
use unified_planning::{PlanGenerationResult, PlanRequest};

use aries_planners::solver::Metric;
use aries_planning::chronicles::analysis::hierarchical_is_non_recursive;
use aries_planning::chronicles::FiniteProblem;
use async_trait::async_trait;
use futures_util::StreamExt;
use prost::Message;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
use unified_planning as up;
use unified_planning::metric::MetricKind;
use unified_planning::unified_planning_server::{UnifiedPlanning, UnifiedPlanningServer};
use unified_planning::{PlanGenerationResult, PlanRequest};
use up::Problem;

pub fn solve(problem: &up::Problem) -> Result<Vec<up::PlanGenerationResult>, Error> {
let mut answers = Vec::new();
//TODO: Get the options from the problem

/// Solves the given problem, giving any intermediate solution to the callback.
pub fn solve(problem: &up::Problem, on_new_sol: impl Fn(up::Plan) + Clone) -> Result<up::PlanGenerationResult, Error> {
let strategies = vec![];
let htn_mode = problem.hierarchy.is_some();

ensure!(problem.metrics.len() <= 1, "Unsupported: more than on metric provided.");
ensure!(problem.metrics.len() <= 1, "Unsupported: multiple metrics provided.");
let metric = if let Some(metric) = problem.metrics.get(0) {
match up::metric::MetricKind::from_i32(metric.kind) {
Some(MetricKind::MinimizeActionCosts) => Some(Metric::ActionCosts),
Expand All @@ -47,7 +45,24 @@ pub fn solve(problem: &up::Problem) -> Result<Vec<up::PlanGenerationResult>, Err
0
};

let result = solver::solve(base_problem, min_depth, max_depth, &strategies, metric, htn_mode)?;
// callback that will be invoked each time an intermediate solution is found
let on_new_solution = |pb: &FiniteProblem, ass: Arc<SavedAssignment>| {
let plan = serialize_plan(problem, pb, &ass);
match plan {
Ok(plan) => on_new_sol(plan),
Err(err) => eprintln!("Error when serializing intermediate plan: {}", err),
}
};
// run solver
let result = solver::solve(
base_problem,
min_depth,
max_depth,
&strategies,
metric,
htn_mode,
on_new_solution,
)?;
if let Some((finite_problem, plan)) = result {
println!(
"************* PLAN FOUND **************\n\n{}",
Expand All @@ -59,26 +74,23 @@ pub fn solve(problem: &up::Problem) -> Result<Vec<up::PlanGenerationResult>, Err
up::plan_generation_result::Status::SolvedSatisficing
};
let plan = serialize_plan(problem, &finite_problem, &plan)?;
let answer = up::PlanGenerationResult {
Ok(up::PlanGenerationResult {
status: status as i32,
plan: Some(plan),
metrics: Default::default(),
log_messages: vec![],
engine: Some(aries_grpc_server::serialize::engine()),
};
answers.push(answer);
})
} else {
println!("************* NO PLAN FOUND **************");
answers.push(up::PlanGenerationResult {
Ok(up::PlanGenerationResult {
status: up::plan_generation_result::Status::UnsolvableIncompletely as i32,
plan: None,
metrics: Default::default(),
log_messages: vec![],
engine: Some(engine()),
});
})
}
// TODO: allow sending a stream of answers rather that sending the vector
Ok(answers)
}
#[derive(Default)]
pub struct UnifiedPlanningService {}
Expand All @@ -88,26 +100,45 @@ impl UnifiedPlanning for UnifiedPlanningService {
type planOneShotStream = ReceiverStream<Result<PlanGenerationResult, Status>>;

async fn plan_one_shot(&self, request: Request<PlanRequest>) -> Result<Response<Self::planOneShotStream>, Status> {
let (tx, rx) = mpsc::channel(4);
let (tx, rx) = mpsc::channel(32);
let plan_request = request.into_inner();

let problem = plan_request
.problem
.ok_or_else(|| Status::aborted("The `problem` field is empty"))?;

let tx2 = tx.clone();
let on_new_sol = move |plan: up::Plan| {
let answer = up::PlanGenerationResult {
status: up::plan_generation_result::Status::Intermediate as i32,
plan: Some(plan),
metrics: Default::default(),
log_messages: vec![],
engine: Some(aries_grpc_server::serialize::engine()),
};

// start a new green thread in charge for sending the result
let tx2 = tx2.clone();
tokio::spawn(async move {
if tx2.send(Ok(answer)).await.is_err() {
eprintln!("Could not send intermediate solution through the gRPC channel.");
}
});
};

// run a new green thread in which the solver will run
tokio::spawn(async move {
let result = solve(&problem);
let result = solve(&problem, on_new_sol);
match result {
Ok(answers) => {
for answer in answers {
tx.send(Ok(answer.clone())).await.unwrap();
}
Ok(answer) => {
tx.send(Ok(answer)).await.unwrap();
}
Err(e) => {
tx.send(Err(Status::internal(e.to_string()))).await.unwrap();
}
}
});
// return the output channel
Ok(Response::new(ReceiverStream::new(rx)))
}

Expand Down
10 changes: 9 additions & 1 deletion planners/src/bin/lcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ fn main() -> Result<()> {
0
};

let result = solve(spec, min_depth, max_depth, &opt.strategies, opt.optimize, htn_mode)?;
let result = solve(
spec,
min_depth,
max_depth,
&opt.strategies,
opt.optimize,
htn_mode,
|_, _| {},
)?;
if let Some((finite_problem, assignment)) = result {
let plan_out = format_plan(&finite_problem, &assignment, htn_mode)?;
println!("{}", plan_out);
Expand Down
17 changes: 13 additions & 4 deletions planners/src/solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl FromStr for Metric {
"plan-length" | "length" => Ok(Metric::PlanLength),
"action-costs" | "costs" => Ok(Metric::ActionCosts),
_ => Err(format!(
"Unknown metric: '{}'. Valid options are: 'makespan', 'plan-length'",
"Unknown metric: '{}'. Valid options are: 'makespan', 'plan-length', 'action-costs",
s
)),
}
Expand All @@ -62,7 +62,8 @@ pub fn solve(
strategies: &[Strat],
metric: Option<Metric>,
htn_mode: bool,
) -> Result<Option<(FiniteProblem, Arc<Domains>)>> {
on_new_sol: impl Fn(&FiniteProblem, Arc<SavedAssignment>) + Clone,
) -> Result<Option<(Arc<FiniteProblem>, Arc<Domains>)>> {
println!("===== Preprocessing ======");
aries_planning::chronicles::preprocessing::preprocess(&mut base_problem);
println!("==========================");
Expand All @@ -86,8 +87,15 @@ pub fn solve(
} else {
populate_with_template_instances(&mut pb, &base_problem, |_| Some(depth))?;
}
let pb = Arc::new(pb);

let on_new_valid_assignment = {
let pb = pb.clone();
let on_new_sol = on_new_sol.clone();
move |ass: Arc<SavedAssignment>| on_new_sol(&pb, ass)
};
println!(" [{:.3}s] Populated", start.elapsed().as_secs_f32());
let result = solve_finite_problem(&pb, strategies, metric, htn_mode);
let result = solve_finite_problem(&pb, strategies, metric, htn_mode, on_new_valid_assignment);
println!(" [{:.3}s] Solved", start.elapsed().as_secs_f32());

if let Some(result) = result {
Expand Down Expand Up @@ -224,6 +232,7 @@ fn solve_finite_problem(
strategies: &[Strat],
metric: Option<Metric>,
htn_mode: bool,
on_new_solution: impl Fn(Arc<SavedAssignment>),
) -> Option<std::sync::Arc<SavedAssignment>> {
if PRINT_INITIAL_PROPAGATION.get() {
propagate_and_print(pb);
Expand All @@ -242,7 +251,7 @@ fn solve_finite_problem(
aries_solver::parallel_solver::ParSolver::new(solver, strats.len(), |id, s| strats[id].adapt_solver(s, pb));

let found_plan = if let Some(metric) = metric {
let res = solver.minimize(metric).unwrap();
let res = solver.minimize_with(metric, on_new_solution).unwrap();
res.map(|tup| tup.1)
} else {
solver.solve().unwrap()
Expand Down

0 comments on commit 31f0bb1

Please sign in to comment.