Skip to content

Commit

Permalink
Implement Large-range MPI for scatter, more boundary conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
davschneller committed Apr 25, 2024
1 parent a41c373 commit 5e96dda
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 64 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ set(CMAKE_CXX_EXTENSIONS OFF)
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS
"Debug" "Release" "RelWithDebInfo") # MinSizeRel is useless for us
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release)
set(CMAKE_BUILD_TYPE Release CACHE STRING "" FORCE)
message(STATUS "Set build type to Release as none was supplied.")
endif()
set(CMAKE_CXX_FLAGS_RELEASE "-O2")
Expand Down
80 changes: 59 additions & 21 deletions src/aux/MPIConvenience.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,41 @@
#include <mpi.h>
#include <vector>

namespace {
constexpr std::size_t DataIncrement = std::numeric_limits<int>::max();
constexpr int Tag = 1000;

std::vector<MPI_Request> largeIsend(const void* sendbuf, std::size_t sendsize, std::size_t senddisp,
MPI_Datatype sendtype, int dest, int tag, MPI_Comm comm) {
int sendtypesizePre;
std::vector<MPI_Request> requests;
MPI_Type_size(sendtype, &sendtypesizePre);
std::size_t sendtypesize = sendtypesizePre;
for (std::size_t position = 0; position < sendsize; position += DataIncrement) {
int localSize = static_cast<int>(std::min(sendsize - position, DataIncrement));
requests.push_back(MPI_REQUEST_NULL);
MPI_Isend(reinterpret_cast<const char*>(sendbuf) + senddisp * sendtypesize + position,
localSize, sendtype, dest, tag, comm, &requests.back());
}
return requests;
}

std::vector<MPI_Request> largeIrecv(void* recvbuf, std::size_t recvsize, std::size_t recvdisp,
MPI_Datatype recvtype, int dest, int tag, MPI_Comm comm) {
int recvtypesizePre;
std::vector<MPI_Request> requests;
MPI_Type_size(recvtype, &recvtypesizePre);
std::size_t recvtypesize = recvtypesizePre;
for (std::size_t position = 0; position < recvsize; position += DataIncrement) {
int localSize = static_cast<int>(std::min(recvsize - position, DataIncrement));
requests.push_back(MPI_REQUEST_NULL);
MPI_Irecv(reinterpret_cast<char*>(recvbuf) + recvdisp * recvtypesize + position, localSize,
recvtype, dest, tag, comm, &requests.back());
}
return requests;
}
} // namespace

void sparseAlltoallv(const void* sendbuf, const std::size_t* sendsize, const std::size_t* senddisp,
MPI_Datatype sendtype, void* recvbuf, const std::size_t* recvsize,
const std::size_t* recvdisp, MPI_Datatype recvtype, MPI_Comm comm) {
Expand All @@ -12,36 +47,19 @@ void sparseAlltoallv(const void* sendbuf, const std::size_t* sendsize, const std
int recvtypesizePre;
MPI_Comm_size(comm, &commsize);
MPI_Comm_rank(comm, &commrank);
MPI_Type_size(sendtype, &sendtypesizePre);
MPI_Type_size(sendtype, &recvtypesizePre);

constexpr int Tag = 1000;

std::size_t sendtypesize = sendtypesizePre;
std::size_t recvtypesize = recvtypesizePre;

std::vector<MPI_Request> requests;
requests.reserve(commsize * 2);

// (no special handling of self-to-self comm at the moment)

constexpr std::size_t DataIncrement = std::numeric_limits<int>::max();

for (int i = 0; i < commsize; ++i) {
for (std::size_t position = 0; position < sendsize[i]; position += DataIncrement) {
int localSize = static_cast<int>(std::min(sendsize[i] - position, DataIncrement));
requests.push_back(MPI_REQUEST_NULL);
MPI_Isend(reinterpret_cast<const char*>(sendbuf) + senddisp[i] * sendtypesize + position,
localSize, sendtype, i, Tag, comm, &requests.back());
}
auto localRequests = largeIsend(sendbuf, sendsize[i], senddisp[i], sendtype, i, Tag, comm);
requests.insert(requests.end(), localRequests.begin(), localRequests.end());
}
for (int i = 0; i < commsize; ++i) {
for (std::size_t position = 0; position < recvsize[i]; position += DataIncrement) {
int localSize = static_cast<int>(std::min(recvsize[i] - position, DataIncrement));
requests.push_back(MPI_REQUEST_NULL);
MPI_Irecv(reinterpret_cast<char*>(recvbuf) + recvdisp[i] * recvtypesize + position, localSize,
recvtype, i, Tag, comm, &requests.back());
}
auto localRequests = largeIrecv(recvbuf, recvsize[i], recvdisp[i], recvtype, i, Tag, comm);
requests.insert(requests.end(), localRequests.begin(), localRequests.end());
}

MPI_Waitall(requests.size(), requests.data(), MPI_STATUSES_IGNORE);
Expand All @@ -68,3 +86,23 @@ void sparseAlltoallv(const void* sendbuf, const int* sendsize, const int* senddi
sparseAlltoallv(sendbuf, sendsize, senddispSize.data(), sendtype, recvbuf, recvsize,
recvdispSize.data(), recvtype, comm);
}

void largeScatterv(const void* sendbuf, const std::size_t* sendsize, const std::size_t* senddisp,
MPI_Datatype sendtype, void* recvbuf, std::size_t recvsize,
MPI_Datatype recvtype, int root, MPI_Comm comm) {
std::vector<MPI_Request> requests;
int commrank;
int commsize;
MPI_Comm_size(comm, &commsize);
MPI_Comm_rank(comm, &commrank);
if (commrank == root) {
requests.reserve(commsize + 1);
for (int i = 0; i < commsize; ++i) {
auto sendRequests = largeIsend(sendbuf, sendsize[i], senddisp[i], sendtype, i, Tag, comm);
requests.insert(requests.end(), sendRequests.begin(), sendRequests.end());
}
}
auto recvRequests = largeIrecv(recvbuf, recvsize, 0, recvtype, root, Tag, comm);
requests.insert(requests.end(), recvRequests.begin(), recvRequests.end());
MPI_Waitall(requests.size(), requests.data(), MPI_STATUSES_IGNORE);
}
4 changes: 4 additions & 0 deletions src/aux/MPIConvenience.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ void sparseAlltoallv(const void* sendbuf, const int* sendsize, const int* senddi
MPI_Datatype sendtype, void* recvbuf, const int* recvsize, const int* recvdisp,
MPI_Datatype recvtype, MPI_Comm comm);

void largeScatterv(const void* sendbuf, const std::size_t* sendsize, const std::size_t* senddisp,
MPI_Datatype sendtype, void* recvbuf, std::size_t recvsize,
MPI_Datatype recvtype, int root, MPI_Comm comm);

#endif // PUMGEN_AUX_MPI_CONVENIENCE_H_
23 changes: 16 additions & 7 deletions src/input/MeshData.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ class FullStorageMeshData : public MeshData {
int bndShift = 8;

void setBoundary(std::size_t cell, int face, int value) {
constexpr auto i32limit = std::numeric_limits<unsigned char>::max();
constexpr auto i64limit = std::numeric_limits<unsigned short>::max();
if (value < 0 || (value > i32limit && bndShift == 8) || (value > i64limit && bndShift == 16)) {
logError() << "Cannot handle boundary condition" << value;
if (bndShift < 0) {
boundaryData[cell * 4 + face] = value;
} else {
constexpr auto i32limit = std::numeric_limits<unsigned char>::max();
constexpr auto i64limit = std::numeric_limits<unsigned short>::max();
if (value < 0 || (value > i32limit && bndShift == 8) ||
(value > i64limit && bndShift == 16)) {
logError() << "Cannot handle boundary condition" << value;
}

boundaryData[cell] |= static_cast<int64_t>(value) << (face * bndShift);
}

boundaryData[cell] |= static_cast<int64_t>(value) << (face * bndShift);
}

void setup(std::size_t cellCount, std::size_t vertexCount) {
Expand All @@ -70,7 +75,11 @@ class FullStorageMeshData : public MeshData {
connectivityData.resize(cellCount * 4);
geometryData.resize(vertexCount * 3);
groupData.resize(cellCount);
boundaryData.resize(cellCount);
if (bndShift < 0) {
boundaryData.resize(cellCount * 4);
} else {
boundaryData.resize(cellCount);
}

// TODO: reconsider the times 4 or 3
}
Expand Down
8 changes: 4 additions & 4 deletions src/meshreader/ParallelGMSHReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,17 @@ template <typename P> class ParallelGMSHReader {
MPI_Comm_rank(comm_, &rank);
MPI_Comm_size(comm_, &procs);

auto sendcounts = std::vector<int>(procs);
auto displs = std::vector<int>(procs + 1);
auto sendcounts = std::vector<std::size_t>(procs);
auto displs = std::vector<std::size_t>(procs + 1);
displs[0] = 0;
for (int i = 0; i < procs; ++i) {
sendcounts[i] = numPerElement * getChunksize(numElements, i, procs);
displs[i + 1] = displs[i] + sendcounts[i];
}

auto recvcount = sendcounts[rank];
MPI_Scatterv(sendbuf, sendcounts.data(), displs.data(), tndm::mpi_type_t<T>(), recvbuf,
recvcount, tndm::mpi_type_t<T>(), 0, comm_);
largeScatterv(sendbuf, sendcounts.data(), displs.data(), tndm::mpi_type_t<T>(), recvbuf,
recvcount, tndm::mpi_type_t<T>(), 0, comm_);
}

MPI_Comm comm_;
Expand Down
81 changes: 50 additions & 31 deletions src/pumgen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,27 @@ static int ilog(std::size_t value, int expbase = 1) {

constexpr std::size_t NoSecondDim = 0;

template <typename T, std::size_t SecondDim, typename F>
template <typename T, typename F>
static void writeH5Data(const F& handler, hid_t h5file, const std::string& name, void* mesh,
int meshdim, hid_t h5memtype, hid_t h5outtype, std::size_t chunk,
std::size_t localSize, std::size_t globalSize, bool reduceInts,
int filterEnable, std::size_t filterChunksize) {
constexpr std::size_t SecondSize = std::max(SecondDim, static_cast<std::size_t>(1));
constexpr std::size_t Dimensions = SecondDim == 0 ? 1 : 2;
const std::size_t chunkSize = chunk / SecondSize / sizeof(T);
int filterEnable, std::size_t filterChunksize, std::size_t secondDim) {
const std::size_t secondSize = std::max(secondDim, static_cast<std::size_t>(1));
const std::size_t dimensions = secondDim == 0 ? 1 : 2;
const std::size_t chunkSize = chunk / secondSize / sizeof(T);
const std::size_t bufferSize = std::min(localSize, chunkSize);
std::vector<T> data(SecondSize * bufferSize);
std::vector<T> data(secondSize * bufferSize);

std::size_t rounds = (localSize + chunkSize - 1) / chunkSize;

MPI_Allreduce(MPI_IN_PLACE, &rounds, 1, tndm::mpi_type_t<decltype(rounds)>(), MPI_MAX,
MPI_COMM_WORLD);

hsize_t globalSizes[2] = {globalSize, SecondDim};
hid_t h5space = H5Screate_simple(Dimensions, globalSizes, nullptr);
hsize_t globalSizes[2] = {globalSize, secondDim};
hid_t h5space = H5Screate_simple(dimensions, globalSizes, nullptr);

hsize_t sizes[2] = {bufferSize, SecondDim};
hid_t h5memspace = H5Screate_simple(Dimensions, sizes, nullptr);
hsize_t sizes[2] = {bufferSize, secondDim};
hid_t h5memspace = H5Screate_simple(dimensions, sizes, nullptr);

std::size_t offset = localSize;

Expand All @@ -104,7 +104,7 @@ static void writeH5Data(const F& handler, hid_t h5file, const std::string& name,
offset -= localSize;

hsize_t start[2] = {offset, 0};
hsize_t count[2] = {bufferSize, SecondDim};
hsize_t count[2] = {bufferSize, secondDim};

hid_t h5dxlist = H5Pcreate(H5P_DATASET_XFER);
checkH5Err(h5dxlist);
Expand All @@ -124,7 +124,7 @@ static void writeH5Data(const F& handler, hid_t h5file, const std::string& name,
hid_t h5filter = H5P_DEFAULT;
if (filterEnable > 0) {
h5filter = checkH5Err(H5Pcreate(H5P_DATASET_CREATE));
hsize_t chunk[2] = {std::min(filterChunksize, bufferSize), SecondDim};
hsize_t chunk[2] = {std::min(filterChunksize, bufferSize), secondDim};
checkH5Err(H5Pset_chunk(h5filter, 2, chunk));
if (filterEnable == 1 && std::is_integral_v<T>) {
checkH5Err(H5Pset_scaleoffset(h5filter, H5Z_SO_INT, H5Z_SO_INT_MINBITS_DEFAULT));
Expand All @@ -147,7 +147,7 @@ static void writeH5Data(const F& handler, hid_t h5file, const std::string& name,
start[0] = offset + written;
count[0] = std::min(localSize - written, bufferSize);

std::copy_n(handler.begin() + written * SecondSize, count[0] * SecondSize, data.begin());
std::copy_n(handler.begin() + written * secondSize, count[0] * secondSize, data.begin());

checkH5Err(H5Sselect_hyperslab(h5memspace, H5S_SELECT_SET, nullstart, nullptr, count, nullptr));

Expand Down Expand Up @@ -202,9 +202,9 @@ int main(int argc, char* argv[]) {
utils::Args::Required, false);
args.addOption("chunksize", 0, "Chunksize for writing (default=1 GiB).", utils::Args::Required,
false);
const char* boundarytypes[] = {"int32", "int64"};
const char* boundarytypes[] = {"int32", "int64", "i32", "i64", "i32x4"};
args.addEnumOption("boundarytype", boundarytypes, 0,
"Type for writing out boundary data (default: int32).", false);
"Type for writing out boundary data (default: i32).", false);

args.addOption("license", 'l', "License file (only used by SimModSuite)", utils::Args::Required,
false);
Expand Down Expand Up @@ -272,14 +272,26 @@ int main(int argc, char* argv[]) {
int boundaryType = args.getArgument("boundarytype", 0);
hid_t boundaryDatatype;
int faceOffset;
if (boundaryType == 0) {
int secondShape = NoSecondDim;
int boundaryFormatAttr = 0;
if (boundaryType == 0 || boundaryType == 2) {
boundaryDatatype = H5T_STD_I32LE;
faceOffset = 8;
logInfo(rank) << "Using 32-bit integer boundary type conditions.";
} else if (boundaryType == 1) {
secondShape = NoSecondDim;
boundaryFormatAttr = 1;
logInfo(rank) << "Using 32-bit integer boundary type conditions, or 8 bit per face (i32).";
} else if (boundaryType == 1 || boundaryType == 3) {
boundaryDatatype = H5T_STD_I64LE;
faceOffset = 16;
logInfo(rank) << "Using 64-bit integer boundary type conditions.";
secondShape = NoSecondDim;
boundaryFormatAttr = 2;
logInfo(rank) << "Using 64-bit integer boundary type conditions, or 16 bit per face (i64).";
} else if (boundaryType == 4) {
boundaryDatatype = H5T_STD_I32LE;
secondShape = 4;
faceOffset = -1;
boundaryFormatAttr = 0;
logInfo(rank) << "Using 32-bit integer per boundary face (i32x4).";
}

std::string xdmfFile = outputFile;
Expand Down Expand Up @@ -406,31 +418,38 @@ int main(int argc, char* argv[]) {
// Write cells
std::size_t connectBytesPerData = 8;
logInfo(rank) << "Writing cells";
writeH5Data<unsigned long, 4>(meshInput->connectivity(), h5file, "connect", mesh, 3,
H5T_NATIVE_ULONG, H5T_STD_U64LE, chunksize, localSize[0],
globalSize[0], reduceInts, filterEnable, filterChunksize);
writeH5Data<unsigned long>(meshInput->connectivity(), h5file, "connect", mesh, 3,
H5T_NATIVE_ULONG, H5T_STD_U64LE, chunksize, localSize[0],
globalSize[0], reduceInts, filterEnable, filterChunksize, 4);

// Vertices
logInfo(rank) << "Writing vertices";
writeH5Data<double, 3>(meshInput->geometry(), h5file, "geometry", mesh, 0, H5T_IEEE_F64LE,
H5T_IEEE_F64LE, chunksize, localSize[1], globalSize[1], reduceInts,
filterEnable, filterChunksize);
writeH5Data<double>(meshInput->geometry(), h5file, "geometry", mesh, 0, H5T_IEEE_F64LE,
H5T_IEEE_F64LE, chunksize, localSize[1], globalSize[1], reduceInts,
filterEnable, filterChunksize, 3);

// Group information

std::size_t groupBytesPerData = 4;
logInfo(rank) << "Writing group information";
writeH5Data<int, NoSecondDim>(meshInput->group(), h5file, "group", mesh, 3, H5T_NATIVE_INT,
H5T_STD_I32LE, chunksize, localSize[0], globalSize[0], reduceInts,
filterEnable, filterChunksize);
writeH5Data<int>(meshInput->group(), h5file, "group", mesh, 3, H5T_NATIVE_INT, H5T_STD_I32LE,
chunksize, localSize[0], globalSize[0], reduceInts, filterEnable,
filterChunksize, NoSecondDim);

// Write boundary condition
logInfo(rank) << "Writing boundary condition";
auto i32limit = std::numeric_limits<unsigned char>::max();
auto i64limit = std::numeric_limits<unsigned short>::max();
writeH5Data<long, NoSecondDim>(meshInput->boundary(), h5file, "boundary", mesh, 3,
H5T_NATIVE_LONG, boundaryDatatype, chunksize, localSize[0],
globalSize[0], reduceInts, filterEnable, filterChunksize);
writeH5Data<long>(meshInput->boundary(), h5file, "boundary", mesh, 3, H5T_NATIVE_LONG,
boundaryDatatype, chunksize, localSize[0], globalSize[0], reduceInts,
filterEnable, filterChunksize, secondShape);

hid_t attrSpace = checkH5Err(H5Screate(H5S_SCALAR));
hid_t attrBoundary = checkH5Err(
H5Acreate(h5file, "boundary-format", H5T_NATIVE_INT, attrSpace, H5P_DEFAULT, H5P_DEFAULT));
checkH5Err(H5Awrite(attrBoundary, H5T_NATIVE_INT, &boundaryFormatAttr));
checkH5Err(H5Aclose(attrBoundary));
checkH5Err(H5Sclose(attrSpace));

// Writing XDMF file
if (rank == 0) {
Expand Down

0 comments on commit 5e96dda

Please sign in to comment.