diff --git a/src/rust/catnap/mod.rs b/src/rust/catnap/mod.rs index 240e12e7ad..e00e059dd9 100644 --- a/src/rust/catnap/mod.rs +++ b/src/rust/catnap/mod.rs @@ -196,7 +196,7 @@ impl SharedCatnapLibOS { timer!("catnap::accept"); trace!("accept(): qd={:?}", qd); let self_: Self = self.clone(); - let coroutine = |yielder: Yielder| -> Result { + let coroutine_constructor = |yielder: Yielder| -> Result { // Asynchronous accept code. Clone the self reference and move into the coroutine. let coroutine: Pin> = Box::pin(self.clone().accept_coroutine(qd, yielder)); // Insert async coroutine into the scheduler. @@ -204,7 +204,7 @@ impl SharedCatnapLibOS { self.runtime.insert_coroutine(&task_name, coroutine) }; - Ok(self_.get_shared_queue(&qd)?.accept(coroutine)?) + Ok(self_.get_shared_queue(&qd)?.accept(coroutine_constructor)?) } /// Asynchronous cross-queue code for accepting a connection. This function returns a coroutine that runs @@ -249,14 +249,14 @@ impl SharedCatnapLibOS { // FIXME: add IPv6 support; https://github.com/microsoft/demikernel/issues/935 let remote: SocketAddrV4 = unwrap_socketaddr(remote)?; let me: Self = self.clone(); - let coroutine = |yielder: Yielder| -> Result { + let coroutine_constructor = |yielder: Yielder| -> Result { // Clone the self reference and move into the coroutine. let coroutine: Pin> = Box::pin(self.clone().connect_coroutine(qd, remote, yielder)); let task_name: String = format!("Catnap::connect for qd={:?}", qd); self.runtime.insert_coroutine(&task_name, coroutine) }; - Ok(me.get_shared_queue(&qd)?.connect(coroutine)?) + Ok(me.get_shared_queue(&qd)?.connect(coroutine_constructor)?) } /// Asynchronous code to establish a connection to a remote endpoint. This function returns a coroutine that runs @@ -311,14 +311,14 @@ impl SharedCatnapLibOS { trace!("async_close() qd={:?}", qd); let self_: Self = self.clone(); - let coroutine = |yielder: Yielder| -> Result { + let coroutine_constructor = |yielder: Yielder| -> Result { // Async code to close this queue. let coroutine: Pin> = Box::pin(self.clone().close_coroutine(qd, yielder)); let task_name: String = format!("Catnap::close for qd={:?}", qd); self.runtime.insert_coroutine(&task_name, coroutine) }; - Ok(self_.get_shared_queue(&qd)?.async_close(coroutine)?) + Ok(self_.get_shared_queue(&qd)?.async_close(coroutine_constructor)?) } /// Asynchronous code to close a queue. This function returns a coroutine that runs asynchronously to close a queue @@ -367,12 +367,12 @@ impl SharedCatnapLibOS { return Err(Fail::new(libc::EINVAL, "zero-length buffer")); }; let self_: Self = self.clone(); - let coroutine = |yielder: Yielder| -> Result { + let coroutine_constructor = |yielder: Yielder| -> Result { let coroutine: Pin> = Box::pin(self.clone().push_coroutine(qd, buf, yielder)); let task_name: String = format!("Catnap::push for qd={:?}", qd); self.runtime.insert_coroutine(&task_name, coroutine) }; - Ok(self_.get_shared_queue(&qd)?.push(coroutine)?) + Ok(self_.get_shared_queue(&qd)?.push(coroutine_constructor)?) } /// Asynchronous code to push [buf] to a SharedCatnapQueue and its underlying POSIX socket. This function returns a @@ -412,12 +412,12 @@ impl SharedCatnapLibOS { return Err(Fail::new(libc::EINVAL, "zero-length buffer")); } let self_: Self = self.clone(); - let coroutine = |yielder: Yielder| -> Result { + let coroutine_constructor = |yielder: Yielder| -> Result { let coroutine: Pin> = Box::pin(self.clone().pushto_coroutine(qd, buf, remote, yielder)); let task_name: String = format!("Catnap::pushto for qd={:?}", qd); self.runtime.insert_coroutine(&task_name, coroutine) }; - Ok(self_.get_shared_queue(&qd)?.push(coroutine)?) + Ok(self_.get_shared_queue(&qd)?.push(coroutine_constructor)?) } /// Asynchronous code to pushto [buf] to [remote] on a SharedCatnapQueue and its underlying POSIX socket. This function @@ -458,12 +458,12 @@ impl SharedCatnapLibOS { // We just assert 'size' here, because it was previously checked at PDPIX layer. debug_assert!(size.is_none() || ((size.unwrap() > 0) && (size.unwrap() <= limits::POP_SIZE_MAX))); let self_: Self = self.clone(); - let coroutine = |yielder: Yielder| -> Result { + let coroutine_constructor = |yielder: Yielder| -> Result { let coroutine: Pin> = Box::pin(self.clone().pop_coroutine(qd, size, yielder)); let task_name: String = format!("Catnap::pop for qd={:?}", qd); self.runtime.insert_coroutine(&task_name, coroutine) }; - Ok(self_.get_shared_queue(&qd)?.pop(coroutine)?) + Ok(self_.get_shared_queue(&qd)?.pop(coroutine_constructor)?) } /// Asynchronous code to pop data from a SharedCatnapQueue and its underlying POSIX socket of optional [size]. This diff --git a/src/rust/catnap/queue.rs b/src/rust/catnap/queue.rs index 9a3b09523a..0ef8b11704 100644 --- a/src/rust/catnap/queue.rs +++ b/src/rust/catnap/queue.rs @@ -146,13 +146,7 @@ impl SharedCatnapQueue { F: FnOnce(Yielder) -> Result, { self.state_machine.prepare(SocketOp::Accept)?; - - let yielder: Yielder = Yielder::new(); - let yielder_handle: YielderHandle = yielder.get_handle(); - - let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?; - - self.add_pending_op(&task_handle, &yielder_handle); + let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor)?; Ok(task_handle.get_task_id().into()) } @@ -207,11 +201,7 @@ impl SharedCatnapQueue { F: FnOnce(Yielder) -> Result, { self.state_machine.prepare(SocketOp::Connect)?; - - let yielder: Yielder = Yielder::new(); - let yielder_handle: YielderHandle = yielder.get_handle(); - let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?; - self.add_pending_op(&task_handle, &yielder_handle); + let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor)?; Ok(task_handle.get_task_id().into()) } @@ -255,8 +245,7 @@ impl SharedCatnapQueue { F: FnOnce(Yielder) -> Result, { self.state_machine.prepare(SocketOp::Close)?; - let yielder: Yielder = Yielder::new(); - let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?; + let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor)?; Ok(task_handle.get_task_id().into()) } @@ -370,20 +359,24 @@ impl SharedCatnapQueue { } /// Generic function for spawning a control-path coroutine on [self]. - fn do_generic_sync_control_path_call( - &mut self, - coroutine_constructor: F, - yielder: Yielder, - ) -> Result + /// This variant adds the operation to the list of pending operations. + fn do_generic_sync_control_path_call(&mut self, coroutine_constructor: F) -> Result where F: FnOnce(Yielder) -> Result, { + let yielder: Yielder = Yielder::new(); + let yielder_handle: YielderHandle = yielder.get_handle(); + // Spawn coroutine. match coroutine_constructor(yielder) { // We successfully spawned the coroutine. Ok(handle) => { // Commit the operation on the socket. self.state_machine.commit(); + + // Add to the list of pending operations only if the socket is not closing. + self.add_pending_op(&handle, &yielder_handle); + Ok(handle) }, // We failed to spawn the coroutine. diff --git a/src/rust/runtime/mod.rs b/src/rust/runtime/mod.rs index 6e44b4ad4a..bf94a0ea3b 100644 --- a/src/rust/runtime/mod.rs +++ b/src/rust/runtime/mod.rs @@ -186,7 +186,7 @@ impl SharedDemiRuntime { /// and gets the result immediately. pub fn remove_coroutine_and_get_result(&mut self, handle: &TaskHandle, qt: u64) -> demi_qresult_t { let operation_task: OperationTask = self.remove_coroutine(handle); - let (qd, result) = operation_task.get_result().expect("Coroutine not finished"); + let (qd, result) = operation_task.get_result().expect("coroutine not finished"); self.pack_result(result, qd, qt) }