From 659fa2d99e8d0591f75fd2998dead868c75db784 Mon Sep 17 00:00:00 2001 From: victoryang00 Date: Mon, 15 Jan 2024 23:30:27 +0000 Subject: [PATCH] udp recovery --- CMakeLists.txt | 13 ++++------ include/wamr.h | 1 + include/wamr_export.h | 9 +++---- include/wamr_wasi_context.h | 19 +++++--------- lib/wasm-micro-runtime | 2 +- src/wamr.cpp | 51 ++++--------------------------------- src/wamr_export.cpp | 37 +++++++++++++++++++-------- src/wamr_wasi_context.cpp | 20 ++++++++++++--- test/client.c | 2 +- 9 files changed, 66 insertions(+), 88 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c85a4b9..8c3b49d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,10 +20,8 @@ if (APPLE) set(WAMR_BUILD_THREAD_MGR 1) set(WAMR_BUILD_PLATFORM "darwin") if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(arm64|aarch64)") - set(LLVM_LLDB_LIB /opt/homebrew/Cellar/llvm@14/14.0.6/lib/liblldb.dylib) set(BLAS_LIBRARIES /opt/homebrew/opt/openblas/lib/libopenblas.0.dylib) else () - set(LLVM_LLDB_LIB /usr/local/Cellar/llvm@14/14.0.6/lib/liblldb.dylib) set(BLAS_LIBRARIES /usr/local/opt/openblas/lib/libopenblas.0.dylib) endif () find_path(BLAS_INCLUDE_DIRS cblas.h @@ -37,7 +35,6 @@ elseif (LINUX) set(WAMR_BUILD_LIB_PTHREAD_SEMAPHORE 1) set(WAMR_BUILD_LIB_WASI_THREADS 1) set(WAMR_BUILD_THREAD_MGR 1) - set(LLVM_LLDB_LIB /usr/lib/liblldb.so) set(BLAS_LIBRARIES -lopenblas) find_path(BLAS_INCLUDE_DIRS cblas.h /usr/include @@ -51,9 +48,9 @@ else () set(CMAKE_MSVC_RUNTIME_LIBRARY "") set(CMAKE_GENERATOR_PLATFORM "") set(CMAKE_CXX_FLAGS "/MD") - set(LLVM_LLDB_LIB "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/lib/x64/cudart.lib" uvwasi_a uv ws2_32 "C:/Program Files/LLVM/lib/liblldb.lib") + set(WIN_EXTRA_LIBS uvwasi_a uv_a ws2_32) if(MVVM_BUILD_TEST) - set(BLAS_LIBRARIES "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/lib/x64/cublas.lib") + set(BLAS_LIBRARIES "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/lib/x64/cublas.lib" "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/lib/x64/cudart.lib") set(BLAS_INCLUDE_DIRS "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/include/") endif() endif () @@ -134,10 +131,10 @@ add_executable(MVVM_restore src/restore.cpp ${UNCOMMON_SHARED_SOURCE}) add_executable(MVVM_checkpoint src/checkpoint.cpp ${UNCOMMON_SHARED_SOURCE}) target_link_libraries(MVVM_export fmt::fmt -lm -ldl -lpthread ${BLAS_LIBRARIES}) -target_link_libraries(MVVM_restore fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${LLVM_LLDB_LIB}) -target_link_libraries(MVVM_checkpoint fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${LLVM_LLDB_LIB}) +target_link_libraries(MVVM_restore fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${WIN_EXTRA_LIBS}) +target_link_libraries(MVVM_checkpoint fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${WIN_EXTRA_LIBS}) if (MVVM_BUILD_MPI) add_executable(MVVM_mpi_test test/mpi.cpp ${UNCOMMON_SHARED_SOURCE}) - target_link_libraries(MVVM_mpi_test fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${LLVM_LLDB_LIB}) + target_link_libraries(MVVM_mpi_test fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${WIN_EXTRA_LIBS}) endif() add_definitions(-DCXXOPTS_NO_RTTI=1) diff --git a/include/wamr.h b/include/wamr.h index 5ac009f..2083b0d 100644 --- a/include/wamr.h +++ b/include/wamr.h @@ -30,6 +30,7 @@ class WAMRInstance { std::string aot_file_path{}; std::string wasm_file_path{}; std::vector int3_addr{}; + std::vector> switch_addr{}; std::vector dir_{}; std::vector map_dir_{}; std::vector env_{}; diff --git a/include/wamr_export.h b/include/wamr_export.h index b8fa7c5..8c9d451 100644 --- a/include/wamr_export.h +++ b/include/wamr_export.h @@ -19,12 +19,10 @@ struct SocketAddrPool { }; enum fd_op { MVVM_FOPEN = 0, MVVM_FWRITE = 1, MVVM_FREAD = 2, MVVM_FSEEK = 3 }; #if !defined(_WIN32) -void insert_sock_send_to_data(uint32_t sock, uint8 *si_data, uint32 si_data_len, uint16_t si_flags, - __wasi_addr_t *dest_addr); +void insert_sock_send_to_data(uint32_t, uint8 *, uint32, uint16_t, __wasi_addr_t *); void insert_sock_open_data(uint32_t, int, int, uint32_t); -void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_len, uint16_t ri_flags, - __wasi_addr_t *src_addr); -void replay_sock_recv_from_data(uint32_t sock, uint8 **ri_data, uint32* ri_data_len); +void insert_sock_recv_from_data(uint32_t, uint8 *, uint32, uint16_t, __wasi_addr_t *); +void replay_sock_recv_from_data(uint32_t, uint8 **, unsigned long *, __wasi_addr_t *); void insert_socket(int, int, int, int); void update_socket_fd_address(int, struct SocketAddrPool *); void set_tcp(); @@ -45,7 +43,6 @@ void rename_fd(int, char const *, int, char const *); void lightweight_checkpoint(WASMExecEnv *); void lightweight_uncheckpoint(WASMExecEnv *); void wamr_wait(); -int check_recvbuffer(int fd, char *buf); void sigint_handler(int sig); void register_sigtrap(); void sigtrap_handler(int sig); diff --git a/include/wamr_wasi_context.h b/include/wamr_wasi_context.h index e85fd44..fc95b7e 100644 --- a/include/wamr_wasi_context.h +++ b/include/wamr_wasi_context.h @@ -17,12 +17,6 @@ #include #include -struct WAMRArgvEnvironValues { - std::vector argv_list; - std::vector env_list; -}; -// need to serialize the opened file in the file descripter and the socket opened also. - struct WAMRAddrPool { uint8 ip4[4]; uint16 ip6[8]; @@ -51,7 +45,6 @@ struct WasiSockSendToData { struct WasiSockRecvFromData { uint32_t sock; std::vector ri_data; - uint32 ri_data_len; uint16_t ri_flags; WAMRWasiAddr src_addr; uint32 ro_data_len; @@ -64,19 +57,21 @@ struct SocketMetaData { SocketAddrPool socketAddress{}; WasiSockOpenData socketOpenData{}; int replay_start_index{}; - bool is_collection=false; + bool is_collection = false; #if !defined(_WIN32) - WasiSockSendToData socketSentToData{}; // + WasiSockSendToData socketSentToData{}; // std::vector socketRecvFromDatas; #endif }; struct WAMRWASIContext { - std::map>>> fd_map; + std::map>>> fd_map; std::map socket_fd_map; std::vector dir; std::vector map_dir; - WAMRArgvEnvironValues argv_environ; - std::vector addr_pool; + std::vector arg; + std::vector argv_list; + std::vector env_list; + std::vector addr_pool; std::vector ns_lookup_list; uint32_t exit_code; void dump_impl(WASIArguments *env); diff --git a/lib/wasm-micro-runtime b/lib/wasm-micro-runtime index e2e36f4..75327ac 160000 --- a/lib/wasm-micro-runtime +++ b/lib/wasm-micro-runtime @@ -1 +1 @@ -Subproject commit e2e36f4e97aa5c96d6b1461fecf36b027c632176 +Subproject commit 75327ac11e40a9b89d0cb57333f5979f9489c4f0 diff --git a/src/wamr.cpp b/src/wamr.cpp index 55f2e66..c644311 100644 --- a/src/wamr.cpp +++ b/src/wamr.cpp @@ -163,12 +163,6 @@ int WAMRInstance::invoke_fopen(std::string &path, uint32 option) { wasm_runtime_module_free(module_inst, buffer_for_wasm); return ((int)argv[0]); } - // auto name1 = "o_"; - // uint32 argv[0]; - // if (!(func = wasm_runtime_lookup_function(module_inst, name1, nullptr))) { - // LOGV(ERROR) << "The wasi " << name1 << " function is not found."; - // } - // wasm_runtime_call_wasm(exec_env, func, 0, argv); return -1; }; int WAMRInstance::invoke_frenumber(uint32 fd, uint32 to) { @@ -235,7 +229,7 @@ int WAMRInstance::invoke_sock_open(uint32_t poolfd, int af, int socktype, uint32 uint32 argv[4] = {poolfd, static_cast(af), static_cast(socktype), buffer_for_wasm}; auto res = wasm_runtime_call_wasm(exec_env, func, 4, argv); wasm_runtime_module_free(module_inst, buffer_for_wasm); - return res; + return argv[0]; } return -1; } @@ -466,16 +460,9 @@ void WAMRInstance::recover(std::vector> *execEnv) { ; }); - for (const auto &exec_ : *execEnv) { - size_t a = exec_->frames.back()->function_index; - size_t b = exec_->frames.front()->function_index; - fprintf(stderr, "exec_env %p, frames %lu %lu, cur_count %d\n", exec_.get(), a, b, exec_->cur_count); - } - argptr = (ThreadArgs **)malloc(sizeof(void *) * execEnv->size()); uint32 id = 0; - auto main_exec_env = execEnv->back().get(); - set_wasi_args(main_exec_env->module_inst.wasi_ctx); + set_wasi_args(execEnv->back()->module_inst.wasi_ctx); instantiate(); auto mi = module_inst; @@ -483,7 +470,7 @@ void WAMRInstance::recover(std::vector> *execEnv) { get_int3_addr(); replace_int3_with_nop(); - restore(main_exec_env, cur_env); + restore(execEnv->back().get(), cur_env); auto main_env = cur_env; auto main_saved_call_chain = main_env->restore_call_chain; fprintf(stderr, "main_env created %p %p\n\n", main_env, main_saved_call_chain); @@ -565,35 +552,8 @@ void WAMRInstance::set_wasi_args(const std::vector &dir_list, const wasm_runtime_set_wasi_ns_lookup_pool(module, ns_pool_.data(), ns_pool_.size()); } void WAMRInstance::set_wasi_args(WAMRWASIContext &context) { - // TODO: some handmade directory after recovery dir - auto get_addr_from_context = [](const WAMRWASIContext &wasiContext) { - auto addr_pool = std::vector(wasiContext.addr_pool.size()); - std::transform(wasiContext.addr_pool.begin(), wasiContext.addr_pool.end(), addr_pool.begin(), - [](const WAMRAddrPool &addrs) { - std::string addr_str; - if (addrs.is_4) { - addr_str = fmt::format("{}.{}.{}.{}/{}", addrs.ip4[0], addrs.ip4[1], addrs.ip4[2], - addrs.ip4[3], addrs.mask); - if (addrs.mask != UINT8_MAX) { - addr_str += fmt::format("/{}", addrs.mask); - } - } else { - addr_str = - - fmt::format("{:#}:{:#}:{:#}:{:#}:{:#}:{:#}:{:#}:{:#}", addrs.ip6[0], addrs.ip6[1], - addrs.ip6[2], addrs.ip6[3], addrs.ip6[4], addrs.ip6[5], addrs.ip6[6], - addrs.ip6[7], addrs.mask); - if (addrs.mask != UINT8_MAX) { - addr_str += fmt::format("/{}", addrs.mask); - } - } - return addr_str; - }); - - return addr_pool; - }; - set_wasi_args(context.dir, context.map_dir, context.argv_environ.env_list, context.argv_environ.argv_list, - get_addr_from_context(context), context.ns_lookup_list); + set_wasi_args(context.dir, context.map_dir, context.env_list, context.argv_list, + context.addr_pool, context.ns_lookup_list); } extern WAMRInstance *wamr; extern "C" { // stop name mangling so it can be linked externally @@ -625,7 +585,6 @@ WASMExecEnv *restore_env() { return exec_env; } -int check_recvbuffer(int fd, char *buf) { return 0; }; } void WAMRInstance::instantiate() { diff --git a/src/wamr_export.cpp b/src/wamr_export.cpp index b5d6d05..ac0a62d 100644 --- a/src/wamr_export.cpp +++ b/src/wamr_export.cpp @@ -73,8 +73,6 @@ void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_le recvFromData.src_addr.ip.ip4[1] = src_addr->addr.ip4.addr.n1; recvFromData.src_addr.ip.ip4[2] = src_addr->addr.ip4.addr.n2; recvFromData.src_addr.ip.ip4[3] = src_addr->addr.ip4.addr.n3; - - recvFromData.src_addr.ip.is_4 = true; recvFromData.src_addr.port = src_addr->addr.ip4.port; } else { recvFromData.src_addr.ip.is_4 = false; @@ -86,8 +84,6 @@ void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_le recvFromData.src_addr.ip.ip6[5] = src_addr->addr.ip6.addr.h1; recvFromData.src_addr.ip.ip6[6] = src_addr->addr.ip6.addr.h2; recvFromData.src_addr.ip.ip6[7] = src_addr->addr.ip6.addr.h3; - - recvFromData.src_addr.ip.is_4 = false; recvFromData.src_addr.port = src_addr->addr.ip6.port; } LOGV(ERROR) << "insert_sock_recv_from_data " << sock << " " << ((struct mvvm_op_data *)ri_data)->op; @@ -100,20 +96,41 @@ void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_le LOGV(ERROR) << "socket_fd" << sock << " not found"; } } -void replay_sock_recv_from_data(uint32_t sock, uint8 **ri_data, uint32 *ri_data_len) { +void replay_sock_recv_from_data(uint32_t sock, uint8 **ri_data, unsigned long *recv_size, __wasi_addr_t *src_addr) { // check from wamr->socket_fd_map_[sock] and drain one // should be in the same order if (wamr->socket_fd_map_[sock].socketRecvFromDatas.empty()) { LOGV(ERROR) << "no recvfrom data " << sock; - *ri_data_len = 0; + *recv_size = 0; return; } // shoud we check the src_addr? + if (wamr->socket_fd_map_[sock].replay_start_index >= wamr->socket_fd_map_[sock].socketRecvFromDatas.size()) { + LOGV(ERROR) << "replay index out of bound " << sock; + *recv_size = 0; + return; + } auto recvFromData = wamr->socket_fd_map_[sock].socketRecvFromDatas[wamr->socket_fd_map_[sock].replay_start_index]; - wamr->socket_fd_map_[sock].socketRecvFromDatas.erase(wamr->socket_fd_map_[sock].socketRecvFromDatas.begin() + - wamr->socket_fd_map_[sock].replay_start_index); - std::memcpy(*ri_data, recvFromData.ri_data.data(), recvFromData.ri_data_len); - *ri_data_len = recvFromData.ri_data_len; + wamr->socket_fd_map_[sock].replay_start_index; + std::memcpy(*ri_data, recvFromData.ri_data.data(), recvFromData.ri_data.size()); + *recv_size = recvFromData.ri_data.size(); + if (src_addr->kind == IPv4) { + src_addr->addr.ip4.addr.n0 = recvFromData.src_addr.ip.ip4[0]; + src_addr->addr.ip4.addr.n1 = recvFromData.src_addr.ip.ip4[1]; + src_addr->addr.ip4.addr.n2 = recvFromData.src_addr.ip.ip4[2]; + src_addr->addr.ip4.addr.n3 = recvFromData.src_addr.ip.ip4[3]; + src_addr->addr.ip4.port = recvFromData.src_addr.port; + } else { + src_addr->addr.ip6.addr.n0 = recvFromData.src_addr.ip.ip6[0]; + src_addr->addr.ip6.addr.n1 = recvFromData.src_addr.ip.ip6[1]; + src_addr->addr.ip6.addr.n2 = recvFromData.src_addr.ip.ip6[2]; + src_addr->addr.ip6.addr.n3 = recvFromData.src_addr.ip.ip6[3]; + src_addr->addr.ip6.addr.h0 = recvFromData.src_addr.ip.ip6[4]; + src_addr->addr.ip6.addr.h1 = recvFromData.src_addr.ip.ip6[5]; + src_addr->addr.ip6.addr.h2 = recvFromData.src_addr.ip.ip6[6]; + src_addr->addr.ip6.addr.h3 = recvFromData.src_addr.ip.ip6[7]; + src_addr->addr.ip6.port = recvFromData.src_addr.port; + } } #endif // only support one type of requests at a time diff --git a/src/wamr_wasi_context.cpp b/src/wamr_wasi_context.cpp index a276a1a..5cb1c7c 100644 --- a/src/wamr_wasi_context.cpp +++ b/src/wamr_wasi_context.cpp @@ -45,6 +45,18 @@ void WAMRWASIContext::dump_impl(WASIArguments *env) { for (auto &i : wamr->map_dir_) { map_dir.emplace_back(i); } + for (auto &i : wamr->env_) { + env_list.emplace_back(i); + } + for (auto &i : wamr->arg_) { + arg.emplace_back(i); + } + for (auto &i : wamr->addr_) { + addr_pool.emplace_back(i); + } + for (auto &i : wamr->ns_pool_) { + ns_lookup_list.emplace_back(i); + } for (auto [fd, res] : wamr->fd_map_) { auto [path, op] = res; auto dumped_res = std::make_tuple(path, op); @@ -87,7 +99,7 @@ void WAMRWASIContext::dump_impl(WASIArguments *env) { } } } - // LOGV(ERROR) << "recv error"; + // LOGV(ERROR) << "recv error"; this->socket_fd_map[fd] = socketMetaData; } @@ -128,12 +140,12 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) { // << " SocketMetaData[socketAddress]: " << socketMetaData.socketAddress << " SocketMetaData[protocol]: " << socketMetaData.protocol << " SocketMetaData[type]: " << socketMetaData.type; - uint32 tmp_sock_fd = socketMetaData.socketOpenData.sockfd; - wamr->invoke_sock_open(socketMetaData.socketOpenData.poolfd, socketMetaData.socketOpenData.af, + uint32 tmp_sock_fd ; + auto res = wamr->invoke_sock_open(socketMetaData.socketOpenData.poolfd, socketMetaData.socketOpenData.af, socketMetaData.socketOpenData.socktype, &tmp_sock_fd); // should be done after restore call chain // renumber or not? - LOGV(INFO) << "tmp_sock_fd " << tmp_sock_fd << " fd" << fd; + LOGV(INFO) << "tmp_sock_fd " << tmp_sock_fd << " fd" << fd << " res " << res; if (tmp_sock_fd != fd) wamr->invoke_frenumber(tmp_sock_fd, fd); wamr->socket_fd_map_[fd] = socketMetaData; diff --git a/test/client.c b/test/client.c index 3d663ba..10b01e2 100644 --- a/test/client.c +++ b/test/client.c @@ -21,7 +21,7 @@ init_sockaddr_inet(struct sockaddr_in *addr) /* 172.17.0.1:1234 */ addr->sin_family = AF_INET; addr->sin_port = htons(12346); - addr->sin_addr.s_addr = my_inet_addr("172.17.0.2"); + addr->sin_addr.s_addr = my_inet_addr("172.17.0.3"); } static void