-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add MessageChannel for async protocol message receiving and sending #93
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice design for message amnagement
And good refactoring
mesh_state: MeshState, | ||
_ctx: C, | ||
_cfg: Config, | ||
_mesh_state: MeshState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we drop this and other unused parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some state actually needs them so we can't just drop them right now. We should instead be simplifying the interface so that we can go back to how we did ctx.mesh_state()
instead of passing them. This will be left for another PR if anyone wants to take it up
let resp = self | ||
.http | ||
.get(url) | ||
.timeout(Duration::from_millis(self.options.state_timeout)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this approach before, but doing .timeout() directly used to cause inconsistent timeouts. Like I set timeout to be 100ms, but actually it takes much longer to timeout. That's why I changed to use tokio::timeout(), which was much more consistent.
Seems to be a common observation, but I forgot the reason why. But I'm not particularly good at multi-task or multi-threading stuff, I'd say better check this issue out online and see if it will continue to be an issue for us now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, let me check myself then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
teach me why after you find out 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I made a test for this, and there's barely any inconsistent timeouts that I would notice. Inside one of the reqwest
request_timeout
test, adding the elapsed check consistently yields 101ms with 100ms timeout. So this shouldn't be an issue
#[tokio::test]
async fn request_timeout() {
let _ = env_logger::try_init();
let server = server::http(move |_req| {
async {
// delay returning the response
tokio::time::sleep(Duration::from_millis(300)).await;
http::Response::default()
}
});
let client = reqwest::Client::builder().no_proxy().build().unwrap();
let url = format!("http://{}/slow", server.addr());
let time = std::time::Instant::now();
let res = client
.get(&url)
.timeout(Duration::from_millis(100))
.send()
.await;
println!("elapsed: {:?}", time.elapsed());
let err = res.unwrap_err();
if cfg!(not(target_arch = "wasm32")) {
assert!(err.is_timeout() && !err.is_connect());
} else {
assert!(err.is_timeout());
}
assert_eq!(err.url().map(|u| u.as_str()), Some(url.as_str()));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the PR where I changed to use tokio::timeout: near/mpc#889,
The description there:
The previous implementation caused troubles: 1) 1s timeout have already stuck the protocol; 2) 500ms timeout still see many timeouts on /state endpoint. I am guessing the .timeout() that came with reqwest package cannot successfully time out an async call, the counting of time may not have considered task/thread switching.
So I switched to using tokio::time::timeout and now problem solved.
It might be useful to verify on dev on taking one node offline, and see if the 1s timeout here would stuck the protocol. If not, then I guess our recent optimization of code has made this earlier issue disappear.
.send() | ||
.await?; | ||
|
||
let status = resp.status(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function would need a timeout too otherwise this might get stuck in unlucky situation like the aurora mainnet incident where their /msg endpoint is unavailable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one already has the default timeout when the Client gets built
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know what the default timeout is? If my memory was correct, I think that was also an inconsistent timeout :( likely similar to the other one I mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its in the new function, where the builder is created
|
||
impl MessageExecutor { | ||
pub async fn execute(mut self) { | ||
let mut interval = tokio::time::interval(Duration::from_millis(100)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean execute() will not clear the inbox if the time taken exceeds 100ms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean by clear here. The inbox never gets cleared unless the protocols themselves takes the specific messages associated to that protocol on state.recv
For interval, when the amount of work exceeds 100ms, the next interval.tick()
immediately runs without a sleep, so we don't sleep at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant exhaust all messages, which is what used to happen. But I get it here because this channel runs its execute() in parallel and there's no definite time that it runs, so having a cap on how much time it consumes is important, otherwise it keeps receiving message so it will always be running
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, good idea. I'll add it in a subsequent PR or maybe this PR if the timeout needs to be adjusted, but this loop is what was in the protocol loop originally so very unlikely it'll block us
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also appreciate your answer in #71 as I am adding metrics to measure delay on each step per protocol, and this change will affect how my definition and implementation should be. Greatly appreciated! 🙏
I'll merge this in and see what dev does overnight with our metrics |
Lots of code here. The crux of what this PR does is:
MessageChannel
that processes inbound and outbound messages in a background task./msg
go intoMessageChannel
to be sorted and finally be received by protocols thorugh theMessageReceiver
interface.MessageChannel::send
to send messages outbound so they no longer block when doing this operation.NodeClient
to callmsg
andstate
endpoints.