Skip to content

Commit

Permalink
Task aware Datums (#94)
Browse files Browse the repository at this point in the history
* rewrite Datum to contain task cancellation logic
  • Loading branch information
srid authored Oct 26, 2023
1 parent 06857c2 commit 0c45504
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 109 deletions.
61 changes: 17 additions & 44 deletions src/app/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,44 +106,25 @@ impl AppState {
let idx = *refresh_action.read();
use_future(cx, (&flake_url, &idx), |(flake_url, idx)| async move {
tracing::info!("Updating flake [{}] {} ...", flake_url, idx);
// Abort previously running task, otherwise Datum refresh will panic
// TODO: Refactor this by changing the [Datum] type to be a
// struct (not enum) containing the
// `CopyValue<Option<JoinHandle<T>>>`
self.flake_task_abort.with_mut(|abort_handle| {
if let Some(abort_handle) = abort_handle.take() {
abort_handle.abort();
}
});
Datum::refresh_with(self.flake, async move {
let join_handle = tokio::spawn(async move {
Flake::from_nix(&nix_rs::command::NixCmd::default(), flake_url.clone())
.await
});
*self.flake_task_abort.write() = Some(join_handle.abort_handle());
let v = join_handle.await.unwrap();
*self.flake_task_abort.write() = None;
v
Flake::from_nix(&nix_rs::command::NixCmd::default(), flake_url.clone()).await
})
.await;
.await
});
}

// Build `state.health_checks` when nix_info or nix_env changes
// Build `state.health_checks` when nix_info changes
{
let nix_info = self.nix_info.read().clone();
use_future(cx, (&nix_info,), |(nix_info,)| async move {
if let Some(nix_info) = nix_info.current_value() {
Datum::refresh_with(self.health_checks, async {
let get_nix_health =
move || -> Result<Vec<nix_health::traits::Check>, SystemError> {
let nix_info = nix_info
.as_ref()
.map_err(|e| Into::<SystemError>::into(e.to_string()))?;
let health_checks = NixHealth::default().run_checks(nix_info, None);
Ok(health_checks)
};
get_nix_health()
use_future(cx, (&nix_info,), |(nix_info1,)| async move {
if let Some(nix_info) = nix_info1.current_value().map(|x| {
x.as_ref()
.map_err(|e| Into::<SystemError>::into(e.to_string()))
.map(|v| v.clone())
}) {
Datum::refresh_with(self.health_checks, async move {
let health_checks = NixHealth::default().run_checks(&nix_info?, None);
Ok(health_checks)
})
.await;
}
Expand All @@ -158,19 +139,11 @@ impl AppState {
use_future(cx, (&idx,), |(idx,)| async move {
tracing::info!("Updating nix info [{}] ...", idx);
Datum::refresh_with(self.nix_info, async {
// NOTE: Without tokio::spawn, this will run in main desktop thread,
// and will hang at some point.
let nix_info = tokio::spawn(async move {
nix_rs::info::NixInfo::from_nix(&nix_rs::command::NixCmd::default())
.await
.map_err(|e| SystemError {
message: format!("Error getting nix info: {:?}", e),
})
})
.await
.unwrap();
tracing::debug!("Got nix info, about to mut");
nix_info
nix_rs::info::NixInfo::from_nix(&nix_rs::command::NixCmd::default())
.await
.map_err(|e| SystemError {
message: format!("Error getting nix info: {:?}", e),
})
})
.await;
});
Expand Down
121 changes: 56 additions & 65 deletions src/app/state/datum.rs
Original file line number Diff line number Diff line change
@@ -1,92 +1,83 @@
use std::{fmt::Display, future::Future};

use dioxus::prelude::*;
use dioxus_signals::Signal;
use dioxus_signals::{CopyValue, Signal};
use tokio::task::AbortHandle;

/// Represent loading/refreshing state of UI data
#[derive(Debug, Default, Clone, Copy, PartialEq)]
pub enum Datum<T> {
#[default]
Loading,
Available {
value: T,
refreshing: bool,
},
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Datum<T> {
/// The current value of the datum
value: Option<T>,
/// If the datum is currently being loaded or refresh, this contains the
/// [AbortHandle] to abort that loading/refreshing process.
task: CopyValue<Option<AbortHandle>>,
}

impl<T> Default for Datum<T> {
fn default() -> Self {
Self {
value: None,
task: CopyValue::default(),
}
}
}

impl<T> Datum<T> {
pub fn is_loading_or_refreshing(&self) -> bool {
matches!(
self,
Datum::Loading
| Datum::Available {
value: _,
refreshing: true
}
)
self.task.read().is_some()
}

/// Get the inner value if available
pub fn current_value(&self) -> Option<&T> {
match self {
Datum::Loading => None,
Datum::Available {
value: x,
refreshing: _,
} => Some(x),
}
}

/// Set the datum value
///
/// Use [refresh_with] if the value is produced by a long-running task.
fn set_value(&mut self, value: T) {
tracing::debug!("🍒 Setting {} datum value", std::any::type_name::<T>());
*self = Datum::Available {
value,
refreshing: false,
}
}

/// Mark the datum is being-refreshed
///
/// Do this just prior to doing a long-running task that will provide a
/// value to be set using [set_value]
fn mark_refreshing(&mut self) {
if let Datum::Available {
value: _,
refreshing,
} = self
{
if *refreshing {
tracing::error!(
"Cannot refresh already refreshing data: {}",
std::any::type_name::<T>()
);
panic!("Cannot refresh already refreshing data");
}
tracing::debug!(
"🍒 Marking {} datum as refreshing",
std::any::type_name::<T>()
);
*refreshing = true;
}
self.value.as_ref()
}

/// Refresh the datum [Signal] using the given function
///
/// Refresh state is automatically set.
/// If a previous refresh is still running, it will be cancelled.
pub async fn refresh_with<F>(signal: Signal<Self>, f: F)
where
F: Future<Output = T>,
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
// Cancel existing fetcher if any.
signal.with_mut(move |x| {
x.mark_refreshing();
if let Some(abort_handle) = x.task.take() {
abort_handle.abort();
}
});
let val = f.await;

// NOTE: We must spawn a tasks (using tokio::spawn), otherwise this
// will run in main desktop thread, and will hang at some point.
let join_handle = tokio::spawn(f);

// Store the [AbortHandle] for cancelling latter.
let abort_handle = join_handle.abort_handle();
signal.with_mut(move |x| {
x.set_value(val);
*x.task.write() = Some(abort_handle);
});

// Wait for result and update the signal state.
match join_handle.await {
Ok(val) => {
signal.with_mut(move |x| {
tracing::debug!("🍒 Setting {} datum value", std::any::type_name::<T>());
x.value = Some(val);
*x.task.write() = None;
});
}
Err(err) => {
if !err.is_cancelled() {
tracing::error!("🍒 Datum refresh failed: {err}");
signal.with_mut(move |x| {
*x.task.write() = None;
});
}
// x.task will be set to None by the caller who cancelled us, so
// we need not do anything here.
}
}
}
}

Expand Down

0 comments on commit 0c45504

Please sign in to comment.