Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instances load balancing #200

Open
wants to merge 1 commit into
base: new-arch-two-buses
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 48 additions & 22 deletions executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
//! planning instances, and computing witnesses for both main and secondary state machines,
//! leveraging parallel processing for efficiency.

use itertools::Itertools;
use p3_field::PrimeField;
use proofman_common::ProofCtx;
use witness::WitnessComponent;
use itertools::Itertools;

use rayon::prelude::*;

Expand Down Expand Up @@ -97,8 +97,11 @@
main_planning
.into_iter()
.filter_map(|plan| {
if let (true, global_idx) = pctx.dctx_add_instance(plan.airgroup_id, plan.air_id, 1)
{
if let (true, global_idx) = pctx.dctx_add_instance(
plan.airgroup_id,
plan.air_id,
pctx.get_weight(plan.airgroup_id, plan.air_id),

Check failure on line 103 in executor/src/executor.rs

View workflow job for this annotation

GitHub Actions / Test on x86_64

mismatched types
) {
Some(MainInstance::new(InstanceCtx::new(global_idx, plan)))
} else {
None
Expand Down Expand Up @@ -196,24 +199,41 @@
Vec<(usize, Box<dyn BusDeviceInstance<F>>)>, // Table instances
Vec<(usize, Box<dyn BusDeviceInstance<F>>)>, // Non-table instances
) {
let gids: Vec<_> = plans
.into_iter()
.enumerate()
.flat_map(|(i, plans_by_sm)| {
plans_by_sm.into_iter().filter_map(move |plan| {
Some((
pctx.dctx_add_instance_no_assign(

Check failure on line 208 in executor/src/executor.rs

View workflow job for this annotation

GitHub Actions / Test on x86_64

no method named `dctx_add_instance_no_assign` found for reference `&ProofCtx<F>` in the current scope
plan.airgroup_id,
plan.air_id,
pctx.get_weight(plan.airgroup_id, plan.air_id),
),
plan.instance_type == InstanceType::Table,
plan,
i,
))
})
})
.collect();

pctx.dctx_assign_instances();

Check failure on line 221 in executor/src/executor.rs

View workflow job for this annotation

GitHub Actions / Test on x86_64

no method named `dctx_assign_instances` found for reference `&ProofCtx<F>` in the current scope

let mut table_instances = Vec::new();
let mut other_instances = Vec::new();

plans.into_iter().enumerate().for_each(|(i, plans_by_sm)| {
plans_by_sm.into_iter().for_each(|plan| {
let (is_mine, global_idx) =
pctx.dctx_add_instance(plan.airgroup_id, plan.air_id, 1);

if is_mine || plan.instance_type == InstanceType::Table {
let ictx = InstanceCtx::new(global_idx, plan);
let instance = (global_idx, self.secondary_sm[i].build_inputs_collector(ictx));
if instance.1.instance_type() == InstanceType::Table {
table_instances.push(instance);
} else {
other_instances.push(instance);
}
gids.into_iter().for_each(|(global_idx, is_table, plan, i)| {
let is_mine = pctx.dctx_is_my_instance(global_idx);
if is_mine || is_table {
let ictx = InstanceCtx::new(global_idx, plan);
let instance = (global_idx, self.secondary_sm[i].build_inputs_collector(ictx));
if is_table {
table_instances.push(instance);
} else {
other_instances.push(instance);
}
});
}
});

(table_instances, other_instances)
Expand Down Expand Up @@ -277,12 +297,18 @@
pctx: &ProofCtx<F>,
table_instances: Vec<(usize, Box<dyn BusDeviceInstance<F>>)>,
) {
let mut instances = table_instances.into_iter().filter(|(_, sec_instance)| sec_instance.instance_type() == InstanceType::Table).collect::<Vec<_>>().into_iter().sorted_by(|(a, _), (b, _)| {
let (airgroup_id_a, air_id_a) = pctx.dctx_get_instance_info(*a);
let (airgroup_id_b, air_id_b) = pctx.dctx_get_instance_info(*b);
let mut instances = table_instances
.into_iter()
.filter(|(_, sec_instance)| sec_instance.instance_type() == InstanceType::Table)
.collect::<Vec<_>>()
.into_iter()
.sorted_by(|(a, _), (b, _)| {
let (airgroup_id_a, air_id_a) = pctx.dctx_get_instance_info(*a);
let (airgroup_id_b, air_id_b) = pctx.dctx_get_instance_info(*b);

airgroup_id_a.cmp(&airgroup_id_b).then(air_id_a.cmp(&air_id_b))
}).collect::<Vec<_>>();
airgroup_id_a.cmp(&airgroup_id_b).then(air_id_a.cmp(&air_id_b))
})
.collect::<Vec<_>>();

instances.iter_mut().for_each(|(global_idx, sec_instance)| {
if sec_instance.instance_type() == InstanceType::Table {
Expand Down
Loading