From 0c45504010cff3d642c60750cd6292a536c26635 Mon Sep 17 00:00:00 2001 From: Sridhar Ratnakumar <3998+srid@users.noreply.github.com> Date: Thu, 26 Oct 2023 16:41:31 -0400 Subject: [PATCH] Task aware `Datum`s (#94) * rewrite Datum to contain task cancellation logic --- src/app/state.rs | 61 ++++++--------------- src/app/state/datum.rs | 121 +++++++++++++++++++---------------------- 2 files changed, 73 insertions(+), 109 deletions(-) diff --git a/src/app/state.rs b/src/app/state.rs index 7ce14666..026ba340 100644 --- a/src/app/state.rs +++ b/src/app/state.rs @@ -106,44 +106,25 @@ impl AppState { let idx = *refresh_action.read(); use_future(cx, (&flake_url, &idx), |(flake_url, idx)| async move { tracing::info!("Updating flake [{}] {} ...", flake_url, idx); - // Abort previously running task, otherwise Datum refresh will panic - // TODO: Refactor this by changing the [Datum] type to be a - // struct (not enum) containing the - // `CopyValue>>` - self.flake_task_abort.with_mut(|abort_handle| { - if let Some(abort_handle) = abort_handle.take() { - abort_handle.abort(); - } - }); Datum::refresh_with(self.flake, async move { - let join_handle = tokio::spawn(async move { - Flake::from_nix(&nix_rs::command::NixCmd::default(), flake_url.clone()) - .await - }); - *self.flake_task_abort.write() = Some(join_handle.abort_handle()); - let v = join_handle.await.unwrap(); - *self.flake_task_abort.write() = None; - v + Flake::from_nix(&nix_rs::command::NixCmd::default(), flake_url.clone()).await }) - .await; + .await }); } - // Build `state.health_checks` when nix_info or nix_env changes + // Build `state.health_checks` when nix_info changes { let nix_info = self.nix_info.read().clone(); - use_future(cx, (&nix_info,), |(nix_info,)| async move { - if let Some(nix_info) = nix_info.current_value() { - Datum::refresh_with(self.health_checks, async { - let get_nix_health = - move || -> Result, SystemError> { - let nix_info = nix_info - .as_ref() - .map_err(|e| Into::::into(e.to_string()))?; - let health_checks = NixHealth::default().run_checks(nix_info, None); - Ok(health_checks) - }; - get_nix_health() + use_future(cx, (&nix_info,), |(nix_info1,)| async move { + if let Some(nix_info) = nix_info1.current_value().map(|x| { + x.as_ref() + .map_err(|e| Into::::into(e.to_string())) + .map(|v| v.clone()) + }) { + Datum::refresh_with(self.health_checks, async move { + let health_checks = NixHealth::default().run_checks(&nix_info?, None); + Ok(health_checks) }) .await; } @@ -158,19 +139,11 @@ impl AppState { use_future(cx, (&idx,), |(idx,)| async move { tracing::info!("Updating nix info [{}] ...", idx); Datum::refresh_with(self.nix_info, async { - // NOTE: Without tokio::spawn, this will run in main desktop thread, - // and will hang at some point. - let nix_info = tokio::spawn(async move { - nix_rs::info::NixInfo::from_nix(&nix_rs::command::NixCmd::default()) - .await - .map_err(|e| SystemError { - message: format!("Error getting nix info: {:?}", e), - }) - }) - .await - .unwrap(); - tracing::debug!("Got nix info, about to mut"); - nix_info + nix_rs::info::NixInfo::from_nix(&nix_rs::command::NixCmd::default()) + .await + .map_err(|e| SystemError { + message: format!("Error getting nix info: {:?}", e), + }) }) .await; }); diff --git a/src/app/state/datum.rs b/src/app/state/datum.rs index 016f0392..7f130a09 100644 --- a/src/app/state/datum.rs +++ b/src/app/state/datum.rs @@ -1,92 +1,83 @@ use std::{fmt::Display, future::Future}; use dioxus::prelude::*; -use dioxus_signals::Signal; +use dioxus_signals::{CopyValue, Signal}; +use tokio::task::AbortHandle; /// Represent loading/refreshing state of UI data -#[derive(Debug, Default, Clone, Copy, PartialEq)] -pub enum Datum { - #[default] - Loading, - Available { - value: T, - refreshing: bool, - }, +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct Datum { + /// The current value of the datum + value: Option, + /// If the datum is currently being loaded or refresh, this contains the + /// [AbortHandle] to abort that loading/refreshing process. + task: CopyValue>, +} + +impl Default for Datum { + fn default() -> Self { + Self { + value: None, + task: CopyValue::default(), + } + } } impl Datum { pub fn is_loading_or_refreshing(&self) -> bool { - matches!( - self, - Datum::Loading - | Datum::Available { - value: _, - refreshing: true - } - ) + self.task.read().is_some() } /// Get the inner value if available pub fn current_value(&self) -> Option<&T> { - match self { - Datum::Loading => None, - Datum::Available { - value: x, - refreshing: _, - } => Some(x), - } - } - - /// Set the datum value - /// - /// Use [refresh_with] if the value is produced by a long-running task. - fn set_value(&mut self, value: T) { - tracing::debug!("🍒 Setting {} datum value", std::any::type_name::()); - *self = Datum::Available { - value, - refreshing: false, - } - } - - /// Mark the datum is being-refreshed - /// - /// Do this just prior to doing a long-running task that will provide a - /// value to be set using [set_value] - fn mark_refreshing(&mut self) { - if let Datum::Available { - value: _, - refreshing, - } = self - { - if *refreshing { - tracing::error!( - "Cannot refresh already refreshing data: {}", - std::any::type_name::() - ); - panic!("Cannot refresh already refreshing data"); - } - tracing::debug!( - "🍒 Marking {} datum as refreshing", - std::any::type_name::() - ); - *refreshing = true; - } + self.value.as_ref() } /// Refresh the datum [Signal] using the given function /// - /// Refresh state is automatically set. + /// If a previous refresh is still running, it will be cancelled. pub async fn refresh_with(signal: Signal, f: F) where - F: Future, + F: Future + Send + 'static, + T: Send + 'static, { + // Cancel existing fetcher if any. signal.with_mut(move |x| { - x.mark_refreshing(); + if let Some(abort_handle) = x.task.take() { + abort_handle.abort(); + } }); - let val = f.await; + + // NOTE: We must spawn a tasks (using tokio::spawn), otherwise this + // will run in main desktop thread, and will hang at some point. + let join_handle = tokio::spawn(f); + + // Store the [AbortHandle] for cancelling latter. + let abort_handle = join_handle.abort_handle(); signal.with_mut(move |x| { - x.set_value(val); + *x.task.write() = Some(abort_handle); }); + + // Wait for result and update the signal state. + match join_handle.await { + Ok(val) => { + signal.with_mut(move |x| { + tracing::debug!("🍒 Setting {} datum value", std::any::type_name::()); + x.value = Some(val); + *x.task.write() = None; + }); + } + Err(err) => { + if !err.is_cancelled() { + tracing::error!("🍒 Datum refresh failed: {err}"); + signal.with_mut(move |x| { + *x.task.write() = None; + }); + } + // x.task will be set to None by the caller who cancelled us, so + // we need not do anything here. + } + } } }