From 050d3c9a0903e8e1c2813678043f69d1dc520fdd Mon Sep 17 00:00:00 2001 From: "randy.pierce" Date: Fri, 9 Feb 2024 13:15:26 -0700 Subject: [PATCH] clean this up and implement rude file locking on archives --- scripts/VXingest_utilities/run-import.sh | 337 +++++++++---------- scripts/VXingest_utilities/scrape_metrics.sh | 44 +-- 2 files changed, 186 insertions(+), 195 deletions(-) diff --git a/scripts/VXingest_utilities/run-import.sh b/scripts/VXingest_utilities/run-import.sh index d370dad8..5915c5ff 100755 --- a/scripts/VXingest_utilities/run-import.sh +++ b/scripts/VXingest_utilities/run-import.sh @@ -11,6 +11,16 @@ # This script expects to have a python virtual environment in the amb-verif home directory in the subdirectory vxingest-env. # This script will generate metrics to track its success or failure, and to track how frequently it runs. +# NOTE! nproc is not available on MACOS +# To run this script you must make an alias for nproc +# alias nproc="sysctl -n hw.ncpu" +# flock is not available on MACOS +# To run this script on MACOS you must brew install flock + +# set the timezone to UTC0 so that the date command will return the correct epoch time +export TZ="UTC0" + + function usage { echo "Usage $0 -c credentials-file -l load directory -t temp_dir -m metrics_directory" echo "The credentials-file specifies cb_host, cb_user, and cb_password." @@ -19,16 +29,116 @@ function usage { echo "The metrics directory is where the scraper will place the metrics" echo "This script expects to execute inside the VxIngest directory" echo "This script expects to be run as user amb-verif" - failed_import_count=$((failed_import_count+1)) + failed_import_count=$((failed_import_count + 1)) exit 1 } - success_import_count=0 failed_import_count=0 success_scrape_count=0 failed_scrape_count=0 -start=$(date +%s) +function import_archive { + # import the data + # the data is in the temp directory + # the log file is in the temp directory + # the log file is named after the tarball + # the log file is + f=$1 + archive_dir=$2 + temp_dir=$3 + + start_epoch=$(date +%s) + t_dir=$(mktemp -d --tmpdir=${temp_dir}) + if [[ ! -d "${t_dir}" ]]; then + echo "ERROR: Failed to create VxIngest temp directory ${t_dir}" + usage + fi + # get the subdir from the tarball - all the archives are in a subdir in the tarball + subdir=$(tar -tzf ${f} | awk -F'/' '{print $1}' | uniq) + echo "processing the tar file ${f}" + data_dir="${t_dir}/${subdir}" + echo "extracting tarball ${f} to temp_dir ${t_dir}" + echo "tar -xzf ${f} -C ${t_dir}" + # NOTE: the archives are tar'd into a subdirectory so data_dir is t_dir/sub_dir + tar -xzf "${f}" -C "${t_dir}" + if [[ $? != 0 ]]; then + echo "ERROR: tarball ${f} failed to extract" + base_f=$(basename $f) + echo "moving tar file ${f} to ${archive_dir}/failed-extract-${base_f}" + failed_import_count=$((failed_import_count + 1)) + # doing cp then rm because of an issue with docker mounts on MAC + echo cp $f "${archive_dir}/failed-extract-${base_f}" + mv $f "${archive_dir}/failed-extract-${base_f}" + rm -rf ${t_dir} + failed_import_count=$((failed_import_count + 1)) + return # go to the next tar file + fi + echo "finished extracting tarball ${f} to ${t_dir}" + log_file_count=$(ls -1 ${data_dir}/*.log | wc -l) + if [[ ${log_file_count} -ne 1 ]]; then + echo "There is not just one log_file in ${data_dir} - extracted from ${f} - there are ${log_file_count}" + base_f=$(basename $f) + echo "moving tar file ${f} to ${archive_dir}/failred-too-many-log-files-${base_f}" + echo " - exiting" + # doing cp then rm because of an issue with docker mounts on MAC + echo cp $f "${archive_dir}/failed-too-many-log-files-${base_f}" + mv $f "${archive_dir}/failed-too-many-log-files-${base_f}" + rm -rf ${t_dir} + return # go to the next tar file + fi + # ok - have one log file + log_file=$(ls -1 ${data_dir}/*.log) + echo "processing log_file ${log_file}" + log_dir=$(dirname ${log_file}) + mkdir -p ${log_dir} + # run the import job + metric_name=$(grep metric_name ${log_file} | awk '{print $6}') # Grab the desired column from the python log format + echo "metric name will be ${metric_name}" + # NOTE! nproc is not available on MACOS + # To run this script you must make an alias for nproc + # alias nproc="sysctl -n hw.ncpu" + number_of_cpus=$(nproc) + + for json_f in ${data_dir}/*.json; do + fname=$(basename ${json_f}) + ${HOME}/cbtools/bin/cbimport json --threads ${number_of_cpus} --cluster couchbase://${cb_host} --bucket ${bucket} --scope-collection-exp ${scope}.${collection} --username ${cb_user} --password ${cb_pwd} --format list --generate-key %id% --dataset file:///${json_f} + done + if [[ "${exit_code}" -ne "0" ]]; then + echo "import failed for $f exit_code:${exit_code}" + failed_import_count=$((failed_import_count + 1)) + base_f=$(basename $f) + echo "moving tar file ${f} to ${archive_dir}/failed-import-${base_f}" + echo mv $f "${archive_dir}/failed-import-${base_f}" + mv $f "${archive_dir}/failed-import-${base_f}" + # don't return or remove the t_dir yet - let the scraper record the error + else + echo "import succeeded for $f success_import_count: ${success_import_count}" + success_import_count=$((success_import_count + 1)) + # save the tar file + base_f=$(basename $f) + echo "moving tar file ${f} to ${archive_dir}/success-${base_f}" + echo mv $f "${archive_dir}/success-${base_f}" + mv $f "${archive_dir}/success-${base_f}" + # don't return or remove the t_dir yet - let the scraper record the error + fi + stop_epoch=$(date +%s) + # run the scraper + sleep 2 # eventually consistent data - give it a little time + echo "RUNNING - scripts/VXingest_utilities/scrape_metrics.sh -c ${credentials_file} -i ${import_log_file} -l ${log_file} -d ${metrics_dir}" + scripts/VXingest_utilities/scrape_metrics.sh -c ${credentials_file} -b ${start_epoch} -e ${stop_epoch} -l ${log_file} -d ${metrics_dir} + exit_code=$? + if [[ "${exit_code}" -ne "0" ]]; then + failed_scrape_count=$((failed_scrape_count + 1)) + else + success_scrape_count=$((success_scrape_count + 1)) + fi + echo "removing temp dir ${t_dir}/*" + rm -rf ${t_dir} + echo "--------" +} # end import_archive + +# main +start=$(date +%s) while getopts 'c:l:t:m:' param; do case "${param}" in c) @@ -40,7 +150,7 @@ while getopts 'c:l:t:m:' param; do cb_host=$(grep cb_host ${credentials_file} | awk '{print $2}') # if it is a multinode host split on ',' and take the first one IFS=',' - read -ra hostarr <<< "$cb_host" + read -ra hostarr <<<"$cb_host" cb_host=${hostarr[0]} cb_user=$(grep cb_user ${credentials_file} | awk '{print $2}') cb_pwd=$(grep cb_password ${credentials_file} | awk '{print $2}') @@ -88,14 +198,13 @@ if [[ -z ${credentials_file} ]] || [[ -z ${load_dir} ]] || [[ -z ${metrics_dir} usage fi -pid=$$ if [ "$(whoami)" != "amb-verif" ]; then - echo "Script must be run as user: amb-verif" - usage + echo "Script must be run as user: amb-verif" + usage fi # Check the load directory for new tar balls. -# This script is expected to run in two minute intervals +# This script is expected to run in intervals # create a temporary log_dir # create an archive dir (might already exist) # The load_dir is where the program will look for the tar files @@ -107,192 +216,70 @@ if [[ ! -d "${archive_dir}" ]]; then usage fi if [ ! -w "${archive_dir}" ]; then - echo "archive directory ${archive_dir} IS NOT WRITABLE"; - usage -fi -runtime=`date +\%Y-\%m-\%d:\%H:\%M:\%S` -t_dir="${temp_dir}/${pid}" -mkdir -p "${t_dir}" -if [[ ! -d "${t_dir}" ]]; then - echo "ERROR: Failed to create VxIngest temp directory ${t_dir}" + echo "archive directory ${archive_dir} IS NOT WRITABLE" usage fi + ls -1 ${load_dir}/*.gz | while read f; do - subdir=$(tar --exclude=*/* -tzf ${f}) - echo "processing the tar file ${f}" - data_dir="${t_dir}/${subdir}" - echo "extracting tarball ${f} to temp_dir ${t_dir}" - echo "tar -xzf ${f} -C ${t_dir}" - # NOTE: the archives are tar'd into a subdirectory so data_dir is t_dir/sub_dir - tar -xzf "${f}" -C "${t_dir}" - if [[ $? != 0 ]]; then - echo "ERROR: tarball ${f} failed to extract" - base_f=$(basename $f) - echo "moving tar file ${f} to ${archive_dir}/failed-extract-${base_f}" - failed_import_count=$((failed_import_count+1)) - # doing cp then rm because of an issue with docker mounts on MAC - echo cp $f "${archive_dir}/failed-extract-${base_f}" - cp $f "${archive_dir}/failed-extract-${base_f}" - echo rm -rf $f - rm -rf $f - echo "removing temp_dir files ${data_dir}/*" - rm -rf ${data_dir} - continue # go to the next tar file - fi - echo "finished extracting tarball ${f} to ${t_dir}" - log_file_count=`ls -1 ${data_dir}/*.log | wc -l` - if [[ ${log_file_count} -ne 1 ]]; then - echo "There is not just one log_file in ${data_dir} - extracted from ${f} - there are ${log_file_count}" - base_f=$(basename $f) - echo "moving tar file ${f} to ${archive_dir}/failred-too-many-log-files-${base_f}" - echo " - exiting" - failed_import_count=$((failed_import_count+1)) - # doing cp then rm because of an issue with docker mounts on MAC - echo cp $f "${archive_dir}/failed-too-many-log-files-${base_f}" - cp $f "${archive_dir}/failed-too-many-log-files-${base_f}" - echo rm -rf $f - rm -rf $f - echo "removing temp_dir ${data_dir}" - rm -rf ${data_dir} - usage - fi - # ok - have one log file - log_file=`ls -1 ${data_dir}/*.log` - echo "processing log_file ${log_file}" - log_dir=$(dirname ${log_file}) - mkdir -p ${log_dir} - log_file_name=$(basename $log_file) - import_log_file="${log_dir}import-${log_file_name}" - echo "import log file will be: ${import_log_file}" - # run the import job - metric_name=$(grep metric_name ${log_file} | awk '{print $6}') # Grab the desired column from the python log format - echo "metric name will be ${metric_name}" - import_metric_name="import_${metric_name}" - echo "import metric name will be ${import_metric_name}" - echo "metric_name ${import_metric_name}" > ${import_log_file} - number_of_cpus=$(nproc) - - for json_f in ${data_dir}/*.json; do - fname=$(basename ${json_f}) - ${HOME}/cbtools/bin/cbimport json --threads ${number_of_cpus} --cluster couchbase://${cb_host} --bucket ${bucket} --scope-collection-exp ${scope}.${collection} --username ${cb_user} --password ${cb_pwd} --format list --generate-key %id% --dataset file:///${json_f} - done - if [[ "${exit_code}" -ne "0" ]]; then - echo "import failed for $f exit_code:${exit_code}" - failed_import_count=$((failed_import_count+1)) - echo "import failed for $f" - base_f=$(basename $f) - # doing cp then rm because of an issue with docker mounts on MAC - echo "moving tar file ${f} to ${archive_dir}/failed-import-${base_f}" - echo cp $f "${archive_dir}/failed-import-${base_f}" - cp $f "${archive_dir}/failed-import-${base_f}" - echo rm -rf $f - rm -rf $f - # don't exit - let the scraper record the error + # lock the archive + if [[ -f "${f}.lock" ]]; then + echo "skipping ${f} - it is being processed" + continue else - success_import_count=$((success_import_count+1)) - echo "import succeeded for $f success_import_count: ${success_import_count}" - # save the tar file - base_f=$(basename $f) - echo "moving tar file ${f} to ${archive_dir}/success-${base_f}" - # doing cp then rm because of an issue with docker mounts on MAC - echo cp $f "${archive_dir}/success-${base_f}" - cp $f "${archive_dir}/success-${base_f}" - echo rm -rf $f - rm -rf $f - if [[ $? != 0 ]]; then - echo "ERROR: failed to move tar file ${f} to ${archive_dir}/success-${base_f}" - failed_import_count=$((failed_import_count+1)) - # not going to exit - let the scraper record the error - fi + touch "${f}.lock" + import_archive $f $archive_dir $temp_dir + # unlock the archive (${f} should have already been archived) + rm "${f}.lock" fi - # run the scraper - sleep 2 # eventually consistent data - give it a little time - echo "RUNNING - scripts/VXingest_utilities/scrape_metrics.sh -c ${credentials_file} -i ${import_log_file} -l ${log_file} -d ${metrics_dir}" - scripts/VXingest_utilities/scrape_metrics.sh -c ${credentials_file} -i ${import_log_file} -l ${log_file} -d ${metrics_dir} - exit_code=$? - if [[ "${exit_code}" -ne "0" ]]; then - failed_scrape_count=$((failed_scrape_count+1)) - else - success_scrape_count=$((success_scrape_count+1)) - fi - # archive the log_file - # note that the log_file is in a subdirectory of the temp_dir - dirname_log_file=$(dirname $(dirname ${log_file})) - ls -l ${log_file} - if [[ ! -d ${dirname_log_file}/archive ]]; then - mkdir -p ${dirname_log_file}/archive - chmod 777 ${dirname_log_file}/archive - fi - echo mv ${log_file} ${dirname_log_file}/archive - mv ${log_file} ${dirname_log_file}/archive - ret=$? - if [[ ${ret} != 0 ]]; then - echo "ERROR: failed to move log file ${log_file} to ${dirname_log_file}/archive ret: ${ret}" - fi - # archive the import log_file - ls -l ${import_log_file} - echo mv ${import_log_file} ${dirname_log_file}/archive - mv ${import_log_file} ${dirname_log_file}/archive - ret=$? - if [[ ${ret} != 0 ]]; then - echo "ERROR: failed to move import log file ${import_log_file} to ${dirname_log_file}/archive ret: ${ret}" - fi - echo "removing temp_dir files ${data_dir}/*" - rm -rf ${data_dir}/* - echo "--------" done -# remove the data dir ($t_dir) -echo "removing data directory - ${t_dir}" -rm -rf ${t_dir} +# update metadata echo "*************************************" if [[ "${success_import_count}" -ne "0" ]]; then echo "update metadata import success count: ${success_import_count}" - LOCKDIR="/data/import_lock" + LOCKDIR="/data/import_lock" #if LOCKDIR is > 48 * 3600 seconds old, remove it - if (( $(date "+%s") - $(date -r ${LOCKDIR} "+%s") > $(( 48 * 3600 )) )); then + if (($(date "+%s") - $(date -r ${LOCKDIR} "+%s") > $((48 * 3600)))); then echo "removing old lock file" rm -rf ${LOCKDIR} fi - if mkdir -- "$LOCKDIR"; then - echo "update ceiling metadata" - mats_metadata_and_indexes/metadata_files/update_ctc_ceiling_mats_metadata.sh ${credentials_file} - ret=$? - if [[ "${ret}" -ne "0" ]]; then - echo "ceiling metadata update failed with exit code ${ret}" - fi - echo "update ceiling metadata" - mats_metadata_and_indexes/metadata_files/update_ctc_visibility_mats_metadata.sh ${credentials_file} - ret=$? - if [[ "${ret}" -ne "0" ]]; then - echo "visibility import failed with exit code ${ret}" - fi - echo "update visibility metadata" - mats_metadata_and_indexes/metadata_files/update_sums_surface_mats_metadata.sh ${credentials_file} - ret=$? - if [[ "${ret}" -ne "0" ]]; then - echo "surface import failed with exit code ${ret}" - fi - echo "update surface metadata" - if rmdir -- "$LOCKDIR" - then - echo "import finished" - else - echo "IMPORT ERROR: Could not remove import lock dir" >&2 - fi - fi + if mkdir -- "$LOCKDIR"; then + echo "update ceiling metadata" + mats_metadata_and_indexes/metadata_files/update_ctc_ceiling_mats_metadata.sh ${credentials_file} + ret=$? + if [[ "${ret}" -ne "0" ]]; then + echo "ceiling metadata update failed with exit code ${ret}" + fi + echo "update ceiling metadata" + mats_metadata_and_indexes/metadata_files/update_ctc_visibility_mats_metadata.sh ${credentials_file} + ret=$? + if [[ "${ret}" -ne "0" ]]; then + echo "visibility import failed with exit code ${ret}" + fi + echo "update visibility metadata" + mats_metadata_and_indexes/metadata_files/update_sums_surface_mats_metadata.sh ${credentials_file} + ret=$? + if [[ "${ret}" -ne "0" ]]; then + echo "surface import failed with exit code ${ret}" + fi + echo "update surface metadata" + if rmdir -- "$LOCKDIR"; then + echo "import finished" + else + echo "IMPORT ERROR: Could not remove import lock dir" >&2 + fi + fi else echo "no new data to import - import success count: ${success_import_count}" fi echo "capture metrics" end=$(date +%s) m_file=$(mktemp) -echo "run_import_duration $((end-start))" > ${m_file} -echo "run_import_success_count ${success_import_count}" >> ${m_file} -echo "run_import_failure_count ${failed_import_count}" >> ${m_file} -echo "run_scrape_success_count ${success_scrape_count}" >> ${m_file} -echo "run_scrape_failure_count ${failed_scrape_count}" >> ${m_file} -cp ${m_file} "${metrics_dir}/run_import_metrics.prom" -rm ${m_file} +echo "run_import_duration $((end - start))" >${m_file} +echo "run_import_success_count ${success_import_count}" >>${m_file} +echo "run_import_failure_count ${failed_import_count}" >>${m_file} +echo "run_scrape_success_count ${success_scrape_count}" >>${m_file} +echo "run_scrape_failure_count ${failed_scrape_count}" >>${m_file} +mv ${m_file} "${metrics_dir}/run_import_metrics.prom" echo "FINISHED" exit 0 diff --git a/scripts/VXingest_utilities/scrape_metrics.sh b/scripts/VXingest_utilities/scrape_metrics.sh index 9051b84c..8d4a868a 100755 --- a/scripts/VXingest_utilities/scrape_metrics.sh +++ b/scripts/VXingest_utilities/scrape_metrics.sh @@ -1,22 +1,30 @@ #!/usr/bin/env bash +# NOTE: This script uses date -d which is a GNU date option that is unavailable on Mac OS X. +# This script will not work on Mac OS X unless you install GNU date (gdate) +# and alias the date command to gdate in your profile +# brew install coreutils +# alias date="gdate". + +# set the timezone to UTC0 so that the date command will return the correct epoch time +export TZ="UTC0" function is_epoch_rational { if [ -z "$1" ]; then echo "is_epoch_rational: ERROR: no epoch specified" usage fi - - if [ $1 -lt $(($(date +%s)-3600*24)) ]; then - echo "is_epoch_rational: ERROR: irrational epoch - prior to yesterday at this time" + current=$(date +"%s") + if [ $1 -lt $((${current}-3600*24)) ]; then + echo "is_epoch_rational: ERROR: irrational epoch - $1 is prior to yesterday at this time ($((${current}-3600*24)))" usage fi - if [ $1 -gt $(date +%s) ]; then - echo "is_epoch_rational: ERROR: irrational epoch - beyond current time" + if [ $1 -gt ${current} ]; then + echo "is_epoch_rational: ERROR: irrational epoch - $1 is beyond current time ${current}" usage fi -return 0 + return 0 } function derive_pattern_from_ids { @@ -55,7 +63,7 @@ function get_record_count_from_log(){ } function usage { - echo "Usage $0 -c credentials-file -i import_log_file -l log_file -d textfile directory" + echo "Usage $0 -c credentials-file -b begin_epoch -e end_epoch -l log_file -d textfile directory" echo "The credentials-file specifies cb_host, cb_user, and cb_password." echo "Metrics will be written into the textfile directory (-d)" echo "The scrape_metrics.sh script scans the log_file for the intended_record_count, and any errors," @@ -63,7 +71,7 @@ function usage { exit 1 } -while getopts 'c:l:i:d:' param; do +while getopts 'c:l:b:e:d:' param; do case "${param}" in c) credentials_file=${OPTARG} @@ -90,12 +98,14 @@ while getopts 'c:l:i:d:' param; do usage fi ;; - i) - import_log_file=${OPTARG} - if [[ ! -f "${import_log_file}" ]]; then - echo "ERROR: import log file ${import_log_file} does not exist" - usage - fi + b) + start_import_epoch=${OPTARG} + ;; + e) + finish_import_epoch=${OPTARG} + # the cas meta field in couchbase is going to reflect the time that a document was imported, not when it was created. + # so add 60 seconds for latency + finish_import_epoch=$((finish_import_epoch + 60)) ;; d) textfile_dir=${OPTARG} @@ -139,12 +149,6 @@ expected_duration_seconds=0 actual_duration_seconds=$((finish_epoch - start_epoch)) error_count=${error_count} -# the cas meta field in couchbase is going to reflect the time that a document was imported, not when it was created. -# We need to get the start and stop epochs from the corresponding import log -start_import_epoch=$(grep Start ${import_log_file} | awk '{print $2}') -finish_import_epoch=$(grep Stop ${import_log_file} | awk '{print $2}') -# add 60 seconds for latency? -finish_import_epoch=$((finish_import_epoch + 60)) intended_record_count=$(get_record_count_from_log "${log_file}") # NOTE: curl URL's don't like '%' or ';' characters. replace them with '%25' and '%3B' respectively (you can leave the ';' at the end of the statement off, actually) echo "start_import_epoch is ${start_import_epoch}"