Skip to content

Commit

Permalink
FairRootGroupGH-429: Cancel SLURM jobs on DDS shutdown.
Browse files Browse the repository at this point in the history
- Automatically cancel pending and running slurm jobs on dds session shutdown. (FairRootGroupGH-429)
  • Loading branch information
AnarManafov committed Jun 25, 2022
1 parent f1037ef commit 863c8ea
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 5 deletions.
10 changes: 8 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ set(DDS_PROTOCOL_VERSION "2")
#
# Check if cmake has the required version
#
cmake_minimum_required( VERSION 3.20 FATAL_ERROR )
cmake_minimum_required( VERSION 3.23.1 FATAL_ERROR )

string(TOLOWER ${PROJECT_NAME} PROJECT_NAME_LOWER)

Expand Down Expand Up @@ -233,11 +233,17 @@ if(BUILD_TESTS)
set(Boost_Components ${Boost_Components} unit_test_framework)
endif(BUILD_TESTS)

find_package(Boost 1.72 REQUIRED COMPONENTS ${Boost_Components})
find_package(Boost 1.75 REQUIRED COMPONENTS ${Boost_Components})
if(Boost_FOUND)
set(local_boost_version "${Boost_MAJOR_VERSION}.${Boost_MINOR_VERSION}.${Boost_SUBMINOR_VERSION}")
endif(Boost_FOUND)

#
# Search for protobuf
#
find_package(Protobuf 3.15 REQUIRED)
add_subdirectory(proto)

# DDS Misc Common
message(STATUS "Build dds_misc_lib - YES")
add_subdirectory ( dds-misc-lib )
Expand Down
3 changes: 2 additions & 1 deletion ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ Fixed: On task done remove agents from the agent to tasks mapping.
Fixed: Replace std::iterator as it's deprecated (C++17).
Fixed: Tasks working directory is set to their slot directory instead of $DDS_LOCATION.
Fixed: Multiple stability issues.
Modified: Bump minimum version requirements for cmake (from 3.11.0 to 3.20) and boost (from 1.67 to 1.72). (GH-428)
Added: every DDS module logs now its pid, group id and parent pid. (GH-403)
Added: Support for Task Assets. (GH-406)
Modified: Bump minimum version requirements for cmake (from 3.11.0 to 3.20) and boost (from 1.67 to 1.72). (GH-428)
Added: Cancel running and panding SLURM jobs on DDS shutdown. (GH-429)

### dds-agent
Fixed: Address potential crash in the external process termination routines.
Expand Down
2 changes: 2 additions & 0 deletions dds-commander/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ add_executable(${PROJECT_NAME} ${SOURCE_FILES} ${HEADER_FILES})

target_link_libraries(${PROJECT_NAME}
dds_misc_lib
dds_proto_lib
dds_user_defaults_lib
dds_topology_lib
dds_protocol_lib
Expand All @@ -43,6 +44,7 @@ target_link_libraries(${PROJECT_NAME}
target_include_directories(${PROJECT_NAME}
PUBLIC
$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}>
${Protobuf_INCLUDE_DIRS}
)

install(TARGETS ${PROJECT_NAME}
Expand Down
66 changes: 66 additions & 0 deletions dds-commander/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// BOOST
#include <boost/property_tree/ini_parser.hpp>
#include <boost/property_tree/ptree.hpp>
// protobuf
#include "submit_info.pb.h"
#include "submit_info_slurm.pb.h"

using namespace std;
using namespace dds::misc;
Expand All @@ -20,6 +23,11 @@ using namespace dds::user_defaults_api;
using boost::asio::ip::tcp;
namespace fs = boost::filesystem;

// TODO: Move this to DDS commander once ToolsAPI supports protobuf.
// Ideally the commander should create and read this file. Plug-ins will only receive info data blocks via DDS
// transport.
const LPCSTR g_submitInfoFile = "submit.inf";

//=============================================================================
int createDirectories(const boost::uuids::uuid& _sid)
{
Expand Down Expand Up @@ -144,6 +152,64 @@ int main(int argc, char* argv[])
}
}

// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
// TODO: A temporary solution to cancel slurm jobs.
// ToolsAPI and DDS Plug-in API is being upgraded to use protobuf. In meantime, we cancel slurm jobs diurectloy
// from commander.

// - Loop over all submissions of this session
// - Collect Job IDs from submission metadata
// - Call scancel for the list of jobs
vector<string> jobs;
fs::path pathWorkDirLocalFiles(smart_path(CUserDefaults::instance().getValueForKey("server.work_dir")));
for (auto& f : fs::recursive_directory_iterator(pathWorkDirLocalFiles))
{
if (f.path().filename() == g_submitInfoFile)
{
dds::protocol::SubmitInfo protoSubmitInfo;
fstream input(f.path().native(), ios::in | ios::binary);
if (!protoSubmitInfo.ParseFromIstream(&input))
{
LOG(log_stderr) << "SLURM JOB CANCEL: Failed to parse job metadata." << f.path().native();
continue;
}

if (!protoSubmitInfo.mutable_rms_plugin_data()->Is<dds::protocol::SlurmSubmitInfo>())
{
LOG(log_stderr) << "SLURM JOB CANCEL: Submission metadata doesn't contain slurm job info "
<< f.path().native();
return 1;
}
dds::protocol::SlurmSubmitInfo protoSlurmSubmitInfo;
protoSubmitInfo.mutable_rms_plugin_data()->UnpackTo(&protoSlurmSubmitInfo);

if (protoSlurmSubmitInfo.slurm_job_id_size() == 0)
{
LOG(log_stderr) << "SLURM JOB CANCEL: No slurm jobs found in submission metadata"
<< f.path().native();
return 1;
}
jobs.push_back(protoSlurmSubmitInfo.slurm_job_id(0));
}
}
const fs::path scancelPath{ bp::search_path("scancel") };

stringstream ssCmd;
ssCmd << scancelPath.string();
for (const auto& id : jobs)
{
ssCmd << " " << id;
}

LOG(log_stdout) << "SLURM JOB CANCEL: " << ssCmd.str();
string sout;
string serr;
execute(ssCmd.str(), chrono::seconds(30), &sout, &serr);
if (!serr.empty())
LOG(log_stderr) << "SLURM JOB CANCEL: " << serr;

// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

return EXIT_SUCCESS;
}

Expand Down
2 changes: 1 addition & 1 deletion dds-topology-lib/src/TopoCreatorCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void CTopoCreatorCore::addDeclElements(CTopoTask::Ptr_t _task, objectMap_t& _dec
_declElements[CTopoBase::EType::TOPO_PROPERTY][property.first] =
static_pointer_cast<CTopoBase>(property.second);
}

const auto& assets = _task->getAssets();
for (const auto& asset : assets)
{
Expand Down
2 changes: 2 additions & 0 deletions plugins/dds-submit-slurm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ add_executable(${PROJECT_NAME} src/main.cpp)

target_link_libraries(${PROJECT_NAME}
PUBLIC
dds_proto_lib
dds_misc_lib
dds_user_defaults_lib
dds_tools_lib
Expand All @@ -22,6 +23,7 @@ target_link_libraries(${PROJECT_NAME}
target_include_directories(${PROJECT_NAME}
PUBLIC
$<BUILD_INTERFACE:${PROJECT_BINARY_DIR/src}>
${Protobuf_INCLUDE_DIRS}
)

install(TARGETS ${PROJECT_NAME} DESTINATION "${PROJECT_INSTALL_PLUGINSDIR}/${PROJECT_NAME}")
Expand Down
2 changes: 1 addition & 1 deletion plugins/dds-submit-slurm/src/dds-submit-slurm-worker
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ logMsg()
logMsg "Submitting DDS Job on the SLURM cluster..."
# Set execute access for job.slurm
chmod +x $RMS_SANDBOX/job.slurm
JOB_ID=$(sbatch $RMS_SANDBOX/job.slurm)
JOB_ID=$(sbatch --parsable $RMS_SANDBOX/job.slurm)
if (( $? != 0 )) ; then
logMsg "Error: Failed to submit SLURM job ($?)"
exit $?
Expand Down
43 changes: 43 additions & 0 deletions plugins/dds-submit-slurm/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
#include "SysHelper.h"
#include "ToolsProtocol.h"
#include "UserDefaults.h"
// protobuf
#include "submit_info.pb.h"
#include "submit_info_slurm.pb.h"
#include <google/protobuf/util/time_util.h>
using google::protobuf::util::TimeUtil;

using namespace std;
using namespace dds;
Expand All @@ -38,6 +43,10 @@ namespace bp = boost::process;
// file is located in the DDS server working dir
const LPCSTR g_pipeName = ".dds_slurm_pipe";
const LPCSTR g_jobIDFile = ".dds_slurm_jobid";
// TODO: Move this to DDS commander once ToolsAPI supports protobuf.
// Ideally the commander should create and read this file. Plug-ins will only receive info data blocks via DDS
// transport.
const LPCSTR g_submitInfoFile = "submit.inf";
//=============================================================================

// Command line parser
Expand Down Expand Up @@ -73,6 +82,10 @@ bool parseCmdLine(int _argc, char* _argv[], bpo::variables_map* _vm)
//=============================================================================
int main(int argc, char* argv[])
{
// Verify that the version of the library that we linked against is
// compatible with the version of the headers we compiled against.
GOOGLE_PROTOBUF_VERIFY_VERSION;

bpo::variables_map vm;
if (defaultExecSetup<bpo::variables_map>(argc, argv, &vm, &parseCmdLine) == EXIT_FAILURE)
return EXIT_FAILURE;
Expand Down Expand Up @@ -238,6 +251,36 @@ int main(int argc, char* argv[])
{
if (fs::exists(pathJobIDFile))
{
string rmsJobID;
ifstream f;
f.open(pathJobIDFile.native());
if (f.is_open())
{
f >> rmsJobID;
}
// Create submit info block
dds::protocol::SlurmSubmitInfo protoSlurmSubmitInfo;
protoSlurmSubmitInfo.add_slurm_job_id(rmsJobID);

dds::protocol::SubmitInfo protoSubmitInfo;
protoSubmitInfo.set_session_id(CUserDefaults::instance().getCurrentSID());
protoSubmitInfo.set_submission_id(submissionId);
protoSubmitInfo.set_dds_sandbox_dir(sSandboxDir);
*protoSubmitInfo.mutable_submission_timestamp() =
TimeUtil::SecondsToTimestamp(time(NULL));
protoSubmitInfo.set_rms_plugin("dds-submit-slurm");
protoSubmitInfo.mutable_rms_plugin_data()->PackFrom(protoSlurmSubmitInfo);

// Write submit info
fs::path pathSubmitInfoFile(pathWorkDirLocalFiles);
pathSubmitInfoFile /= g_submitInfoFile;
fstream output(pathSubmitInfoFile.native(), ios::out | ios::trunc | ios::binary);
if (!protoSubmitInfo.SerializeToOstream(&output))
{
proto.sendMessage(EMsgSeverity::error, "Failed to save submission metadata");
}
// < < < < < < < < < < < < <

started = true;
proto.sendMessage(EMsgSeverity::info, "DDS agents have been submitted");
break;
Expand Down
31 changes: 31 additions & 0 deletions proto/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2022 GSI, Inc. All rights reserved.
#
#
project(dds_proto_lib)

#
# All protobuf IDL files
#

file(GLOB PROTOBUF_FILELIST "${CMAKE_CURRENT_SOURCE_DIR}/*.proto")
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${PROTOBUF_FILELIST})

SET_SOURCE_FILES_PROPERTIES(${PROTO_SRC} ${PROTO_INCL} PROPERTIES GENERATED TRUE)

add_library(dds_proto_lib ${PROTO_SRCS} ${PROTO_INCL})

target_link_libraries(${PROJECT_NAME}
PUBLIC
${Protobuf_LIBRARIES}
)

target_include_directories(${PROJECT_NAME}
PUBLIC
$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}>
${Protobuf_INCLUDE_DIRS}
)

install(TARGETS ${PROJECT_NAME}
EXPORT DDSTargets
LIBRARY DESTINATION "${PROJECT_INSTALL_LIBDIR}"
)
22 changes: 22 additions & 0 deletions proto/submit_info.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 GSI, Inc. All rights reserved.
//
//
//
syntax = "proto3";

package dds.protocol;

import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";

message SubmitInfo {
string session_id = 1;
string submission_id = 2;
string dds_sandbox_dir = 3;
// Creation date
google.protobuf.Timestamp submission_timestamp = 4;
// RMS plug-in
string rms_plugin = 5;
// actual RMS plug-in data block
google.protobuf.Any rms_plugin_data = 6;
}
12 changes: 12 additions & 0 deletions proto/submit_info_slurm.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2022 GSI, Inc. All rights reserved.
//
//
//
syntax = "proto3";

package dds.protocol;

message SlurmSubmitInfo {
// RMS job ID
repeated string slurm_job_id = 1;
}

0 comments on commit 863c8ea

Please sign in to comment.