Skip to content

Commit

Permalink
qmanager: fix module reload with rv1, queues, jobs
Browse files Browse the repository at this point in the history
Problem: when the fluxion modules are reloaded with running jobs,
and the match format is "rv1_nosched", and queues are enabled,
running jobs are killed with a fatal scheduler-restart exception.

During the hello handshake defined by RFC 27, the scheduler is informed
during its initialization of jobs that are holding resources.  The hello
callback in qmanager retrieves the job's queue name from the R key
"attributes.system.scheduler.queue".  The queue name is used to locate
the proper queue for the job to be inserted into.

This attribute is not being set in R when the match format is "rv1_nosched"
so the default queue is assumed.  Since the default queue is not instantiated
when named queues are defined, a fatal job exception is raised when the
queue lookup fails.

There has been a proposal to deprecate the R attribute (flux-framework#1108), so rather than
ensure that it is set in this case, determine the queue instead by fetching
the jobspec from the KVS.
  • Loading branch information
garlick committed Apr 16, 2024
1 parent a025bcb commit 6dd19be
Showing 1 changed file with 47 additions and 23 deletions.
70 changes: 47 additions & 23 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,30 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
return rc;
}

/* The RFC 27 hello handshake occurs during scheduler initialization. Its
* purpose is to inform the scheduler of jobs that already have resources
* allocated. This callback is made once per job. The callback should return
* 0 on success or -1 on failure. On failure, the job manager raises a fatal
* exception on the job.
*
* Jobs that already have resources at hello need to be assigned to the correct
* qmanager queue, but the queue is not provided in the hello metadata.
* Therefore, jobspec is fetched from the KVS so that attributes.system.queue
* can be extracted from it.
*
* Note that fluxion instantiates the "default" queue when no named queues
* are configured. Therefore, when the queue attribute is not defined, we
* put the job in the default queue.
*
* Fail the job if its queue attribute (or lack thereof) no longer matches a
* valid queue. This can occur if queues have been reconfigured since job
* submission.
*/
int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
const char *R, void *arg)

{
int rc = 0;
json_t *o = NULL;
json_error_t err;
std::string R_out;
char *qn_attr = NULL;
std::string queue_name;
Expand All @@ -137,38 +154,44 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
unsigned int prio;
uint32_t uid;
double ts;
json_t *jobspec = NULL;
flux_future_t *f = NULL;

/* Don't expect jobspec to be set here as it is not currently defined
* in RFC 27. However, add it anyway in case the hello protocol
* evolves to include it. If it is not set, it must be looked up.
*/
if (flux_msg_unpack (msg,
"{s:I s:i s:i s:f}",
"{s:I s:i s:i s:f s?o}",
"id", &id,
"priority", &prio,
"userid", &uid,
"t_submit", &ts) < 0) {
"t_submit", &ts,
"jobspec", &jobspec) < 0) {
flux_log_error (h, "%s: flux_msg_unpack", __FUNCTION__);
goto out;
}

if ( (o = json_loads (R, 0, &err)) == NULL) {
rc = -1;
errno = EPROTO;
flux_log (h, LOG_ERR, "%s: parsing R for job (id=%jd): %s %s@%d:%d",
__FUNCTION__, static_cast<intmax_t> (id),
err.text, err.source, err.line, err.column);
goto out;
if (!jobspec) {
char key[64] = { 0 };
if (flux_job_kvs_key (key, sizeof (key), id, "jobspec") < 0
|| !(f = flux_kvs_lookup (h, NULL, 0, key))
|| flux_kvs_lookup_get_unpack (f, "o", &jobspec) < 0) {
flux_log_error (h, "%s", key);
goto out;
}
}
if ( (rc = json_unpack (o, "{ s?:{s?:{s?:{s?:s}}} }",
"attributes",
"system",
"scheduler",
"queue", &qn_attr)) < 0) {
json_decref (o);
errno = EPROTO;
flux_log (h, LOG_ERR, "%s: json_unpack for attributes", __FUNCTION__);
if (json_unpack (jobspec,
"{s?{s?{s?s}}}",
"attributes",
"system",
"queue", &qn_attr) < 0) {
flux_log_error (h, "error parsing jobspec");
goto out;
}

queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue_name ();
json_decref (o);
if (qn_attr)
queue_name = qn_attr;
else
queue_name = ctx->opts.get_opt ().get_default_queue_name ();
queue = ctx->queues.at (queue_name);
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
id, uid, calc_priority (prio),
Expand All @@ -184,6 +207,7 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
queue_name.c_str (), static_cast<intmax_t> (id));

out:
flux_future_destroy (f);
return rc;
}

Expand Down

0 comments on commit 6dd19be

Please sign in to comment.