diff --git a/eden/mononoke/manifest/src/ordered_ops.rs b/eden/mononoke/manifest/src/ordered_ops.rs index 71745dfc335a1..ab1e94903257d 100644 --- a/eden/mononoke/manifest/src/ordered_ops.rs +++ b/eden/mononoke/manifest/src/ordered_ops.rs @@ -11,6 +11,7 @@ use std::iter::Peekable; use anyhow::Error; use borrowed::borrowed; use bounded_traversal::OrderedTraversal; +use cloned::cloned; use context::CoreContext; use futures::future; use futures::future::FutureExt; @@ -19,6 +20,7 @@ use futures::stream; use futures::stream::BoxStream; use futures::stream::StreamExt; use futures::stream::TryStreamExt; +use futures_watchdog::WatchdogExt; use mononoke_types::path::MPath; use mononoke_types::MPathElement; use nonzero_ext::nonzero; @@ -354,11 +356,16 @@ where match input { Diff::Changed(path, left, right) => { - let (left_mf, right_mf) = future::try_join( - left.load(ctx, store), - right.load(ctx, other_store), - ) - .await?; + let l = tokio::spawn({ + cloned!(ctx, left, store); + async move { left.load(&ctx, &store).watched(ctx.logger()).await } + }); + let r = tokio::spawn({ + cloned!(ctx, right, other_store); + async move { right.load(&ctx, &other_store).watched(ctx.logger()).await } + }); + let (left_mf, right_mf) = future::try_join(l, r).watched(ctx.logger()).await?; + let (left_mf, right_mf) = (left_mf?, right_mf?); if after.include_self() { push_output( @@ -372,8 +379,8 @@ where } let iter = EntryDiffIterator::new( - left_mf.list_weighted(ctx, store).await?.try_collect::>().await?.into_iter(), - right_mf.list_weighted(ctx, other_store).await?.try_collect::>().await?.into_iter(), + left_mf.list_weighted(ctx, store).watched(ctx.logger()).await?.try_collect::>().watched(ctx.logger()).await?.into_iter(), + right_mf.list_weighted(ctx, other_store).watched(ctx.logger()).await?.try_collect::>().watched(ctx.logger()).await?.into_iter(), ); for (name, left, right) in iter { if after.skip(&name) || left == right { @@ -492,9 +499,9 @@ where Diff::Added(path.clone(), Entry::Tree(tree.clone())), ); } - let manifest = tree.load(ctx, other_store).await?; - let mut stream = manifest.list_weighted(ctx, store).await?; - while let Some((name, entry)) = stream.try_next().await? { + let manifest = tree.load(ctx, other_store).watched(ctx.logger()).await?; + let mut stream = manifest.list_weighted(ctx, store).watched(ctx.logger()).await?; + while let Some((name, entry)) = stream.try_next().watched(ctx.logger()).await? { if after.skip(&name) { continue; } @@ -526,9 +533,9 @@ where Diff::Removed(path.clone(), Entry::Tree(tree.clone())), ); } - let manifest = tree.load(ctx, store).await?; - let mut stream = manifest.list_weighted(ctx, store).await?; - while let Some((name, entry)) = stream.try_next().await? { + let manifest = tree.load(ctx, store).watched(ctx.logger()).await?; + let mut stream = manifest.list_weighted(ctx, store).watched(ctx.logger()).await?; + while let Some((name, entry)) = stream.try_next().watched(ctx.logger()).await? { if after.skip(&name) { continue; } @@ -561,7 +568,7 @@ where ); pin_mut!(s); - while let Some(value) = s.next().await { + while let Some(value) = s.next().watched(ctx.logger()).await { yield value; } }) diff --git a/eden/mononoke/mononoke_api/src/changeset.rs b/eden/mononoke/mononoke_api/src/changeset.rs index 288f7bfc6bbf5..164ed8837a561 100644 --- a/eden/mononoke/mononoke_api/src/changeset.rs +++ b/eden/mononoke/mononoke_api/src/changeset.rs @@ -41,6 +41,7 @@ use futures::stream::BoxStream; use futures::stream::Stream; use futures::stream::StreamExt; use futures::stream::TryStreamExt; +use futures_ext::FbStreamExt; use futures_lazy_shared::LazyShared; use futures_watchdog::WatchdogExt; use git_types::MappedGitCommitId; @@ -1350,7 +1351,10 @@ impl ChangesetContext { let diff_trees = diff_items.contains(&ChangesetDiffItem::TREES); self.find_entries(to_vec1(path_restrictions), ordering) + .watched(self.ctx().logger()) .await? + .yield_periodically() + .with_logger(self.ctx().logger()) .try_filter_map(|(path, entry)| async move { match (path.into_optional_non_root_path(), entry) { (Some(mpath), ManifestEntry::Leaf(_)) if diff_files => Ok(Some(mpath)), @@ -1367,6 +1371,7 @@ impl ChangesetContext { )) }) .try_collect::>() + .watched(self.ctx().logger()) .await }