Skip to content

Commit

Permalink
Added configurable timeouts towards bitbucket
Browse files Browse the repository at this point in the history
  • Loading branch information
Jens Brimfors committed May 8, 2020
1 parent aebe2a5 commit 735e61f
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 25 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bitbucket_server_cli"
version = "0.3.11"
version = "0.3.12"
authors = ["Jens Brimfors <[email protected]>"]
edition = "2018"
license = "MIT"
Expand All @@ -27,6 +27,8 @@ clap = "2.33.0"
indicatif = "0.14.0"
pickledb = "0.4.1"
dialoguer = "0.6.2"
atomic-counter = "1.0.1"
parse_duration = "2.1.0"

[dev-dependencies]
tokio-test = "0.2.1"
Expand Down
7 changes: 7 additions & 0 deletions src/bitbucket/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl RepoUrlBuilder for UserResult {
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

#[test]
fn test_add_user_to_url_with_user() {
Expand Down Expand Up @@ -191,6 +192,9 @@ mod tests {
clone_type: CloneType::HttpSavedLogin,
project_keys: vec!["key".to_owned()],
all: false,
timeout: Duration::from_secs(5),
retries: 1,
backoff: None,
};
let vec1 = get_clone_links(&prjs, &opts);
assert_eq!(vec1.len(), 1, "Wrong number of output Repo objects");
Expand All @@ -214,6 +218,9 @@ mod tests {
clone_type: CloneType::HTTP,
project_keys: vec!["key".to_owned()],
all: false,
timeout: Duration::from_secs(5),
retries: 1,
backoff: None,
};
let vec1 = get_clone_links(&prjs, &opts);
assert_eq!(vec1.len(), 1);
Expand Down
88 changes: 72 additions & 16 deletions src/bitbucket/worker.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,40 @@
use std::borrow::BorrowMut;

use atomic_counter::{AtomicCounter, RelaxedCounter};
use futures::stream::{self, StreamExt};
#[allow(unused_imports)]
use futures::SinkExt as _;
use generic_error::{GenericError, Result};
use indicatif::ProgressStyle;
use reqwest::{header::ACCEPT, Client as ReqwestClient, RequestBuilder};
use serde::de::DeserializeOwned;
use tokio::time::delay_for;

use crate::bitbucket::types::{
get_clone_links, PageResponse, ProjDesc, Project, Repo, RepoUrlBuilder, UserResult,
};
use crate::types::BitBucketOpts;
use crate::util::bail;
#[allow(unused_imports)]
use futures::SinkExt as _;
use indicatif::ProgressStyle;

pub type BitbucketResult<T> = std::result::Result<T, BitbucketError>;

pub struct BitbucketError {
is_timeout: bool,
msg: String,
cause: String,
}

#[derive(Clone)]
pub struct BitbucketWorker<'a> {
opts: &'a BitBucketOpts,
timeout_counter: RelaxedCounter,
}

impl BitbucketWorker<'_> {
pub fn new(opts: &BitBucketOpts) -> BitbucketWorker {
BitbucketWorker { opts }
BitbucketWorker {
opts,
timeout_counter: RelaxedCounter::new(0),
}
}

pub async fn fetch_all_repos(&self) -> Result<Vec<Repo>> {
Expand Down Expand Up @@ -125,6 +131,10 @@ impl BitbucketWorker<'_> {
}
};
}
let timeouts = self.timeout_counter.get();
if timeouts > 0 {
eprintln!("There were {} timeouts towards bitbucket.", timeouts);
}
Ok(all)
}

Expand All @@ -135,20 +145,51 @@ impl BitbucketWorker<'_> {
let host = self.opts.server.clone().unwrap();
let mut start: u32 = 0;
let mut sum: Vec<T> = vec![];
loop {
'outer: loop {
let url = format!(
"{host}{path}?limit=500&start={start}",
host = host,
path = path,
start = start
);
let response: reqwest::Result<reqwest::Response> = self.bake_client(url).send().await;
let mut resp = extract_body::<PageResponse<T>>(response, naming).await?;
sum.append(resp.values.borrow_mut());
start += resp.size;
if resp.is_last_page {
break;
for attempt in 1..self.opts.retries + 2 {
let response: reqwest::Result<reqwest::Response> =
self.bake_client(&url).send().await;
match extract_body::<PageResponse<T>>(response, naming).await {
Ok(mut resp) => {
sum.append(resp.values.borrow_mut());
if resp.is_last_page {
break 'outer;
} else {
start += resp.size;
continue 'outer;
}
}
Err(e) if e.is_timeout => {
let count = self.timeout_counter.inc();
if attempt > self.opts.retries {
// Last chance blown!
return Err(e);
} else if let Some(Some(backoff)) =
self.opts.backoff.map(|b| b.checked_mul((count + 1) as u32))
{
delay_for(backoff).await;
}
}
Err(e) => {
return Err(e);
}
}
}
// To be sure we dont escape some case into an endless retry-loop
return Err(BitbucketError {
is_timeout: true,
msg: format!(
"Failed to read from bitbucket with {} retries.",
self.opts.retries
),
cause: "Timeouts against bitbucket.".to_owned(),
});
}
Ok(sum)
}
Expand All @@ -163,9 +204,10 @@ impl BitbucketWorker<'_> {
Ok(repos)
}

fn bake_client(&self, url: String) -> RequestBuilder {
fn bake_client(&self, url: &str) -> RequestBuilder {
let builder: RequestBuilder = ReqwestClient::new()
.get(url.trim())
.get(url)
.timeout(self.opts.timeout)
.header(ACCEPT, "application/json");
match (&self.opts.username, &self.opts.password) {
(Some(u), Some(p)) => builder.basic_auth(u, Some(p)),
Expand All @@ -184,7 +226,13 @@ where
match response {
Ok(response) if response.status().is_success() => match response.json::<T>().await {
Ok(all_projects) => Ok(all_projects),
Err(e) if e.is_timeout() => Err(BitbucketError {
is_timeout: true,
msg: "Timeout reading from bitbucket.".to_owned(),
cause: format!("{:?}", e),
}),
Err(e) => Err(BitbucketError {
is_timeout: e.is_timeout(),
msg: format!(
"Failed fetching {} from bitbucket, bad json format.",
naming
Expand All @@ -193,6 +241,7 @@ where
}),
},
Ok(response) => Err(BitbucketError {
is_timeout: false,
msg: format!(
"Failed fetching {} from bitbucket, status code: {}.",
naming,
Expand All @@ -204,6 +253,7 @@ where
},
}),
Err(e) => Err(BitbucketError {
is_timeout: e.is_timeout(),
msg: format!("Failed fetching {} from bitbucket.", naming),
cause: format!("{:?}", e),
}),
Expand All @@ -212,6 +262,8 @@ where

#[cfg(test)]
mod tests {
use std::time::Duration;

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

Expand Down Expand Up @@ -239,6 +291,9 @@ mod tests {
clone_type: CloneType::HTTP,
project_keys: vec!["key".to_owned()],
all: false,
timeout: Duration::from_secs(10),
retries: 0,
backoff: None,
}
}

Expand Down Expand Up @@ -288,8 +343,9 @@ mod tests {
match result {
Ok(_) => assert!(false, "This request was expected to fail."),
Err(e) => assert!(
e.msg.contains(format!("status code: {}", 404).as_str()),
"Response code should be 404."
e.msg.contains("status code: 404"),
"Response code should be 404, but was {:?}",
e.cause
),
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod tests {
use crate::types::{BitBucketOpts, CloneType, GitOpts};

use super::*;
use std::time::Duration;

#[tokio::test]
async fn cloner_integration_test() {
Expand All @@ -88,6 +89,9 @@ mod tests {
clone_type: CloneType::HTTP,
project_keys: vec![],
all: true,
timeout: Duration::from_secs(5),
retries: 2,
backoff: None,
},
git_opts: GitOpts {
reset_state: false,
Expand Down
49 changes: 41 additions & 8 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::input::prompts::{PROMPT_BB_PROJECT_ALL, PROMPT_BB_SERVER, PROMPT_BB_U
use crate::input::{get_bool, get_password, get_with_default, password_from_env};
use crate::util::bail;
use dialoguer::Confirm;
use std::time::Duration;

#[derive(StructOpt, Debug, Clone)]
#[structopt(
Expand Down Expand Up @@ -65,27 +66,28 @@ pub struct BitBucketOpts {
pub password: Option<String>,
#[structopt(
short = "b",
long = "concurrent_http",
long = "concurrent-http",
name = "bitbucket_concurrency",
help = "Number of concurrent http requests towards bitbucket. Keep it sane, keep bitbucket alive for all. Max=100",
default_value = "20"
)]
pub concurrency: usize,
#[structopt(
short = "H",
long = "http_verbose",
long = "http-verbose",
name = "bitbucket_verbose",
help = "Output full http response on failed bitbucket requests."
)]
pub verbose: bool,
#[structopt(
short = "W",
long = "env_password",
long = "env-password",
name = "bitbucket_password_from_env",
help = "Try get password from env variable BITBUCKET_PASSWORD.\nTry it out without showing your password:\nIFS= read -rs BITBUCKET_PASSWORD < /dev/tty && export BITBUCKET_PASSWORD\n"
)]
pub password_from_env: bool,
#[structopt(long = "clone_type",
#[structopt(
long = "clone-type",
name = "clone_type",
possible_values = & CloneType::variants(),
case_insensitive = true,
Expand All @@ -96,7 +98,7 @@ pub struct BitBucketOpts {
short = "k",
long = "key",
name = "git_project_keys",
help = "BitBucket Project keys"
help = "BitBucket Project keys (applicable multiple times)"
)]
pub project_keys: Vec<String>,
#[structopt(
Expand All @@ -106,6 +108,25 @@ pub struct BitBucketOpts {
help = "Clone all projects"
)]
pub all: bool,
#[structopt(
long = "http-timeout",
help = "HTTP timout, 2min4sec6milli8micro3nano combine freely with or without abbreviations or spaces.",
default_value = "2.5 sec",
parse(try_from_str = parse_duration::parse),
)]
pub timeout: Duration,
#[structopt(
long,
help = "Retries to attempt requesting on timeout from bitbucket.",
default_value = "2"
)]
pub retries: u32,
#[structopt(
long = "http-backoff",
help = "Linear backoff time per failed request.\nie. 10 timed out requests and backoff=10ms -> 100ms backoff on next timed out request\nor {prior_timeouts}*{backoff}={delay_on_next_timeout}",
parse(try_from_str = parse_duration::parse),
)]
pub backoff: Option<Duration>,
}

#[derive(StructOpt, Clone, Debug)]
Expand All @@ -120,21 +141,21 @@ pub struct GitOpts {
pub reset_state: bool,
#[structopt(
short = "g",
long = "concurrent_git",
long = "concurrent-git",
name = "git_concurrency",
help = "Number of concurrent git actions. Bitbucket might have a limited number of threads reserved for serving git requests - if you drive this value to high you might block your CI, colleagues or even crash bitbucket. Max=100",
default_value = "5"
)]
pub concurrency: usize,
#[structopt(
short = "Q",
long = "git_quiet",
long = "git-quiet",
name = "git_quiet",
help = "Suppress warnings from failed git actions."
)]
pub quiet: bool,
#[structopt(
long = "output_directory",
long = "output-directory",
help = "Suppress warnings from failed git actions.",
default_value = "."
)]
Expand Down Expand Up @@ -232,3 +253,15 @@ impl BitBucketOpts {
.collect()
}
}
#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_multi_time_parse() {
assert_eq!(
parse_duration::parse("2.5s500ms").unwrap(),
Duration::from_secs(3)
)
}
}
4 changes: 4 additions & 0 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bitbucket_server_cli::{
cloner::Cloner,
types::{BitBucketOpts, CloneType, GitOpts},
};
use std::time::Duration;

fn env(key: &str) -> Result<String> {
match std::env::var(key) {
Expand Down Expand Up @@ -43,7 +44,10 @@ fn opts() -> Result<CloneOpts> {
password_from_env: false,
clone_type: CloneType::SSH,
all: false,
timeout: Duration::from_secs(1),
retries: 0,
project_keys: vec![env("BITBUCKET_PROJECT")?],
backoff: None,
},
git_opts: GitOpts {
reset_state: false,
Expand Down

0 comments on commit 735e61f

Please sign in to comment.