Skip to content

Commit

Permalink
fix conditional compilation for od swap in put()/get(); make the file…
Browse files Browse the repository at this point in the history
… backend not user-specified
  • Loading branch information
bozhang-hpc committed Sep 29, 2024
1 parent d84a86e commit 407a6b9
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ if(NetCDF_FOUND)
endif()

if(HDF5_FOUND OR NetCDF_FOUND)
set(ENABLE_FILE_STORAGE ON CACHE BOOL "Enable File Swap on DataSpaces Server")
set(ENABLE_FILE_STORAGE ON)
add_definitions(-DDSPACES_HAVE_FILE_STORAGE)
endif()

Expand Down
2 changes: 2 additions & 0 deletions include/ss_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ struct obj_data {
/* Flag to mark if we should free this data object. */
unsigned int f_free : 1;

#ifdef DSPACES_HAVE_FILE_STORAGE
/* Reference to the flat od list in local storage; Used for advanced swap policy */
struct obj_data_ptr_flat_list_entry* ls_od_entry;
#endif // DSPACES_HAVE_FILE_STORAGE
};

struct gdim_list_entry {
Expand Down
2 changes: 1 addition & 1 deletion src/dspaces-conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ int parse_conf_toml(const char *fname, struct ds_conf *conf)
if(swap) {
parse_swap_table_after_default(swap, conf);
}
#endif
#endif // DSPACES_HAVE_FILE_STORAGE

toml_free(toml_conf);

Expand Down
19 changes: 18 additions & 1 deletion src/dspaces-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,9 @@ static int server_destroy(dspaces_provider_t server)
}

free_sspace(server->dsg);
#ifdef DSPACES_HAVE_FILE_STORAGE
free_ls_od_list(&server->dsg->ls_od_list);
#endif // DSPACES_HAVE_FILE_STORAGE
ls_free(server->dsg->ls);
free(server->dsg);
free(server->server_address[0]);
Expand Down Expand Up @@ -1272,6 +1274,7 @@ static void odsc_take_ownership(dspaces_provider_t server, obj_descriptor *odsc)
}
}

#ifdef DSPACES_HAVE_FILE_STORAGE
static int od_swap_out(dspaces_provider_t server)
{
int ret;
Expand Down Expand Up @@ -1304,6 +1307,7 @@ static int od_swap_out(dspaces_provider_t server)

return dspaces_SUCCESS;
}
#endif // DSPACES_HAVE_FILE_STORAGE

static void put_rpc(hg_handle_t handle)
{
Expand Down Expand Up @@ -1339,12 +1343,14 @@ static void put_rpc(hg_handle_t handle)
// set the owner to be this server address
odsc_take_ownership(server, &in_odsc);

#ifdef DSPACES_HAVE_FILE_STORAGE
/* Check if needs to swap data out to HDF5*/
uint64_t size_MB = obj_data_size(&in_odsc) >> 20;

while(need_swap_out(&server->conf.swap, size_MB)) {
od_swap_out(server);
}
#endif // DSPACES_HAVE_FILE_STORAGE

struct obj_data *od;
od = obj_data_alloc(&in_odsc);
Expand Down Expand Up @@ -1396,8 +1402,10 @@ static void put_rpc(hg_handle_t handle)

ABT_mutex_lock(server->ls_mutex);
ls_add_obj(server->dsg->ls, od);
#ifdef DSPACES_HAVE_FILE_STORAGE
/* add obj_data to the flat od_list for swap out */
list_add_tail(&od->flat_list_entry.entry, &server->dsg->ls_od_list);
#endif // DSPACES_HAVE_FILE_STORAGE
ABT_mutex_unlock(server->ls_mutex);

DEBUG_OUT("Received obj %s\n", obj_desc_sprint(&od->obj_desc));
Expand Down Expand Up @@ -2233,10 +2241,13 @@ static void get_rpc(hg_handle_t handle)

struct obj_data *od, *from_obj;

#ifdef DSPACES_HAVE_FILE_STORAGE
while(!(od = obj_data_alloc(&in_odsc))) {
/* Failed to allocate od, the primary reason is insufficient memory. */
od_swap_out(server);
}
#endif // DSPACES_HAVE_FILE_STORAGE

DEBUG_OUT("allocated target object\n");

ABT_mutex_lock(server->ls_mutex);
Expand All @@ -2253,11 +2264,13 @@ static void get_rpc(hg_handle_t handle)
}
ABT_mutex_unlock(server->ls_mutex);

#ifdef DSPACES_HAVE_FILE_STORAGE
if(!from_obj) {
/* Did not find the from_obj from staging memory,
* search it in the swap space */
file_read_od(&server->conf.swap, od);
}
#endif // DSPACES_HAVE_FILE_STORAGE

hg_size_t size = (in_odsc.size) * bbox_volume(&(in_odsc.bb));
void *buffer = (void *)od->data;
Expand Down Expand Up @@ -2315,7 +2328,9 @@ static void get_rpc(hg_handle_t handle)
struct obj_data_ptr_flat_list_entry *od_flat_entry;
if(from_obj) {
obj_data_free(od);
} else {
}
#ifdef DSPACES_HAVE_FILE_STORAGE
else {
/* We add the read-back od to the local storage */
ABT_mutex_lock(server->ls_mutex);
ls_add_obj(server->dsg->ls, od);
Expand All @@ -2324,6 +2339,8 @@ static void get_rpc(hg_handle_t handle)
list_add_tail(&od->flat_list_entry.entry, &server->dsg->ls_od_list);
ABT_mutex_unlock(server->ls_mutex);
}
#endif // DSPACES_HAVE_FILE_STORAGE

margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
Expand Down

0 comments on commit 407a6b9

Please sign in to comment.