Skip to content

Commit

Permalink
new: workers quota on api sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
evilsocket committed Jul 19, 2024
1 parent 77c5c3f commit 27751aa
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) async fn start(opts: Options) -> Result<(), Error> {

log::info!("starting api on http://{} ...", &address);

let state = Arc::new(RwLock::new(State::new()));
let state = Arc::new(RwLock::new(State::new(opts.concurrency)));

HttpServer::new(move || {
App::new()
Expand Down
41 changes: 36 additions & 5 deletions src/api/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
process::Stdio,
sync::{Arc, Mutex},
sync::{atomic::AtomicU64, Arc, Mutex},
time::SystemTime,
};

Expand Down Expand Up @@ -151,6 +151,8 @@ impl Wrapper {
client: String,
session_id: uuid::Uuid,
argv: Vec<String>,
taken_workers: usize,
avail_workers: Arc<AtomicU64>,
) -> Result<Self, Error> {
let app = get_current_exe()?;

Expand Down Expand Up @@ -212,6 +214,9 @@ impl Wrapper {
Some(Completion::with_error(error.to_string()));
}
}

// free the workers
avail_workers.fetch_add(taken_workers as u64, std::sync::atomic::Ordering::Relaxed);
});

Ok(Self {
Expand Down Expand Up @@ -239,12 +244,17 @@ impl Wrapper {
#[derive(Serialize)]
pub(crate) struct State {
sessions: HashMap<uuid::Uuid, Wrapper>,
available_workers: Arc<AtomicU64>,
}

impl State {
pub fn new() -> Self {
pub fn new(concurrency: usize) -> Self {
let sessions = HashMap::new();
Self { sessions }
let available_workers = Arc::new(AtomicU64::new(concurrency as u64));
Self {
sessions,
available_workers,
}
}

pub async fn start_new_session(
Expand All @@ -255,13 +265,34 @@ impl State {
// TODO: change all errors and results to anyhow

// validate argv
let _ = Options::try_parse_from(&argv).map_err(|e| e.to_string())?;
let opts = Options::try_parse_from(&argv).map_err(|e| e.to_string())?;
let avail_workers = self
.available_workers
.load(std::sync::atomic::Ordering::Relaxed) as usize;
if opts.concurrency > avail_workers {
return Err(format!(
"can't start new session, {avail_workers} available workers"
));
}

self.available_workers.fetch_sub(
opts.concurrency as u64,
std::sync::atomic::Ordering::Relaxed,
);

let session_id = uuid::Uuid::new_v4();

// add to active sessions
self.sessions.insert(
session_id.clone(),
Wrapper::start(client, session_id, argv).await?,
Wrapper::start(
client,
session_id,
argv,
opts.concurrency,
self.available_workers.clone(),
)
.await?,
);

Ok(session_id)
Expand Down

0 comments on commit 27751aa

Please sign in to comment.