diff --git a/src/rust/catnap/mod.rs b/src/rust/catnap/mod.rs index 7b807096a..240e12e7a 100644 --- a/src/rust/catnap/mod.rs +++ b/src/rust/catnap/mod.rs @@ -128,8 +128,7 @@ impl SharedCatnapLibOS { } // Create underlying queue. - let queue: SharedCatnapQueue = - SharedCatnapQueue::new(domain, typ, self.transport.clone(), self.runtime.clone())?; + let queue: SharedCatnapQueue = SharedCatnapQueue::new(domain, typ, self.transport.clone())?; let qd: QDesc = self.runtime.alloc_queue(queue); Ok(qd) } @@ -515,7 +514,7 @@ impl SharedCatnapLibOS { demi_opcode_t::DEMI_OPC_CLOSE => {}, _ => { match self.get_shared_queue(&QDesc::from(result.qr_qd)) { - Ok(_) => self.runtime.remove_pending_op(&handle), + Ok(mut queue) => queue.remove_pending_op(&handle), Err(_) => warn!("catnap: qd={:?}, lingering pending op found", result.qr_qd), }; }, diff --git a/src/rust/catnap/queue.rs b/src/rust/catnap/queue.rs index 56c32c025..9a3b09523 100644 --- a/src/rust/catnap/queue.rs +++ b/src/rust/catnap/queue.rs @@ -27,7 +27,6 @@ use crate::{ }, DemiRuntime, QToken, - SharedDemiRuntime, SharedObject, }, }; @@ -38,6 +37,7 @@ use ::socket2::{ }; use ::std::{ any::Any, + collections::HashMap, net::SocketAddrV4, ops::{ Deref, @@ -63,8 +63,8 @@ pub struct CatnapQueue { remote: Option, /// Underlying network transport. transport: SharedCatnapTransport, - /// Underlying runtime. - runtime: SharedDemiRuntime, + /// Currently running coroutines. + pending_ops: HashMap, } #[derive(Clone)] @@ -75,12 +75,7 @@ pub struct SharedCatnapQueue(SharedObject); //====================================================================================================================== impl CatnapQueue { - pub fn new( - domain: Domain, - typ: Type, - transport: SharedCatnapTransport, - runtime: SharedDemiRuntime, - ) -> Result { + pub fn new(domain: Domain, typ: Type, transport: SharedCatnapTransport) -> Result { // This was previously checked in the LibOS layer. debug_assert!(typ == Type::STREAM || typ == Type::DGRAM); @@ -99,22 +94,15 @@ impl CatnapQueue { local: None, remote: None, transport, - runtime, + pending_ops: HashMap::::new(), }) } } /// Associate Functions for Catnap LibOS impl SharedCatnapQueue { - pub fn new( - domain: Domain, - typ: Type, - transport: SharedCatnapTransport, - runtime: SharedDemiRuntime, - ) -> Result { - Ok(Self(SharedObject::new(CatnapQueue::new( - domain, typ, transport, runtime, - )?))) + pub fn new(domain: Domain, typ: Type, transport: SharedCatnapTransport) -> Result { + Ok(Self(SharedObject::new(CatnapQueue::new(domain, typ, transport)?))) } /// Binds the target queue to `local` address. @@ -164,7 +152,7 @@ impl SharedCatnapQueue { let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?; - self.runtime.add_pending_op(&task_handle, &yielder_handle); + self.add_pending_op(&task_handle, &yielder_handle); Ok(task_handle.get_task_id().into()) } @@ -187,7 +175,7 @@ impl SharedCatnapQueue { local: None, remote: Some(saddr), transport: self.transport.clone(), - runtime: self.runtime.clone(), + pending_ops: HashMap::::new(), }))); }, Err(Fail { errno, cause: _ }) if DemiRuntime::should_retry(errno) => { @@ -223,7 +211,7 @@ impl SharedCatnapQueue { let yielder: Yielder = Yielder::new(); let yielder_handle: YielderHandle = yielder.get_handle(); let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?; - self.runtime.add_pending_op(&task_handle, &yielder_handle); + self.add_pending_op(&task_handle, &yielder_handle); Ok(task_handle.get_task_id().into()) } @@ -277,8 +265,7 @@ impl SharedCatnapQueue { let transport: SharedCatnapTransport = self.transport.clone(); match transport.close(&mut self.socket) { Ok(()) => { - self.runtime - .cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed")); + self.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed")); Ok(()) }, Err(e) => Err(e), @@ -293,8 +280,7 @@ impl SharedCatnapQueue { loop { match transport.close(&mut self.socket) { Ok(()) => { - self.runtime - .cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed")); + self.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed")); return Ok(()); }, Err(Fail { errno, cause: _ }) if DemiRuntime::should_retry(errno) => { @@ -417,9 +403,33 @@ impl SharedCatnapQueue { let yielder: Yielder = Yielder::new(); let yielder_handle: YielderHandle = yielder.get_handle(); let task_handle: TaskHandle = coroutine_constructor(yielder)?; - self.runtime.add_pending_op(&task_handle, &yielder_handle); + self.add_pending_op(&task_handle, &yielder_handle); Ok(task_handle.get_task_id().into()) } + + /// Adds a new operation to the list of pending operations on this queue. + fn add_pending_op(&mut self, handle: &TaskHandle, yielder_handle: &YielderHandle) { + self.pending_ops.insert(handle.clone(), yielder_handle.clone()); + } + + /// Removes an operation from the list of pending operations on this queue. This function should only be called if + /// add_pending_op() was previously called. + /// TODO: Remove this when we clean up take_result(). + /// This function is deprecated, do not use. + /// FIXME: https://github.com/microsoft/demikernel/issues/888 + pub fn remove_pending_op(&mut self, handle: &TaskHandle) { + self.pending_ops.remove(handle); + } + + /// Cancel all currently pending operations on this queue. If the operation is not complete and the coroutine has + /// yielded, wake the coroutine with an error. + fn cancel_pending_ops(&mut self, cause: Fail) { + for (handle, mut yielder_handle) in self.pending_ops.drain() { + if !handle.has_completed() { + yielder_handle.wake_with(Err(cause.clone())); + } + } + } } //====================================================================================================================== diff --git a/src/rust/runtime/mod.rs b/src/rust/runtime/mod.rs index ba36a2f52..5baa65794 100644 --- a/src/rust/runtime/mod.rs +++ b/src/rust/runtime/mod.rs @@ -60,7 +60,6 @@ use crate::{ }; use ::std::{ boxed::Box, - collections::HashMap, future::Future, mem, net::SocketAddrV4, @@ -86,13 +85,10 @@ use crate::pal::functions::socketaddrv4_to_sockaddr; #[cfg(target_os = "linux")] use crate::pal::linux::socketaddrv4_to_sockaddr; -use self::{ - scheduler::YielderHandle, - types::{ - demi_accept_result_t, - demi_qr_value_t, - demi_qresult_t, - }, +use self::types::{ + demi_accept_result_t, + demi_qr_value_t, + demi_qresult_t, }; //====================================================================================================================== @@ -112,8 +108,6 @@ pub struct DemiRuntime { timer: SharedTimer, /// Shared table for mapping from underlying transport identifiers to queue descriptors. network_table: NetworkQueueTable, - /// Currently running coroutines. - pending_ops: HashMap, } #[derive(Clone)] @@ -154,7 +148,6 @@ impl SharedDemiRuntime { ephemeral_ports: EphemeralPorts::default(), timer: SharedTimer::new(now), network_table: NetworkQueueTable::default(), - pending_ops: HashMap::::new(), })) } @@ -470,30 +463,6 @@ impl SharedDemiRuntime { }, } } - - /// Adds a new operation to the list of pending operations on this queue. - pub fn add_pending_op(&mut self, handle: &TaskHandle, yielder_handle: &YielderHandle) { - self.pending_ops.insert(handle.clone(), yielder_handle.clone()); - } - - /// Removes an operation from the list of pending operations on this queue. This function should only be called if - /// add_pending_op() was previously called. - /// TODO: Remove this when we clean up take_result(). - /// This function is deprecated, do not use. - /// FIXME: https://github.com/microsoft/demikernel/issues/888 - pub fn remove_pending_op(&mut self, handle: &TaskHandle) { - self.pending_ops.remove(handle); - } - - /// Cancel all currently pending operations on this queue. If the operation is not complete and the coroutine has - /// yielded, wake the coroutine with an error. - pub fn cancel_pending_ops(&mut self, cause: Fail) { - for (handle, mut yielder_handle) in self.pending_ops.drain() { - if !handle.has_completed() { - yielder_handle.wake_with(Err(cause.clone())); - } - } - } } impl SharedObject {