Skip to content

Commit

Permalink
Revert "[runtime] Enhancement: Move pending ops from Catnap to runtime"
Browse files Browse the repository at this point in the history
This reverts commit c7de2cf.
  • Loading branch information
anandbonde committed Nov 21, 2023
1 parent c7de2cf commit aff1d3e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 65 deletions.
5 changes: 2 additions & 3 deletions src/rust/catnap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ impl SharedCatnapLibOS {
}

// Create underlying queue.
let queue: SharedCatnapQueue =
SharedCatnapQueue::new(domain, typ, self.transport.clone(), self.runtime.clone())?;
let queue: SharedCatnapQueue = SharedCatnapQueue::new(domain, typ, self.transport.clone())?;
let qd: QDesc = self.runtime.alloc_queue(queue);
Ok(qd)
}
Expand Down Expand Up @@ -515,7 +514,7 @@ impl SharedCatnapLibOS {
demi_opcode_t::DEMI_OPC_CLOSE => {},
_ => {
match self.get_shared_queue(&QDesc::from(result.qr_qd)) {
Ok(_) => self.runtime.remove_pending_op(&handle),
Ok(mut queue) => queue.remove_pending_op(&handle),
Err(_) => warn!("catnap: qd={:?}, lingering pending op found", result.qr_qd),
};
},
Expand Down
64 changes: 37 additions & 27 deletions src/rust/catnap/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::{
},
DemiRuntime,
QToken,
SharedDemiRuntime,
SharedObject,
},
};
Expand All @@ -38,6 +37,7 @@ use ::socket2::{
};
use ::std::{
any::Any,
collections::HashMap,
net::SocketAddrV4,
ops::{
Deref,
Expand All @@ -63,8 +63,8 @@ pub struct CatnapQueue {
remote: Option<SocketAddrV4>,
/// Underlying network transport.
transport: SharedCatnapTransport,
/// Underlying runtime.
runtime: SharedDemiRuntime,
/// Currently running coroutines.
pending_ops: HashMap<TaskHandle, YielderHandle>,
}

#[derive(Clone)]
Expand All @@ -75,12 +75,7 @@ pub struct SharedCatnapQueue(SharedObject<CatnapQueue>);
//======================================================================================================================

impl CatnapQueue {
pub fn new(
domain: Domain,
typ: Type,
transport: SharedCatnapTransport,
runtime: SharedDemiRuntime,
) -> Result<Self, Fail> {
pub fn new(domain: Domain, typ: Type, transport: SharedCatnapTransport) -> Result<Self, Fail> {
// This was previously checked in the LibOS layer.
debug_assert!(typ == Type::STREAM || typ == Type::DGRAM);

Expand All @@ -99,22 +94,15 @@ impl CatnapQueue {
local: None,
remote: None,
transport,
runtime,
pending_ops: HashMap::<TaskHandle, YielderHandle>::new(),
})
}
}

/// Associate Functions for Catnap LibOS
impl SharedCatnapQueue {
pub fn new(
domain: Domain,
typ: Type,
transport: SharedCatnapTransport,
runtime: SharedDemiRuntime,
) -> Result<Self, Fail> {
Ok(Self(SharedObject::new(CatnapQueue::new(
domain, typ, transport, runtime,
)?)))
pub fn new(domain: Domain, typ: Type, transport: SharedCatnapTransport) -> Result<Self, Fail> {
Ok(Self(SharedObject::new(CatnapQueue::new(domain, typ, transport)?)))
}

/// Binds the target queue to `local` address.
Expand Down Expand Up @@ -164,7 +152,7 @@ impl SharedCatnapQueue {

let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?;

self.runtime.add_pending_op(&task_handle, &yielder_handle);
self.add_pending_op(&task_handle, &yielder_handle);
Ok(task_handle.get_task_id().into())
}

Expand All @@ -187,7 +175,7 @@ impl SharedCatnapQueue {
local: None,
remote: Some(saddr),
transport: self.transport.clone(),
runtime: self.runtime.clone(),
pending_ops: HashMap::<TaskHandle, YielderHandle>::new(),
})));
},
Err(Fail { errno, cause: _ }) if DemiRuntime::should_retry(errno) => {
Expand Down Expand Up @@ -223,7 +211,7 @@ impl SharedCatnapQueue {
let yielder: Yielder = Yielder::new();
let yielder_handle: YielderHandle = yielder.get_handle();
let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?;
self.runtime.add_pending_op(&task_handle, &yielder_handle);
self.add_pending_op(&task_handle, &yielder_handle);
Ok(task_handle.get_task_id().into())
}

Expand Down Expand Up @@ -277,8 +265,7 @@ impl SharedCatnapQueue {
let transport: SharedCatnapTransport = self.transport.clone();
match transport.close(&mut self.socket) {
Ok(()) => {
self.runtime
.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed"));
self.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed"));
Ok(())
},
Err(e) => Err(e),
Expand All @@ -293,8 +280,7 @@ impl SharedCatnapQueue {
loop {
match transport.close(&mut self.socket) {
Ok(()) => {
self.runtime
.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed"));
self.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed"));
return Ok(());
},
Err(Fail { errno, cause: _ }) if DemiRuntime::should_retry(errno) => {
Expand Down Expand Up @@ -417,9 +403,33 @@ impl SharedCatnapQueue {
let yielder: Yielder = Yielder::new();
let yielder_handle: YielderHandle = yielder.get_handle();
let task_handle: TaskHandle = coroutine_constructor(yielder)?;
self.runtime.add_pending_op(&task_handle, &yielder_handle);
self.add_pending_op(&task_handle, &yielder_handle);
Ok(task_handle.get_task_id().into())
}

/// Adds a new operation to the list of pending operations on this queue.
fn add_pending_op(&mut self, handle: &TaskHandle, yielder_handle: &YielderHandle) {
self.pending_ops.insert(handle.clone(), yielder_handle.clone());
}

/// Removes an operation from the list of pending operations on this queue. This function should only be called if
/// add_pending_op() was previously called.
/// TODO: Remove this when we clean up take_result().
/// This function is deprecated, do not use.
/// FIXME: https://github.com/microsoft/demikernel/issues/888
pub fn remove_pending_op(&mut self, handle: &TaskHandle) {
self.pending_ops.remove(handle);
}

/// Cancel all currently pending operations on this queue. If the operation is not complete and the coroutine has
/// yielded, wake the coroutine with an error.
fn cancel_pending_ops(&mut self, cause: Fail) {
for (handle, mut yielder_handle) in self.pending_ops.drain() {
if !handle.has_completed() {
yielder_handle.wake_with(Err(cause.clone()));
}
}
}
}

//======================================================================================================================
Expand Down
39 changes: 4 additions & 35 deletions src/rust/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use crate::{
};
use ::std::{
boxed::Box,
collections::HashMap,
future::Future,
mem,
net::SocketAddrV4,
Expand All @@ -86,13 +85,10 @@ use crate::pal::functions::socketaddrv4_to_sockaddr;
#[cfg(target_os = "linux")]
use crate::pal::linux::socketaddrv4_to_sockaddr;

use self::{
scheduler::YielderHandle,
types::{
demi_accept_result_t,
demi_qr_value_t,
demi_qresult_t,
},
use self::types::{
demi_accept_result_t,
demi_qr_value_t,
demi_qresult_t,
};

//======================================================================================================================
Expand All @@ -112,8 +108,6 @@ pub struct DemiRuntime {
timer: SharedTimer,
/// Shared table for mapping from underlying transport identifiers to queue descriptors.
network_table: NetworkQueueTable,
/// Currently running coroutines.
pending_ops: HashMap<TaskHandle, YielderHandle>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -154,7 +148,6 @@ impl SharedDemiRuntime {
ephemeral_ports: EphemeralPorts::default(),
timer: SharedTimer::new(now),
network_table: NetworkQueueTable::default(),
pending_ops: HashMap::<TaskHandle, YielderHandle>::new(),
}))
}

Expand Down Expand Up @@ -470,30 +463,6 @@ impl SharedDemiRuntime {
},
}
}

/// Adds a new operation to the list of pending operations on this queue.
pub fn add_pending_op(&mut self, handle: &TaskHandle, yielder_handle: &YielderHandle) {
self.pending_ops.insert(handle.clone(), yielder_handle.clone());
}

/// Removes an operation from the list of pending operations on this queue. This function should only be called if
/// add_pending_op() was previously called.
/// TODO: Remove this when we clean up take_result().
/// This function is deprecated, do not use.
/// FIXME: https://github.com/microsoft/demikernel/issues/888
pub fn remove_pending_op(&mut self, handle: &TaskHandle) {
self.pending_ops.remove(handle);
}

/// Cancel all currently pending operations on this queue. If the operation is not complete and the coroutine has
/// yielded, wake the coroutine with an error.
pub fn cancel_pending_ops(&mut self, cause: Fail) {
for (handle, mut yielder_handle) in self.pending_ops.drain() {
if !handle.has_completed() {
yielder_handle.wake_with(Err(cause.clone()));
}
}
}
}

impl<T> SharedObject<T> {
Expand Down

0 comments on commit aff1d3e

Please sign in to comment.