Skip to content

Commit

Permalink
DAOS-16930 pool: Share map bulk resources (#15763)
Browse files Browse the repository at this point in the history
Improve concurrent POOL_QUERY, POOL_CONNECT, and POOL_TGT_QUERY_MAP efficiency by giving them a chance to share the same pool map buffer and pool map buffer bulk handle.

Introduce pool space query on service leader to avoid space query flooding. The pool space cache expiration time is 2 seconds by default, one can change the expiration time via DAOS_POOL_SPACE_CACHE_INTVL, if the expiration time is set to zero, space cache will be disabled.

Signed-off-by: Li Wei <[email protected]>
Signed-off-by: Niu Yawei <[email protected]>
Co-authored-by: Xuezhao Liu <[email protected]>
Co-authored-by: Liang Zhen <[email protected]>
Co-authored-by: Dalton Bohning <[email protected]>
  • Loading branch information
4 people authored Jan 24, 2025
1 parent ad722a6 commit 0f05b2f
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 56 deletions.
10 changes: 9 additions & 1 deletion src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ struct ds_pool_svc;
/* age of an entry in svc_ops KVS before it may be evicted */
#define DEFAULT_SVC_OPS_ENTRY_AGE_SEC_MAX 300ULL

/* Pool map buffer cache */
struct ds_pool_map_bc {
struct pool_buf *pmc_buf;
crt_bulk_t pmc_bulk;
uint32_t pmc_ref;
};

/*
* Pool object
*
Expand All @@ -48,7 +55,8 @@ struct ds_pool {
uuid_t sp_uuid; /* pool UUID */
d_list_t sp_hdls;
ABT_rwlock sp_lock;
struct pool_map *sp_map;
struct pool_map *sp_map;
struct ds_pool_map_bc *sp_map_bc;
uint32_t sp_map_version; /* temporary */
uint32_t sp_ec_cell_sz;
uint64_t sp_reclaim;
Expand Down
10 changes: 10 additions & 0 deletions src/pool/srv.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand All @@ -22,6 +23,7 @@

bool ec_agg_disabled;
uint32_t pw_rf = -1; /* pool wise redundancy factor */
uint32_t ps_cache_intvl = 2; /* pool space cache expiration time, in seconds */
#define PW_RF_DEFAULT (2)
#define PW_RF_MIN (0)
#define PW_RF_MAX (4)
Expand Down Expand Up @@ -76,6 +78,14 @@ init(void)
pw_rf = PW_RF_DEFAULT;
D_INFO("pool redundancy factor %d\n", pw_rf);

d_getenv_uint32_t("DAOS_POOL_SPACE_CACHE_INTVL", &ps_cache_intvl);
if (ps_cache_intvl > 20) {
D_WARN("pool space cache expiration time %u is too large, use default value\n",
ps_cache_intvl);
ps_cache_intvl = 2;
}
D_INFO("pool space cache expiration time set to %u seconds\n", ps_cache_intvl);

ds_pool_rsvc_class_register();

bio_register_ract_ops(&nvme_reaction_ops);
Expand Down
11 changes: 7 additions & 4 deletions src/pool/srv_internal.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand All @@ -17,6 +18,7 @@
#include <gurt/telemetry_common.h>

extern uint32_t pw_rf;
extern uint32_t ps_cache_intvl;

/**
* Global pool metrics
Expand Down Expand Up @@ -236,8 +238,10 @@ int ds_pool_tgt_prop_update(struct ds_pool *pool, struct pool_iv_prop *iv_prop);
int ds_pool_tgt_connect(struct ds_pool *pool, struct pool_iv_conn *pic);
void ds_pool_tgt_query_map_handler(crt_rpc_t *rpc);
void ds_pool_tgt_discard_handler(crt_rpc_t *rpc);
void
ds_pool_tgt_warmup_handler(crt_rpc_t *rpc);
void ds_pool_tgt_warmup_handler(crt_rpc_t *rpc);
int ds_pool_lookup_map_bc(struct ds_pool *pool, crt_context_t ctx,
struct ds_pool_map_bc **map_bc_out, uint32_t *map_version_out);
void ds_pool_put_map_bc(struct ds_pool_map_bc *map_bc);

/*
* srv_util.c
Expand All @@ -246,8 +250,7 @@ bool ds_pool_map_rank_up(struct pool_map *map, d_rank_t rank);
int ds_pool_plan_svc_reconfs(int svc_rf, struct pool_map *map, d_rank_list_t *replicas,
d_rank_t self, bool filter_only, d_rank_list_t **to_add_out,
d_rank_list_t **to_remove_out);
int ds_pool_transfer_map_buf(struct pool_buf *map_buf, uint32_t map_version,
crt_rpc_t *rpc, crt_bulk_t remote_bulk,
int ds_pool_transfer_map_buf(struct ds_pool_map_bc *map_bc, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
uint32_t *required_buf_size);
extern struct bio_reaction_ops nvme_reaction_ops;

Expand Down
76 changes: 56 additions & 20 deletions src/pool/srv_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ sched_cancel_and_wait(struct pool_svc_sched *sched)
sched_wait(sched);
}

struct pool_space_cache {
struct daos_pool_space psc_space;
uint64_t psc_timestamp;
ABT_mutex psc_lock;
};

/* Pool service */
struct pool_svc {
struct ds_rsvc ps_rsvc;
Expand All @@ -204,6 +210,7 @@ struct pool_svc {
rdb_path_t ps_ops; /* metadata ops KVS */
int ps_error; /* in DB data (see pool_svc_lookup_leader) */
struct pool_svc_events ps_events;
struct pool_space_cache ps_space_cache;
uint32_t ps_global_version;
int ps_svc_rf;
bool ps_force_notify; /* MS of PS membership */
Expand Down Expand Up @@ -1235,9 +1242,16 @@ pool_svc_alloc_cb(d_iov_t *id, struct ds_rsvc **rsvc)
goto err_pool;
}

rc = ABT_mutex_create(&svc->ps_space_cache.psc_lock);
if (rc != ABT_SUCCESS) {
D_ERROR("failed to create psc_lock: %d\n", rc);
rc = dss_abterr2der(rc);
goto err_lock;
}

rc = rdb_path_init(&svc->ps_root);
if (rc != 0)
goto err_lock;
goto err_psc_lock;
rc = rdb_path_push(&svc->ps_root, &rdb_path_root_key);
if (rc != 0)
goto err_root;
Expand Down Expand Up @@ -1306,6 +1320,8 @@ pool_svc_alloc_cb(d_iov_t *id, struct ds_rsvc **rsvc)
rdb_path_fini(&svc->ps_handles);
err_root:
rdb_path_fini(&svc->ps_root);
err_psc_lock:
ABT_mutex_free(&svc->ps_space_cache.psc_lock);
err_lock:
ABT_rwlock_free(&svc->ps_lock);
err_pool:
Expand Down Expand Up @@ -3872,8 +3888,6 @@ ds_pool_connect_handler(crt_rpc_t *rpc, int handler_version)
struct pool_connect_in *in = crt_req_get(rpc);
struct pool_connect_out *out = crt_reply_get(rpc);
struct pool_svc *svc;
struct pool_buf *map_buf = NULL;
uint32_t map_version;
uint32_t connectable;
uint32_t global_ver;
uint32_t obj_layout_ver;
Expand Down Expand Up @@ -4095,12 +4109,6 @@ ds_pool_connect_handler(crt_rpc_t *rpc, int handler_version)
goto out_map_version;
}

rc = read_map_buf(&tx, &svc->ps_root, &map_buf, &map_version);
if (rc != 0) {
D_ERROR(DF_UUID": failed to read pool map: "DF_RC"\n",
DP_UUID(svc->ps_uuid), DP_RC(rc));
D_GOTO(out_map_version, rc);
}
transfer_map = true;
if (skip_update)
D_GOTO(out_map_version, rc = 0);
Expand Down Expand Up @@ -4208,13 +4216,20 @@ ds_pool_connect_handler(crt_rpc_t *rpc, int handler_version)
ABT_rwlock_unlock(svc->ps_lock);
rdb_tx_end(&tx);
if (rc == 0 && transfer_map) {
rc = ds_pool_transfer_map_buf(map_buf, map_version, rpc, bulk,
&out->pco_map_buf_size);
struct ds_pool_map_bc *map_bc;
uint32_t map_version;

rc = ds_pool_lookup_map_bc(svc->ps_pool, rpc->cr_ctx, &map_bc, &map_version);
if (rc == 0) {
rc = ds_pool_transfer_map_buf(map_bc, rpc, bulk, &out->pco_map_buf_size);
ds_pool_put_map_bc(map_bc);
/* Ensure the map version matches the map buffer. */
out->pco_op.po_map_version = map_version;
}
/** TODO: roll back tx if transfer fails? Perhaps rdb_tx_discard()? */
}
if (rc == 0)
rc = op_val.ov_rc;
D_FREE(map_buf);
D_FREE(hdl);
D_FREE(machine);
if (prop)
Expand Down Expand Up @@ -4487,8 +4502,23 @@ pool_space_query_bcast(crt_context_t ctx, struct pool_svc *svc, uuid_t pool_hdl,
struct pool_tgt_query_in *in;
struct pool_tgt_query_out *out;
crt_rpc_t *rpc;
struct pool_space_cache *cache = &svc->ps_space_cache;
uint64_t cur_time = 0;
bool unlock = false;
int rc;

if (ps_cache_intvl > 0) {
ABT_mutex_lock(cache->psc_lock);

cur_time = daos_gettime_coarse();
if (cur_time < cache->psc_timestamp + ps_cache_intvl) {
*ps = cache->psc_space;
ABT_mutex_unlock(cache->psc_lock);
return 0;
}
unlock = true;
}

D_DEBUG(DB_MD, DF_UUID": bcasting\n", DP_UUID(svc->ps_uuid));

rc = bcast_create(ctx, svc, POOL_TGT_QUERY, NULL, &rpc);
Expand Down Expand Up @@ -4516,11 +4546,18 @@ pool_space_query_bcast(crt_context_t ctx, struct pool_svc *svc, uuid_t pool_hdl,
} else {
D_ASSERT(ps != NULL);
*ps = out->tqo_space;
if (ps_cache_intvl > 0 && cur_time > cache->psc_timestamp) {
cache->psc_timestamp = cur_time;
cache->psc_space = *ps;
}
}

out_rpc:
crt_req_decref(rpc);
out:
if (unlock)
ABT_mutex_unlock(cache->psc_lock);

D_DEBUG(DB_MD, DF_UUID": bcasted: "DF_RC"\n", DP_UUID(svc->ps_uuid),
DP_RC(rc));
return rc;
Expand Down Expand Up @@ -4979,7 +5016,7 @@ ds_pool_query_handler(crt_rpc_t *rpc, int handler_version)
struct pool_query_in *in = crt_req_get(rpc);
struct pool_query_out *out = crt_reply_get(rpc);
daos_prop_t *prop = NULL;
struct pool_buf *map_buf;
struct ds_pool_map_bc *map_bc;
uint32_t map_version = 0;
struct pool_svc *svc;
struct pool_metrics *metrics;
Expand Down Expand Up @@ -5144,19 +5181,18 @@ ds_pool_query_handler(crt_rpc_t *rpc, int handler_version)
}
}

rc = read_map_buf(&tx, &svc->ps_root, &map_buf, &map_version);
if (rc != 0)
D_ERROR(DF_UUID": failed to read pool map: "DF_RC"\n",
DP_UUID(svc->ps_uuid), DP_RC(rc));

out_lock:
ABT_rwlock_unlock(svc->ps_lock);
rdb_tx_end(&tx);
if (rc != 0)
goto out_svc;

rc = ds_pool_transfer_map_buf(map_buf, map_version, rpc, bulk, &out->pqo_map_buf_size);
D_FREE(map_buf);

rc = ds_pool_lookup_map_bc(svc->ps_pool, rpc->cr_ctx, &map_bc, &map_version);
if (rc != 0)
goto out_svc;
rc = ds_pool_transfer_map_buf(map_bc, rpc, bulk, &out->pqo_map_buf_size);
ds_pool_put_map_bc(map_bc);
if (rc != 0)
goto out_svc;

Expand Down
Loading

0 comments on commit 0f05b2f

Please sign in to comment.