Skip to content

Commit

Permalink
modern sync: refactor to send on channels
Browse files Browse the repository at this point in the history
Reviewed By: markbt

Differential Revision: D68813683

fbshipit-source-id: e7bfdf9b7e8a32cfb9a0a2c202dd530402f31c10
  • Loading branch information
lmvasquezg authored and facebook-github-bot committed Jan 31, 2025
1 parent 4ea204d commit 9b88334
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 111 deletions.
1 change: 1 addition & 0 deletions eden/mononoke/modern_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ sharding_ext = { version = "0.1.0", path = "../cmdlib/sharding_ext" }
slog = { version = "2.7", features = ["max_level_trace", "nested-values"] }
sorted_vector_map = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
tokio = { version = "1.41.0", features = ["full", "test-util", "tracing"] }
url = "2.5.2"

[dev-dependencies]
Expand Down
27 changes: 26 additions & 1 deletion eden/mononoke/modern_sync/src/commands/sync_one.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use std::sync::Arc;

use anyhow::bail;
use anyhow::format_err;
use anyhow::Result;
use clap::Parser;
Expand All @@ -18,10 +19,12 @@ use mononoke_app::MononokeApp;
use mononoke_types::ChangesetId;
use repo_blobstore::RepoBlobstoreRef;
use repo_identity::RepoIdentityRef;
use tokio::sync::mpsc;
use url::Url;

use crate::sender::dummy::DummySender;
use crate::sender::edenapi::EdenapiSender;
use crate::sender::manager::SendManager;
use crate::sender::ModernSyncSender;
use crate::ModernSyncArgs;
use crate::Repo;
Expand Down Expand Up @@ -97,7 +100,29 @@ pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> {
)
};

crate::sync::process_one_changeset(&args.cs_id, &ctx, repo, &logger, sender, false, "").await?;
let mut send_manager = SendManager::new(sender.clone(), logger.clone());
let (cr_s, mut cr_r) = mpsc::channel::<Result<()>>(1);

crate::sync::process_one_changeset(
&args.cs_id,
&ctx,
repo,
&logger,
&mut send_manager,
false,
"",
Some(cr_s),
)
.await?;

let res = cr_r.recv().await;
match res {
Some(Err(e)) => {
bail!("Error while waiting for commit to be synced {:?}", e);
}
None => bail!("No commit synced"),
_ => (),
}

Ok(())
}
15 changes: 1 addition & 14 deletions eden/mononoke/modern_sync/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,10 @@ use mononoke_types::FileContents;

pub mod dummy;
pub mod edenapi;

pub enum Entry {
#[allow(unused)]
Content(AnyFileContentId, FileContents),
#[allow(unused)]
Tree(HgManifestId),
#[allow(unused)]
FileNode(HgFileNodeId),
#[allow(unused)]
HgChangeset(HgBlobChangeset, BonsaiChangeset),
}
pub mod manager;

#[async_trait]
pub trait ModernSyncSender {
#[allow(unused)]
async fn enqueue_entry(&self, entry: Entry) -> Result<()>;

async fn upload_contents(&self, contents: Vec<(AnyFileContentId, FileContents)>) -> Result<()>;

async fn upload_trees(&self, trees: Vec<HgManifestId>) -> Result<()>;
Expand Down
14 changes: 0 additions & 14 deletions eden/mononoke/modern_sync/src/sender/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use mononoke_types::FileContents;
use slog::info;
use slog::Logger;

use crate::sender::Entry;
use crate::sender::ModernSyncSender;

#[derive(Clone)]
Expand All @@ -34,19 +33,6 @@ impl DummySender {

#[async_trait]
impl ModernSyncSender for DummySender {
async fn enqueue_entry(&self, entry: Entry) -> Result<()> {
match entry {
Entry::Content(content_id, blob) => {
self.upload_contents(vec![(content_id, blob)]).await
}
Entry::Tree(tree_id) => self.upload_trees(vec![tree_id]).await,
Entry::FileNode(filenode_id) => self.upload_filenodes(vec![filenode_id]).await,
Entry::HgChangeset(hg_cs, bcs) => {
self.upload_identical_changeset(vec![(hg_cs, bcs)]).await
}
}
}

async fn upload_contents(&self, contents: Vec<(AnyFileContentId, FileContents)>) -> Result<()> {
for (content_id, _blob) in contents {
info!(&self.logger, "Uploading content with id: {:?}", content_id);
Expand Down
34 changes: 6 additions & 28 deletions eden/mononoke/modern_sync/src/sender/edenapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use url::Url;

mod util;

use crate::sender::Entry;
use crate::sender::ModernSyncSender;

const MAX_BLOB_BYTES: u64 = 100 * 1024 * 1024; // 100 MB
Expand Down Expand Up @@ -105,11 +104,6 @@ impl EdenapiSender {

#[async_trait]
impl ModernSyncSender for EdenapiSender {
async fn enqueue_entry(&self, _entry: Entry) -> Result<()> {
// TODO: implement using mpsc channels
Ok(())
}

async fn upload_contents(&self, contents: Vec<(AnyFileContentId, FileContents)>) -> Result<()> {
// Batch contents by size
let mut batches = Vec::new();
Expand Down Expand Up @@ -155,10 +149,6 @@ impl ModernSyncSender for EdenapiSender {
expected_responses,
actual_responses
);
info!(
&self.logger,
"Uploaded {:?} contents successfully", actual_responses
)
}

Ok(())
Expand All @@ -184,10 +174,6 @@ impl ModernSyncSender for EdenapiSender {
expected_responses,
actual_responses,
);
info!(
&self.logger,
"Uploaded {:?} trees successfully", actual_responses
);
Ok(())
}

Expand All @@ -211,10 +197,6 @@ impl ModernSyncSender for EdenapiSender {
expected_responses,
actual_responses
);
info!(
&self.logger,
"Uploaded {:?} filenodes successfully", actual_responses
);
Ok(())
}

Expand All @@ -236,7 +218,7 @@ impl ModernSyncSender for EdenapiSender {
]),
)
.await?;
info!(&self.logger, "Move bookmark response {:?}", res);
info!(&self.logger, "Moved bookmark with result {:?}", res);
Ok(())
}

Expand All @@ -252,10 +234,11 @@ impl ModernSyncSender for EdenapiSender {
let res = self.client.upload_identical_changesets(entries).await?;
let responses = res.entries.try_collect::<Vec<_>>().await?;
ensure!(!responses.is_empty(), "Not all changesets were uploaded");
info!(
&self.logger,
"Upload hg changeset response: {:?}", responses
);
let ids = responses
.iter()
.map(|r| r.token.data.id)
.collect::<Vec<_>>();
info!(&self.logger, "Uploaded changesets: {:?}", ids);

Ok(())
}
Expand All @@ -265,13 +248,8 @@ impl ModernSyncSender for EdenapiSender {
.iter()
.map(|csid| AnyId::BonsaiChangesetId(csid.clone().into()))
.collect::<Vec<_>>();
let all = ids.len();
let res = self.client.lookup_batch(ids, None, None).await?;
let missing = get_missing_in_order(res, csids);
let present = all - missing.len();
if present > 0 {
info!(&self.logger, "Skipping {} commits", present);
}
Ok(missing)
}
}
Expand Down
Loading

0 comments on commit 9b88334

Please sign in to comment.