Skip to content

Commit

Permalink
[runtime] Enhancement: Move pending ops from Catnap to runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
anandbonde committed Nov 21, 2023
1 parent ec766b8 commit c7de2cf
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 43 deletions.
5 changes: 3 additions & 2 deletions src/rust/catnap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ impl SharedCatnapLibOS {
}

// Create underlying queue.
let queue: SharedCatnapQueue = SharedCatnapQueue::new(domain, typ, self.transport.clone())?;
let queue: SharedCatnapQueue =
SharedCatnapQueue::new(domain, typ, self.transport.clone(), self.runtime.clone())?;
let qd: QDesc = self.runtime.alloc_queue(queue);
Ok(qd)
}
Expand Down Expand Up @@ -514,7 +515,7 @@ impl SharedCatnapLibOS {
demi_opcode_t::DEMI_OPC_CLOSE => {},
_ => {
match self.get_shared_queue(&QDesc::from(result.qr_qd)) {
Ok(mut queue) => queue.remove_pending_op(&handle),
Ok(_) => self.runtime.remove_pending_op(&handle),
Err(_) => warn!("catnap: qd={:?}, lingering pending op found", result.qr_qd),
};
},
Expand Down
64 changes: 27 additions & 37 deletions src/rust/catnap/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
},
DemiRuntime,
QToken,
SharedDemiRuntime,
SharedObject,
},
};
Expand All @@ -37,7 +38,6 @@ use ::socket2::{
};
use ::std::{
any::Any,
collections::HashMap,
net::SocketAddrV4,
ops::{
Deref,
Expand All @@ -63,8 +63,8 @@ pub struct CatnapQueue {
remote: Option<SocketAddrV4>,
/// Underlying network transport.
transport: SharedCatnapTransport,
/// Currently running coroutines.
pending_ops: HashMap<TaskHandle, YielderHandle>,
/// Underlying runtime.
runtime: SharedDemiRuntime,
}

#[derive(Clone)]
Expand All @@ -75,7 +75,12 @@ pub struct SharedCatnapQueue(SharedObject<CatnapQueue>);
//======================================================================================================================

impl CatnapQueue {
pub fn new(domain: Domain, typ: Type, transport: SharedCatnapTransport) -> Result<Self, Fail> {
pub fn new(
domain: Domain,
typ: Type,
transport: SharedCatnapTransport,
runtime: SharedDemiRuntime,
) -> Result<Self, Fail> {
// This was previously checked in the LibOS layer.
debug_assert!(typ == Type::STREAM || typ == Type::DGRAM);

Expand All @@ -94,15 +99,22 @@ impl CatnapQueue {
local: None,
remote: None,
transport,
pending_ops: HashMap::<TaskHandle, YielderHandle>::new(),
runtime,
})
}
}

/// Associate Functions for Catnap LibOS
impl SharedCatnapQueue {
pub fn new(domain: Domain, typ: Type, transport: SharedCatnapTransport) -> Result<Self, Fail> {
Ok(Self(SharedObject::new(CatnapQueue::new(domain, typ, transport)?)))
pub fn new(
domain: Domain,
typ: Type,
transport: SharedCatnapTransport,
runtime: SharedDemiRuntime,
) -> Result<Self, Fail> {
Ok(Self(SharedObject::new(CatnapQueue::new(
domain, typ, transport, runtime,
)?)))
}

/// Binds the target queue to `local` address.
Expand Down Expand Up @@ -152,7 +164,7 @@ impl SharedCatnapQueue {

let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?;

self.add_pending_op(&task_handle, &yielder_handle);
self.runtime.add_pending_op(&task_handle, &yielder_handle);
Ok(task_handle.get_task_id().into())
}

Expand All @@ -175,7 +187,7 @@ impl SharedCatnapQueue {
local: None,
remote: Some(saddr),
transport: self.transport.clone(),
pending_ops: HashMap::<TaskHandle, YielderHandle>::new(),
runtime: self.runtime.clone(),
})));
},
Err(Fail { errno, cause: _ }) if DemiRuntime::should_retry(errno) => {
Expand Down Expand Up @@ -211,7 +223,7 @@ impl SharedCatnapQueue {
let yielder: Yielder = Yielder::new();
let yielder_handle: YielderHandle = yielder.get_handle();
let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?;
self.add_pending_op(&task_handle, &yielder_handle);
self.runtime.add_pending_op(&task_handle, &yielder_handle);
Ok(task_handle.get_task_id().into())
}

Expand Down Expand Up @@ -265,7 +277,8 @@ impl SharedCatnapQueue {
let transport: SharedCatnapTransport = self.transport.clone();
match transport.close(&mut self.socket) {
Ok(()) => {
self.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed"));
self.runtime
.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed"));
Ok(())
},
Err(e) => Err(e),
Expand All @@ -280,7 +293,8 @@ impl SharedCatnapQueue {
loop {
match transport.close(&mut self.socket) {
Ok(()) => {
self.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed"));
self.runtime
.cancel_pending_ops(Fail::new(libc::ECANCELED, "This queue was closed"));
return Ok(());
},
Err(Fail { errno, cause: _ }) if DemiRuntime::should_retry(errno) => {
Expand Down Expand Up @@ -403,33 +417,9 @@ impl SharedCatnapQueue {
let yielder: Yielder = Yielder::new();
let yielder_handle: YielderHandle = yielder.get_handle();
let task_handle: TaskHandle = coroutine_constructor(yielder)?;
self.add_pending_op(&task_handle, &yielder_handle);
self.runtime.add_pending_op(&task_handle, &yielder_handle);
Ok(task_handle.get_task_id().into())
}

/// Adds a new operation to the list of pending operations on this queue.
fn add_pending_op(&mut self, handle: &TaskHandle, yielder_handle: &YielderHandle) {
self.pending_ops.insert(handle.clone(), yielder_handle.clone());
}

/// Removes an operation from the list of pending operations on this queue. This function should only be called if
/// add_pending_op() was previously called.
/// TODO: Remove this when we clean up take_result().
/// This function is deprecated, do not use.
/// FIXME: https://github.com/microsoft/demikernel/issues/888
pub fn remove_pending_op(&mut self, handle: &TaskHandle) {
self.pending_ops.remove(handle);
}

/// Cancel all currently pending operations on this queue. If the operation is not complete and the coroutine has
/// yielded, wake the coroutine with an error.
fn cancel_pending_ops(&mut self, cause: Fail) {
for (handle, mut yielder_handle) in self.pending_ops.drain() {
if !handle.has_completed() {
yielder_handle.wake_with(Err(cause.clone()));
}
}
}
}

//======================================================================================================================
Expand Down
39 changes: 35 additions & 4 deletions src/rust/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::{
};
use ::std::{
boxed::Box,
collections::HashMap,
future::Future,
mem,
net::SocketAddrV4,
Expand All @@ -85,10 +86,13 @@ use crate::pal::functions::socketaddrv4_to_sockaddr;
#[cfg(target_os = "linux")]
use crate::pal::linux::socketaddrv4_to_sockaddr;

use self::types::{
demi_accept_result_t,
demi_qr_value_t,
demi_qresult_t,
use self::{
scheduler::YielderHandle,
types::{
demi_accept_result_t,
demi_qr_value_t,
demi_qresult_t,
},
};

//======================================================================================================================
Expand All @@ -108,6 +112,8 @@ pub struct DemiRuntime {
timer: SharedTimer,
/// Shared table for mapping from underlying transport identifiers to queue descriptors.
network_table: NetworkQueueTable,
/// Currently running coroutines.
pending_ops: HashMap<TaskHandle, YielderHandle>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -148,6 +154,7 @@ impl SharedDemiRuntime {
ephemeral_ports: EphemeralPorts::default(),
timer: SharedTimer::new(now),
network_table: NetworkQueueTable::default(),
pending_ops: HashMap::<TaskHandle, YielderHandle>::new(),
}))
}

Expand Down Expand Up @@ -463,6 +470,30 @@ impl SharedDemiRuntime {
},
}
}

/// Adds a new operation to the list of pending operations on this queue.
pub fn add_pending_op(&mut self, handle: &TaskHandle, yielder_handle: &YielderHandle) {
self.pending_ops.insert(handle.clone(), yielder_handle.clone());
}

/// Removes an operation from the list of pending operations on this queue. This function should only be called if
/// add_pending_op() was previously called.
/// TODO: Remove this when we clean up take_result().
/// This function is deprecated, do not use.
/// FIXME: https://github.com/microsoft/demikernel/issues/888
pub fn remove_pending_op(&mut self, handle: &TaskHandle) {
self.pending_ops.remove(handle);
}

/// Cancel all currently pending operations on this queue. If the operation is not complete and the coroutine has
/// yielded, wake the coroutine with an error.
pub fn cancel_pending_ops(&mut self, cause: Fail) {
for (handle, mut yielder_handle) in self.pending_ops.drain() {
if !handle.has_completed() {
yielder_handle.wake_with(Err(cause.clone()));
}
}
}
}

impl<T> SharedObject<T> {
Expand Down

0 comments on commit c7de2cf

Please sign in to comment.