Skip to content

Commit

Permalink
setup parallelisation and multi-survey bootstrap; fix #41; fix #44
Browse files Browse the repository at this point in the history
  • Loading branch information
ernestguevarra committed Jan 7, 2025
1 parent 6e49636 commit f7a154a
Show file tree
Hide file tree
Showing 24 changed files with 7,796 additions and 7,670 deletions.
4 changes: 4 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ export(bootBW)
export(bootClassic)
export(bootPROBIT)
export(boot_bw)
export(boot_bw_parallel)
export(boot_bw_sample_clusters)
export(boot_bw_sample_within_clusters)
export(boot_bw_sequential)
export(boot_bw_weight)
export(recode)
importFrom(car,bcPower)
Expand All @@ -14,6 +16,8 @@ importFrom(cli,cli_abort)
importFrom(cli,cli_alert_success)
importFrom(cli,cli_bullets)
importFrom(doParallel,registerDoParallel)
importFrom(foreach,"%:%")
importFrom(foreach,"%do%")
importFrom(foreach,"%dopar%")
importFrom(foreach,foreach)
importFrom(parallel,makeCluster)
Expand Down
4 changes: 2 additions & 2 deletions R/bbw.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@
#' @importFrom withr with_options
#' @importFrom parallelly availableCores
#' @importFrom parallel makeCluster
#' @importFrom foreach foreach %dopar%
#' @importFrom foreach foreach %dopar% %do% %:%
#' @importFrom doParallel registerDoParallel
#' @importFrom cli cli_abort cli_bullets cli_alert_success
#'
"_PACKAGE"

## quiets concerns of R CMD check re: the psus and THRESHOLD that appear in bbw
if(getRversion() >= "2.15.1") utils::globalVariables(c("psu", "THRESHOLD"))
if(getRversion() >= "2.15.1") utils::globalVariables(c("psu", "THRESHOLD", "i"))
2 changes: 1 addition & 1 deletion R/bootBW.r
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ bootBW <- function(x, w, statistic,
check_data(x)

## Check weights ----
check_weights(w = w)
w <- boot_bw_weight(w)

## Check params ----
params <- check_params(x = x, params = params)
Expand Down
2 changes: 1 addition & 1 deletion R/bootClassic.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#' @examples
#' # Example call to bootClassic function
#' sampled_clusters <- boot_bw_sample_clusters(
#' x = indicatorsHH, df_weighted = boot_bw_weight(villageData)
#' x = indicatorsHH, w = boot_bw_weight(villageData)
#' )
#'
#' boot <- boot_bw_sample_within_clusters(sampled_clusters)
Expand Down
2 changes: 1 addition & 1 deletion R/bootProbit.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#' @examples
#' # Example call to bootBW function:
#' sampled_clusters <- boot_bw_sample_clusters(
#' x = indicatorsCH1, df_weighted = boot_bw_weight(villageData)
#' x = indicatorsCH1, w = boot_bw_weight(villageData)
#' )
#'
#' boot <- boot_bw_sample_within_clusters(sampled_clusters)
Expand Down
227 changes: 196 additions & 31 deletions R/boot_bw.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
#' function is discussed here.
#'
#' @inheritParams bootBW
#' @param df_weighted A [data.frame()] based on `w` with additional variables
#' for `weight` and `cumWeight`. This is usually produced using the function
#' [boot_bw_weight()].
#' @param strata A character value for name of variable in `x` providing
#' information on how `x` is grouped such that resampling is performed for
#' each group. Default to NULL for no grouping and resampling is performed for
#' full data.
#' @param parallel Logical. Should resampling be done in parallel? Default to
#' TRUE.
#' @param cores The number of computer cores to use or number of child processes
#' to be run simultaneously. Default to one less than the available number of
#' cores on current machine.
Expand Down Expand Up @@ -44,58 +47,218 @@
boot_bw <- function(x, w, statistic,
params, outputColumns = params,
replicates = 400,
strata = NULL,
parallel = TRUE,
cores = parallelly::availableCores(omit = 1)) {
## Get cumulative weights for clusters ----
w <- boot_bw_weight(w)
# Get cumulative weights for clusters ----
w <- suppressMessages(boot_bw_weight(w), classes = "cliMessage")

## Check data ----
check_data(x)
# Check data ----
suppressMessages(check_data(x), classes = "cliMessage")

## Check params ----
params <- check_params(x = x, params = params)
# Check params ----
params <- suppressMessages(
check_params(x = x, params = params), classes = "cliMessage"
)

## Setup parallelism ----
if (parallel) {
boot <- boot_bw_parallel(
x = x, w = w, statistic = statistic, params = params,
outputColumns = outputColumns, replicates = replicates, strata = strata,
cores = cores
)
} else {
boot <- boot_bw_sequential(
x = x, w = w, statistic = statistic, params = params,
outputColumns = outputColumns, replicates = replicates, strata = strata
)
}

## Return boot ----
boot
}


#'
#' @export
#' @rdname boot_bw
#'

boot_bw_parallel <- function(x, w, statistic,
params, outputColumns = params,
replicates = 400,
strata = NULL,
cores = parallelly::availableCores(omit = 1)) {
## Setup parallelism ----
cli::cli_progress_step("Setting up {.strong {cores}} parallel operations")
cl <- parallel::makeCluster(cores)
doParallel::registerDoParallel(cl)

## Resample ----
boot <- foreach::foreach(seq_len(replicates), .combine = rbind) %dopar% {
## Sample clusters ----
sampled_clusters <- boot_bw_sample_clusters(x = x, df_weighted = w)
if (is.null(strata)) {
cli::cli_progress_step(
"Resampling with {.strong {replicates}} replicates in parallel"
)

## Sample within selected clusters ----
xBW <- boot_bw_sample_within_clusters(sampled_clusters)
boot <- foreach::foreach(seq_len(replicates), .combine = rbind) %dopar% {
## Sample clusters ----
sampled_clusters <- boot_bw_sample_clusters(x = x, w = w)

## Sample within selected clusters ----
xBW <- boot_bw_sample_within_clusters(sampled_clusters)

## Apply statistic ----
statistic(xBW, params)
}

## Apply statistic ----
statistic(xBW, params)
}
## Tidy output data.frame ----
cli::cli_progress_step("Tidying up resampling outputs")
boot <- tidy_boot(boot, outputColumns = outputColumns)
} else {
cli::cli_progress_step(
"Resampling by {.strong {strata}} - {.strong {replicates}} replicates in parallel"
)

## Rename output data.frame ----
boot <- as.data.frame(boot)
row.names(boot) <- NULL
names(boot) <- outputColumns
boot <- foreach::foreach(i = unique(w[[strata]])) %:%
foreach::foreach(seq_len(replicates), .combine = rbind) %dopar% {
## Subset x to strata ----
y <- x[which(x[[strata]] == i), ]

## Subset df_weighted to strata ----replicates = 400,
z <- w[which(w[[strata]] == i), ]

## Sample clusters ----
sampled_clusters <- boot_bw_sample_clusters(x = y, w = z)

## Sample within selected clusters ----
xBW <- boot_bw_sample_within_clusters(sampled_clusters)

## Apply statistic ----
statistic(xBW, params)
}

## Re-structure boot to identify outputs list and rename data.frames ----
cli::cli_progress_step("Tidying up resampling outputs")
boot <- tidy_boot(
boot, w = w, strata = strata, outputColumns = outputColumns
)
}

## Stop parallelism ----
cli::cli_progress_step("Closing {.strong {cores}} parallel operations")
parallel::stopCluster(cl)

## Return boot ----
boot
}


#'
#' @export
#' @rdname boot_bw
#'

boot_bw_sequential <- function(x, w, statistic,
params, outputColumns = params,
replicates = 400,
strata = NULL) {
x_name <- deparse(substitute(x))
stat_name <- deparse(substitute(statistic))

cli::cli_h2("Resampling sequentially")
## Resample ----
if (is.null(strata)) {
cli::cli_h3("Resampling with {.strong {replicates}} replicates")
cli::cli_progress_bar("Resampling", total = replicates, clear = FALSE)
boot <- foreach::foreach(
replicate = seq_len(replicates), .combine = rbind
) %do% {
## Sample clusters ----
sampled_clusters <- boot_bw_sample_clusters(x = x, w = w)

## Sample within selected clusters ----
xBW <- boot_bw_sample_within_clusters(sampled_clusters)

## Apply statistic ----
cli::cli_progress_update()
statistic(xBW, params)
}

## Complete progress bar ----
cli::cli_progress_done()

## Tidy output data.frame ----
cli::cli_progress_step("Tidying up resampling outputs")
boot <- tidy_boot(boot, outputColumns = outputColumns)
} else {
cli::cli_h3(
"Resampling by {.strong {strata}} with {.strong {replicates}} replicates"
)

boot <- foreach::foreach(i = unique(w[[strata]])) %:%
foreach::foreach(replicate = seq_len(replicates), .combine = rbind) %do% {
cli::cli_progress_message(
"Resampling {.strong {strata} -} {.strong {i}} sequentially: replicate {.strong {replicate}}"
)

## Subset x to strata ----
y <- x[which(x[[strata]] == i), ]

## Subset df_weighted to strata ----
z <- w[which(w[[strata]] == i), ]

## Sample clusters ----
sampled_clusters <- boot_bw_sample_clusters(x = y, w = z)

## Sample within selected clusters ----
xBW <- boot_bw_sample_within_clusters(sampled_clusters)

## Apply statistic ----
cli::cli_progress_update()
statistic(xBW, params)
}

## Re-structure boot to identify outputs list and rename data.frames ----
cli::cli_progress_step("Tidying up resampling outputs")
boot <- tidy_boot(
boot, w = w, strata = strata, outputColumns = outputColumns
)
}

## Return boot ----
boot
}


#'
#' @export
#' @rdname boot_bw
#'

boot_bw_weight <- function(w) {
## Check weights ----
check_weights(w = w)
req_names <- c("psu", "pop", "weight", "cumWeight")
names_check <- req_names %in% names(w)
names_in <- req_names[names_check]
names_out <- req_names[!names_check]

## Scale weights and accumulate weights
w$weight <- w$pop / sum(w$pop)
w$cumWeight <- cumsum(w$weight)
if (all(names_check)) {
cli::cli_alert_success(
"{.arg w} has the needed variables with the appropriate names"
)
} else {
if (all(c("psu", "pop") %in% names_in)) {
## Scale weights and accumulate weights
w$weight <- w$pop / sum(w$pop)
w$cumWeight <- cumsum(w$weight)
} else {
cli::cli_abort(
"{.arg w} doesn't have the needed variables or they are not named appropriately"
)
}
}

## Return w
## Return w ----
w
}

Expand All @@ -116,9 +279,9 @@ boot_bw_sample_cluster <- function(p, w) {
#' @rdname boot_bw
#'

boot_bw_sample_clusters <- function(x, df_weighted, index = FALSE) {
boot_bw_sample_clusters <- function(x, w, index = FALSE) {
## Get number of clusters ----
nClusters <- nrow(df_weighted)
nClusters <- nrow(w)

## Get vector of random probabilities ----
p <- runif(n = nClusters)
Expand All @@ -127,7 +290,7 @@ boot_bw_sample_clusters <- function(x, df_weighted, index = FALSE) {
selected_clusters <- lapply(
X = p,
FUN = boot_bw_sample_cluster,
w = df_weighted
w = w
) |>
unlist()

Expand Down Expand Up @@ -169,3 +332,5 @@ boot_bw_sample_within_clusters <- function(cluster_df) {
) |>
do.call(rbind, args = _)
}


Loading

0 comments on commit f7a154a

Please sign in to comment.