Skip to content

Commit

Permalink
Merge branch 'main' into feature_joss_submission
Browse files Browse the repository at this point in the history
  • Loading branch information
freifrauvonbleifrei committed Nov 16, 2023
2 parents 41bd491 + 6c50c52 commit d18afbe
Show file tree
Hide file tree
Showing 28 changed files with 1,775 additions and 502 deletions.
6 changes: 5 additions & 1 deletion .jenkins/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pipeline {
spack compiler remove -a [email protected]: || true # ignore if this compiler was not found
spack spec -y discotec@main -lto %${compiler} ^${mpiimpl} #+selalib # --only dependencies # actually build discotec, such that load command will work
spack install -y discotec@main -lto %${compiler} ^${mpiimpl} #+selalib # --only dependencies # actually build discotec, such that load command will work
spack install -y lz4 %${compiler}
'''
}
}
Expand Down Expand Up @@ -148,9 +149,11 @@ pipeline {
sh '''
. ../discotec-spack/spack/share/spack/setup-env.sh
spack load --only dependencies --first discotec@main -lto %${compiler} ^${mpiimpl} #+selalib
spack load --first cmake %${compiler}
spack load --first lz4 %${compiler}
mkdir -p build/${compiler}-${mpiimpl}-${build_type}
cd build/${compiler}-${mpiimpl}-${build_type}
cmake -DCMAKE_BUILD_TYPE=${build_type} -DDISCOTEC_TEST=1 -DDISCOTEC_TIMING=1 -DDISCOTEC_ENABLEFT=0 -DDISCOTEC_GENE=0 -DDISCOTEC_USE_HIGHFIVE=1 -DDISCOTEC_OPENMP=1 -DDISCOTEC_USE_LTO=0 ../..
cmake -DCMAKE_BUILD_TYPE=${build_type} -DDISCOTEC_TEST=1 -DDISCOTEC_TIMING=1 -DDISCOTEC_ENABLEFT=0 -DDISCOTEC_GENE=0 -DDISCOTEC_USE_HIGHFIVE=1 -DDISCOTEC_OPENMP=1 -DDISCOTEC_USE_LTO=0 -DDISCOTEC_WITH_COMPRESSION=1 ../..
make -j8
'''
}
Expand All @@ -169,6 +172,7 @@ pipeline {
mpiexec.${mpiimpl} -n 9 ./test_distributedcombigrid_boost --run_test=mpisystem
mpiexec.${mpiimpl} -n 9 ./test_distributedcombigrid_boost --run_test=fullgrid
mpiexec.${mpiimpl} -n 9 ./test_distributedcombigrid_boost --run_test=hierarchization --log_level=message
mpiexec.${mpiimpl} -n 9 ./test_distributedcombigrid_boost --run_test=io
mpiexec.${mpiimpl} -n 9 ./test_distributedcombigrid_boost --run_test=loadmodel
mpiexec.${mpiimpl} -n 9 ./test_distributedcombigrid_boost --run_test=reduce
mpiexec.${mpiimpl} -n 9 ./test_distributedcombigrid_boost --run_test=rescheduling
Expand Down
2 changes: 1 addition & 1 deletion examples/combi_example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ find_package(Boost REQUIRED)

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
add_executable(combi_example combi_example.cpp)
target_compile_features(combi_example PRIVATE cxx_std_17)
target_compile_features(combi_example PUBLIC cxx_std_17)
target_link_libraries(combi_example discotec Boost::boost)

install(TARGETS combi_example DESTINATION examples/combi_example)
Expand Down
2 changes: 1 addition & 1 deletion examples/distributed_advection/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ find_package(Boost REQUIRED)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

add_executable(distributed_combi_example combi_example.cpp)
target_compile_features(distributed_combi_example PRIVATE cxx_std_17)
target_compile_features(distributed_combi_example PUBLIC cxx_std_17)
target_link_libraries(distributed_combi_example discotec Boost::boost)

install(TARGETS distributed_combi_example DESTINATION examples/distributed_advection_example)
Expand Down
28 changes: 17 additions & 11 deletions examples/distributed_third_level/combi_example_worker_only.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ int main(int argc, char** argv) {
if (hasThirdLevel) {
systemNumber = cfg.get<unsigned int>("thirdLevel.systemNumber");
numSystems = cfg.get<unsigned int>("thirdLevel.numSystems");
assert(numSystems == 2);
assert(numSystems > 1);
assert(systemNumber < numSystems);
extraSparseGrid = cfg.get<bool>("thirdLevel.extraSparseGrid", true);
MIDDLE_PROCESS_EXCLUSIVE_SECTION std::cout << "running in file-based third level mode"
Expand Down Expand Up @@ -193,7 +193,7 @@ int main(int argc, char** argv) {
ctschemeFile.substr(
0, ctschemeFile.length() - std::string("_part0_00008groups.json").length()) +
"conjoint.sizes";
worker.reduceExtraSubspaceSizes(conjointSubspaceFileName, true);
worker.reduceExtraSubspaceSizes({conjointSubspaceFileName}, true);
}

OUTPUT_GROUP_EXCLUSIVE_SECTION {
Expand Down Expand Up @@ -269,26 +269,32 @@ int main(int argc, char** argv) {
<< " took: " << durationCombineWrite << " seconds" << std::endl;
}
auto startCombineRead = std::chrono::high_resolution_clock::now();
std::string readSparseGridFile;
std::vector<std::string> readSparseGridFiles;
std::vector<std::string> readSparseGridFileTokens;
if (hasThirdLevel) {
readSparseGridFile =
"dsg_" + std::to_string((systemNumber + 1) % 2) + "_step" + std::to_string(i);
std::string readSparseGridFileToken = readSparseGridFile + "_token.txt";
worker.combineReadDistributeSystemWide(readSparseGridFile, readSparseGridFileToken, false,
for (unsigned int otherSystemNumber = 0; otherSystemNumber < numSystems;
++otherSystemNumber) {
if (otherSystemNumber != systemNumber) {
readSparseGridFiles.emplace_back("dsg_" + std::to_string(otherSystemNumber) + "_step" +
std::to_string(i));
readSparseGridFileTokens.emplace_back(readSparseGridFiles.back() + "_token.txt");
}
}
worker.combineReadDistributeSystemWide(readSparseGridFiles, readSparseGridFileTokens, false,
false);

} else {
readSparseGridFile = writeSparseGridFile;
worker.combineReadDistributeSystemWide(readSparseGridFile, writeSparseGridFileToken, true,
false);
readSparseGridFiles.emplace_back(writeSparseGridFile);
worker.combineReadDistributeSystemWide(readSparseGridFiles, {writeSparseGridFileToken},
true, false);
}
MIDDLE_PROCESS_EXCLUSIVE_SECTION {
auto endCombineRead = std::chrono::high_resolution_clock::now();
auto durationCombineRead =
std::chrono::duration_cast<std::chrono::seconds>(endCombineRead - startCombineRead)
.count();
std::cout << getTimeStamp() << "combination-wait/read/reduce " << i
<< " took: " << durationCombineRead << " seconds ; read " << readSparseGridFile
<< " took: " << durationCombineRead << " seconds ; read " << readSparseGridFiles
<< std::endl;
}
Stats::stopEvent("combine");
Expand Down
277 changes: 277 additions & 0 deletions examples/distributed_third_level/tools/worker_only_io_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
// to resolve https://github.com/open-mpi/ompi/issues/5157
#define OMPI_SKIP_MPICXX 1
#include <mpi.h>

#include <boost/asio.hpp>
#include <boost/serialization/export.hpp>
#include <filesystem>
#include <string>
#include <vector>

#include "combischeme/CombiMinMaxScheme.hpp"
#include "combischeme/CombiThirdLevelScheme.hpp"
#include "fault_tolerance/FaultCriterion.hpp"
#include "fault_tolerance/StaticFaults.hpp"
#include "fault_tolerance/WeibullFaults.hpp"
#include "io/BroadcastParameters.hpp"
#include "io/H5InputOutput.hpp"
#include "loadmodel/LinearLoadModel.hpp"
#include "manager/CombiParameters.hpp"
#include "manager/ProcessGroupWorker.hpp"
#include "task/Task.hpp"
#include "utils/MonteCarlo.hpp"
#include "utils/Types.hpp"

// to allow using test tasks
#define BOOST_CHECK

#include "TaskAdvection.hpp"

using namespace combigrid;

// this is necessary for correct function of task serialization
#include "utils/BoostExports.hpp"
BOOST_CLASS_EXPORT(TaskAdvection)

int main(int argc, char** argv) {
[[maybe_unused]] auto mpiOnOff = MpiOnOff(&argc, &argv);
/* when using timers (TIMING is defined in Stats), the Stats class must be
* initialized at the beginning of the program. (and finalized in the end)
*/
Stats::initialize();
auto startInit = std::chrono::high_resolution_clock::now();

// only one rank reads inputs and broadcasts to others
std::string paramfile = "ctparam";
if (argc > 1) paramfile = argv[1];
boost::property_tree::ptree cfg =
broadcastParameters::getParametersFromRankZero(paramfile, MPI_COMM_WORLD);

// number of process groups and number of processes per group
size_t ngroup = cfg.get<size_t>("manager.ngroup");
size_t nprocs = cfg.get<size_t>("manager.nprocs");

theMPISystem()->initWorldReusable(MPI_COMM_WORLD, ngroup, nprocs, false, true);
{
/* read other parameters from ctparam */
DimType dim = cfg.get<DimType>("ct.dim");
LevelVector lmin(dim), lmax(dim);
std::vector<int> p(dim);
combigrid::real dt;
size_t nsteps, ncombi;
cfg.get<std::string>("ct.lmin") >> lmin;
cfg.get<std::string>("ct.lmax") >> lmax;
cfg.get<std::string>("ct.p") >> p;
ncombi = cfg.get<size_t>("ct.ncombi");
uint32_t chunkSizeInMebibyte = cfg.get<uint32_t>("ct.chunkSize", 64);
std::string ctschemeFile = cfg.get<std::string>("ct.ctscheme", "");
dt = cfg.get<combigrid::real>("application.dt");
nsteps = cfg.get<size_t>("application.nsteps");
bool evalMCError = cfg.get<bool>("application.mcerror", false);
uint16_t numberOfFileParts = cfg.get<uint16_t>("io.numberParts", 1);
theMPISystem()->initOuputGroupComm(numberOfFileParts);

// read in third level parameters if available
std::string thirdLevelHost, thirdLevelSSHCommand = "";
unsigned int systemNumber = 0, numSystems = 1;
unsigned short thirdLevelPort = 0;
bool hasThirdLevel = static_cast<bool>(cfg.get_child_optional("thirdLevel"));
bool extraSparseGrid = true;
if (hasThirdLevel) {
systemNumber = cfg.get<unsigned int>("thirdLevel.systemNumber");
numSystems = cfg.get<unsigned int>("thirdLevel.numSystems");
assert(numSystems == 2);
assert(systemNumber < numSystems);
extraSparseGrid = cfg.get<bool>("thirdLevel.extraSparseGrid", true);
MIDDLE_PROCESS_EXCLUSIVE_SECTION std::cout << "running in file-based third level mode"
<< std::endl;
} else {
MIDDLE_PROCESS_EXCLUSIVE_SECTION std::cout << "running in file-based local mode"
<< std::endl;
}

// periodic boundary conditions
std::vector<BoundaryType> boundary(dim, 1);
auto forwardDecomposition = false;

// check whether parallelization vector p agrees with nprocs
int checkProcs = 1;
for (auto k : p) checkProcs *= k;
if (checkProcs != IndexType(nprocs)) {
throw std::invalid_argument("process group size and parallelization do not match");
}

std::vector<LevelVector> levels;
std::vector<combigrid::real> coeffs;
std::vector<size_t> taskNumbers; // only used in case of static task assignment
bool useStaticTaskAssignment = false;
if (ctschemeFile == "") {
throw std::runtime_error("No CT scheme file specified");
} else {
// read in CT scheme, if applicable
std::unique_ptr<CombiMinMaxSchemeFromFile> scheme(
new CombiMinMaxSchemeFromFile(dim, lmin, lmax, ctschemeFile));
const auto& pgroupNumber = theMPISystem()->getProcessGroupNumber();
size_t totalNumTasks =
combigrid::getAssignedLevels(*scheme, pgroupNumber, levels, coeffs, taskNumbers);
useStaticTaskAssignment = true;
MASTER_EXCLUSIVE_SECTION {
std::cout << getTimeStamp() << " Process group " << pgroupNumber << " will run "
<< levels.size() << " of " << totalNumTasks << " tasks." << std::endl;
printCombiDegreesOfFreedom(levels, boundary);
}
}

if (!useStaticTaskAssignment) {
throw std::runtime_error("Dynamic task assignment not to be used here");
}
// create load model
std::unique_ptr<LoadModel> loadmodel = std::unique_ptr<LoadModel>(new LinearLoadModel());

// create combiparameters
auto reduceCombinationDimsLmax = LevelVector(dim, 1);
// lie about ncombi, because default is to not use reduced dims for last combi step,
// which we don't want here because it makes the sparse grid too large
CombiParameters params(dim, lmin, lmax, boundary, ncombi * 2, 1,
CombinationVariant::chunkedOutgroupSparseGridReduce, p,
LevelVector(dim, 0), reduceCombinationDimsLmax, chunkSizeInMebibyte,
forwardDecomposition, thirdLevelHost, thirdLevelPort, 0);
setCombiParametersHierarchicalBasesUniform(params, "hat_periodic");
IndexVector minNumPoints(dim), maxNumPoints(dim);
for (DimType d = 0; d < dim; ++d) {
minNumPoints[d] = combigrid::getNumDofNodal(lmin[d], boundary[d]);
maxNumPoints[d] = combigrid::getNumDofNodal(lmax[d], boundary[d]);
}
// first, test if decomposition possible for small resolution
auto decomposition = combigrid::getDefaultDecomposition(minNumPoints, p, forwardDecomposition);
// then assign the actual used one
decomposition = combigrid::getDefaultDecomposition(maxNumPoints, p, forwardDecomposition);
// default decomposition works only for powers of 2!
params.setDecomposition(decomposition);
MIDDLE_PROCESS_EXCLUSIVE_SECTION std::cout << getTimeStamp() << "generated parameters"
<< std::endl;

MIDDLE_PROCESS_EXCLUSIVE_SECTION std::cout << getTimeStamp() << "read interpolation coordinates"
<< std::endl;

ProcessGroupWorker worker;
worker.setCombiParameters(std::move(params));

worker.initializeAllTasks<TaskAdvection>(levels, coeffs, taskNumbers, loadmodel.get(), dt,
nsteps, p);
MIDDLE_PROCESS_EXCLUSIVE_SECTION std::cout << getTimeStamp() << "worker: initialized tasks "
<< std::endl;

worker.initCombinedDSGVector();
MIDDLE_PROCESS_EXCLUSIVE_SECTION std::cout << getTimeStamp() << "worker: initialized SG"
<< std::endl;

// read (extra) sparse grid sizes, as generated with subspace_writer
// for target scenarios, consider `wget
// https://darus.uni-stuttgart.de/api/access/datafile/195543` or similar
if (extraSparseGrid) {
std::string conjointSubspaceFileName = // cf. subspace_writer.cpp
ctschemeFile.substr(
0, ctschemeFile.length() - std::string("_part0_00008groups.json").length()) +
"conjoint.sizes";
worker.reduceExtraSubspaceSizes(conjointSubspaceFileName, true);
}

OUTPUT_GROUP_EXCLUSIVE_SECTION {
std::cout << getTimeStamp() << "group " << theMPISystem()->getProcessGroupNumber()
<< " sparse grid, will allocate "
<< static_cast<real>(worker.getCombinedDSGVector()[0]->getAccumulatedDataSize() *
sizeof(CombiDataType)) /
1e6
<< " (but only "
<< static_cast<real>(combigrid::CombiCom::getGlobalReduceChunkSize<CombiDataType>(
chunkSizeInMebibyte) *
sizeof(CombiDataType)) /
1e6
<< " MB at once)"
<< " plus "
<< static_cast<real>(worker.getExtraDSGVector()[0]->getAccumulatedDataSize() *
sizeof(CombiDataType)) /
1e6
<< " MB" << std::endl;
}
MPI_Barrier(theMPISystem()->getWorldComm());
// allocate sparse grids now
worker.zeroDsgsData();
MIDDLE_PROCESS_EXCLUSIVE_SECTION {
auto endInit = std::chrono::high_resolution_clock::now();
auto durationInit =
std::chrono::duration_cast<std::chrono::seconds>(endInit - startInit).count();
std::cout << getTimeStamp() << "initialization took: " << durationInit << " seconds"
<< std::endl;
}
MIDDLE_PROCESS_EXCLUSIVE_SECTION std::cout << getTimeStamp() << "start simulation loop"
<< std::endl;
for (size_t i = 0; i < ncombi; ++i) {
MPI_Barrier(theMPISystem()->getWorldComm());
Stats::startEvent("combine");
auto startCombineWrite = std::chrono::high_resolution_clock::now();
std::string writeSparseGridFile =
"dsg_" + std::to_string(systemNumber) + "_step" + std::to_string(i);
std::string writeSparseGridFileToken = writeSparseGridFile + "_token.txt";

OUTPUT_GROUP_EXCLUSIVE_SECTION {
worker.combineThirdLevelFileBasedWrite(writeSparseGridFile, writeSparseGridFileToken);
}
// everyone writes partial stats
Stats::writePartial("stats_worker_" + std::to_string(systemNumber) + "_group" +
std::to_string(theMPISystem()->getProcessGroupNumber()) + ".json",
theMPISystem()->getLocalComm());
MPI_Barrier(theMPISystem()->getWorldComm());
MIDDLE_PROCESS_EXCLUSIVE_SECTION {
auto endCombineWrite = std::chrono::high_resolution_clock::now();
auto durationCombineWrite =
std::chrono::duration_cast<std::chrono::seconds>(endCombineWrite - startCombineWrite)
.count();
std::cout << getTimeStamp() << "combination-local/write " << i
<< " took: " << durationCombineWrite << " seconds" << std::endl;
}
MPI_Barrier(theMPISystem()->getWorldComm());

auto startCombineRead = std::chrono::high_resolution_clock::now();
std::string readSparseGridFile;
if (hasThirdLevel) {
readSparseGridFile =
"dsg_" + std::to_string((systemNumber + 1) % 2) + "_step" + std::to_string(i);
std::string readSparseGridFileToken = readSparseGridFile + "_token.txt";
OUTPUT_GROUP_EXCLUSIVE_SECTION {
}

} else {
readSparseGridFile = writeSparseGridFile;
OUTPUT_GROUP_EXCLUSIVE_SECTION {
worker.waitForTokenFile(writeSparseGridFileToken);
Stats::startEvent("read SG");
int numRead = worker.readReduce(readSparseGridFile, true);
Stats::stopEvent("read SG");
}
}
MPI_Barrier(theMPISystem()->getWorldComm());
MIDDLE_PROCESS_EXCLUSIVE_SECTION {
auto endCombineRead = std::chrono::high_resolution_clock::now();
auto durationCombineRead =
std::chrono::duration_cast<std::chrono::seconds>(endCombineRead - startCombineRead)
.count();
std::cout << getTimeStamp() << "combination-wait/read/reduce " << i
<< " took: " << durationCombineRead << " seconds ; read " << readSparseGridFile
<< std::endl;
}
Stats::stopEvent("combine");
}
worker.exit();

Stats::finalize();

/* write stats to json file for postprocessing */
Stats::write("timers_system" + std::to_string(systemNumber) + "_group" +
std::to_string(theMPISystem()->getProcessGroupNumber()) + ".json",
theMPISystem()->getLocalComm());
}

return 0;
}
Loading

0 comments on commit d18afbe

Please sign in to comment.