Skip to content

Commit

Permalink
merge: #2987
Browse files Browse the repository at this point in the history
2987: Enable parallelism in dependent values update r=paulocsanz a=paulocsanz



Co-authored-by: Paulo Cabral <[email protected]>
  • Loading branch information
si-bors-ng[bot] and paulocsanz authored Nov 29, 2023
2 parents eea2e8d + 7dee112 commit cc0a3aa
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
13 changes: 5 additions & 8 deletions lib/council-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,12 @@ impl Server {

let mut complete_graph = ChangeSetGraph::default();
loop {
for (reply_channel, node_id) in complete_graph.fetch_all_available() {
info!(%reply_channel, %node_id, "Ok to process AttributeValue");
for (reply_channel, node_ids) in complete_graph.fetch_all_available() {
info!(%reply_channel, ?node_ids, "Ok to process AttributeValue");
self.nats
.publish(
reply_channel,
serde_json::to_vec(&Response::OkToProcess {
node_ids: vec![node_id],
})
.unwrap(),
serde_json::to_vec(&Response::OkToProcess { node_ids }).unwrap(),
)
.await
.unwrap();
Expand Down Expand Up @@ -230,9 +227,9 @@ pub async fn job_processed_a_value(
change_set_id: Id,
node_id: Id,
) -> Result<(), Error> {
debug!(%reply_channel, %change_set_id, %node_id, "Job finished processing graph node");
info!(%reply_channel, %change_set_id, %node_id, "Job finished processing graph node");
for reply_channel in
complete_graph.mark_node_as_processed(reply_channel, change_set_id, node_id)?
complete_graph.mark_node_as_processed(&reply_channel, change_set_id, node_id)?
{
info!(%reply_channel, ?node_id, "AttributeValue has been processed by a job");
nats.publish(
Expand Down
10 changes: 5 additions & 5 deletions lib/council-server/src/server/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ impl ChangeSetGraph {
self.dependency_data.is_empty()
}

pub fn fetch_all_available(&mut self) -> Vec<(String, Id)> {
let mut result = Vec::new();
pub fn fetch_all_available(&mut self) -> HashMap<String, Vec<Id>> {
let mut result: HashMap<String, Vec<Id>> = HashMap::new();
for graph in self.dependency_data.values_mut() {
for (id, metadata) in graph.iter_mut() {
if let Some(reply_channel) = metadata.next_to_process() {
result.push((reply_channel, *id));
result.entry(reply_channel.clone()).or_default().push(*id);
}
}
}
Expand Down Expand Up @@ -68,15 +68,15 @@ impl ChangeSetGraph {

pub fn mark_node_as_processed(
&mut self,
reply_channel: String,
reply_channel: &str,
change_set_id: Id,
node_id: Id,
) -> Result<HashSet<String>, Error> {
let change_set_graph_data = self.dependency_data.get_mut(&change_set_id).unwrap();

let (ok_to_remove_node, wanted_by_reply_channels) =
if let Some(node_metadata) = change_set_graph_data.get_mut(&node_id) {
node_metadata.mark_as_processed(&reply_channel)?
node_metadata.mark_as_processed(reply_channel)?
} else {
return Err(Error::UnknownNodeId);
};
Expand Down

0 comments on commit cc0a3aa

Please sign in to comment.