Skip to content

Commit

Permalink
force dht_update(local) to finish before dht_update(local->server)
Browse files Browse the repository at this point in the history
  • Loading branch information
bozhang-hpc committed Dec 19, 2023
1 parent 1f76467 commit 19d27da
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 111 deletions.
6 changes: 6 additions & 0 deletions include/ss_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ struct subods_list_entry {
obj_descriptor qodsc;
};

struct margo_request_list_entry {
struct list_head entry;
hg_handle_t hndl;
margo_request req;
};

typedef struct {
int num_obj;
int size_hash;
Expand Down
273 changes: 162 additions & 111 deletions src/dspaces-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct dspaces_provider {
hg_id_t get_id;
hg_id_t get_local_id;
hg_id_t obj_update_id;
hg_id_t obj_update_blocking_id;
hg_id_t odsc_internal_id;
hg_id_t ss_id;
hg_id_t drain_id;
Expand Down Expand Up @@ -104,6 +105,7 @@ DECLARE_MARGO_RPC_HANDLER(get_rpc)
DECLARE_MARGO_RPC_HANDLER(query_rpc)
DECLARE_MARGO_RPC_HANDLER(query_meta_rpc)
DECLARE_MARGO_RPC_HANDLER(obj_update_rpc)
DECLARE_MARGO_RPC_HANDLER(obj_update_blocking_rpc)
DECLARE_MARGO_RPC_HANDLER(odsc_internal_rpc)
DECLARE_MARGO_RPC_HANDLER(ss_rpc)
DECLARE_MARGO_RPC_HANDLER(kill_rpc)
Expand All @@ -119,6 +121,7 @@ static void get_rpc(hg_handle_t h);
static void query_rpc(hg_handle_t h);
static void query_meta_rpc(hg_handle_t h);
static void obj_update_rpc(hg_handle_t h);
static void obj_update_blocking_rpc(hg_handle_t h);
static void odsc_internal_rpc(hg_handle_t h);
static void ss_rpc(hg_handle_t h);
static void kill_rpc(hg_handle_t h);
Expand Down Expand Up @@ -710,6 +713,76 @@ static int obj_update_dht(dspaces_provider_t server, struct obj_data *od,
return dspaces_SUCCESS;
}

static int obj_update_dht_req(dspaces_provider_t server, struct obj_data *od,
obj_update_t type, struct list_head *req_list)
{
obj_descriptor *odsc = &od->obj_desc;
ABT_mutex_lock(server->sspace_mutex);
struct sspace *ssd = lookup_sspace(server, odsc->name, &od->gdim);
ABT_mutex_unlock(server->sspace_mutex);
struct dht_entry *dht_tab[ssd->dht->num_entries];

int num_de, i;

/* Compute object distribution to nodes in the space. */
num_de = ssd_hash(ssd, &odsc->bb, dht_tab);
if(num_de == 0) {
fprintf(stderr, "'%s()': this should not happen, num_de == 0 ?!\n",
__func__);
}

/* Update object descriptors on the corresponding nodes. */
for(i = 0; i < num_de; i++) {
if(dht_tab[i]->rank == server->dsg->rank) {
DEBUG_OUT("Add in local_dht %d\n", server->dsg->rank);
ABT_mutex_lock(server->dht_mutex);
switch(type) {
case DS_OBJ_NEW:
dht_add_entry(ssd->ent_self, odsc);
break;
case DS_OBJ_OWNER:
dht_update_owner(ssd->ent_self, odsc, 1);
break;
default:
fprintf(stderr, "ERROR: (%s): unknown object update type.\n",
__func__);
}
ABT_mutex_unlock(server->dht_mutex);
DEBUG_OUT("I am self, added in local dht %d\n", server->dsg->rank);
continue;
}

// now send rpc to the server for dht_update
hg_return_t hret;
odsc_gdim_t in;
struct margo_request_list_entry *req_ent =
(struct margo_request_list_entry*) malloc(sizeof(struct margo_request_list_entry));

DEBUG_OUT("Server %d sending object %s to dht server %d \n",
server->dsg->rank, obj_desc_sprint(odsc), dht_tab[i]->rank);

in.odsc_gdim.size = sizeof(*odsc);
in.odsc_gdim.gdim_size = sizeof(struct global_dimension);
in.odsc_gdim.raw_odsc = (char *)(odsc);
in.odsc_gdim.raw_gdim = (char *)(&od->gdim);
in.param = type;

hg_addr_t svr_addr;
margo_addr_lookup(server->mid, server->server_address[dht_tab[i]->rank],
&svr_addr);
margo_create(server->mid, svr_addr, server->obj_update_blocking_id, &req_ent->hndl);
margo_iforward(req_ent->hndl, &in, &req_ent->req);
DEBUG_OUT("sent obj server %d to update dht %s in \n", dht_tab[i]->rank,
obj_desc_sprint(odsc));

margo_addr_free(server->mid, svr_addr);

list_add(&req_ent->entry, req_list);
}

return dspaces_SUCCESS;
}

static int get_client_data(obj_descriptor odsc, dspaces_provider_t server)
{
bulk_in_t in;
Expand Down Expand Up @@ -956,6 +1029,10 @@ int dspaces_server_init(char *listen_addr_str, MPI_Comm comm,
&server->obj_update_id, &flag);
DS_HG_REGISTER(hg, server->obj_update_id, odsc_gdim_t, void,
obj_update_rpc);
margo_registered_name(server->mid, "obj_update_blocking_rpc",
&server->obj_update_blocking_id, &flag);
DS_HG_REGISTER(hg, server->obj_update_blocking_id, odsc_gdim_t, bulk_out_t,
obj_update_blocking_rpc);
margo_registered_name(server->mid, "odsc_internal_rpc",
&server->odsc_internal_id, &flag);
DS_HG_REGISTER(hg, server->odsc_internal_id, odsc_gdim_t, odsc_list_t,
Expand Down Expand Up @@ -1018,6 +1095,11 @@ int dspaces_server_init(char *listen_addr_str, MPI_Comm comm,
NULL);
margo_registered_disable_response(server->mid, server->obj_update_id,
HG_TRUE);
server->obj_update_blocking_id = MARGO_REGISTER(
server->mid, "obj_update_blocking_rpc", odsc_gdim_t, bulk_out_t,
obj_update_blocking_rpc);
margo_register_data(server->mid, server->obj_update_blocking_id,
(void *)server, NULL);
server->odsc_internal_id =
MARGO_REGISTER(server->mid, "odsc_internal_rpc", odsc_gdim_t,
odsc_list_t, odsc_internal_rpc);
Expand Down Expand Up @@ -2107,6 +2189,73 @@ static void obj_update_rpc(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(obj_update_rpc)

/*
Rpc routine to update (add or insert) an object descriptor in the
dht table.
*/
static void obj_update_blocking_rpc(hg_handle_t handle)
{
hg_return_t hret;
odsc_gdim_t in;
bulk_out_t out;
obj_update_t type;
int err;

margo_instance_id mid = margo_hg_handle_get_instance(handle);

const struct hg_info *info = margo_get_info(handle);
dspaces_provider_t server =
(dspaces_provider_t)margo_registered_data(mid, info->id);

DEBUG_OUT("Received rpc to update obj_dht\n");

hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,
"DATASPACES: ERROR handling %s: margo_get_input() failed with "
"%d.\n",
__func__, hret);
margo_destroy(handle);
return;
}

obj_descriptor in_odsc;
memcpy(&in_odsc, in.odsc_gdim.raw_odsc, sizeof(in_odsc));
struct global_dimension gdim;
memcpy(&gdim, in.odsc_gdim.raw_gdim, sizeof(struct global_dimension));
type = in.param;

DEBUG_OUT("received update_rpc %s\n", obj_desc_sprint(&in_odsc));
ABT_mutex_lock(server->sspace_mutex);
struct sspace *ssd = lookup_sspace(server, in_odsc.name, &gdim);
ABT_mutex_unlock(server->sspace_mutex);
struct dht_entry *de = ssd->ent_self;

ABT_mutex_lock(server->dht_mutex);
switch(type) {
case DS_OBJ_NEW:
err = dht_add_entry(de, &in_odsc);
break;
case DS_OBJ_OWNER:
err = dht_update_owner(de, &in_odsc, 1);
break;
default:
fprintf(stderr, "ERROR: (%s): unknown object update type.\n", __func__);
}
ABT_mutex_unlock(server->dht_mutex);
DEBUG_OUT("Updated dht %s in server %d \n", obj_desc_sprint(&in_odsc),
server->dsg->rank);
if(err < 0)
fprintf(stderr, "ERROR (%s): obj_update_rpc Failed with %d\n", __func__,
err);

out.ret = err;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
}
DEFINE_MARGO_RPC_HANDLER(obj_update_blocking_rpc)

static void ss_rpc(hg_handle_t handle)
{
ss_information out;
Expand Down Expand Up @@ -2333,116 +2482,6 @@ int dspaces_server_get_objdata(dspaces_provider_t server,
return (0);
}

// static void put_drain_rpc(hg_handle_t handle)
// {
// hg_return_t hret;
// bulk_gdim_t in;
// hg_bulk_t bulk_handle;
// hg_addr_t client_addr;
// hg_handle_t notify_drainh;
// margo_request req;
// bulk_in_t notice;

// margo_instance_id mid = margo_hg_handle_get_instance(handle);

// const struct hg_info *info = margo_get_info(handle);
// dspaces_provider_t server =
// (dspaces_provider_t)margo_registered_data(mid, info->id);

// if(server->f_kill == 0) {
// fprintf(stderr, "WARNING: put_drain rpc received when server is finalizing. "
// "This will likely cause problems...\n");
// }

// hret = margo_get_input(handle, &in);
// if(hret != HG_SUCCESS) {
// fprintf(stderr,
// "DATASPACES: ERROR handling %s: margo_get_input() failed with "
// "%d.\n",
// __func__, hret);
// margo_destroy(handle);
// return;
// }

// obj_descriptor in_odsc;
// memcpy(&in_odsc, in.odsc.raw_odsc, sizeof(in_odsc));

// // get client addr here before update the owner
// margo_addr_lookup(server->mid, in_odsc.owner, &client_addr);

// // set the owner to be this server address
// hg_addr_t owner_addr;
// size_t owner_addr_size = 128;

// margo_addr_self(server->mid, &owner_addr);
// margo_addr_to_string(server->mid, in_odsc.owner, &owner_addr_size,
// owner_addr);
// margo_addr_free(server->mid, owner_addr);

// struct obj_data *od;
// od = obj_data_alloc(&in_odsc);
// memcpy(&od->gdim, in.odsc.raw_gdim, sizeof(struct global_dimension));

// if(!od)
// fprintf(stderr, "ERROR: (%s): object allocation failed!\n", __func__);

// // do write lock

// hg_size_t size = (in_odsc.size) * bbox_volume(&(in_odsc.bb));

// hret = margo_bulk_create(mid, 1, (void **)&(od->data), &size,
// HG_BULK_WRITE_ONLY, &bulk_handle);

// if(hret != HG_SUCCESS) {
// fprintf(stderr, "ERROR: (%s): margo_bulk_create failed!\n", __func__);
// out.ret = dspaces_ERR_MERCURY;
// margo_respond(handle, &out);
// margo_free_input(handle, &in);
// margo_destroy(handle);
// return;
// }

// hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.handle, 0,
// bulk_handle, 0, size);
// if(hret != HG_SUCCESS) {
// fprintf(stderr, "ERROR: (%s): margo_bulk_transfer failed!\n", __func__);
// out.ret = dspaces_ERR_MERCURY;
// margo_respond(handle, &out);
// margo_free_input(handle, &in);
// margo_bulk_free(bulk_handle);
// margo_destroy(handle);
// return;
// }

// ABT_mutex_lock(server->ls_mutex);
// ls_add_obj(server->dsg->ls, od);
// ABT_mutex_unlock(server->ls_mutex);

// DEBUG_OUT("Received obj %s\n", obj_desc_sprint(&od->obj_desc));

// // now update the dht
// //out.ret = dspaces_SUCCESS;
// margo_bulk_free(bulk_handle);
// //margo_respond(handle, &out);
// margo_free_input(handle, &in);
// margo_destroy(handle);

// obj_update_dht(server, od, DS_OBJ_OWNER);
// DEBUG_OUT("Finished obj_put_update_owner from put_rpc\n");

// notice.odsc.size = sizeof(obj_descriptor);
// notice.odsc.raw_odsc = (char*)(&in_odsc);

// margo_create(server->mid, client_addr, server->notify_drain_id, &notify_drainh);
// margo_iforward(notify_drainh, &notice, &req);
// margo_addr_free(server->mid, client_addr);
// margo_destroy(notify_drainh);

// margo_free_input(handle, &in);
// margo_destroy(handle);
// }
// DEFINE_MARGO_RPC_HANDLER(put_drain_rpc)

static void putlocal_subdrain_rpc(hg_handle_t handle)
{
hg_return_t hret;
Expand Down Expand Up @@ -2495,8 +2534,12 @@ static void putlocal_subdrain_rpc(hg_handle_t handle)
DEBUG_OUT("Received obj %s in putlocal_subdrain_rpc\n",
obj_desc_sprint(&client_od->obj_desc));

/* List of struct dht_update_req */
struct list_head dht_req_list;
INIT_LIST_HEAD(&dht_req_list);

// now update the dht
obj_update_dht(server, client_od, DS_OBJ_NEW);
obj_update_dht_req(server, client_od, DS_OBJ_NEW, &dht_req_list);
DEBUG_OUT("Finished obj_put_local_update in putlocal_subdrain_rpc\n");

// respond to the client after update dht for put_local
Expand Down Expand Up @@ -2555,6 +2598,14 @@ static void putlocal_subdrain_rpc(hg_handle_t handle)
margo_free_input(handle, &in);
margo_bulk_free(bulk_handle);

struct margo_request_list_entry *dht_ent;
list_for_each_entry(dht_ent, &dht_req_list, struct margo_request_list_entry, entry) {
margo_wait(dht_ent->req);
margo_destroy(dht_ent->hndl);
list_del(&dht_ent->entry);
free(dht_ent);
}

obj_update_dht(server, server_od, DS_OBJ_OWNER);
DEBUG_OUT("Finished obj_put_update_owner from putlocal_subdrain_rpc\n");

Expand Down

0 comments on commit 19d27da

Please sign in to comment.