From 023ad23854be0cfeb7b1f39da3cc9c116995b2da Mon Sep 17 00:00:00 2001 From: "mats.gsl" Date: Thu, 8 Feb 2024 17:01:54 +0000 Subject: [PATCH] deal with concurrency issue in import_docs - merge cbimport into run_imports --- scripts/VXingest_utilities/import_docs.sh | 123 ---------------------- scripts/VXingest_utilities/run-import.sh | 14 +-- 2 files changed, 8 insertions(+), 129 deletions(-) delete mode 100755 scripts/VXingest_utilities/import_docs.sh diff --git a/scripts/VXingest_utilities/import_docs.sh b/scripts/VXingest_utilities/import_docs.sh deleted file mode 100755 index 06d99e4..0000000 --- a/scripts/VXingest_utilities/import_docs.sh +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env bash - -function usage() { - echo "Usage $0 -c credentials-file -p full_path_to_json_files_directory, -l log_dir [-n number_of_processes (default 1)]" - echo "(The number_of_processes must be less than or equal to nproc)." - echo "The credentials-file specifies cb_host, cb_user, and cb_password." - echo "This script assumes that you have cloned VXingest into ${HOME}/VXingest" - echo "If you cloned it elsewhere, make a link." - echo "This script uses cbimport with 'number_of_processes' cbimport processes running simultaneously." - echo "The jason files in 'full_path_to_json_files_directory' will be seperated into (number_of_files / num_processes)" - echo "groups and imported simultaneously. Output is written to 'logdir/cbimport_n.log' where n is the instance number." - echo "sample invocation...." - echo "${HOME}VXingest/scripts/VXingest_utilities/import_docs.sh -c credentials_file -p /data/grib2_to_cb/output -n 6 -l ${HOME}/VXingest/logs" - exit 1 -} -number_of_processes=1 -number_of_cpus=$(nproc) -while getopts 'c:p:n:l:' param; do - case "${param}" in - c) - credentials_file=${OPTARG} - if [ ! -f "${credentials_file}" ]; then - echo "${credentials_file} does not exist" - usage - fi - ;; - p) - input_file_path=${OPTARG} - if [ ! -d "${input_file_path}" ]; then - echo "${input_file_path} does not exist" - usage - fi - ;; - n) - number_of_processes=${OPTARG} - if [ ! "${number_of_processes}" -le "${number_of_cpus}" ]; then - echo "${number_of_processes} exceeds ${number_of_cpus}" - usage - fi - ;; - l) - log_dir=${OPTARG} - if [ ! -d "${log_dir}" ]; then - echo "log directory \"${log_dir}\" does not exist" - usage - fi - ;; - *) - echo "wrong parameter, I don't do ${param}" - usage - ;; - esac -done - -if [ ! -f "${credentials_file}" ]; then - echo "no credentials_file specified" - usage -fi -if [ ! -d "${input_file_path}" ]; then - echo "no input_file_path specified" - usage -fi -if [ ! -d "${log_dir}" ]; then - echo "no log_dir specified - using stdout" -fi - -if [ "${number_of_processes}" -gt "$number_of_cpus" ]; then - echo "${number_of_processes} exceeds ${number_of_cpus}" - usage -fi - -host=$(grep cb_host ${credentials_file} | awk '{print $2}') -user=$(grep cb_user ${credentials_file} | awk '{print $2}') -pwd=$(grep cb_password ${credentials_file} | awk '{print $2}') -bucket=$(grep cb_bucket ${credentials_file} | awk '{print $2}') -collection=$(grep cb_collection ${credentials_file} | awk '{print $2}') -scope=$(grep cb_scope ${credentials_file} | awk '{print $2}') -if [ -z "${host}" ]; then - echo "credentials do not specify cb_host" - usage -fi -# if it is a multinode host split on ',' and take the first one -IFS=',' -read -ra hostarr <<< "$host" -host=${hostarr[0]} - -if [ -z "${user}" ]; then - echo "credentials do not specify cb_user" - usage -fi -if [ -z "${pwd}" ]; then - echo "credentials do not specify cb_password" - usage -fi - -do_import() { - file_list=$1 - sleep 10 - cat ${file_list} | while read f; do - echo "cbimport json --cluster couchbase://${host} --bucket ${bucket} --scope-collection-exp ${scope}.${collection} --username ${user} --password ${pwd} --format list --generate-key %id% --dataset file:///${f}" - cbimport json --cluster couchbase://${host} --bucket ${bucket} --scope-collection-exp ${scope}.${collection} --username ${user} --password ${pwd} --format list --generate-key %id% --dataset file:///${f} - done -} - -curdir=$(pwd) -tmp_dir=$(mktemp -d -t cbimport_files-XXXXXXXXXX) -cd ${tmp_dir} -# create a tmp log dir so that multiple instances will not step on each other -tmp_log_dir="${tmp_dir}/logs" -mkdir ${tmp_log_dir} -find ${input_file_path} -name "*.json" | split -d -l $(($(find ${input_file_path} -name "*.json" | wc -l) / ${number_of_processes} + 1)) -# each file is a list of files -echo "Start $(date +%s)" -for f in ${tmp_dir}/*; do - fname=$(basename ${f}) - do_import ${f} > ${tmp_log_dir}/${fname} 2>&1 & -done -echo "cbimport commands submitted, now waiting" -wait -echo "cbimport commands submitted, done waiting" -echo "Stop $(date +%s)" -cd ${curdir} -cp -a ${tmp_log_dir} ${log_dir} diff --git a/scripts/VXingest_utilities/run-import.sh b/scripts/VXingest_utilities/run-import.sh index dcef4c5..d370dad 100755 --- a/scripts/VXingest_utilities/run-import.sh +++ b/scripts/VXingest_utilities/run-import.sh @@ -160,8 +160,9 @@ ls -1 ${load_dir}/*.gz | while read f; do log_file=`ls -1 ${data_dir}/*.log` echo "processing log_file ${log_file}" log_dir=$(dirname ${log_file}) + mkdir -p ${log_dir} log_file_name=$(basename $log_file) - import_log_file="${log_dir}/import-${log_file_name}" + import_log_file="${log_dir}import-${log_file_name}" echo "import log file will be: ${import_log_file}" # run the import job metric_name=$(grep metric_name ${log_file} | awk '{print $6}') # Grab the desired column from the python log format @@ -169,11 +170,12 @@ ls -1 ${load_dir}/*.gz | while read f; do import_metric_name="import_${metric_name}" echo "import metric name will be ${import_metric_name}" echo "metric_name ${import_metric_name}" > ${import_log_file} - echo "RUNNING - scripts/VXingest_utilities/import_docs.sh -c ${credentials_file} -p ${data_dir} -n 6 -l logs >> ${import_log_file}" - scripts/VXingest_utilities/import_docs.sh -c ${credentials_file} -p ${data_dir} -n $(nproc) -l logs 2>&1 >> ${import_log_file} - exit_code=$? - wait - echo "exit_code:${exit_code}" >> ${import_log_file} + number_of_cpus=$(nproc) + + for json_f in ${data_dir}/*.json; do + fname=$(basename ${json_f}) + ${HOME}/cbtools/bin/cbimport json --threads ${number_of_cpus} --cluster couchbase://${cb_host} --bucket ${bucket} --scope-collection-exp ${scope}.${collection} --username ${cb_user} --password ${cb_pwd} --format list --generate-key %id% --dataset file:///${json_f} + done if [[ "${exit_code}" -ne "0" ]]; then echo "import failed for $f exit_code:${exit_code}" failed_import_count=$((failed_import_count+1))