Skip to content

Commit

Permalink
Merge pull request #3553 from garlick/pmi_process_mapping
Browse files Browse the repository at this point in the history
fix PMI_process_mapping when Flux is launched with multiple brokers per node
  • Loading branch information
mergify[bot] authored Mar 15, 2021
2 parents c70d8a8 + 910e6d5 commit 428762d
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 199 deletions.
5 changes: 0 additions & 5 deletions doc/man7/flux-broker-attributes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ tbon.maxlevel

tbon.endpoint
The endpoint for the tree based overlay network to communicate over.
Format specifier "%h" can be used to specify the IP address of the
host and is useful when configuring an IP endpoint. Format specifier
"%B" can be used to specify the value of the attribute broker.rundir.
It is useful when configuring an IPC endpoint. Defaults to
"tcp://%h:\*".


SOCKET ATTRIBUTES
Expand Down
36 changes: 27 additions & 9 deletions src/broker/boot_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "src/common/libutil/log.h"
#include "src/common/libutil/kary.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libpmi/clique.h"

#include "attr.h"
#include "overlay.h"
Expand Down Expand Up @@ -250,6 +251,8 @@ int boot_config_attr (attr_t *attrs, json_t *hosts)
char *s = NULL;
size_t index;
json_t *value;
char buf[1024];
char *val;
int rv = -1;

if (!hosts || json_array_size (hosts) == 0)
Expand Down Expand Up @@ -280,6 +283,30 @@ int boot_config_attr (attr_t *attrs, json_t *hosts)
goto error;
}

/* Generate broker.mapping.
* For now, set it to NULL if there are multiple brokers per node.
*/
hostlist_uniq (hl);
if (hostlist_count (hl) < json_array_size (hosts))
val = NULL;
else {
struct pmi_map_block mapblock = {
.nodeid = 0,
.nodes = json_array_size (hosts),
.procs = 1
};
if (pmi_process_mapping_encode (&mapblock, 1, buf, sizeof (buf)) < 0) {
log_msg ("encoding broker.mapping");
errno = EOVERFLOW;
goto error;
}
val = buf;
}
if (attr_add (attrs, "broker.mapping", val, FLUX_ATTRFLAG_IMMUTABLE) < 0) {
log_err ("setattr broker.mapping");
goto error;
}

rv = 0;
error:
hostlist_destroy (hl);
Expand Down Expand Up @@ -419,15 +446,6 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs, int tbon_k)
uint32_t size;
json_t *hosts = NULL;

/* Throw an error if 'tbon.endpoint' attribute is already set.
* flux-start sets this, and it's not compatible with the
* config boot method as it would be overwritten below.
*/
if (attr_get (attrs, "tbon.endpoint", NULL, NULL) == 0) {
log_msg ("attr tbon.endpoint may not be set with [bootstrap] config");
return -1;
}

/* Ingest the [bootstrap] stanza.
*/
if (boot_config_parse (flux_get_conf (h), &conf, &hosts) < 0)
Expand Down
183 changes: 89 additions & 94 deletions src/broker/boot_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,97 +21,14 @@
#include "src/common/libutil/kary.h"
#include "src/common/libpmi/pmi.h"
#include "src/common/libpmi/pmi_strerror.h"
#include "src/common/libpmi/clique.h"

#include "attr.h"
#include "overlay.h"
#include "boot_pmi.h"
#include "pmiutil.h"


/* Generally accepted max, although some go higher (IE is 2083) */
#define ENDPOINT_MAX 2048

/* Given a string with possible format specifiers, return string that is
* fully expanded.
*
* Possible format specifiers:
* - %h - local IP address by heuristic (see src/libutil/ipaddr.h)
* - %B - value of attribute broker.rundir
*
* Caller is responsible for freeing memory of returned value.
*/
static char * format_endpoint (attr_t *attrs, const char *endpoint)
{
char ipaddr[HOST_NAME_MAX + 1];
char *ptr, *buf, *rv = NULL;
bool percent_flag = false;
unsigned int len = 0;
const char *rundir;
char error[200];

if (!(buf = calloc (1, ENDPOINT_MAX + 1))) {
errno = ENOMEM;
return NULL;
}

ptr = (char *)endpoint;
while (*ptr) {
if (percent_flag) {
if (*ptr == 'h') {
if (ipaddr_getprimary (ipaddr, sizeof (ipaddr),
error, sizeof (error)) < 0) {
log_msg ("%s", error);
goto done;
}
if ((len + strlen (ipaddr)) > ENDPOINT_MAX) {
log_msg ("ipaddr overflow max endpoint length");
goto done;
}
strcat (buf, ipaddr);
len += strlen (ipaddr);
}
else if (*ptr == 'B') {
if (attr_get (attrs, "broker.rundir", &rundir, NULL) < 0) {
log_msg ("broker.rundir attribute is not set");
goto done;
}
if ((len + strlen (rundir)) > ENDPOINT_MAX) {
log_msg ("broker.rundir overflow max endpoint length");
goto done;
}
strcat (buf, rundir);
len += strlen (rundir);
}
else if (*ptr == '%')
buf[len++] = '%';
else {
buf[len++] = '%';
buf[len++] = *ptr;
}
percent_flag = false;
}
else {
if (*ptr == '%')
percent_flag = true;
else
buf[len++] = *ptr;
}

if (len >= ENDPOINT_MAX) {
log_msg ("overflow max endpoint length");
goto done;
}

ptr++;
}

rv = buf;
done:
if (!rv)
free (buf);
return (rv);
}

/* If the broker is launched via flux-shell, then the shell may opt
* to set a "flux.instance-level" parameter in the PMI kvs to tell
* the booting instance at what "level" it will be running, i.e. the
Expand Down Expand Up @@ -145,6 +62,86 @@ static int set_instance_level_attr (struct pmi_handle *pmi,
return 0;
}

/* Set broker.mapping attribute from enclosing instance PMI_process_mapping.
*/
static int set_broker_mapping_attr (struct pmi_handle *pmi,
const char *kvsname,
attr_t *attrs)
{
char buf[1024];
char *val = NULL;

if (broker_pmi_kvs_get (pmi,
kvsname,
"PMI_process_mapping",
buf,
sizeof (buf)) == PMI_SUCCESS)
val = buf;
if (attr_add (attrs, "broker.mapping", val, FLUX_ATTRFLAG_IMMUTABLE) < 0)
return -1;
return 0;
}

/* Check if IPC can be used to communicate.
* Currently this only goes so far as to check if the process mapping of
* brokers has all brokers on the same node. We could check if all peers
* are on the same node, but given how the TBON maps to rank assignments,
* it is fairly unlikely.
*/
static bool use_ipc (attr_t *attrs)
{
bool result = false;
struct pmi_map_block *blocks = NULL;
int nblocks;
const char *val;

if (attr_get (attrs, "broker.mapping", &val, NULL) < 0 || !val)
goto done;
if (pmi_process_mapping_parse (val, &blocks, &nblocks) < 0)
goto done;
if (nblocks == 1 && blocks[0].nodes == 1) // one node
result = true;
done:
free (blocks);
return result;
}

/* Build URI for broker TBON to bind to.
* If IPC, use '<rundir>/tbon-<rank>' which should be unique if there are
* multiple brokers and/or multiple instances per node.
* If using TCP, choose the address to be the one associated with the default
* route (see src/common/libutil/ipaddr.h), and a randomly chosen port.
*/
static int format_bind_uri (char *buf, int bufsz, attr_t *attrs, int rank)
{
if (use_ipc (attrs)) {
const char *rundir;

if (attr_get (attrs, "rundir", &rundir, NULL) < 0) {
log_err ("rundir attribute is not set");
return -1;
}
if (snprintf (buf, bufsz, "ipc://%s/tbon-%d", rundir, rank) >= bufsz)
goto overflow;
}
else {
char ipaddr[HOST_NAME_MAX + 1];
char error[200];

if (ipaddr_getprimary (ipaddr, sizeof (ipaddr),
error, sizeof (error)) < 0) {
log_err ("%s", error);
return -1;
}
if (snprintf (buf, bufsz, "tcp://%s:*", ipaddr) >= bufsz)
goto overflow;
}
return 0;
overflow:
log_msg ("buffer overflow while building bind URI");
return -1;
}

int boot_pmi (struct overlay *overlay, attr_t *attrs, int tbon_k)
{
int rank;
Expand Down Expand Up @@ -175,6 +172,10 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs, int tbon_k)
log_err ("set_instance_level_attr");
goto error;
}
if (set_broker_mapping_attr (pmi, pmi_params.kvsname, attrs) < 0) {
log_err ("error setting broker.mapping attribute");
goto error;
}
if (overlay_init (overlay, pmi_params.size, pmi_params.rank, tbon_k) < 0)
goto error;

Expand All @@ -192,25 +193,19 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs, int tbon_k)
pmi_params.size,
pmi_params.rank,
0) != KARY_NONE) {
const char *fmt;
char *tmp;
char buf[1024];

if (attr_get (attrs, "tbon.endpoint", &fmt, NULL) < 0)
fmt = "tcp://%h:*";
if (!(tmp = format_endpoint (attrs, fmt)))
if (format_bind_uri (buf, sizeof (buf), attrs, pmi_params.rank) < 0)
goto error;
if (overlay_bind (overlay, tmp) < 0) {
log_err ("overlay_bind %s failed", tmp);
free (tmp);
if (overlay_bind (overlay, buf) < 0) {
log_err ("error binding to %s", buf);
goto error;
}
free (tmp);
uri = overlay_get_bind_uri (overlay);
}
else {
uri = NULL;
}
(void)attr_delete (attrs, "tbon.endpoint", true);
if (attr_add (attrs, "tbon.endpoint", uri, FLUX_ATTRFLAG_IMMUTABLE) < 0) {
log_err ("setattr tbon.endpoint");
goto error;
Expand Down
20 changes: 19 additions & 1 deletion src/cmd/flux-start.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "src/common/libutil/cleanup.h"
#include "src/common/libutil/setenvf.h"
#include "src/common/libpmi/simple_server.h"
#include "src/common/libpmi/clique.h"
#include "src/common/libpmi/dgetline.h"

#define DEFAULT_KILLER_TIMEOUT 20.0
Expand Down Expand Up @@ -94,6 +95,8 @@ static struct optparse_option opts[] = {
.usage = "Trace pmi simple server protocol exchange", },
{ .name = "scratchdir", .key = 'D', .has_arg = 1, .arginfo = "DIR",
.usage = "Use DIR as scratch directory", },
{ .name = "noclique", .key = 'c', .has_arg = 0, .arginfo = NULL,
.usage = "Don't set PMI_process_mapping in PMI KVS", },

/* Option group 1, these options will be listed after those above */
{ .group = 1,
Expand Down Expand Up @@ -209,6 +212,8 @@ int main (int argc, char *argv[])
case BOOTSTRAP_PMI:
if (optparse_hasopt (ctx.opts, "scratchdir"))
log_msg_exit ("--scratchdir only works with --bootstrap=selfpmi");
if (optparse_hasopt (ctx.opts, "noclique"))
log_msg_exit ("--noclique only works with --bootstrap=selfpmi");
status = exec_broker (command, len, broker_path);
break;
case BOOTSTRAP_SELFPMI:
Expand Down Expand Up @@ -481,7 +486,6 @@ struct client *client_create (const char *broker_path, const char *scratch_dir,
argz_add (&argz, &argz_len, broker_path);
char *dir_arg = xasprintf ("--setattr=rundir=%s", scratch_dir);
argz_add (&argz, &argz_len, dir_arg);
argz_add (&argz, &argz_len, "--setattr=tbon.endpoint=ipc://%B/req");
free (dir_arg);
add_args_list (&argz, &argz_len, ctx.opts, "broker-opts");
if (rank == 0 && cmd_argz)
Expand Down Expand Up @@ -546,8 +550,22 @@ void pmi_server_initialize (int flags)
.debug_trace = pmi_debug_trace,
};
int appnum = 0;

if (!(ctx.pmi.kvs = zhash_new()))
oom ();

if (!optparse_hasopt (ctx.opts, "noclique")) {
struct pmi_map_block mapblock = {
.nodeid = 0,
.nodes = 1,
.procs = ctx.size,
};
char buf[256];
if (pmi_process_mapping_encode (&mapblock, 1, buf, sizeof (buf)) < 0)
log_msg_exit ("error encoding PMI_process_mapping");
zhash_update (ctx.pmi.kvs, "PMI_process_mapping", xstrdup (buf));
}

ctx.pmi.srv = pmi_simple_server_create (ops, appnum, ctx.size,
ctx.size, "-", flags, NULL);
if (!ctx.pmi.srv)
Expand Down
3 changes: 0 additions & 3 deletions src/shell/lua.d/openmpi.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,5 @@
-- SPDX-License-Identifier: LGPL-3.0
-------------------------------------------------------------

local f = require 'flux'.new ()
local rundir = f:getattr ('broker.rundir')
shell.setenv ("OMPI_MCA_orte_tmpdir_base", rundir)
shell.setenv ("OMPI_MCA_pmix", "flux")
shell.setenv ("OMPI_MCA_schizo", "flux")
7 changes: 0 additions & 7 deletions src/shell/lua.d/spectrum.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,12 @@ local function strip_env_by_prefix (env, prefix)
end
end

local f = require 'flux'.new()
local rundir = f:getattr ('broker.rundir')

local env = shell.getenv()

-- Clear all existing PMIX_ and OMPI_ values before setting our own
strip_env_by_prefix (env, "PMIX_")
strip_env_by_prefix (env, "OMPI_")

-- Avoid shared memory segment name collisions
-- when flux instance runs >1 broker per node.
shell.setenv ('OMPI_MCA_orte_tmpdir_base', rundir)

-- Assumes the installation paths of Spectrum MPI on LLNL's Sierra
shell.setenv ('OMPI_MCA_osc', "pt2pt")
shell.setenv ('OMPI_MCA_pml', "yalla")
Expand Down
Loading

0 comments on commit 428762d

Please sign in to comment.