Skip to content

Commit

Permalink
DAOS-16563 client: mark pool/cont handle as g2l after fork
Browse files Browse the repository at this point in the history
This patch marks all pool and container handles as if they
were created with g2l in the child processes after fork.
It prevents misinteractions if one of the child processes
closes the handle.
The marking is done by iterating through all the pool and
container handles which was not supported by the hhash code.

This patch also:
- adds support for fork to pydaos.
- introduces daos_reinit() to be called after fork.
- fixes IL to set the atfork callback when no extra eq are used.
- remove support for creating an event queue for each pydaos
  put/get operation. This makes the global event queue the only
  option. This should probably be moved to a per-thread eq in
  the future.

Run-GHA: true

Signed-off-by: Johann Lombardi <[email protected]>
  • Loading branch information
johannlombardi committed Sep 12, 2024
1 parent f9b3f46 commit 688a981
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 71 deletions.
29 changes: 29 additions & 0 deletions src/client/api/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,32 @@ daos_fini(void)
D_MUTEX_UNLOCK(&module_lock);
return rc;
}

/**
* Re-initialize the DAOS library in the child process after fork.
*/
int
daos_reinit(void)
{
int rc;

rc = daos_eq_lib_reset_after_fork();
if (rc)
return rc;

daos_dti_reset();

/**
* Mark all pool and container handles owned by the parent process as if they were created
* in the child processes with g2l to avoid confusing the DAOS engines.
*/
rc = dc_pool_mark_all_slave();
if (rc)
return rc;

rc = dc_cont_mark_all_slave();
if (rc)
return rc;

return 0;
}
28 changes: 17 additions & 11 deletions src/client/dfuse/il/int_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -818,16 +818,20 @@ child_hdlr(void)
{
int rc;

rc = daos_eq_lib_reset_after_fork();
rc = daos_reinit();
if (rc)
DL_WARN(rc, "daos_eq_lib_init() failed in child process");
daos_dti_reset();
DL_WARN(rc, "daos_reinit() failed in child process");

/** Reset event queue */
ioil_eqh = ioil_iog.iog_main_eqh = DAOS_HDL_INVAL;
rc = daos_eq_create(&ioil_eqh);
if (rc)
DFUSE_LOG_WARNING("daos_eq_create() failed: "DF_RC, DP_RC(rc));
else
ioil_iog.iog_main_eqh = ioil_eqh;

if (ioil_iog.iog_eq_count_max) {
rc = daos_eq_create(&ioil_eqh);
if (rc)
DFUSE_LOG_WARNING("daos_eq_create() failed: " DF_RC, DP_RC(rc));
else
ioil_iog.iog_main_eqh = ioil_eqh;
}
}

/* Returns true on success */
Expand Down Expand Up @@ -876,10 +880,12 @@ check_ioctl_on_open(int fd, struct fd_entry *entry, int flags)
D_GOTO(err, rc = daos_der2errno(rc));
}
ioil_iog.iog_main_eqh = ioil_eqh;

rc = pthread_atfork(NULL, NULL, &child_hdlr);
D_ASSERT(rc == 0);
}

rc = pthread_atfork(NULL, NULL, &child_hdlr);
if (rc)
DFUSE_LOG_WARNING("Failed to install atfork handler: " DF_RC, DP_RC(rc));
rc = 0;
}

d_list_for_each_entry(pool, &ioil_iog.iog_pools_head, iop_pools) {
Expand Down
5 changes: 2 additions & 3 deletions src/client/dfuse/pil4dfs/int_dfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -939,10 +939,9 @@ child_hdlr(void)
if (atomic_load_relaxed(&d_daos_inited) == false)
return;

rc = daos_eq_lib_reset_after_fork();
rc = daos_reinit();
if (rc)
DL_WARN(rc, "daos_eq_lib_init() failed in child process");
daos_dti_reset();
DL_WARN(rc, "daos_reinit() failed in child process");
td_eqh = main_eqh = DAOS_HDL_INVAL;
context_reset = true;
}
Expand Down
94 changes: 37 additions & 57 deletions src/client/pydaos/pydaos_shim.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <gurt/common.h>
#include <daos_kv.h>
#include <daos_uns.h>
#include <daos/event.h>

#define PY_SHIM_MAGIC_NUMBER 0x7A8A
#define MAX_OID_HI ((1UL << 32) - 1)
Expand Down Expand Up @@ -63,30 +64,48 @@ struct open_handle {
} \
} while (0)

static daos_handle_t glob_eq;
static bool use_glob_eq;
/** Global event queue */
static daos_handle_t eq;

/**
* Implementations of baseline shim functions
*/

static void
child_handler(void)
{
int rc;

rc = daos_reinit();
if (rc)
D_WARN("daos_reinit() failed in child process %d", rc);

eq = DAOS_HDL_INVAL;
rc = daos_eq_create(&eq);
if (rc)
DL_ERROR(rc, "Failed to re-create global eq");
}

static PyObject *
__shim_handle__daos_init(PyObject *self, PyObject *args)
{
int rc;

rc = daos_init();
if ((rc == 0) && (use_glob_eq == 0)) {
d_getenv_bool("PYDAOS_GLOB_EQ", &use_glob_eq);
if (use_glob_eq) {
int ret;

ret = daos_eq_create(&glob_eq);
if (ret) {
DL_ERROR(ret, "Failed to create global eq");
use_glob_eq = false;
}
}
if (rc)
return PyLong_FromLong(rc);

rc = daos_eq_create(&eq);
if (rc) {
DL_ERROR(rc, "Failed to create global eq");
daos_fini();
return PyLong_FromLong(rc);
}

rc = pthread_atfork(NULL, NULL, &child_handler);
if (rc) {
DL_ERROR(rc, "Failed to set atfork handler");
return PyLong_FromLong(rc);
}

return PyLong_FromLong(rc);
Expand All @@ -97,12 +116,9 @@ __shim_handle__daos_fini(PyObject *self, PyObject *args)
{
int rc;

if (use_glob_eq) {
rc = daos_eq_destroy(glob_eq, DAOS_EQ_DESTROY_FORCE);
if (rc)
D_ERROR("Failed to destroy global eq, "DF_RC"\n", DP_RC(rc));
use_glob_eq = false;
}
rc = daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE);
if (rc)
D_ERROR("Failed to destroy global eq, " DF_RC "\n", DP_RC(rc));

rc = daos_fini();

Expand Down Expand Up @@ -914,8 +930,7 @@ __shim_handle__kv_get(PyObject *self, PyObject *args)
PyObject *daos_dict;
daos_handle_t oh;
PyObject *key;
Py_ssize_t pos = 0;
daos_handle_t eq;
Py_ssize_t pos = 0;
struct kv_op *kv_array = NULL;
struct kv_op *op;
daos_event_t *evp;
Expand All @@ -928,14 +943,6 @@ __shim_handle__kv_get(PyObject *self, PyObject *args)
RETURN_NULL_IF_FAILED_TO_PARSE(args, "LO!l", &oh.cookie, &PyDict_Type,
&daos_dict, &v_size);

if (!use_glob_eq) {
rc = daos_eq_create(&eq);
if (rc)
return PyLong_FromLong(rc);
} else {
eq = glob_eq;
}

D_ALLOC_ARRAY(kv_array, MAX_INFLIGHT);
if (kv_array == NULL) {
rc = -DER_NOMEM;
Expand Down Expand Up @@ -1079,19 +1086,10 @@ __shim_handle__kv_get(PyObject *self, PyObject *args)
out:
D_FREE(kv_array);

/** destroy event queue */
if (!use_glob_eq) {
ret = daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE);
if (rc == DER_SUCCESS && ret < 0)
rc = ret;
}

/* Populate return list */
return PyLong_FromLong(rc);

err:
if (!use_glob_eq)
daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE);
D_FREE(kv_array);

return NULL;
Expand All @@ -1104,8 +1102,7 @@ __shim_handle__kv_put(PyObject *self, PyObject *args)
daos_handle_t oh;
PyObject *key;
PyObject *value;
Py_ssize_t pos = 0;
daos_handle_t eq;
Py_ssize_t pos = 0;
daos_event_t ev_array[MAX_INFLIGHT];
daos_event_t *evp;
int i = 0;
Expand All @@ -1116,14 +1113,6 @@ __shim_handle__kv_put(PyObject *self, PyObject *args)
RETURN_NULL_IF_FAILED_TO_PARSE(args, "LO!", &oh.cookie,
&PyDict_Type, &daos_dict);

if (!use_glob_eq) {
rc = daos_eq_create(&eq);
if (rc)
return PyLong_FromLong(rc);
} else {
eq = glob_eq;
}

while (PyDict_Next(daos_dict, &pos, &key, &value)) {
char *buf;
daos_size_t size;
Expand Down Expand Up @@ -1203,17 +1192,8 @@ __shim_handle__kv_put(PyObject *self, PyObject *args)
if (rc == DER_SUCCESS && ret < 0)
rc = ret;

/** destroy event queue */
if (!use_glob_eq) {
ret = daos_eq_destroy(eq, 0);
if (rc == DER_SUCCESS && ret < 0)
rc = ret;
}

return PyLong_FromLong(rc);
err:
if (!use_glob_eq)
daos_eq_destroy(eq, 0);
return NULL;
}

Expand Down
6 changes: 6 additions & 0 deletions src/common/misc.c
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,12 @@ daos_hhash_link_delete(struct d_hlink *hlink)
return d_hhash_link_delete(daos_ht.dht_hhash, hlink);
}

int
daos_hhash_traverse(int type, daos_hhash_traverse_cb_t cb, void *arg)
{
return d_hhash_traverse(daos_ht.dht_hhash, type, cb, arg);
}

/**
* a helper to get the needed crt_init_opt.
*
Expand Down
17 changes: 17 additions & 0 deletions src/container/cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -3524,3 +3524,20 @@ dc_cont_hdl2props(daos_handle_t coh)

return result;
}

static int
cont_mark_slave(struct d_hlink *link, void *arg)
{
struct dc_cont *cont;

cont = container_of(link, struct dc_cont, dc_hlink);
cont->dc_slave = 1;

return 0;
}

int
dc_cont_mark_all_slave(void)
{
return daos_hhash_traverse(DAOS_HTYPE_CO, cont_mark_slave, NULL);
}
36 changes: 36 additions & 0 deletions src/gurt/hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,42 @@ d_hhash_key_type(uint64_t key)
return d_hhash_key_isptr(key) ? D_HTYPE_PTR : key & D_HTYPE_MASK;
}

struct traverse_args {
int type;
d_hhash_traverse_cb_t cb;
void *arg;
};

static int
d_hhash_cb(d_list_t *link, void *args)
{
struct traverse_args *targs = args;
struct d_hlink *hlink = link2hlink(link);
uint64_t key;

if (hlink == NULL)
return 0;

d_hhash_link_key(hlink, &key);

if (targs->type != d_hhash_key_type(key))
return 0;

return targs->cb(hlink, targs->arg);
}

int
d_hhash_traverse(struct d_hhash *hhash, int type, d_hhash_traverse_cb_t cb, void *arg)
{
struct traverse_args args;

args.type = type;
args.cb = cb;
args.arg = arg;

return d_hash_table_traverse(&hhash->ch_htable, d_hhash_cb, &args);
}

/******************************************************************************
* UUID Hash Table Wrapper
* Key: UUID
Expand Down
11 changes: 11 additions & 0 deletions src/include/daos.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ daos_init(void);
int
daos_fini(void);

/**
* Reinitialize DAOS library after a fork call.
* For applications that initialize DAOS and then call fork without exec, some
* internal data structures must be reinitialized in the child process.
* It is recommended to call this function from a fork handler registered via
* pthread_atfork(). If any event queues were created prior to the fork call,
* those must be re-created in the child process.
*/
int
daos_reinit(void);

#if defined(__cplusplus)
}
#endif
Expand Down
4 changes: 4 additions & 0 deletions src/include/daos/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,10 @@ bool daos_hhash_link_delete(struct d_hlink *hlink);
#define daos_hhash_link_empty(hlink) d_hhash_link_empty(hlink)
#define daos_hhash_link_key(hlink, key) d_hhash_link_key(hlink, key)

typedef int (*daos_hhash_traverse_cb_t)(struct d_hlink *link, void *arg);
int
daos_hhash_traverse(int type, daos_hhash_traverse_cb_t cb, void *arg);

/* daos_recx_t overlap detector */
#define DAOS_RECX_OVERLAP(recx_1, recx_2) \
(((recx_1).rx_idx < (recx_2).rx_idx + (recx_2).rx_nr) && \
Expand Down
2 changes: 2 additions & 0 deletions src/include/daos/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,6 @@ dc_cont_open_flags_valid(uint64_t flags)
return true;
}

int
dc_cont_mark_all_slave(void);
#endif /* __DD_CONT_H__ */
3 changes: 3 additions & 0 deletions src/include/daos/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,7 @@ int dc_pool_create_map_refresh_task(daos_handle_t pool_hdl, uint32_t map_version
tse_sched_t *sched, tse_task_t **task);
void dc_pool_abandon_map_refresh_task(tse_task_t *task);

int
dc_pool_mark_all_slave(void);

#endif /* __DD_POOL_H__ */
3 changes: 3 additions & 0 deletions src/include/gurt/hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,9 @@ int d_hhash_key_type(uint64_t key);
bool d_hhash_key_isptr(uint64_t key);
int d_hhash_set_ptrtype(struct d_hhash *hhash);
bool d_hhash_is_ptrtype(struct d_hhash *hhash);
typedef int (*d_hhash_traverse_cb_t)(struct d_hlink *link, void *arg);
int
d_hhash_traverse(struct d_hhash *hhash, int type, d_hhash_traverse_cb_t cb, void *arg);

/******************************************************************************
* UUID Hash Table Wrapper
Expand Down
Loading

0 comments on commit 688a981

Please sign in to comment.