Skip to content

Commit

Permalink
[catnap] Enhancement: Centralize yielder and yielder_handle creation
Browse files Browse the repository at this point in the history
  • Loading branch information
anandbonde committed Nov 25, 2023
1 parent 4c6eea7 commit 8581aaa
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 32 deletions.
24 changes: 12 additions & 12 deletions src/rust/catnap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ impl SharedCatnapLibOS {
timer!("catnap::accept");
trace!("accept(): qd={:?}", qd);
let self_: Self = self.clone();
let coroutine = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine_constructor = |yielder: Yielder| -> Result<TaskHandle, Fail> {
// Asynchronous accept code. Clone the self reference and move into the coroutine.
let coroutine: Pin<Box<Operation>> = Box::pin(self.clone().accept_coroutine(qd, yielder));
// Insert async coroutine into the scheduler.
let task_name: String = format!("Catnap::accept for qd={:?}", qd);
self.runtime.insert_coroutine(&task_name, coroutine)
};

Ok(self_.get_shared_queue(&qd)?.accept(coroutine)?)
Ok(self_.get_shared_queue(&qd)?.accept(coroutine_constructor)?)
}

/// Asynchronous cross-queue code for accepting a connection. This function returns a coroutine that runs
Expand Down Expand Up @@ -249,14 +249,14 @@ impl SharedCatnapLibOS {
// FIXME: add IPv6 support; https://github.com/microsoft/demikernel/issues/935
let remote: SocketAddrV4 = unwrap_socketaddr(remote)?;
let me: Self = self.clone();
let coroutine = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine_constructor = |yielder: Yielder| -> Result<TaskHandle, Fail> {
// Clone the self reference and move into the coroutine.
let coroutine: Pin<Box<Operation>> = Box::pin(self.clone().connect_coroutine(qd, remote, yielder));
let task_name: String = format!("Catnap::connect for qd={:?}", qd);
self.runtime.insert_coroutine(&task_name, coroutine)
};

Ok(me.get_shared_queue(&qd)?.connect(coroutine)?)
Ok(me.get_shared_queue(&qd)?.connect(coroutine_constructor)?)
}

/// Asynchronous code to establish a connection to a remote endpoint. This function returns a coroutine that runs
Expand Down Expand Up @@ -311,14 +311,14 @@ impl SharedCatnapLibOS {
trace!("async_close() qd={:?}", qd);

let self_: Self = self.clone();
let coroutine = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine_constructor = |yielder: Yielder| -> Result<TaskHandle, Fail> {
// Async code to close this queue.
let coroutine: Pin<Box<Operation>> = Box::pin(self.clone().close_coroutine(qd, yielder));
let task_name: String = format!("Catnap::close for qd={:?}", qd);
self.runtime.insert_coroutine(&task_name, coroutine)
};

Ok(self_.get_shared_queue(&qd)?.async_close(coroutine)?)
Ok(self_.get_shared_queue(&qd)?.async_close(coroutine_constructor)?)
}

/// Asynchronous code to close a queue. This function returns a coroutine that runs asynchronously to close a queue
Expand Down Expand Up @@ -367,12 +367,12 @@ impl SharedCatnapLibOS {
return Err(Fail::new(libc::EINVAL, "zero-length buffer"));
};
let self_: Self = self.clone();
let coroutine = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine_constructor = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine: Pin<Box<Operation>> = Box::pin(self.clone().push_coroutine(qd, buf, yielder));
let task_name: String = format!("Catnap::push for qd={:?}", qd);
self.runtime.insert_coroutine(&task_name, coroutine)
};
Ok(self_.get_shared_queue(&qd)?.push(coroutine)?)
Ok(self_.get_shared_queue(&qd)?.push(coroutine_constructor)?)
}

/// Asynchronous code to push [buf] to a SharedCatnapQueue and its underlying POSIX socket. This function returns a
Expand Down Expand Up @@ -412,12 +412,12 @@ impl SharedCatnapLibOS {
return Err(Fail::new(libc::EINVAL, "zero-length buffer"));
}
let self_: Self = self.clone();
let coroutine = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine_constructor = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine: Pin<Box<Operation>> = Box::pin(self.clone().pushto_coroutine(qd, buf, remote, yielder));
let task_name: String = format!("Catnap::pushto for qd={:?}", qd);
self.runtime.insert_coroutine(&task_name, coroutine)
};
Ok(self_.get_shared_queue(&qd)?.push(coroutine)?)
Ok(self_.get_shared_queue(&qd)?.push(coroutine_constructor)?)
}

/// Asynchronous code to pushto [buf] to [remote] on a SharedCatnapQueue and its underlying POSIX socket. This function
Expand Down Expand Up @@ -458,12 +458,12 @@ impl SharedCatnapLibOS {
// We just assert 'size' here, because it was previously checked at PDPIX layer.
debug_assert!(size.is_none() || ((size.unwrap() > 0) && (size.unwrap() <= limits::POP_SIZE_MAX)));
let self_: Self = self.clone();
let coroutine = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine_constructor = |yielder: Yielder| -> Result<TaskHandle, Fail> {
let coroutine: Pin<Box<Operation>> = Box::pin(self.clone().pop_coroutine(qd, size, yielder));
let task_name: String = format!("Catnap::pop for qd={:?}", qd);
self.runtime.insert_coroutine(&task_name, coroutine)
};
Ok(self_.get_shared_queue(&qd)?.pop(coroutine)?)
Ok(self_.get_shared_queue(&qd)?.pop(coroutine_constructor)?)
}

/// Asynchronous code to pop data from a SharedCatnapQueue and its underlying POSIX socket of optional [size]. This
Expand Down
31 changes: 12 additions & 19 deletions src/rust/catnap/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,7 @@ impl SharedCatnapQueue {
F: FnOnce(Yielder) -> Result<TaskHandle, Fail>,
{
self.state_machine.prepare(SocketOp::Accept)?;

let yielder: Yielder = Yielder::new();
let yielder_handle: YielderHandle = yielder.get_handle();

let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?;

self.add_pending_op(&task_handle, &yielder_handle);
let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor)?;
Ok(task_handle.get_task_id().into())
}

Expand Down Expand Up @@ -207,11 +201,7 @@ impl SharedCatnapQueue {
F: FnOnce(Yielder) -> Result<TaskHandle, Fail>,
{
self.state_machine.prepare(SocketOp::Connect)?;

let yielder: Yielder = Yielder::new();
let yielder_handle: YielderHandle = yielder.get_handle();
let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?;
self.add_pending_op(&task_handle, &yielder_handle);
let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor)?;
Ok(task_handle.get_task_id().into())
}

Expand Down Expand Up @@ -255,8 +245,7 @@ impl SharedCatnapQueue {
F: FnOnce(Yielder) -> Result<TaskHandle, Fail>,
{
self.state_machine.prepare(SocketOp::Close)?;
let yielder: Yielder = Yielder::new();
let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor, yielder)?;
let task_handle: TaskHandle = self.do_generic_sync_control_path_call(coroutine_constructor)?;
Ok(task_handle.get_task_id().into())
}

Expand Down Expand Up @@ -370,20 +359,24 @@ impl SharedCatnapQueue {
}

/// Generic function for spawning a control-path coroutine on [self].
fn do_generic_sync_control_path_call<F>(
&mut self,
coroutine_constructor: F,
yielder: Yielder,
) -> Result<TaskHandle, Fail>
/// This variant adds the operation to the list of pending operations.
fn do_generic_sync_control_path_call<F>(&mut self, coroutine_constructor: F) -> Result<TaskHandle, Fail>
where
F: FnOnce(Yielder) -> Result<TaskHandle, Fail>,
{
let yielder: Yielder = Yielder::new();
let yielder_handle: YielderHandle = yielder.get_handle();

// Spawn coroutine.
match coroutine_constructor(yielder) {
// We successfully spawned the coroutine.
Ok(handle) => {
// Commit the operation on the socket.
self.state_machine.commit();

// Add to the list of pending operations only if the socket is not closing.
self.add_pending_op(&handle, &yielder_handle);

Ok(handle)
},
// We failed to spawn the coroutine.
Expand Down
2 changes: 1 addition & 1 deletion src/rust/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl SharedDemiRuntime {
/// and gets the result immediately.
pub fn remove_coroutine_and_get_result(&mut self, handle: &TaskHandle, qt: u64) -> demi_qresult_t {
let operation_task: OperationTask = self.remove_coroutine(handle);
let (qd, result) = operation_task.get_result().expect("Coroutine not finished");
let (qd, result) = operation_task.get_result().expect("coroutine not finished");
self.pack_result(result, qd, qt)
}

Expand Down

0 comments on commit 8581aaa

Please sign in to comment.