From 735e61fb36d20ac548a17e471e1fe6ee3cf74bfc Mon Sep 17 00:00:00 2001 From: Jens Brimfors Date: Fri, 8 May 2020 12:09:04 +0200 Subject: [PATCH] Added configurable timeouts towards bitbucket --- Cargo.toml | 4 +- src/bitbucket/types.rs | 7 ++++ src/bitbucket/worker.rs | 88 ++++++++++++++++++++++++++++++++------- src/cloner.rs | 4 ++ src/types.rs | 49 ++++++++++++++++++---- tests/integration_test.rs | 4 ++ 6 files changed, 131 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4bddff8..25bc5fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bitbucket_server_cli" -version = "0.3.11" +version = "0.3.12" authors = ["Jens Brimfors "] edition = "2018" license = "MIT" @@ -27,6 +27,8 @@ clap = "2.33.0" indicatif = "0.14.0" pickledb = "0.4.1" dialoguer = "0.6.2" +atomic-counter = "1.0.1" +parse_duration = "2.1.0" [dev-dependencies] tokio-test = "0.2.1" diff --git a/src/bitbucket/types.rs b/src/bitbucket/types.rs index 784972a..a85e2af 100644 --- a/src/bitbucket/types.rs +++ b/src/bitbucket/types.rs @@ -145,6 +145,7 @@ impl RepoUrlBuilder for UserResult { #[cfg(test)] mod tests { use super::*; + use std::time::Duration; #[test] fn test_add_user_to_url_with_user() { @@ -191,6 +192,9 @@ mod tests { clone_type: CloneType::HttpSavedLogin, project_keys: vec!["key".to_owned()], all: false, + timeout: Duration::from_secs(5), + retries: 1, + backoff: None, }; let vec1 = get_clone_links(&prjs, &opts); assert_eq!(vec1.len(), 1, "Wrong number of output Repo objects"); @@ -214,6 +218,9 @@ mod tests { clone_type: CloneType::HTTP, project_keys: vec!["key".to_owned()], all: false, + timeout: Duration::from_secs(5), + retries: 1, + backoff: None, }; let vec1 = get_clone_links(&prjs, &opts); assert_eq!(vec1.len(), 1); diff --git a/src/bitbucket/worker.rs b/src/bitbucket/worker.rs index 4119bd6..b0abead 100644 --- a/src/bitbucket/worker.rs +++ b/src/bitbucket/worker.rs @@ -1,34 +1,40 @@ use std::borrow::BorrowMut; +use atomic_counter::{AtomicCounter, RelaxedCounter}; use futures::stream::{self, StreamExt}; +#[allow(unused_imports)] +use futures::SinkExt as _; use generic_error::{GenericError, Result}; +use indicatif::ProgressStyle; use reqwest::{header::ACCEPT, Client as ReqwestClient, RequestBuilder}; use serde::de::DeserializeOwned; +use tokio::time::delay_for; use crate::bitbucket::types::{ get_clone_links, PageResponse, ProjDesc, Project, Repo, RepoUrlBuilder, UserResult, }; use crate::types::BitBucketOpts; use crate::util::bail; -#[allow(unused_imports)] -use futures::SinkExt as _; -use indicatif::ProgressStyle; pub type BitbucketResult = std::result::Result; pub struct BitbucketError { + is_timeout: bool, msg: String, cause: String, } -#[derive(Clone)] pub struct BitbucketWorker<'a> { opts: &'a BitBucketOpts, + timeout_counter: RelaxedCounter, } impl BitbucketWorker<'_> { pub fn new(opts: &BitBucketOpts) -> BitbucketWorker { - BitbucketWorker { opts } + BitbucketWorker { + opts, + timeout_counter: RelaxedCounter::new(0), + } } pub async fn fetch_all_repos(&self) -> Result> { @@ -125,6 +131,10 @@ impl BitbucketWorker<'_> { } }; } + let timeouts = self.timeout_counter.get(); + if timeouts > 0 { + eprintln!("There were {} timeouts towards bitbucket.", timeouts); + } Ok(all) } @@ -135,20 +145,51 @@ impl BitbucketWorker<'_> { let host = self.opts.server.clone().unwrap(); let mut start: u32 = 0; let mut sum: Vec = vec![]; - loop { + 'outer: loop { let url = format!( "{host}{path}?limit=500&start={start}", host = host, path = path, start = start ); - let response: reqwest::Result = self.bake_client(url).send().await; - let mut resp = extract_body::>(response, naming).await?; - sum.append(resp.values.borrow_mut()); - start += resp.size; - if resp.is_last_page { - break; + for attempt in 1..self.opts.retries + 2 { + let response: reqwest::Result = + self.bake_client(&url).send().await; + match extract_body::>(response, naming).await { + Ok(mut resp) => { + sum.append(resp.values.borrow_mut()); + if resp.is_last_page { + break 'outer; + } else { + start += resp.size; + continue 'outer; + } + } + Err(e) if e.is_timeout => { + let count = self.timeout_counter.inc(); + if attempt > self.opts.retries { + // Last chance blown! + return Err(e); + } else if let Some(Some(backoff)) = + self.opts.backoff.map(|b| b.checked_mul((count + 1) as u32)) + { + delay_for(backoff).await; + } + } + Err(e) => { + return Err(e); + } + } } + // To be sure we dont escape some case into an endless retry-loop + return Err(BitbucketError { + is_timeout: true, + msg: format!( + "Failed to read from bitbucket with {} retries.", + self.opts.retries + ), + cause: "Timeouts against bitbucket.".to_owned(), + }); } Ok(sum) } @@ -163,9 +204,10 @@ impl BitbucketWorker<'_> { Ok(repos) } - fn bake_client(&self, url: String) -> RequestBuilder { + fn bake_client(&self, url: &str) -> RequestBuilder { let builder: RequestBuilder = ReqwestClient::new() - .get(url.trim()) + .get(url) + .timeout(self.opts.timeout) .header(ACCEPT, "application/json"); match (&self.opts.username, &self.opts.password) { (Some(u), Some(p)) => builder.basic_auth(u, Some(p)), @@ -184,7 +226,13 @@ where match response { Ok(response) if response.status().is_success() => match response.json::().await { Ok(all_projects) => Ok(all_projects), + Err(e) if e.is_timeout() => Err(BitbucketError { + is_timeout: true, + msg: "Timeout reading from bitbucket.".to_owned(), + cause: format!("{:?}", e), + }), Err(e) => Err(BitbucketError { + is_timeout: e.is_timeout(), msg: format!( "Failed fetching {} from bitbucket, bad json format.", naming @@ -193,6 +241,7 @@ where }), }, Ok(response) => Err(BitbucketError { + is_timeout: false, msg: format!( "Failed fetching {} from bitbucket, status code: {}.", naming, @@ -204,6 +253,7 @@ where }, }), Err(e) => Err(BitbucketError { + is_timeout: e.is_timeout(), msg: format!("Failed fetching {} from bitbucket.", naming), cause: format!("{:?}", e), }), @@ -212,6 +262,8 @@ where #[cfg(test)] mod tests { + use std::time::Duration; + use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; @@ -239,6 +291,9 @@ mod tests { clone_type: CloneType::HTTP, project_keys: vec!["key".to_owned()], all: false, + timeout: Duration::from_secs(10), + retries: 0, + backoff: None, } } @@ -288,8 +343,9 @@ mod tests { match result { Ok(_) => assert!(false, "This request was expected to fail."), Err(e) => assert!( - e.msg.contains(format!("status code: {}", 404).as_str()), - "Response code should be 404." + e.msg.contains("status code: 404"), + "Response code should be 404, but was {:?}", + e.cause ), } } diff --git a/src/cloner.rs b/src/cloner.rs index b19b61a..48bee07 100644 --- a/src/cloner.rs +++ b/src/cloner.rs @@ -73,6 +73,7 @@ mod tests { use crate::types::{BitBucketOpts, CloneType, GitOpts}; use super::*; + use std::time::Duration; #[tokio::test] async fn cloner_integration_test() { @@ -88,6 +89,9 @@ mod tests { clone_type: CloneType::HTTP, project_keys: vec![], all: true, + timeout: Duration::from_secs(5), + retries: 2, + backoff: None, }, git_opts: GitOpts { reset_state: false, diff --git a/src/types.rs b/src/types.rs index 46d4700..793d30b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -8,6 +8,7 @@ use crate::input::prompts::{PROMPT_BB_PROJECT_ALL, PROMPT_BB_SERVER, PROMPT_BB_U use crate::input::{get_bool, get_password, get_with_default, password_from_env}; use crate::util::bail; use dialoguer::Confirm; +use std::time::Duration; #[derive(StructOpt, Debug, Clone)] #[structopt( @@ -65,7 +66,7 @@ pub struct BitBucketOpts { pub password: Option, #[structopt( short = "b", - long = "concurrent_http", + long = "concurrent-http", name = "bitbucket_concurrency", help = "Number of concurrent http requests towards bitbucket. Keep it sane, keep bitbucket alive for all. Max=100", default_value = "20" @@ -73,19 +74,20 @@ pub struct BitBucketOpts { pub concurrency: usize, #[structopt( short = "H", - long = "http_verbose", + long = "http-verbose", name = "bitbucket_verbose", help = "Output full http response on failed bitbucket requests." )] pub verbose: bool, #[structopt( short = "W", - long = "env_password", + long = "env-password", name = "bitbucket_password_from_env", help = "Try get password from env variable BITBUCKET_PASSWORD.\nTry it out without showing your password:\nIFS= read -rs BITBUCKET_PASSWORD < /dev/tty && export BITBUCKET_PASSWORD\n" )] pub password_from_env: bool, - #[structopt(long = "clone_type", + #[structopt( + long = "clone-type", name = "clone_type", possible_values = & CloneType::variants(), case_insensitive = true, @@ -96,7 +98,7 @@ pub struct BitBucketOpts { short = "k", long = "key", name = "git_project_keys", - help = "BitBucket Project keys" + help = "BitBucket Project keys (applicable multiple times)" )] pub project_keys: Vec, #[structopt( @@ -106,6 +108,25 @@ pub struct BitBucketOpts { help = "Clone all projects" )] pub all: bool, + #[structopt( + long = "http-timeout", + help = "HTTP timout, 2min4sec6milli8micro3nano combine freely with or without abbreviations or spaces.", + default_value = "2.5 sec", + parse(try_from_str = parse_duration::parse), + )] + pub timeout: Duration, + #[structopt( + long, + help = "Retries to attempt requesting on timeout from bitbucket.", + default_value = "2" + )] + pub retries: u32, + #[structopt( + long = "http-backoff", + help = "Linear backoff time per failed request.\nie. 10 timed out requests and backoff=10ms -> 100ms backoff on next timed out request\nor {prior_timeouts}*{backoff}={delay_on_next_timeout}", + parse(try_from_str = parse_duration::parse), + )] + pub backoff: Option, } #[derive(StructOpt, Clone, Debug)] @@ -120,7 +141,7 @@ pub struct GitOpts { pub reset_state: bool, #[structopt( short = "g", - long = "concurrent_git", + long = "concurrent-git", name = "git_concurrency", help = "Number of concurrent git actions. Bitbucket might have a limited number of threads reserved for serving git requests - if you drive this value to high you might block your CI, colleagues or even crash bitbucket. Max=100", default_value = "5" @@ -128,13 +149,13 @@ pub struct GitOpts { pub concurrency: usize, #[structopt( short = "Q", - long = "git_quiet", + long = "git-quiet", name = "git_quiet", help = "Suppress warnings from failed git actions." )] pub quiet: bool, #[structopt( - long = "output_directory", + long = "output-directory", help = "Suppress warnings from failed git actions.", default_value = "." )] @@ -232,3 +253,15 @@ impl BitBucketOpts { .collect() } } +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_multi_time_parse() { + assert_eq!( + parse_duration::parse("2.5s500ms").unwrap(), + Duration::from_secs(3) + ) + } +} diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 55282fd..94faf61 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -10,6 +10,7 @@ use bitbucket_server_cli::{ cloner::Cloner, types::{BitBucketOpts, CloneType, GitOpts}, }; +use std::time::Duration; fn env(key: &str) -> Result { match std::env::var(key) { @@ -43,7 +44,10 @@ fn opts() -> Result { password_from_env: false, clone_type: CloneType::SSH, all: false, + timeout: Duration::from_secs(1), + retries: 0, project_keys: vec![env("BITBUCKET_PROJECT")?], + backoff: None, }, git_opts: GitOpts { reset_state: false,