diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a6b95cc..b091e6fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -74,11 +74,27 @@ if(OPENMP_FOUND AND DSPACES_USE_OPENMP) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}") add_definitions(-DOPS_USE_OPENMP) endif() + find_package(CURL) if(CURL_FOUND) add_definitions(-DDSPACES_HAVE_CURL) endif() +find_package(HDF5) +if(HDF5_FOUND) + add_definitions(-DDSPACES_HAVE_HDF5) +endif() + +find_package(NetCDF) +if(NetCDF_FOUND) + add_definitions(-DDSPACES_HAVE_NetCDF) +endif() + +if(HDF5_FOUND OR NetCDF_FOUND) + set(ENABLE_FILE_STORAGE ON) + add_definitions(-DDSPACES_HAVE_FILE_STORAGE) +endif() + include(CheckLanguage) check_language(Fortran) if(CMAKE_Fortran_COMPILER) diff --git a/cmake/FindNetCDF.cmake b/cmake/FindNetCDF.cmake new file mode 100644 index 00000000..8b7a5e3e --- /dev/null +++ b/cmake/FindNetCDF.cmake @@ -0,0 +1,133 @@ +# Copied from VTK's FindNetCDF.cmake + +#[==[ +Provides the following variables: + + * `NetCDF_FOUND`: Whether NetCDF was found or not. + * `NetCDF_INCLUDE_DIRS`: Include directories necessary to use NetCDF. + * `NetCDF_LIBRARIES`: Libraries necessary to use NetCDF. + * `NetCDF_VERSION`: The version of NetCDF found. + * `NetCDF::NetCDF`: A target to use with `target_link_libraries`. + * `NetCDF_HAS_PARALLEL`: Whether or not NetCDF was found with parallel IO support. +#]==] + +function(FindNetCDF_get_is_parallel_aware include_dir) + file(STRINGS "${include_dir}/netcdf_meta.h" _netcdf_lines + REGEX "#define[ \t]+NC_HAS_PARALLEL[ \t]") + string(REGEX REPLACE ".*NC_HAS_PARALLEL[ \t]*([0-1]+).*" "\\1" _netcdf_has_parallel "${_netcdf_lines}") + if (_netcdf_has_parallel) + set(NetCDF_HAS_PARALLEL TRUE PARENT_SCOPE) + else() + set(NetCDF_HAS_PARALLEL FALSE PARENT_SCOPE) + endif() +endfunction() + +# Try to find a CMake-built NetCDF. +find_package(netCDF CONFIG QUIET) +if (netCDF_FOUND) + # Forward the variables in a consistent way. + set(NetCDF_FOUND "${netCDF_FOUND}") + set(NetCDF_INCLUDE_DIRS "${netCDF_INCLUDE_DIR}") + set(NetCDF_LIBRARIES "${netCDF_LIBRARIES}") + set(NetCDF_VERSION "${NetCDFVersion}") + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args(NetCDF + REQUIRED_VARS NetCDF_INCLUDE_DIRS NetCDF_LIBRARIES + VERSION_VAR NetCDF_VERSION) + + if (NOT TARGET NetCDF::NetCDF) + add_library(NetCDF::NetCDF INTERFACE IMPORTED) + if (TARGET "netCDF::netcdf") + # 4.7.3 + set_target_properties(NetCDF::NetCDF PROPERTIES + INTERFACE_LINK_LIBRARIES "netCDF::netcdf") + elseif (TARGET "netcdf") + set_target_properties(NetCDF::NetCDF PROPERTIES + INTERFACE_LINK_LIBRARIES "netcdf") + else () + set_target_properties(NetCDF::NetCDF PROPERTIES + INTERFACE_LINK_LIBRARIES "${netCDF_LIBRARIES}") + endif () + endif () + + FindNetCDF_get_is_parallel_aware("${NetCDF_INCLUDE_DIRS}") + # Skip the rest of the logic in this file. + return () +endif () + +find_package(PkgConfig QUIET) +if (PkgConfig_FOUND) + pkg_check_modules(_NetCDF QUIET netcdf IMPORTED_TARGET) + if (_NetCDF_FOUND) + # Forward the variables in a consistent way. + set(NetCDF_FOUND "${_NetCDF_FOUND}") + set(NetCDF_INCLUDE_DIRS "${_NetCDF_INCLUDE_DIRS}") + set(NetCDF_LIBRARIES "${_NetCDF_LIBRARIES}") + set(NetCDF_VERSION "${_NetCDF_VERSION}") + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args(NetCDF + REQUIRED_VARS NetCDF_LIBRARIES + # This is not required because system-default include paths are not + # reported by `FindPkgConfig`, so this might be empty. Assume that if we + # have a library, the include directories are fine (if any) since + # PkgConfig reported that the package was found. + # NetCDF_INCLUDE_DIRS + VERSION_VAR NetCDF_VERSION) + + if (NOT TARGET NetCDF::NetCDF) + add_library(NetCDF::NetCDF INTERFACE IMPORTED) + set_target_properties(NetCDF::NetCDF PROPERTIES + INTERFACE_LINK_LIBRARIES "PkgConfig::_NetCDF") + endif () + + FindNetCDF_get_is_parallel_aware("${_NetCDF_INCLUDEDIR}") + # Skip the rest of the logic in this file. + return () + endif () +endif () + +find_path(NetCDF_INCLUDE_DIR + NAMES netcdf.h + DOC "netcdf include directories") +mark_as_advanced(NetCDF_INCLUDE_DIR) + +find_library(NetCDF_LIBRARY + NAMES netcdf + DOC "netcdf library") +mark_as_advanced(NetCDF_LIBRARY) + +if (NetCDF_INCLUDE_DIR) + file(STRINGS "${NetCDF_INCLUDE_DIR}/netcdf_meta.h" _netcdf_version_lines + REGEX "#define[ \t]+NC_VERSION_(MAJOR|MINOR|PATCH|NOTE)") + string(REGEX REPLACE ".*NC_VERSION_MAJOR *\([0-9]*\).*" "\\1" _netcdf_version_major "${_netcdf_version_lines}") + string(REGEX REPLACE ".*NC_VERSION_MINOR *\([0-9]*\).*" "\\1" _netcdf_version_minor "${_netcdf_version_lines}") + string(REGEX REPLACE ".*NC_VERSION_PATCH *\([0-9]*\).*" "\\1" _netcdf_version_patch "${_netcdf_version_lines}") + string(REGEX REPLACE ".*NC_VERSION_NOTE *\"\([^\"]*\)\".*" "\\1" _netcdf_version_note "${_netcdf_version_lines}") + set(NetCDF_VERSION "${_netcdf_version_major}.${_netcdf_version_minor}.${_netcdf_version_patch}${_netcdf_version_note}") + unset(_netcdf_version_major) + unset(_netcdf_version_minor) + unset(_netcdf_version_patch) + unset(_netcdf_version_note) + unset(_netcdf_version_lines) + + FindNetCDF_get_is_parallel_aware("${NetCDF_INCLUDE_DIR}") +endif () + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(NetCDF + REQUIRED_VARS NetCDF_LIBRARY NetCDF_INCLUDE_DIR + VERSION_VAR NetCDF_VERSION) + +if (NetCDF_FOUND) + set(NetCDF_INCLUDE_DIRS "${NetCDF_INCLUDE_DIR}") + set(NetCDF_LIBRARIES "${NetCDF_LIBRARY}") + + if (NOT TARGET NetCDF::NetCDF) + add_library(NetCDF::NetCDF UNKNOWN IMPORTED) + set_target_properties(NetCDF::NetCDF PROPERTIES + IMPORTED_LOCATION "${NetCDF_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${NetCDF_INCLUDE_DIR}") + endif () +endif () \ No newline at end of file diff --git a/include/bbox.h b/include/bbox.h index 16162c2c..7ceb1484 100644 --- a/include/bbox.h +++ b/include/bbox.h @@ -42,6 +42,11 @@ int bbox_include(const struct bbox *, const struct bbox *); int bbox_does_intersect(const struct bbox *, const struct bbox *); void bbox_intersect(const struct bbox *, const struct bbox *, struct bbox *); int bbox_equals(const struct bbox *, const struct bbox *); +int bbox_include_ondim(const struct bbox *b0, const struct bbox *b1, int dim); +int bbox_can_include(struct bbox* bbox0, struct bbox* bbox1); +int bbox_can_union(const struct bbox* bbox0, const struct bbox* bbox1); +void bbox_union_ondim(const struct bbox* bbox0, const struct bbox* bbox1, int dim, struct bbox* bbox2); +int bbox_union(const struct bbox* bbox0, const struct bbox* bbox1, struct bbox* bbox2); uint64_t bbox_volume(const struct bbox *); void bbox_to_intv(const struct bbox *, uint64_t, int, struct intv **, int *); diff --git a/include/dspaces-common.h b/include/dspaces-common.h index f1018219..56266d28 100644 --- a/include/dspaces-common.h +++ b/include/dspaces-common.h @@ -27,6 +27,8 @@ extern "C" { #define dspaces_ERR_UNKNOWN_PR -7 /* Could not find server */ #define dspaces_ERR_UNKNOWN_OBJ -8 /* Could not find the object*/ #define dspaces_ERR_END -9 /* End of range for valid error codes */ +#define dspaces_ERR_UTILS -12 +#define dspaces_ERR_HDF5 -13 #define DS_MOD_EFAULT -1 #define DS_MOD_ENODEF -2 diff --git a/include/dspaces-conf.h b/include/dspaces-conf.h index 7dcbf94f..8fbc2a96 100644 --- a/include/dspaces-conf.h +++ b/include/dspaces-conf.h @@ -5,6 +5,10 @@ #include "dspaces-remote.h" #include "list.h" +#ifdef DSPACES_HAVE_FILE_STORAGE +#include "file_storage/policy.h" +#endif + static char *hash_strings[] = {"Dynamic", "Unitary", "SFC", "Bisection"}; /* Server configuration parameters */ @@ -18,6 +22,9 @@ struct ds_conf { struct remote **remotes; int nremote; struct list_head *mods; +#ifdef DSPACES_HAVE_FILE_STORAGE + struct swap_config swap; +#endif }; int parse_conf(const char *fname, struct ds_conf *conf); diff --git a/include/file_storage/file.h b/include/file_storage/file.h new file mode 100644 index 00000000..03b7936e --- /dev/null +++ b/include/file_storage/file.h @@ -0,0 +1,10 @@ +#ifndef __DSPACES_FILE_STORAGE_H__ +#define __DSPACES_FILE_STORAGE_H__ + +#include "ss_data.h" +#include "file_storage/policy.h" + +int file_write_od(struct swap_config* swap_conf, struct obj_data *od); +int file_read_od(struct swap_config* swap_conf, struct obj_data *od); + +#endif // __DSPACES_FILE_STORAGE_H__ \ No newline at end of file diff --git a/include/file_storage/file_hdf5.h b/include/file_storage/file_hdf5.h new file mode 100644 index 00000000..2b77439a --- /dev/null +++ b/include/file_storage/file_hdf5.h @@ -0,0 +1,10 @@ +#ifndef __DSPACES_FILE_STORAGE_HDF5_H__ +#define __DSPACES_FILE_STORAGE_HDF5_H__ + +#include "ss_data.h" +#include "file_storage/policy.h" + +int hdf5_write_od(struct swap_config* swap_conf, struct obj_data *od); +int hdf5_read_od(struct swap_config* swap_conf, struct obj_data *od); + +#endif // __DSPACES_FILE_STORAGE_HDF5_H__ \ No newline at end of file diff --git a/include/file_storage/policy.h b/include/file_storage/policy.h new file mode 100644 index 00000000..b6a11b34 --- /dev/null +++ b/include/file_storage/policy.h @@ -0,0 +1,45 @@ +#ifndef __DSPACES_FILE_STORAGE_POLICY_H__ +#define __DSPACES_FILE_STORAGE_POLICY_H__ + +#include "stdint.h" + + +#include "ss_data.h" + +/* Server swap space configuration parameters */ +typedef enum mem_value_type {DS_MEM_BYTES, DS_MEM_PERCENT} ds_mem_val_t; + +typedef enum file_type { +#ifdef DSPACES_HAVE_HDF5 + DS_FILE_HDF5, +#endif +#ifdef DSPACES_HAVE_NetCDF + DS_FILE_NetCDF +#endif +} ds_file_t; + +struct swap_config +{ + char *file_dir; + ds_mem_val_t mem_quota_type; + ds_file_t file_backend; + char *policy; + float disk_quota_MB; + union { + float MB; + float percent; + } mem_quota; +}; + +void free_ls_od_list(struct list_head* ls_od_list); + +void memory_quota_parser(char* str, struct swap_config* swap); +void disk_quota_parser(char* str, struct swap_config* swap); + +int policy_str_check(const char*str); + +int need_swap_out(struct swap_config *swap, uint64_t size_MB); + +struct obj_data* which_swap_out(struct swap_config* swap, struct list_head* ls_od_list); + +#endif // __DSPACES_FILE_STORAGE_POLICY_H__ \ No newline at end of file diff --git a/include/gspace.h b/include/gspace.h index e6f7d973..67354bb7 100644 --- a/include/gspace.h +++ b/include/gspace.h @@ -60,6 +60,11 @@ struct ds_gspace { /* Pending object descriptors for draining. */ struct list_head obj_desc_drain_list; +#ifdef DSPACES_HAVE_FILE_STORAGE + /* List of object data for swap out */ + struct list_head ls_od_list; +#endif // DSPACES_HAVE_FILE_STORAGE + int rank; int size_sp; char **server_address; diff --git a/include/ss_data.h b/include/ss_data.h index ececc6a7..5071216b 100644 --- a/include/ss_data.h +++ b/include/ss_data.h @@ -66,9 +66,20 @@ struct global_dimension { struct coord sizes; }; +#ifdef DSPACES_HAVE_FILE_STORAGE +struct flat_od_list_info { + struct list_head entry; + int usecnt; +}; +#endif // DSPACES_HAVE_FILE_STORAGE + struct obj_data { struct list_head obj_entry; +#ifdef DSPACES_HAVE_FILE_STORAGE + struct flat_od_list_info flat_list_entry; +#endif // DSPACES_HAVE_FILE_STORAGE + obj_descriptor obj_desc; struct global_dimension gdim; @@ -439,6 +450,7 @@ struct obj_data *ls_lookup(ss_storage *, char *); void ls_remove(ss_storage *, struct obj_data *); void ls_try_remove_free(ss_storage *, struct obj_data *); struct obj_data *ls_find(ss_storage *, obj_descriptor *); +struct obj_data *ls_find_include(ss_storage *, obj_descriptor *); struct obj_data *ls_find_od(ss_storage *, obj_descriptor *); int ls_find_ods(ss_storage *ls, obj_descriptor *odsc, obj_descriptor ***od_tab); struct obj_data *ls_find_no_version(ss_storage *, obj_descriptor *); @@ -460,10 +472,11 @@ void obj_data_resize(obj_descriptor *obj_desc, uint64_t *new_dims); int obj_desc_equals(obj_descriptor *, obj_descriptor *); int obj_desc_equals_no_owner(const obj_descriptor *, const obj_descriptor *); -int obj_desc_equals_intersect(obj_descriptor *odsc1, obj_descriptor *odsc2); +int obj_desc_equals_intersect(const obj_descriptor *odsc1, const obj_descriptor *odsc2); int obj_desc_by_name_intersect(const obj_descriptor *odsc1, const obj_descriptor *odsc2); +int obj_desc_equals_include(const obj_descriptor *odsc1, const obj_descriptor *odsc2); // void copy_global_dimension(struct global_dimension *l, int ndim, const // uint64_t *gdim); diff --git a/include/util.h b/include/util.h index 6375dbd4..5192b206 100644 --- a/include/util.h +++ b/include/util.h @@ -2,6 +2,7 @@ #define __DS_UTIL_H_ #include +#include size_t str_len(const char *str); char *str_append_const(char *, const char *); @@ -28,4 +29,42 @@ struct name_value_pair *text_to_nv_pairs(const char *text); void free_nv_pairs(struct name_value_pair *pairs); char *alloc_sprintf(const char *fmt_str, ...); +/******************************************************* + Memory Info +**********************************************************/ + +#define MEM_PATH_LEN 256 +typedef struct { + // Values from /proc/meminfo, in KiB or converted to MiB. + uint64_t MemTotalKiB; + uint64_t MemTotalMiB; + uint64_t MemAvailableMiB; // -1 means no data available + uint64_t SwapTotalMiB; + uint64_t SwapTotalKiB; + uint64_t SwapFreeMiB; + // Calculated percentages + double MemAvailablePercent; // percent of total memory that is available + double SwapFreePercent; // percent of total swap that is free +} meminfo_t; + +typedef struct procinfo { + int pid; + int pidfd; + int uid; + int badness; + int oom_score_adj; + long long VmRSSkiB; + char name[MEM_PATH_LEN]; +} procinfo_t; + +meminfo_t parse_meminfo(); + +/******************************************************* + Directory Opreations +**********************************************************/ +int check_dir_exist(const char* dir_path); +int check_dir_write_permission(const char* dir_path); +void mkdir_all_owner_permission(const char* dir_path); +int remove_dir_rf(const char *dir_path); + #endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c8ef0a1d..33aefa36 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -35,6 +35,14 @@ set_target_properties (dspaces set(dspaces-server-src util.c bbox.c ss_data.c dspaces-server.c dspaces-conf.c dspaces-modules.c dspaces-logging.c toml.c) +if(ENABLE_FILE_STORAGE) + list(APPEND dspaces-server-src file_storage/policy.c file_storage/file.c) +endif() + +if(HDF5_FOUND) + list(APPEND dspaces-server-src file_storage/file_hdf5.c) +endif() + add_library(dspaces-server ${dspaces-server-src} ${dspaces-src}) if(HAVE_DRC) target_link_libraries (dspaces-server margo m pthread ${DRC_LIBRARIES} liblz4) @@ -58,6 +66,10 @@ if(CURL_FOUND) target_include_directories (dspaces-server BEFORE PUBLIC ${CURL_INCLUDE_DIR}) endif() +if(HDF5_FOUND) + target_link_libraries(dspaces-server HDF5::HDF5) +endif() + target_compile_definitions(dspaces-server PRIVATE DSPACES_MOD_DIR=${CMAKE_INSTALL_PREFIX}/share/modules) # for shared libs, establish the lib version diff --git a/src/bbox.c b/src/bbox.c index c72292d8..c7875fb6 100644 --- a/src/bbox.c +++ b/src/bbox.c @@ -99,7 +99,7 @@ void bbox_divide(struct bbox *b0, struct bbox *b_tab) /* Test if bounding box b0 includes b1 along dimension dim. */ -static inline int bbox_include_ondim(const struct bbox *b0, +int bbox_include_ondim(const struct bbox *b0, const struct bbox *b1, int dim) { if((b0->lb.c[dim] <= b1->lb.c[dim]) && (b0->ub.c[dim] >= b1->ub.c[dim])) @@ -122,6 +122,46 @@ int bbox_include(const struct bbox *b0, const struct bbox *b1) return 1; } +/* Test if bounding box bbox0 includes bbox1 or bbox1 includes bbox0 + * along all dimensions. Return value could be: + * 0 - no include relationship + * 1 - bbox0 includes bbox1 + * 2 - bbox1 includes bbox0 + * */ +int bbox_can_include(struct bbox* bbox0, struct bbox* bbox1) +{ + int ret; + struct bbox* big, *small; + + // Find the bigger bbox first + if(bbox_dist(bbox0, 0)<=bbox_dist(bbox1, 0)) { + big = bbox1; + small = bbox0; + ret = 2; + } else { + big = bbox0; + small = bbox1; + ret = 1; + } + + // If we cannot find a bbox whose size on every dimension is + // bigger than the other one, then there is no chance for either + // 1 bbox to include the other + for(int i=1; inum_dims; i++) { + if(bbox_dist(small, i)>bbox_dist(big, i)) { + return 0; + } + } + + for(int i=0; inum_dims; i++) { + if(!bbox_include_ondim(big, small, i)) { + return 0; + } + } + + return ret; +} + /* Test if bounding boxes b0 and b1 intersect along dimension dim. */ @@ -182,6 +222,102 @@ int bbox_equals(const struct bbox *bb0, const struct bbox *bb1) return 0; } +/* Test if bounding box bbox0 and bbox1 can union. + * Return value is the dimension where they can union along, + * -1 when they cannot union. + * */ +int bbox_can_union(const struct bbox* bbox0, const struct bbox* bbox1) +{ + int share; + /* Two bbox can union means that they share a common + * (n-1) dim coordinates, and has 1 dimension extended. */ + for(int i=0; inum_dims; i++) { // Choose 1 dimension + // check the coordinates of the other (n-1) dimension + share = 1; + for(int j=0; jnum_dims; j++) { + if(j==i) continue; + if(bbox0->lb.c[j] != bbox1->lb.c[j] || bbox0->ub.c[j] != bbox1->ub.c[j]) { + share = 0; + break; + } + } + + if(!share) continue; // choose the next dimension + + if(share && bbox_intersect_ondim(bbox0, bbox1, i)) return i; + } + return -1; +} + +/* Compute the union result of bounding box bbox0 and bbox1. + * Require to test if bbox0 & bbox1 can union before calling + * this function. Need the union dimension as the input. + * The result will be stored in bbox2. + * */ +void bbox_union_ondim(const struct bbox* bbox0, const struct bbox* bbox1, + int dim, struct bbox* bbox2) +{ + bbox2->num_dims = bbox0->num_dims; + for(int i=0; inum_dims; i++) { + if(i==dim) { + bbox2->lb.c[i] = min(bbox0->lb.c[i], bbox1->lb.c[i]); + bbox2->ub.c[i] = min(bbox0->ub.c[i], bbox1->ub.c[i]); + } else { + bbox2->lb.c[i] = bbox0->lb.c[i]; + bbox2->ub.c[i] = bbox0->ub.c[i]; + } + } +} + +/* Test if bounding box bbox0 and bbox1 can union and + * compute their union bbox. The result will be stored in bbox2. + * Return value is 1 when they can union, otherwise 0. + * */ +int bbox_union(const struct bbox* bbox0, const struct bbox* bbox1, struct bbox* bbox2) +{ + int found = 0, share; + int dim; + /* Two bbox can union means that they share a common + (n-1) dim coordinates, and has 1 dimension extended. */ + for(int i=0; inum_dims; i++) { // Choose 1 dimension + // check the coordinates of the other (n-1) dimension + share = 1; + for(int j=0; jnum_dims; j++) { + if(j==i) continue; + if(bbox0->lb.c[j] != bbox1->lb.c[j] || bbox0->ub.c[j] != bbox1->ub.c[j]) { + share = 0; + break; + } + } + + if(!share) continue; // choose the next dimension + + found = share && bbox_intersect_ondim(bbox0, bbox1, i); + + if(found) { + dim = i; + break; + } + } + + if(!found) { + return 0; + } else { // fill bbox2 + bbox2->num_dims = bbox0->num_dims; + for(int i=0; inum_dims; i++) { + if(i==dim) { + bbox2->lb.c[i] = min(bbox0->lb.c[i], bbox1->lb.c[i]); + bbox2->ub.c[i] = min(bbox0->ub.c[i], bbox1->ub.c[i]); + } else { + bbox2->lb.c[i] = bbox0->lb.c[i]; + bbox2->ub.c[i] = bbox0->ub.c[i]; + } + } + return 1; + } + +} + uint64_t bbox_volume(const struct bbox *bb) { uint64_t n = 1; diff --git a/src/dspaces-client.c b/src/dspaces-client.c index 0dded3c2..f733e3e2 100644 --- a/src/dspaces-client.c +++ b/src/dspaces-client.c @@ -1672,7 +1672,12 @@ static void fill_odsc(dspaces_client_t client, const char *var_name, odsc->version = ver; memset(odsc->owner, 0, sizeof(odsc->owner)); odsc->st = st; - odsc->size = elem_size; + if(elem_size < 0) { + odsc->type = elem_size; + odsc->size = type_to_size(elem_size); + } else { + odsc->size = elem_size; + } odsc->bb.num_dims = ndim; memset(odsc->bb.lb.c, 0, sizeof(uint64_t) * BBOX_MAX_NDIM); diff --git a/src/dspaces-conf.c b/src/dspaces-conf.c index 0fc68ac5..e9a30df2 100644 --- a/src/dspaces-conf.c +++ b/src/dspaces-conf.c @@ -92,6 +92,22 @@ static int parse_line(int lineno, char *line, struct ds_conf *conf) return 0; } +#ifdef DSPACES_HAVE_FILE_STORAGE +static inline void set_default_swap(struct ds_conf *conf) +{ + conf->swap.file_dir = strdup("./dspaces_swap/"); + conf->swap.mem_quota_type = DS_MEM_PERCENT; +#ifdef DSPACES_HAVE_HDF5 + conf->swap.file_backend = DS_FILE_HDF5; +#elif DSPACES_HAVE_NetCDF + conf->swap.file_backend = DS_FILE_NetCDF; +#endif // file backend selection + conf->swap.mem_quota.percent = 1.0; + conf->swap.policy = strdup("Default"); + conf->swap.disk_quota_MB = -1.0; +} +#endif // DSPACES_HAVE_FILE_STORAGE + int parse_conf(const char *fname, struct ds_conf *conf) { FILE *fin; @@ -114,6 +130,9 @@ int parse_conf(const char *fname, struct ds_conf *conf) } fclose(fin); +#ifdef DSPACES_HAVE_FILE_STORAGE + set_default_swap(conf); +#endif // DSPACES_HAVE_FILE_STORAGE return 0; } @@ -313,10 +332,48 @@ static void parse_modules_table(toml_table_t *modules, struct ds_conf *conf) #endif } +#ifdef DSPACES_HAVE_FILE_STORAGE +static void parse_swap_table_after_default(toml_table_t *swap, struct ds_conf *conf) +{ + toml_datum_t dat; + + dat = toml_string_in(swap, "directory"); + if(dat.ok) { + free(conf->swap.file_dir); + conf->swap.file_dir = strdup(dat.u.s); + free(dat.u.s); + } + + dat = toml_string_in(swap, "memory quota"); + if(dat.ok) { + memory_quota_parser(dat.u.s, &conf->swap); + free(dat.u.s); + } + + dat = toml_string_in(swap, "policy"); + if(dat.ok) { + if(policy_str_check(dat.u.s)) { + free(conf->swap.policy); + conf->swap.policy = strdup(dat.u.s); + } else { + fprintf(stderr, "WARNING: Swap Policy: %s is not supported. " + "Use FIFO policy as default.\n", dat.u.s); + } + free(dat.u.s); + } + + dat = toml_string_in(swap, "disk quota"); + if(dat.ok) { + disk_quota_parser(dat.u.s, &conf->swap); + free(dat.u.s); + } +} +#endif // DSPACES_HAVE_FILE_STORAGE + int parse_conf_toml(const char *fname, struct ds_conf *conf) { FILE *fin; - toml_table_t *toml_conf, *server, *remotes, *storage, *modules; + toml_table_t *toml_conf, *server, *remotes, *storage, *modules, *swap; char errbuf[200]; char *ip; int port; @@ -366,6 +423,14 @@ int parse_conf_toml(const char *fname, struct ds_conf *conf) parse_modules_table(modules, conf); } +#ifdef DSPACES_HAVE_FILE_STORAGE + set_default_swap(conf); + swap = toml_table_in(toml_conf, "swap space"); + if(swap) { + parse_swap_table_after_default(swap, conf); + } +#endif // DSPACES_HAVE_FILE_STORAGE + toml_free(toml_conf); return (0); diff --git a/src/dspaces-server.c b/src/dspaces-server.c index 1703f9e7..a70b4b87 100644 --- a/src/dspaces-server.c +++ b/src/dspaces-server.c @@ -17,6 +17,8 @@ #include "ss_data.h" #include "str_hash.h" #include "toml.h" +#include "util.h" + #include #include #include @@ -32,6 +34,10 @@ #include #endif +#ifdef DSPACES_HAVE_FILE_STORAGE +#include "file_storage/file.h" +#endif + #ifdef DSPACES_HAVE_PYTHON #define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION #define PY_ARRAY_UNIQUE_SYMBOL dsm @@ -396,6 +402,10 @@ static int dsg_alloc(dspaces_provider_t server, const char *conf_name, goto err_free; } +#ifdef DSPACES_HAVE_FILE_STORAGE + INIT_LIST_HEAD(&dsg_l->ls_od_list); +#endif // DSPACES_HAVE_FILE_STORAGE + // proxy storage dsg_l->ps = ls_alloc(server->conf.max_versions); if(!dsg_l->ps) { @@ -1216,6 +1226,9 @@ static int server_destroy(dspaces_provider_t server) } free_sspace(server->dsg); +#ifdef DSPACES_HAVE_FILE_STORAGE + free_ls_od_list(&server->dsg->ls_od_list); +#endif // DSPACES_HAVE_FILE_STORAGE ls_free(server->dsg->ls); free(server->dsg); free(server->server_address[0]); @@ -1263,6 +1276,41 @@ static void odsc_take_ownership(dspaces_provider_t server, obj_descriptor *odsc) } } +#ifdef DSPACES_HAVE_FILE_STORAGE +static int od_swap_out(dspaces_provider_t server) +{ + int ret; + struct obj_data *swap_od; + /* Find the od that we want to swap out */ + ABT_mutex_lock(server->ls_mutex); + swap_od = which_swap_out(&server->conf.swap, &server->dsg->ls_od_list); + if(!swap_od) { + fprintf(stderr, "DATASPACES: ERROR: %s: Object data list has nothing to swap out! " + "This should not happen... The primary reason could be either the " + "server node memory capacity is too small, or the user-configured " + "server memory quota is too small.", __func__); + return dspaces_ERR_UNKNOWN_OBJ; + } + + /* Write od to HDF5 */ + ret = file_write_od(&server->conf.swap, swap_od); + if(ret != dspaces_SUCCESS) { + ABT_mutex_unlock(server->ls_mutex); + return ret; + } + + /* Delete the od from both the local stroage list & flat list */ + ls_remove(server->dsg->ls, swap_od); + list_del(&swap_od->flat_list_entry.entry); + ABT_mutex_unlock(server->ls_mutex); + + /* Free od */ + obj_data_free(swap_od); + + return dspaces_SUCCESS; +} +#endif // DSPACES_HAVE_FILE_STORAGE + static void put_rpc(hg_handle_t handle) { hg_return_t hret; @@ -1297,6 +1345,15 @@ static void put_rpc(hg_handle_t handle) // set the owner to be this server address odsc_take_ownership(server, &in_odsc); +#ifdef DSPACES_HAVE_FILE_STORAGE + /* Check if needs to swap data out to HDF5*/ + uint64_t size_MB = obj_data_size(&in_odsc) >> 20; + + while(need_swap_out(&server->conf.swap, size_MB)) { + od_swap_out(server); + } +#endif // DSPACES_HAVE_FILE_STORAGE + struct obj_data *od; od = obj_data_alloc(&in_odsc); memcpy(&od->gdim, in.odsc.raw_gdim, sizeof(struct global_dimension)); @@ -1347,6 +1404,10 @@ static void put_rpc(hg_handle_t handle) ABT_mutex_lock(server->ls_mutex); ls_add_obj(server->dsg->ls, od); +#ifdef DSPACES_HAVE_FILE_STORAGE + /* add obj_data to the flat od_list for swap out */ + list_add_tail(&od->flat_list_entry.entry, &server->dsg->ls_od_list); +#endif // DSPACES_HAVE_FILE_STORAGE ABT_mutex_unlock(server->ls_mutex); DEBUG_OUT("Received obj %s\n", obj_desc_sprint(&od->obj_desc)); @@ -2182,18 +2243,41 @@ static void get_rpc(hg_handle_t handle) struct obj_data *od, *from_obj; +#ifdef DSPACES_HAVE_FILE_STORAGE + while(!(od = obj_data_alloc(&in_odsc))) { + /* Failed to allocate od, the primary reason is insufficient memory. */ + od_swap_out(server); + } +#else + od = obj_data_alloc(&in_odsc); +#endif // DSPACES_HAVE_FILE_STORAGE + + DEBUG_OUT("allocated target object\n"); + ABT_mutex_lock(server->ls_mutex); if(server->remotes && ls_lookup(server->dsg->ps, in_odsc.name)) { - from_obj = ls_find(server->dsg->ps, &in_odsc); + from_obj = ls_find_include(server->dsg->ps, &in_odsc); } else { - from_obj = ls_find(server->dsg->ls, &in_odsc); + from_obj = ls_find_include(server->dsg->ls, &in_odsc); + } + if(from_obj) { + DEBUG_OUT("found source data object from staging memory\n"); + ssd_copy(od, from_obj); + DEBUG_OUT("copied object data\n"); +#ifdef DSPACES_HAVE_FILE_STORAGE + from_obj->flat_list_entry.usecnt++; +#endif // DSPACES_HAVE_FILE_STORAGE } - DEBUG_OUT("found source data object\n"); - od = obj_data_alloc(&in_odsc); - DEBUG_OUT("allocated target object\n"); - ssd_copy(od, from_obj); - DEBUG_OUT("copied object data\n"); ABT_mutex_unlock(server->ls_mutex); + +#ifdef DSPACES_HAVE_FILE_STORAGE + if(!from_obj) { + /* Did not find the from_obj from staging memory, + * search it in the swap space */ + file_read_od(&server->conf.swap, od); + } +#endif // DSPACES_HAVE_FILE_STORAGE + hg_size_t size = (in_odsc.size) * bbox_volume(&(in_odsc.bb)); void *buffer = (void *)od->data; cbuffer = malloc(size); @@ -2245,7 +2329,23 @@ static void get_rpc(hg_handle_t handle) margo_bulk_free(bulk_handle); out.ret = dspaces_SUCCESS; out.len = csize; - obj_data_free(od); + /* If we read the od back from the swap space, + * then we keep it in the staging memory */ + if(from_obj) { + obj_data_free(od); + } +#ifdef DSPACES_HAVE_FILE_STORAGE + else { + /* We add the read-back od to the local storage */ + ABT_mutex_lock(server->ls_mutex); + ls_add_obj(server->dsg->ls, od); + /* add obj_data to the flat od_list for swap out */ + od->flat_list_entry.usecnt++; + list_add_tail(&od->flat_list_entry.entry, &server->dsg->ls_od_list); + ABT_mutex_unlock(server->ls_mutex); + } +#endif // DSPACES_HAVE_FILE_STORAGE + margo_respond(handle, &out); margo_free_input(handle, &in); margo_destroy(handle); diff --git a/src/file_storage/file.c b/src/file_storage/file.c new file mode 100644 index 00000000..bead0a52 --- /dev/null +++ b/src/file_storage/file.c @@ -0,0 +1,50 @@ +#include "stdio.h" +#include "file_storage/file.h" + +#ifdef DSPACES_HAVE_HDF5 +#include "file_storage/file_hdf5.h" +#endif + +#ifdef DSPACES_HAVE_NetCDF +// TODO: add NetCDF support +#endif + +int file_write_od(struct swap_config* swap_conf, struct obj_data *od) +{ + switch (swap_conf->file_backend) + { +#ifdef DSPACES_HAVE_HDF5 + case DS_FILE_HDF5: + return hdf5_write_od(swap_conf, od); +#endif + +#ifdef DSPACES_HAVE_NetCDF + case DS_FILE_NetCDF: + // TODO: add NetCDF support + break; +#endif + + default: + fprintf(stderr, "This should not happen... ftype must be supported file backends.\n"); + break; + } +} +int file_read_od(struct swap_config* swap_conf, struct obj_data *od) { + switch (swap_conf->file_backend) + { +#ifdef DSPACES_HAVE_HDF5 + case DS_FILE_HDF5: + return hdf5_read_od(swap_conf, od); +#endif + +#ifdef DSPACES_HAVE_NetCDF + case DS_FILE_NetCDF: + // TODO: add NetCDF support + break; +#endif + + default: + fprintf(stderr, "This should not happen... ftype must be supported file backends.\n"); + break; + } +} \ No newline at end of file diff --git a/src/file_storage/file_hdf5.c b/src/file_storage/file_hdf5.c new file mode 100644 index 00000000..92060fe2 --- /dev/null +++ b/src/file_storage/file_hdf5.c @@ -0,0 +1,786 @@ +#include "stdio.h" +#include "stdlib.h" +#include "stdbool.h" +#include "ctype.h" +#include "string.h" + +#include "dspaces-common.h" +#include "util.h" +#include "bbox.h" +#include "file_storage/file_hdf5.h" + +#include + +static hid_t dstype_to_h5type(int type_id) +{ + switch (type_id) + { + case -1: // DSP_FLOAT + return H5T_NATIVE_FLOAT; + case -2: // DSP_INT + return H5T_NATIVE_INT; + case -3: // DSP_LONG + return H5T_NATIVE_LONG; + case -4: // DSP_DOUBLE + return H5T_NATIVE_DOUBLE; + case -5: // DSP_BOOL + return H5T_NATIVE_HBOOL; + case -6: // DSP_CHAR + return H5T_NATIVE_CHAR; + case -7: // DSP_UINT + return H5T_NATIVE_UINT; + case -8: // DSP_ULONG + return H5T_NATIVE_ULONG; + case -9: // DSP_BYTE + return H5T_NATIVE_B8; + case -10: // DSP_UINT8 + return H5T_NATIVE_B8; + case -11: // DSP_UINT16 + return H5T_NATIVE_B16; + case -12: // DSP_UINT32 + return H5T_NATIVE_B32; + case -13: // DSP_UINT64 + return H5T_NATIVE_B64; + case -14: // DSP_INT8 + return H5T_NATIVE_B8; + case -15: // DSP_INT16 + return H5T_NATIVE_B16; + case -16: // DSP_INT32 + return H5T_NATIVE_B32; + case -17: // DSP_INT64 + return H5T_NATIVE_B64; + default: + return H5T_NATIVE_OPAQUE; + } +} + +static char *hdf5_bound_sprint(const uint64_t* bound, int num_dims) +{ + char *str; + int i; + int size = 2; + + for(i = 0; i < num_dims; i++) { + size += snprintf(NULL, 0, "%" PRIu64, bound[i]); + if(i > 0) { + size += i ? 2 : 0; // account for ", " + } + } + str = malloc(sizeof(*str) * (size + 1)); // add null terminator + strcpy(str, "{"); + for(i = 0; i < num_dims; i++) { + char *tmp = alloc_sprintf(i ? ", %" PRIu64 : "%" PRIu64, bound[i]); + str = str_append(str, tmp); + } + str = str_append_const(str, "}"); + return str; +} + +static char* hdf5_dataset_name_sprint(const struct bbox* bbox) +{ + char *str = hdf5_bound_sprint(bbox->lb.c, bbox->num_dims); + str = str_append_const(str, "-"); + str = str_append(str, hdf5_bound_sprint(bbox->ub.c, bbox->num_dims)); + return str; +} + +static void hdf5_dataset_name_parser(char* dname, struct bbox* bbox) +{ + char *p = dname; + int i = 0; + while (*p) { + if(isdigit(*p)) { + if(i < bbox->num_dims) + bbox->lb.c[i] = (uint64_t) strtoull(p, &p, 10); + else + bbox->ub.c[i-bbox->num_dims] = (uint64_t) strtoull(p, &p, 10); + i++; + } else { + p++; + } + } + +} + +/* Search the first dataset that can include the queried bbox from a HDF5 file. + * If found, the dataset name is returned. + * Otherwise, return NULL. + * */ +static char* hdf5_search_include_dataset(const char* file_name, hid_t file_id, struct bbox* qbbox) +{ + herr_t status; + H5G_info_t ginfo; + char* dname; + struct bbox sbbox; + ssize_t dname_size, errh; + + status = H5Gget_info(file_id, &ginfo); + if(status == H5I_INVALID_HID) { + fprintf(stderr,"HDF5 failed to get root group info in file: %s.\n", file_name); + return NULL; + } + + if(ginfo.nlinks == 0) return NULL; + + sbbox.num_dims = qbbox->num_dims; + memset(sbbox.lb.c, 0, sizeof(uint64_t) * BBOX_MAX_NDIM); + memset(sbbox.ub.c, 0, sizeof(uint64_t) * BBOX_MAX_NDIM); + + for(int i=0; inum_dims; + memset(sbbox.lb.c, 0, sizeof(uint64_t) * BBOX_MAX_NDIM); + memset(sbbox.ub.c, 0, sizeof(uint64_t) * BBOX_MAX_NDIM); + + for(int i=0; i 0) { + free(dataset_name); + return dspaces_SUCCESS; + } + + dims = (hsize_t*) malloc(bboxw->num_dims*sizeof(hsize_t)); + for(int i=0; inum_dims; i++) { + dims[i] = bboxw->ub.c[i] - bboxw->lb.c[i] + 1; + } + + /* Prepare HDF5 data space. */ + dataspace_id = H5Screate_simple(bboxw->num_dims, dims, NULL); + if(dataspace_id == H5I_INVALID_HID) { + fprintf(stderr,"HDF5 failed to create N-dimensional dataspace in file: %s.\n", + file_name); + free(dataset_name); + free(dims); + return dspaces_ERR_HDF5; + } + + /* Prepare HDF5 dataset. */ + dataset_id = H5Dcreate2(file_id, dataset_name, dstype_to_h5type(type_id), + dataspace_id, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + if(dataset_id == H5I_INVALID_HID) { + fprintf(stderr,"HDF5 failed to create dataset: %s in file: %s.\n", + dataset_name, file_name); + free(dataset_name); + free(dims); + return dspaces_ERR_HDF5; + } + + /* Write data to HDF5 dataset. */ + status = H5Dwrite(dataset_id, dstype_to_h5type(type_id), + H5S_ALL, H5S_ALL, H5P_DEFAULT, data); + if(status < 0) { + fprintf(stderr, "HDF5 failed to write data to dataset: %s in file: %s.\n", + dataset_name, file_name); + free(dataset_name); + free(dims); + return dspaces_ERR_HDF5; + } + + status = H5Dclose(dataset_id); + if(status < 0) { + fprintf(stderr,"HDF5 failed to close the dataset: %s in file: %s.\n", + dataset_name, file_name); + free(dataset_name); + free(dims); + return dspaces_ERR_HDF5; + } + + status = H5Sclose(dataspace_id); + if(status < 0) { + fprintf(stderr,"HDF5 failed to close the dataspace of dataset: %s in file: %s.\n", + dataset_name, file_name); + free(dataset_name); + free(dims); + return dspaces_ERR_HDF5; + } + + free(dataset_name); + free(dims); + + return dspaces_SUCCESS; +} + +static int hdf5_read_full_dataset(const char* file_name, hid_t file_id, + char* dataset_name, void* data) +{ + hid_t dataset_id, dataspace_id, datatype_id; + hssize_t elem_num; + herr_t status; + size_t datatype_size; + + dataset_id = H5Dopen2(file_id, dataset_name, H5P_DEFAULT); + if(dataset_id == H5I_INVALID_HID) { + fprintf(stderr, "HDF5 failed to open dataset: %s in file: %s.\n", dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + dataspace_id = H5Dget_space(dataset_id); + if(dataspace_id == H5I_INVALID_HID) { + fprintf(stderr, "HDF5 failed to get data space of dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + datatype_id = H5Dget_type(dataset_id); + if(datatype_id == H5I_INVALID_HID) { + fprintf(stderr, "HDF5 failed to get data type of dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + datatype_size = H5Tget_size(datatype_id); + if(datatype_size == 0) { + fprintf(stderr, "HDF5 failed to get data type size of dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + elem_num = H5Sget_simple_extent_npoints(dataspace_id); + if(elem_num < 0) { + fprintf(stderr, "HDF5 failed to get elem_num of dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + data = (void*) malloc(elem_num*datatype_size); + + status = H5Dread(dataset_id, datatype_id, H5S_ALL, H5S_ALL, H5P_DEFAULT, data); + if(status < 0) { + fprintf(stderr, "HDF5 failed to read dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + status = H5Sclose(dataspace_id); + if(status < 0) { + fprintf(stderr,"HDF5 failed to close the dataspace of dataset: %s in file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + status = H5Dclose(dataset_id); + if(status < 0) { + fprintf(stderr,"HDF5 failed to close the dataset: %s in file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + return dspaces_SUCCESS; +} + +static int hdf5_read_dataset(const char* file_name, hid_t file_id, char* dataset_name, + hsize_t *offset, hsize_t *count, void* data) +{ + hid_t dataset_id, dataspace_id, datatype_id, memspace_id; + int ndims; + size_t elem_num = 1, datatype_size; + hsize_t *stride, *block; + herr_t status; + + dataset_id = H5Dopen2(file_id, dataset_name, H5P_DEFAULT); + if(dataset_id == H5I_INVALID_HID) { + fprintf(stderr, "HDF5 failed to open dataset: %s in file: %s.\n", dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + dataspace_id = H5Dget_space(dataset_id); + if(dataspace_id == H5I_INVALID_HID) { + fprintf(stderr, "HDF5 failed to get data space of dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + datatype_id = H5Dget_type(dataset_id); + if(datatype_id == H5I_INVALID_HID) { + fprintf(stderr, "HDF5 failed to get data type of dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + datatype_size = H5Tget_size(datatype_id); + if(datatype_size == 0) { + fprintf(stderr, "HDF5 failed to get data type size of dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + ndims = H5Sget_simple_extent_ndims(dataspace_id); + if(ndims < 0) { + fprintf(stderr, "HDF5 failed to get number of dimensions of dataset: %s from file: %s.\n", + dataset_name, file_name); + return dspaces_ERR_HDF5; + } + + stride = (hsize_t*) malloc(ndims*sizeof(hsize_t)); + block = (hsize_t*) malloc(ndims*sizeof(hsize_t)); + + for(int i=0; ifile_dir)) { + fprintf(stderr, "WARNING: Swap directory: %s does not exist. " + "Creating the directory.\n", swap_conf->file_dir); + mkdir_all_owner_permission(swap_conf->file_dir); + } else if(!check_dir_write_permission(swap_conf->file_dir)) { + fprintf(stderr, "WARNING: Failed to write files into swap directory: %s " + "Use ./dspaces_swap as the default swap directory instead.\n", + swap_conf->file_dir); + free(swap_conf->file_dir); + swap_conf->file_dir = strdup("./dspaces_swap/"); + mkdir_all_owner_permission(swap_conf->file_dir); + } + + /* Concatenate file name */ + sprintf(file_name, "%s/%s.ver%u.h5", swap_conf->file_dir, od->obj_desc.name, od->obj_desc.version); + + /* Mute HDF5 error message for potential concurrent file createion & access. */ + // TODO: Unmute it later, but the HDF5 function has a bug in latest release. + status = H5Eset_auto2(H5E_DEFAULT, NULL, NULL); + if(status < 0) { + fprintf(stderr,"HDF5 failed to mute error message for " + "potential concurrent file createion & access.\n"); + return dspaces_ERR_HDF5; + } + + /* Try to create the file and write ndims as a file attribute. + * Fail if the file exists. */ + file_id = H5Fcreate(file_name, H5F_ACC_EXCL, H5P_DEFAULT, H5P_DEFAULT); + if(file_id != H5I_INVALID_HID) { + ret = hdf5_write_scalar_attr(file_name, file_id, "ndims", + H5T_NATIVE_INT, &(od->obj_desc.bb.num_dims)); + if(ret != dspaces_SUCCESS) return ret; + } else { + /* Try to catch the file lock and open the file. */ + do { + file_id = H5Fopen(file_name, H5F_ACC_RDWR, H5P_DEFAULT); + } while (file_id == H5I_INVALID_HID); + } + + /* Check if the new coming obj can be merged. + * We have an exclusive file lock in HDF5, so this + * merge check will be able to find all meragble objects. */ + + search_odsc = od->obj_desc; + union_odsc = od->obj_desc; + + mergeable = false; + qod = od; + qbbox = &(od->obj_desc.bb); + wdata = od->data; + + while((search = hdf5_search_mergeable_dataset(file_name, file_id, qbbox, rdname)) != -3) + { + if(!mergeable) mergeable = true; + + if(search == -2) { + /* qbbox includes some bbox in the file. qbbox remains the same one. */ + // write the memory to a new dataset + ret = hdf5_write_dataset(file_name, file_id, qbbox, qod->obj_desc.type, wdata); + if(ret != dspaces_SUCCESS) return ret; + + // delete the found dataset + status = H5Ldelete(file_id, rdname, H5P_DEFAULT); + if(status < 0) { + fprintf(stderr, "HDF5 failed to delete dataset: %s in file: %s.\n", + rdname, file_name); + return dspaces_ERR_HDF5; + } + free(rdname); + } else if(search == -1) { + /* qbbox is included by some bbox in the file. Do nothing. */ + free(rdname); + break; + } else { + /* qbbox and some bbox searched from the file can union */ + hdf5_dataset_name_parser(rdname, &(search_odsc.bb)); + bbox_union_ondim(qbbox, &(search_odsc.bb), search, &(union_odsc.bb)); + + // Read the searched dataset from the file, and do ssd_copy() + ret = hdf5_read_full_dataset(file_name, file_id, rdname, fdata); + if(ret != dspaces_SUCCESS) return ret; + + search_od = obj_data_alloc_no_data(&search_odsc, fdata); + union_od = obj_data_alloc(&union_odsc); + ssd_copy(union_od, search_od); + ssd_copy(union_od, qod); + + // write the union obj_data to a new dataset + wdata = union_od->data; + ret = hdf5_write_dataset(file_name, file_id, &(union_odsc.bb), + union_od->obj_desc.type, wdata); + if(ret != dspaces_SUCCESS) return ret; + + /* Free the old memory data buffer and file data buffer. + * Do not free the origin od for safety. + * Also, delete the searched dataset from the file + * and the query dataset if it is stored in the file. + * Iterate the qbbox to the union bbox. */ + status = H5Ldelete(file_id, rdname, H5P_DEFAULT); + if(status < 0) { + fprintf(stderr, "HDF5 failed to delete dataset: %s in file: %s.\n", + rdname, file_name); + return dspaces_ERR_HDF5; + } + obj_data_free(search_od); + free(search_od); + free(rdname); + + if(qod != od) { + qdname = hdf5_dataset_name_sprint(&qod->obj_desc.bb); + status = H5Ldelete(file_id, qdname, H5P_DEFAULT); + if(status < 0) { + fprintf(stderr, "HDF5 failed to delete dataset: %s in file: %s.\n", + qdname, file_name); + return dspaces_ERR_HDF5; + } + obj_data_free(qod); + free(qod); + free(qdname); + } + + qod = union_od; + qbbox = &(union_od->obj_desc.bb); + } + } + + // Final clean up + if(mergeable && qod!=od) { + obj_data_free(qod); + free(qod); + } + + /* Write a new data obj if not mergeable. */ + if(!mergeable) { + ret = hdf5_write_dataset(file_name, file_id, &(od->obj_desc.bb), od->obj_desc.type, od->data); + if(ret != dspaces_SUCCESS) return ret; + } + + /* Close file */ + status = H5Fclose(file_id); + if(status < 0) { + fprintf(stderr,"HDF5 failed to close the file: %s.\n", file_name); + return dspaces_ERR_HDF5; + } + + return dspaces_SUCCESS; +} + +/* Read a dspaces obj_data from a HDF5 file. + * The HDF5 file name is "{odsc_name}.ver%u.h5" + * The HDF5 dataset name is the bbox formatted as + * "{lb[0],lb[1], ...}-{ub[0], ub[1], ...}". + * Subset reading from a dataset is supported. + * The data read from the HDF5 file is stored in the od->data. + * */ +int hdf5_read_od(struct swap_config* swap_conf, struct obj_data *od) +{ + int ret; + hid_t file_id; + hsize_t *offset, *count; + herr_t status; + char file_name[256], *dataset_name; + struct bbox sbbox; + + /* Concatenate file name */ + sprintf(file_name, "%s/%s.ver%u.h5", swap_conf->file_dir, od->obj_desc.name, od->obj_desc.version); + + /* Mute HDF5 error message for potential concurrent file createion & access. */ + // TODO: Unmute it later, but the HDF5 function has a bug in latest release. + status = H5Eset_auto2(H5E_DEFAULT, NULL, NULL); + if(status < 0) { + fprintf(stderr,"HDF5 failed to mute error message for " + "potential concurrent file createion & access.\n"); + return dspaces_ERR_HDF5; + } + + /* Try to catch the file lock and open the file. */ + do { + file_id = H5Fopen(file_name, H5F_ACC_RDONLY, H5P_DEFAULT); + } while (file_id == H5I_INVALID_HID); + + /* The queried bbox might be included by the dataset's bbox */ + dataset_name = hdf5_search_include_dataset(file_name, file_id, &od->obj_desc.bb); + if(!dataset_name) { + fprintf(stderr,"HDF5 failed to find the requested data object from file: %s." + " This should not happen... \n", file_name); + return dspaces_ERR_HDF5; + } + + sbbox.num_dims = od->obj_desc.bb.num_dims; + memset(sbbox.lb.c, 0, sizeof(uint64_t) * BBOX_MAX_NDIM); + memset(sbbox.ub.c, 0, sizeof(uint64_t) * BBOX_MAX_NDIM); + hdf5_dataset_name_parser(dataset_name, &sbbox); + + offset = (hsize_t*) malloc(od->obj_desc.bb.num_dims*sizeof(hsize_t)); + count = (hsize_t*) malloc(od->obj_desc.bb.num_dims*sizeof(hsize_t)); + for(int i=0; iobj_desc.bb.num_dims; i++) { + offset[i] = od->obj_desc.bb.lb.c[i] - sbbox.lb.c[i]; + count[i] = od->obj_desc.bb.ub.c[i] - od->obj_desc.bb.lb.c[i] + 1; + } + ret = hdf5_read_dataset(file_name, file_id, dataset_name, offset, count, od->data); + + free(count); + free(offset); + free(dataset_name); + + /* Close file */ + status = H5Fclose(file_id); + if(status < 0) { + fprintf(stderr,"HDF5 failed to close the file: %s.\n", file_name); + return dspaces_ERR_HDF5; + } + + return dspaces_SUCCESS; +} \ No newline at end of file diff --git a/src/file_storage/policy.c b/src/file_storage/policy.c new file mode 100644 index 00000000..d91ea549 --- /dev/null +++ b/src/file_storage/policy.c @@ -0,0 +1,231 @@ +#ifdef DSPACES_HAVE_FILE_STORAGE + +#include "stdlib.h" +#include "stdio.h" + +#include "util.h" +#include "list.h" +#include "ss_data.h" +#include "file_storage/policy.h" + +const char* policy_options[] = { + "Default", + "Custom", + "FIFO", + "LIFO", + "LRU" +}; + +void free_ls_od_list(struct list_head* ls_od_list) +{ + if(!ls_od_list) + return; + int cnt = 0; + struct obj_data *od, *t; + list_for_each_entry_safe(od, t, ls_od_list, struct obj_data, flat_list_entry.entry) + { + list_del(&od->flat_list_entry.entry); + cnt++; + } + +#ifdef DEBUG + fprintf(stderr, "%s(): number of object data record is %d\n", + __func__, cnt); +#endif +} + +int policy_str_check(const char* str) +{ + for(int i=0; i<5; i++) { + if(strcmp(str, policy_options[i]) == 0) return 1; + } + return 0; +} + +void memory_quota_parser(char* str, struct swap_config* swap) +{ + char *pch; + int len, itmp; + float ftmp; + uint64_t lltmp; + + meminfo_t meminfo; + + pch = strtok(str, "%"); + if(pch !=NULL) { + /* Check if the input string is a percentage number */ + swap->mem_quota_type = DS_MEM_PERCENT; + swap->mem_quota.percent = strtof(pch, NULL) / 100.0; + } else if (sscanf(str, "%f %n", &ftmp, &len) && !str[len]) { + /* Check if the input string is a float number */ + swap->mem_quota_type = DS_MEM_PERCENT; + swap->mem_quota.percent = ftmp; + } else if((pch=strtok(str, "kKmMgGtT")) != NULL) { + /* Check if the input string is a real value with quantity */ + swap->mem_quota_type = DS_MEM_BYTES; + ftmp = strtof(pch, &pch); + // Check if the last char is 'B'(Byte) or 'b'(bit) + if(*(pch+1) == 'b') + ftmp = ftmp / 8; + // Check the quantity. + if(*pch == 'k' || *pch == 'K') { + ftmp = ftmp / 2e10; + } else if (*pch == 'm' || *pch == 'M') { + ; + } else if (*pch == 'g' || *pch == 'G') { + ftmp = ftmp * 2e10; + } else if (*pch == 't' || *pch == 'T') { + ftmp = ftmp * 2e20; + } + // If the quota is higher than the full memory capacity, + // Just set the quota to 100% + meminfo = parse_meminfo(); + if(ftmp > ((float) meminfo.MemTotalMiB)) { + swap->mem_quota_type = DS_MEM_PERCENT; + swap->mem_quota.percent = 1.0; + } + swap->mem_quota.MB = ftmp; + } +} + +void disk_quota_parser(char* str, struct swap_config* swap) +{ + char *pch; + float ftmp; + + pch = strtok(str, "kKmMgGtTpP"); + if(pch == NULL) { + swap->disk_quota_MB = -1.0; + } else { + // Round to an approx int + ftmp = strtof(pch, &pch); + // Check if the last char is 'B'(Byte) or 'b'(bit) + if(*(pch+1) == 'b') + ftmp = ftmp / 8; + // Check the quantity. + if(*pch == 'k' || *pch == 'K') { + ftmp = ftmp / 2e10; + } else if (*pch == 'm' || *pch == 'M') { + ; + } else if (*pch == 'g' || *pch == 'G') { + ftmp = ftmp * 2e10; + } else if (*pch == 't' || *pch == 'T') { + ftmp = ftmp * 2e20; + } else if (*pch == 'p' || *pch == 'P') { + ftmp = ftmp * 2e30; + } + swap->disk_quota_MB = ftmp; + } +} + +static int default_when_policy(uint64_t size_MB) +{ + meminfo_t meminfo = parse_meminfo(); + + if(meminfo.MemAvailableMiB < size_MB) + return 1; + else + return 0; +} + +static int percent_when_policy(float threshold, uint64_t size_MB) +{ + meminfo_t meminfo = parse_meminfo(); + + if((meminfo.MemAvailablePercent - (size_MB / meminfo.MemTotalMiB)) < (1 - threshold)) + return 1; + else + return 0; + +} + +static int value_when_policy(float threshold, uint64_t size_MB) +{ + meminfo_t meminfo = parse_meminfo(); + + if(meminfo.MemAvailableMiB < (size_MB + meminfo.MemTotalMiB - threshold)) + return 1; + else + return 0; + +} + +static struct obj_data* fifo_which_policy(struct list_head *ls_od_list) { + struct obj_data *od; + + if(list_empty(ls_od_list)) { + return NULL; + } else { + od = list_entry(ls_od_list->next, struct obj_data, flat_list_entry.entry); + return od; + } +} + +static struct obj_data* lifo_which_policy(struct list_head *ls_od_list) { + struct obj_data *od; + + if(list_empty(ls_od_list)) { + return NULL; + } else { + od = list_entry(ls_od_list->prev, struct obj_data, flat_list_entry.entry); + return od; + } +} + +static struct obj_data* lru_which_policy(struct list_head *ls_od_list) { + struct obj_data *od, *min; + + if(list_empty(ls_od_list)) { + return NULL; + } else { + min = list_entry(ls_od_list->next, struct obj_data, flat_list_entry.entry); + + list_for_each_entry(od, ls_od_list, struct obj_data, flat_list_entry.entry) { + if(od->flat_list_entry.usecnt < min->flat_list_entry.usecnt) { + min = od; + } + } + return min; + } +} + +int need_swap_out(struct swap_config* swap, uint64_t size_MB) +{ + meminfo_t meminfo = parse_meminfo(); + + if(swap->mem_quota_type == DS_MEM_BYTES) { + // Use memory MB + return value_when_policy(swap->mem_quota.MB, size_MB); + } else if(swap->mem_quota_type == DS_MEM_PERCENT) { + // Use memory percent + if(abs(1.0 - swap->mem_quota.percent) < 1e-6) { + // Use full node memory + return default_when_policy(size_MB); + } else { + return percent_when_policy(swap->mem_quota.percent, size_MB); + } + } else { + // Use full node memory + return default_when_policy(size_MB); + } +} + +struct obj_data* which_swap_out(struct swap_config *swap, + struct list_head* ls_od_list) +{ + // TODO: support custom policy + if((strcmp(swap->policy, "Default") == 0) || (strcmp(swap->policy, "FIFO") == 0)) { + return fifo_which_policy(ls_od_list); + } else if(strcmp(swap->policy, "LIFO") == 0) { + return lifo_which_policy(ls_od_list); + } else if(strcmp(swap->policy, "LRU") == 0) { + return lru_which_policy(ls_od_list); + } else { + return fifo_which_policy(ls_od_list); + } +} + +#endif // DSPACES_HAVE_FILE_STORAGE + + + diff --git a/src/ss_data.c b/src/ss_data.c index 5a256305..1b9c8fc2 100644 --- a/src/ss_data.c +++ b/src/ss_data.c @@ -1068,6 +1068,28 @@ struct obj_data *ls_find(ss_storage *ls, obj_descriptor *odsc) return NULL; } +/* + Find an object in the local storage that includes + the object descriptor 'odsc'. +*/ +struct obj_data *ls_find_include(ss_storage *ls, obj_descriptor *odsc) +{ + struct obj_data *od; + struct list_head *list; + int index; + + index = odsc->version % ls->size_hash; + list = &ls->obj_hash[index]; + + list_for_each_entry(od, list, struct obj_data, obj_entry) + { + if(obj_desc_equals_include(&od->obj_desc, odsc)) + return od; + } + + return NULL; +} + int ls_find_all_no_version(ss_storage *ls, const char *var_name, obj_descriptor ***odscs) { @@ -1331,7 +1353,7 @@ int obj_desc_equals(obj_descriptor *odsc1, obj_descriptor *odsc2) * Test if two object descriptors have the same name and versions and * their bounding boxes intersect. * */ -int obj_desc_equals_intersect(obj_descriptor *odsc1, obj_descriptor *odsc2) +int obj_desc_equals_intersect(const obj_descriptor *odsc1, const obj_descriptor *odsc2) { if(strcmp(odsc1->name, odsc2->name) == 0 && odsc1->version == odsc2->version && @@ -1353,6 +1375,19 @@ int obj_desc_by_name_intersect(const obj_descriptor *odsc1, return 0; } +/* + * Test if two object descriptors have the same name and versions and + * odsc1's bbox includes odsc2's bbox. + * */ +int obj_desc_equals_include(const obj_descriptor *odsc1, const obj_descriptor *odsc2) +{ + if(strcmp(odsc1->name, odsc2->name) == 0 && + odsc1->version == odsc2->version && + bbox_include(&odsc1->bb, &odsc2->bb)) + return 1; + return 0; +} + /* Public API starts here. */ diff --git a/src/util.c b/src/util.c index 8f29cb86..d01e765e 100644 --- a/src/util.c +++ b/src/util.c @@ -36,11 +36,17 @@ * tjin@cac.rutgers.edu */ +#define _XOPEN_SOURCE 500 #include #include #include #include +#include +#include +#include +#include +#include "dspaces-common.h" #include "util.h" size_t str_len(const char *str) @@ -226,3 +232,158 @@ void free_nv_pairs(struct name_value_pair *pairs) free(p); } } + +/******************************************************* + Memory Info +**********************************************************/ + +/* Parse the contents of /proc/meminfo (in buf), return value of "name" + * (example: "MemTotal:") + * Returns -errno if the entry cannot be found. */ +static long long get_entry(const char* name, const char* buf) +{ + char* hit = strstr(buf, name); + if (hit == NULL) { + return -ENODATA; + } + + errno = 0; + long long val = strtoll(hit + strlen(name), NULL, 10); + if (errno != 0) { + int strtoll_errno = errno; + fprintf(stderr, "%s: strtol() failed: %s", __func__, strerror(errno)); + return -strtoll_errno; + } + return val; +} + +/* Like get_entry(), but exit if the value cannot be found */ +static long long get_entry_fatal(const char* name, const char* buf) +{ + long long val = get_entry(name, buf); + if (val < 0) { + fprintf(stderr, "%s: fatal error, could not find entry '%s' in /proc/meminfo: %s\n", __func__, + name, strerror((int)-val)); + return 0; + } + return val; +} + +/* If the kernel does not provide MemAvailable (introduced in Linux 3.14), + * approximate it using other data we can get */ +static long long available_guesstimate(const char* buf) +{ + long long Cached = get_entry_fatal("Cached:", buf); + long long MemFree = get_entry_fatal("MemFree:", buf); + long long Buffers = get_entry_fatal("Buffers:", buf); + long long Shmem = get_entry_fatal("Shmem:", buf); + + return MemFree + Cached + Buffers - Shmem; +} + +/* Parse /proc/meminfo. + * This function either returns valid data or kills the process + * with a fatal error. + */ +meminfo_t parse_meminfo() +{ + // Note that we do not need to close static FDs that we ensure to + // `fopen()` maximally once. + static FILE* fd; + static int guesstimate_warned = 0; + // On Linux 5.3, "wc -c /proc/meminfo" counts 1391 bytes. + // 8192 should be enough for the foreseeable future. + char buf[8192] = { 0 }; + meminfo_t m = { 0 }; + + if (fd == NULL) { + char buf[MEM_PATH_LEN] = { 0 }; + snprintf(buf, sizeof(buf), "%s/%s", "/proc", "meminfo"); + fd = fopen(buf, "r"); + } + if (fd == NULL) { + fprintf(stderr, "could not open /proc/meminfo: %s\n", strerror(errno)); + } + rewind(fd); + + size_t len = fread(buf, 1, sizeof(buf) - 1, fd); + if (ferror(fd)) { + fprintf(stderr, "could not read /proc/meminfo: %s\n", strerror(errno)); + } + if (len == 0) { + fprintf(stderr, "could not read /proc/meminfo: 0 bytes returned\n"); + } + + m.MemTotalKiB = (uint64_t) get_entry_fatal("MemTotal:", buf); + m.SwapTotalKiB = (uint64_t) get_entry_fatal("SwapTotal:", buf); + long long SwapFree = (uint64_t) get_entry_fatal("SwapFree:", buf); + + long long MemAvailable = get_entry("MemAvailable:", buf); + if (MemAvailable < 0) { + MemAvailable = available_guesstimate(buf); + if (guesstimate_warned == 0) { + fprintf(stderr, "Warning: Your kernel does not provide MemAvailable data (needs 3.14+)\n" + " Falling back to guesstimate\n"); + guesstimate_warned = 1; + } + } + + // Calculate percentages + m.MemAvailablePercent = (double)MemAvailable * 100 / (double)m.MemTotalKiB; + if (m.SwapTotalKiB > 0) { + m.SwapFreePercent = (double)SwapFree * 100 / (double)m.SwapTotalKiB; + } else { + m.SwapFreePercent = 0; + } + + // Convert kiB to MiB + m.MemTotalMiB = m.MemTotalKiB >> 10; + m.MemAvailableMiB = ((uint64_t)(MemAvailable)) >> 10; + m.SwapTotalMiB = m.SwapTotalKiB >> 10; + m.SwapFreeMiB = ((uint64_t) SwapFree) >> 10; + + return m; +} + +/******************************************************* + Directory Opreations +**********************************************************/ +int check_dir_exist(const char* dir_path) +{ + struct stat s; + int ret; + + if((stat(dir_path, &s) == 0) && S_ISDIR(s.st_mode)) + return 1; + else + return 0; + +} + +int check_dir_write_permission(const char* dir_path) +{ + if(access(dir_path, W_OK)) + return 1; + else + return 0; +} + +void mkdir_all_owner_permission(const char* dir_path) +{ + mkdir(dir_path, 0700); +} + +static int unlink_cb(const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf) +{ + int rv = remove(fpath); + + if (rv) + perror(fpath); + + return rv; +} + +int remove_dir_rf(const char *dir_path) +{ + return nftw(dir_path, unlink_cb, 256, FTW_DEPTH | FTW_PHYS); +} \ No newline at end of file