Skip to content

Commit

Permalink
add error info for empty swap list; rm hdf5 debug codes
Browse files Browse the repository at this point in the history
  • Loading branch information
bozhang-hpc committed Sep 12, 2024
1 parent 8f2312b commit 3921641
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 40 deletions.
40 changes: 14 additions & 26 deletions src/dspaces-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ struct dspaces_provider {
int f_debug;
int f_drain;
int f_kill;
int f_filedebug;

#ifdef HAVE_DRC
uint32_t drc_credential_id;
Expand Down Expand Up @@ -771,7 +770,6 @@ int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm,
const char *envdebug = getenv("DSPACES_DEBUG");
const char *envnthreads = getenv("DSPACES_NUM_HANDLERS");
const char *envdrain = getenv("DSPACES_DRAIN");
const char *envfiledebug = getenv("DSPACES_FILE_DEBUG");
const char *mod_dir_str = xstr(DSPACES_MOD_DIR);
dspaces_provider_t server;
hg_class_t *hg;
Expand Down Expand Up @@ -810,10 +808,6 @@ int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm,
server->f_drain = 1;
}

if(envfiledebug) {
server->f_filedebug = 1;
}

MPI_Comm_dup(comm, &server->comm);
MPI_Comm_rank(comm, &server->rank);

Expand Down Expand Up @@ -1315,25 +1309,18 @@ static void put_rpc(hg_handle_t handle)
struct obj_data_ptr_flat_list_entry *swap_od_entry;
struct obj_data *swap_od;

if(server->f_filedebug) {
if(server->dsg->ls->num_obj>= 1) {
ABT_mutex_lock(server->ls_mutex);
swap_od = which_swap_out(&server->conf.swap, &server->dsg->ls_od_list);
ABT_mutex_unlock(server->ls_mutex);
hdf5_write_od(&server->conf.swap, swap_od);
ABT_mutex_lock(server->ls_mutex);
ls_remove(server->dsg->ls, swap_od);
list_del(&swap_od->flat_list_entry.entry);
ABT_mutex_unlock(server->ls_mutex);
obj_data_free(swap_od);
}
} else {
while(need_swap_out(&server->conf.swap, size_MB)) {
/* Find the od that we want to swap out */
ABT_mutex_lock(server->ls_mutex);
swap_od = which_swap_out(&server->conf.swap, &server->dsg->ls_od_list);
ABT_mutex_unlock(server->ls_mutex);
// swap_od = swap_od_entry->od;
if(!swap_od) {
fprintf(stderr, "DATASPACES: ERROR: %s: Object data list has nothing to swap out! "
"This should not happen... The primary reason could be either the "
"server node memory capacity is too small, or the user-configured "
"server memory quota is too small.", __func__);
break;
}

/* Write od to HDF5 */
hdf5_write_od(&server->conf.swap, swap_od);
Expand All @@ -1346,8 +1333,6 @@ static void put_rpc(hg_handle_t handle)

/* Free od */
obj_data_free(swap_od);
// free(swap_od_entry);
}
}

struct obj_data *od;
Expand Down Expand Up @@ -1401,7 +1386,6 @@ static void put_rpc(hg_handle_t handle)
ABT_mutex_lock(server->ls_mutex);
ls_add_obj(server->dsg->ls, od);
/* add obj_data to the flat od_list for swap out */
// struct obj_data_ptr_flat_list_entry *od_flat_entry = ls_flat_od_list_entry_alloc(od);
list_add_tail(&od->flat_list_entry.entry, &server->dsg->ls_od_list);
ABT_mutex_unlock(server->ls_mutex);

Expand Down Expand Up @@ -2245,7 +2229,13 @@ static void get_rpc(hg_handle_t handle)
ABT_mutex_lock(server->ls_mutex);
swap_od = which_swap_out(&server->conf.swap, &server->dsg->ls_od_list);
ABT_mutex_unlock(server->ls_mutex);
// swap_od = swap_od_entry->od;
if(!swap_od) {
fprintf(stderr, "DATASPACES: ERROR: %s: Object data list has nothing to swap out! "
"This should not happen... The primary reason could be either the "
"server node memory capacity is too small, or the user-configured "
"server memory quota is too small.", __func__);
break;
}

/* Write od to HDF5 */
hdf5_write_od(&server->conf.swap, swap_od);
Expand All @@ -2258,7 +2248,6 @@ static void get_rpc(hg_handle_t handle)
ABT_mutex_unlock(server->ls_mutex);
/* Free od */
obj_data_free(swap_od);
// free(swap_od_entry);
}
DEBUG_OUT("allocated target object\n");

Expand Down Expand Up @@ -2343,7 +2332,6 @@ static void get_rpc(hg_handle_t handle)
ABT_mutex_lock(server->ls_mutex);
ls_add_obj(server->dsg->ls, od);
/* add obj_data to the flat od_list for swap out */
// od_flat_entry = ls_flat_od_list_entry_alloc(od);
od->flat_list_entry.usecnt++;
list_add_tail(&od->flat_list_entry.entry, &server->dsg->ls_od_list);
ABT_mutex_unlock(server->ls_mutex);
Expand Down
13 changes: 9 additions & 4 deletions src/file_storage/file_hdf5.c
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,10 @@ static int hdf5_read_dataset(const char* file_name, hid_t file_id, char* dataset
free(stride);
return dspaces_ERR_HDF5;
}

data = (void*) malloc(elem_num*datatype_size);

if(!data) {
data = (void*) malloc(elem_num*datatype_size);
}

status = H5Dread(dataset_id, datatype_id, memspace_id, dataspace_id, H5P_DEFAULT, data);
if(status < 0) {
Expand Down Expand Up @@ -710,8 +712,11 @@ int hdf5_write_od(struct swap_config* swap_conf, struct obj_data *od)
struct obj_data *qod, *search_od, *union_od;

/* Check if we can open the file path, if not, create one at the default path */
if(!(check_dir_exist(swap_conf->file_dir) && check_dir_write_permission(swap_conf->file_dir)))
{
if(!check_dir_exist(swap_conf->file_dir)) {
fprintf(stderr, "WARNING: Swap directory: %s does not exist. "
"Creating the directory.\n", swap_conf->file_dir);
mkdir_all_owner_permission(swap_conf->file_dir);
} else if(!check_dir_write_permission(swap_conf->file_dir)) {
fprintf(stderr, "WARNING: Failed to write files into swap directory: %s "
"Use ./dspaces_swap as the default swap directory instead.\n",
swap_conf->file_dir);
Expand Down
34 changes: 24 additions & 10 deletions src/file_storage/policy.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,27 +150,41 @@ static int value_when_policy(float threshold, uint64_t size_MB)

static struct obj_data* fifo_which_policy(struct list_head *ls_od_list) {
struct obj_data *od;
od = list_entry(ls_od_list->next, struct obj_data, flat_list_entry.entry);
return od;

if(list_empty(ls_od_list)) {
return NULL;
} else {
od = list_entry(ls_od_list->next, struct obj_data, flat_list_entry.entry);
return od;
}
}

static struct obj_data* lifo_which_policy(struct list_head *ls_od_list) {
struct obj_data *od;
od = list_entry(ls_od_list->prev, struct obj_data, flat_list_entry.entry);
return od;

if(list_empty(ls_od_list)) {
return NULL;
} else {
od = list_entry(ls_od_list->prev, struct obj_data, flat_list_entry.entry);
return od;
}
}

static struct obj_data* lru_which_policy(struct list_head *ls_od_list) {
struct obj_data *od, *min;

min = list_entry(ls_od_list->next, struct obj_data, flat_list_entry.entry);
if(list_empty(ls_od_list)) {
return NULL;
} else {
min = list_entry(ls_od_list->next, struct obj_data, flat_list_entry.entry);

list_for_each_entry(od, ls_od_list, struct obj_data, flat_list_entry.entry) {
if(od->flat_list_entry.usecnt < min->flat_list_entry.usecnt) {
min = od;
list_for_each_entry(od, ls_od_list, struct obj_data, flat_list_entry.entry) {
if(od->flat_list_entry.usecnt < min->flat_list_entry.usecnt) {
min = od;
}
}
return min;
}
return min;
}

int need_swap_out(struct swap_config* swap, uint64_t size_MB)
Expand All @@ -195,7 +209,7 @@ int need_swap_out(struct swap_config* swap, uint64_t size_MB)
}

struct obj_data* which_swap_out(struct swap_config *swap,
struct list_head* ls_od_list)
struct list_head* ls_od_list)
{
// TODO: support custom policy
if((strcmp(swap->policy, "Default") == 0) || (strcmp(swap->policy, "FIFO") == 0)) {
Expand Down

0 comments on commit 3921641

Please sign in to comment.