diff --git a/app.R b/app.R index bb8e4a6..5a8f478 100644 --- a/app.R +++ b/app.R @@ -280,13 +280,8 @@ server <- function(input, output, session) { job_records_cache() }) - interactive_jobs <- reactive({ - if(input$adjust_interactive) - get_interactive_jobs(elastic_con, jobs = job_records()) - }) - job_breakdown <- reactive({ - generate_job_statistics(df = job_records(), adjust_cpu = input$adjust_cpu, adjust_interactive = interactive_jobs()) + generate_job_statistics(df = job_records(), adjust_cpu = input$adjust_cpu, adjust_interactive = input$adjust_interactive) }) output$job_breakdown <- DT::renderDT({ @@ -303,14 +298,9 @@ server <- function(input, output, session) { if (info$col != job_type_index) return() # do nothing if the clicked cell is not in the job_type column selected_job_type <- df[info$row, 'job_type', drop = TRUE] - dt <- filter(job_records(), job_type == selected_job_type) %>% slice_sample(n = 10) - - records <- get_docs_by_ids( - con = elastic_con, - ids = dt$`_id`, - timestamps = dt$timestamp, - fields = c('Job', 'Command', 'Job_Efficiency_Raw_Percent', 'RAW_MAX_MEM_EFFICIENCY_PERCENT', 'MEM_REQUESTED_MB', 'RUN_TIME_SEC') - ) %>% + records <- job_records() %>% + filter(job_type == selected_job_type) %>% + slice_sample(n = 10) %>% prepare_commands_table() showModal( @@ -330,7 +320,7 @@ server <- function(input, output, session) { generate_job_statistics( df = job_records(), adjust_cpu = input$adjust_cpu, - adjust_interactive = interactive_jobs(), + adjust_interactive = input$adjust_interactive, time_bucket = input$time_bucket ) }) diff --git a/src/constants.R b/src/constants.R index 5920f5f..8321950 100644 --- a/src/constants.R +++ b/src/constants.R @@ -29,6 +29,9 @@ Assume that successful processes requiring 1 cpu do not waste cpu. time_buckets <- c("day", "week", "month") elastic_bucket_aggregations <- c("terms", "multi_terms", "date_histogram") +success <- 'Success' +failed <- 'Failed' + raw_stats_elastic_columns <- c( 'Job', 'NUM_EXEC_PROCS', 'AVAIL_CPU_TIME_SEC', 'WASTED_CPU_SECONDS', @@ -40,6 +43,8 @@ elastic_column_map <- c( 'mem_avail_mb_sec' = 'MEM_REQUESTED_MB_SEC', 'mem_wasted_mb_sec' = 'WASTED_MB_SECONDS', 'cpu_wasted_sec' = 'WASTED_CPU_SECONDS', + 'raw_mem_wasted_mb_sec' = 'RAW_WASTED_MB_SECONDS', + 'raw_cpu_wasted_sec' = 'RAW_WASTED_CPU_SECONDS', 'procs' = 'NUM_EXEC_PROCS', 'job_status' = 'Job', 'job_name' = 'JOB_NAME', diff --git a/src/elastic_helpers.R b/src/elastic_helpers.R index 9ed29cd..831746c 100644 --- a/src/elastic_helpers.R +++ b/src/elastic_helpers.R @@ -396,19 +396,3 @@ scroll_elastic <- function(con, body, fields) { return(df) } - -get_docs_by_ids <- function (con, ids, timestamps, fields = NULL) { - index_prefix <- stringr::str_remove(attr(con, 'index'), "\\*$") - indexes <- paste0(index_prefix, 'farm-', format(timestamps, "%Y.%m.%d")) - - res <- docs_mget( - conn = con, - index_type_id = purrr::map2(ids, indexes, ~ c(.y, '_doc', .x)), - source = fields - ) - - ids <- purrr::map_chr(res$docs, magrittr::extract2, i = '_id') - lapply(res$docs, magrittr::extract2, i = '_source') %>% - data.table::rbindlist() %>% - mutate(`_id` = ids) -} diff --git a/src/stat_helpers.R b/src/stat_helpers.R index edb0b0f..ecf5767 100644 --- a/src/stat_helpers.R +++ b/src/stat_helpers.R @@ -150,13 +150,12 @@ get_job_records <- function (con, query) { df <- scroll_elastic( con = con, body = list(query = query), - fields = c('timestamp', 'JOB_NAME', 'Job', - 'NUM_EXEC_PROCS', 'AVAIL_CPU_TIME_SEC', 'WASTED_CPU_SECONDS', - 'MEM_REQUESTED_MB', 'MEM_REQUESTED_MB_SEC', 'WASTED_MB_SECONDS') + fields = c('timestamp', 'JOB_NAME', 'Job', 'RUN_TIME_SEC', 'Command', + 'NUM_EXEC_PROCS', 'RAW_WASTED_CPU_SECONDS', 'MEM_REQUESTED_MB', 'RAW_WASTED_MB_SECONDS') ) df %>% - annotate_jupyter_jobs(con, query) %>% + annotate_jupyter_jobs() %>% prepare_job_records() } @@ -164,24 +163,27 @@ prepare_job_records <- function (df) { df %>% mutate(timestamp = lubridate::as_datetime(timestamp)) %>% rename_raw_elastic_fields() %>% - mutate(job_type = parse_job_type(job_name), .keep = 'unused') + mutate( + mem_avail_mb_sec = as.numeric(RUN_TIME_SEC) * MEM_REQUESTED_MB, + cpu_avail_sec = as.numeric(RUN_TIME_SEC) * procs, + cpu_wasted_sec = ifelse(job_status == success, raw_cpu_wasted_sec, cpu_avail_sec), + mem_wasted_mb_sec = ifelse(job_status == success, raw_mem_wasted_mb_sec, mem_avail_mb_sec), + job_type = parse_job_type(job_name) + ) %>% + select(-job_name) } -annotate_jupyter_jobs <- function (df, con, query) { - ids <- get_jupyter_jobs(con, query) +annotate_jupyter_jobs <- function (df) { + ids <- get_jupyter_jobs(df) df <- assign_jupyter_job_names(df, ids) return(df) } -get_jupyter_jobs <- function (con, query) { - jupyter_filter <- build_match_phrase_filter("Command", "jupyterhub-singleuser") - query$bool$filter <- append(query$bool$filter, jupyter_filter) - b <- list(query = query) - - res <- elastic_search(con, index = attr(con, 'index'), body = b, asdf = T, size = 1e4, source = FALSE) - - df <- extract_hits_from_elastic_response(res) - df[['_id']] +get_jupyter_jobs <- function (df) { + df %>% + filter(stringr::str_detect(Command, stringr::fixed('jupyter'))) %>% + filter(stringr::str_detect(Command, stringr::fixed('spawner'))) %>% + pull(`_id`) } assign_jupyter_job_names <- function (df, ids) { @@ -189,20 +191,9 @@ assign_jupyter_job_names <- function (df, ids) { mutate(JOB_NAME = ifelse(`_id` %in% ids, "jupyter", JOB_NAME)) } -get_interactive_jobs <- function(con, jobs){ - interactive_jobs <- filter(jobs, job_type == 'interactive') - if(nrow(interactive_jobs) == 0) return(NULL) - df <- get_docs_by_ids( - con = con, - ids = interactive_jobs$`_id`, - timestamps = interactive_jobs$timestamp, - fields = c('RAW_WASTED_CPU_SECONDS', 'RAW_WASTED_MB_SECONDS') - ) -} - -generate_job_statistics <- function (df, adjust_cpu = TRUE, adjust_interactive = NULL, time_bucket = 'none') { - if(!is.null(adjust_interactive)){ - df <- adjust_interactive_statistics(df, interactive_jobs = adjust_interactive) +generate_job_statistics <- function (df, adjust_cpu = TRUE, adjust_interactive = FALSE, time_bucket = 'none') { + if(adjust_interactive){ + df <- adjust_interactive_statistics(df) } if(adjust_cpu){ diff --git a/src/table_helpers.R b/src/table_helpers.R index 08c7148..41e2855 100644 --- a/src/table_helpers.R +++ b/src/table_helpers.R @@ -59,25 +59,23 @@ adjust_statistics <- function (df) { df <- df %>% mutate( mem_wasted_cost = mem_wasted_mb_sec * ram_mb_second, - cpu_wasted_sec = ifelse(job_status == 'Success' & procs == 1, 0, cpu_wasted_sec) + cpu_wasted_sec = ifelse(job_status == success & procs == 1, 0, cpu_wasted_sec) ) if('wasted_cost' %in% names(df)) - df <- mutate(df, wasted_cost = ifelse(job_status == 'Success' & procs == 1, mem_wasted_cost, wasted_cost)) + df <- mutate(df, wasted_cost = ifelse(job_status == success & procs == 1, mem_wasted_cost, wasted_cost)) return(df) } -adjust_interactive_statistics <- function (df, interactive_jobs) { +adjust_interactive_statistics <- function (df) { is_interactive <- df$job_type == 'interactive' df %>% - left_join(interactive_jobs, by = '_id') %>% mutate( - cpu_wasted_sec = ifelse(is_interactive, RAW_WASTED_CPU_SECONDS, cpu_wasted_sec), - mem_wasted_mb_sec = ifelse(is_interactive, RAW_WASTED_MB_SECONDS, mem_wasted_mb_sec), - job_status = ifelse(is_interactive, 'Success', job_status) - ) %>% - select(-c('RAW_WASTED_CPU_SECONDS', 'RAW_WASTED_MB_SECONDS')) + cpu_wasted_sec = ifelse(is_interactive, raw_cpu_wasted_sec, cpu_wasted_sec), + mem_wasted_mb_sec = ifelse(is_interactive, raw_mem_wasted_mb_sec, mem_wasted_mb_sec), + job_status = ifelse(is_interactive, success, job_status) + ) } generate_wasted_cost <- function (df) { @@ -214,22 +212,26 @@ generate_efficiency_stats <- function(df, extra_stats = list()) { prepare_commands_table <- function (df) { df %>% - select(-`_id`) %>% - mutate(MEM_REQUESTED = convert_bytes(MEM_REQUESTED_MB, from = 'mb', to = 'b'), .keep = 'unused') %>% - rename(RUN_TIME = RUN_TIME_SEC) %>% + mutate( + MEM_REQUESTED = convert_bytes(MEM_REQUESTED_MB, from = 'mb', to = 'b'), + raw_cpu_efficiency = 100 * (1 - raw_cpu_wasted_sec / cpu_avail_sec), + raw_mem_efficiency = 100 * (1 - raw_mem_wasted_mb_sec / mem_avail_mb_sec), + .keep = 'unused') %>% + select(job_status, RUN_TIME = RUN_TIME_SEC, raw_cpu_efficiency, raw_mem_efficiency, MEM_REQUESTED, Command) %>% gt::gt() %>% gt::cols_align(align = "left", columns = 'Command') %>% - gt::fmt_percent(columns = c('Job_Efficiency_Raw_Percent', 'RAW_MAX_MEM_EFFICIENCY_PERCENT'), scale_values = FALSE) %>% + gt::fmt_percent(columns = c('raw_cpu_efficiency', 'raw_mem_efficiency'), scale_values = FALSE) %>% gt::fmt_bytes(columns = MEM_REQUESTED, standard = 'binary') %>% gt::fmt_duration(RUN_TIME, input_units = 'seconds', max_output_units = 1) %>% + gt::cols_move_to_end(Command) %>% + gt::cols_move_to_start(job_status) %>% gt::cols_label( - Job_Efficiency_Raw_Percent = 'Raw CPU efficiency', - RAW_MAX_MEM_EFFICIENCY_PERCENT = 'Raw memory efficiency', + raw_cpu_efficiency = 'Raw CPU efficiency', + raw_mem_efficiency = 'Raw memory efficiency', MEM_REQUESTED = 'Memory requested', - RUN_TIME = 'Run time' - ) %>% - gt::cols_move_to_end(Command) %>% - gt::cols_move_to_start(Job) + RUN_TIME = 'Run time', + job_status = 'Job status', + ) } prepare_raw_stats_records <- function (df) { diff --git a/terraform/main.tf b/terraform/main.tf index f3ff00d..e7a7214 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -133,8 +133,8 @@ resource "openstack_networking_secgroup_rule_v2" "shinyproxy_web_port" { resource "openstack_compute_instance_v2" "server" { name = "shinyproxy-server" - image_name = "jammy-WTSI-docker_247771_4ea57c30" - flavor_name = "m1.xlarge" + image_name = "jammy-WTSI-docker_324910_82eec972" + flavor_name = "m2.xlarge" key_pair = openstack_compute_keypair_v2.kp.name security_groups = [ "default", diff --git a/tests/testthat/test_elastic_helpers.R b/tests/testthat/test_elastic_helpers.R index dbc943f..7c15845 100644 --- a/tests/testthat/test_elastic_helpers.R +++ b/tests/testthat/test_elastic_helpers.R @@ -427,6 +427,17 @@ test_that("build_match_phrase_filter works", { expect_equal(object, expected_object) }) +test_that("build_prefix_filter works", { + object <- build_prefix_filter("JOB_NAME", "nf-") + expected_object <- list( + list( + "prefix" = list("JOB_NAME" = "nf-") + ) + ) + + expect_equal(object, expected_object) +}) + test_that("get_numerical_colnames", { # with mixed columns test_df <- data.frame( diff --git a/tests/testthat/test_stat_helpers.R b/tests/testthat/test_stat_helpers.R index b3f469c..7821100 100644 --- a/tests/testthat/test_stat_helpers.R +++ b/tests/testthat/test_stat_helpers.R @@ -157,20 +157,20 @@ test_that("build_bom_aggregation works", { test_that("prepare_job_records works", { fake_data_frame <- data.frame( - AVAIL_CPU_TIME_SEC = c(800, 1000, 1200), JOB_NAME = c('nf-hello', 'wrp_job', 'another_job'), Job = c('Success', 'Success', 'Failed'), MEM_REQUESTED_MB = c(1200, 2400, 3600), - MEM_REQUESTED_MB_SEC = c(12000, 42000, 60000), NUM_EXEC_PROCS = c(1, 1, 3), timestamp = c(1732146484, 1732146665, 1732146702), - WASTED_CPU_SECONDS = c(600, 500, 700), - WASTED_MB_SECONDS = c(5000, 20000, 10000) + raw_cpu_wasted_sec = c(600, 500, 700), + raw_mem_wasted_mb_sec = c(5000, 20000, 10000), + RUN_TIME_SEC = c(800, 1000, 400) ) expected_columns <- c( - 'cpu_avail_sec', 'job_status', 'MEM_REQUESTED_MB', 'mem_avail_mb_sec', - 'procs', 'timestamp', 'cpu_wasted_sec', 'mem_wasted_mb_sec', 'job_type' + 'job_status', 'MEM_REQUESTED_MB', 'procs', 'timestamp', + 'raw_cpu_wasted_sec', 'raw_mem_wasted_mb_sec', 'RUN_TIME_SEC', 'mem_avail_mb_sec', + 'cpu_avail_sec', 'cpu_wasted_sec', 'mem_wasted_mb_sec', 'job_type' ) dt <- prepare_job_records(fake_data_frame) @@ -346,6 +346,30 @@ test_that("assign_jupyter_job_names works", { expect_equal(dt$JOB_NAME, expected_job_names) }) +test_that("get_jupyter_jobs works", { + df <- data.frame( + '_id' = c(1, 3, 2, 4), + 'Command' = c('ls', 'jupyterhub-singleuser spawner', 'bash', 'call jupyterhub-singleuser\nspawner'), + check.names = FALSE + ) + expected <- c(3, 4) + result <- get_jupyter_jobs(df) + expect_equal(result, expected) +}) + +test_that("annotate_jupyter_jobs works", { + df <- data.frame( + '_id' = c(1, 3, 2, 4), + 'Command' = c('ls', 'jupyterhub-singleuser spawner', 'bash', 'call jupyterhub-singleuser\nspawner'), + 'JOB_NAME' = c('job1', NA, 'job2', NA), + check.names = FALSE + ) + expected <- c('job1', 'jupyter', 'job2', 'jupyter') + result <- annotate_jupyter_jobs(df) + expect_s3_class(result, 'data.frame') + expect_equal(result$JOB_NAME, expected) +}) + test_that("decide_statistics_function works", { expect_identical(decide_statistics_function('all', 'all'), get_bom_statistics) expect_identical(decide_statistics_function('user1', 'all'), get_user_statistics) diff --git a/tests/testthat/test_table_helpers.R b/tests/testthat/test_table_helpers.R index 8ed3efb..445c2b1 100644 --- a/tests/testthat/test_table_helpers.R +++ b/tests/testthat/test_table_helpers.R @@ -232,9 +232,11 @@ test_that("prepare_commands_table works", { MEM_REQUESTED_MB = c(1000, 2000), RUN_TIME_SEC = c(10, 100), Command = c('rstudio', 'bash'), - Job_Efficiency_Raw_Percent = c(20, 30), - RAW_MAX_MEM_EFFICIENCY_PERCENT = c(10, 20), - Job = c('Success', 'Failed'), + cpu_avail_sec = c(500, 1000), + raw_cpu_wasted_sec = c(400, 700), + mem_avail_mb_sec = c(100, 1000), + raw_mem_wasted_mb_sec = c(90, 800), + job_status = c('Success', 'Failed'), check.names = FALSE ) @@ -267,13 +269,8 @@ test_that("adjust_interactive_statistics works", { cpu_wasted_sec = c(100, 200, 300), mem_wasted_mb_sec = c(1000, 2000, 3000), job_status = c('Success', 'Failed', 'Failed'), - check.names = FALSE - ) - - jobs <- data.frame( - `_id` = c('id1', 'id3'), - RAW_WASTED_CPU_SECONDS = c(150, 350), - RAW_WASTED_MB_SECONDS = c(1500, 3500), + raw_cpu_wasted_sec = c(150, 200, 350), + raw_mem_wasted_mb_sec = c(1500, 2000, 3500), check.names = FALSE ) @@ -282,7 +279,7 @@ test_that("adjust_interactive_statistics works", { expected_df$mem_wasted_mb_sec <- c(1500, 2000, 3500) expected_df$job_status <- c('Success', 'Failed', 'Success') - dt <- adjust_interactive_statistics(df, jobs) + dt <- adjust_interactive_statistics(df) expect_s3_class(dt, 'data.frame') expect_equal(dt, expected_df)