-
Notifications
You must be signed in to change notification settings - Fork 242
/
Copy pathegress.rs
105 lines (91 loc) · 2.79 KB
/
egress.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use crate::prelude::*;
use std::collections::HashMap;
#[derive(Serialize, Deserialize)]
struct EgressTx {
node: NodeIndex,
local: LocalNodeIndex,
dest: ReplicaAddr,
}
#[derive(Serialize, Deserialize)]
pub struct Egress {
txs: Vec<EgressTx>,
tags: HashMap<Tag, NodeIndex>,
}
impl Clone for Egress {
fn clone(&self) -> Self {
assert!(self.txs.is_empty());
Self {
txs: Vec::new(),
tags: self.tags.clone(),
}
}
}
impl Default for Egress {
fn default() -> Self {
Self {
tags: Default::default(),
txs: Default::default(),
}
}
}
impl Egress {
pub fn add_tx(&mut self, dst_g: NodeIndex, dst_l: LocalNodeIndex, addr: ReplicaAddr) {
self.txs.push(EgressTx {
node: dst_g,
local: dst_l,
dest: addr,
});
}
pub fn add_tag(&mut self, tag: Tag, dst: NodeIndex) {
self.tags.insert(tag, dst);
}
pub fn process(
&mut self,
m: &mut Option<Box<Packet>>,
shard: usize,
output: &mut dyn Executor,
) {
let &mut Self {
ref mut txs,
ref tags,
} = self;
// send any queued updates to all external children
assert!(!txs.is_empty());
let txn = txs.len() - 1;
// we need to find the ingress node following this egress according to the path
// with replay.tag, and then forward this message only on the channel corresponding
// to that ingress node.
let replay_to = m.as_ref().unwrap().tag().map(|tag| {
tags.get(&tag)
.cloned()
.expect("egress node told about replay message, but not on replay path")
});
for (txi, ref mut tx) in txs.iter_mut().enumerate() {
let mut take = txi == txn;
if let Some(replay_to) = replay_to.as_ref() {
if *replay_to == tx.node {
take = true;
} else {
continue;
}
}
// Avoid cloning if this is last send
let mut m = if take {
m.take().unwrap()
} else {
// we know this is a data (not a replay)
// because, a replay will force a take
m.as_ref().map(|m| Box::new(m.clone_data())).unwrap()
};
// src is usually ignored and overwritten by ingress
// *except* if the ingress is marked as a shard merger
// in which case it wants to know about the shard
m.link_mut().src = unsafe { LocalNodeIndex::make(shard as u32) };
m.link_mut().dst = tx.local;
output.send(tx.dest, m);
if take {
break;
}
}
}
}