Skip to content

Commit

Permalink
udp recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
victoryang00 committed Jan 15, 2024
1 parent 5f1b7b2 commit 659fa2d
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 88 deletions.
13 changes: 5 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ if (APPLE)
set(WAMR_BUILD_THREAD_MGR 1)
set(WAMR_BUILD_PLATFORM "darwin")
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(arm64|aarch64)")
set(LLVM_LLDB_LIB /opt/homebrew/Cellar/llvm@14/14.0.6/lib/liblldb.dylib)
set(BLAS_LIBRARIES /opt/homebrew/opt/openblas/lib/libopenblas.0.dylib)
else ()
set(LLVM_LLDB_LIB /usr/local/Cellar/llvm@14/14.0.6/lib/liblldb.dylib)
set(BLAS_LIBRARIES /usr/local/opt/openblas/lib/libopenblas.0.dylib)
endif ()
find_path(BLAS_INCLUDE_DIRS cblas.h
Expand All @@ -37,7 +35,6 @@ elseif (LINUX)
set(WAMR_BUILD_LIB_PTHREAD_SEMAPHORE 1)
set(WAMR_BUILD_LIB_WASI_THREADS 1)
set(WAMR_BUILD_THREAD_MGR 1)
set(LLVM_LLDB_LIB /usr/lib/liblldb.so)
set(BLAS_LIBRARIES -lopenblas)
find_path(BLAS_INCLUDE_DIRS cblas.h
/usr/include
Expand All @@ -51,9 +48,9 @@ else ()
set(CMAKE_MSVC_RUNTIME_LIBRARY "")
set(CMAKE_GENERATOR_PLATFORM "")
set(CMAKE_CXX_FLAGS "/MD")
set(LLVM_LLDB_LIB "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/lib/x64/cudart.lib" uvwasi_a uv ws2_32 "C:/Program Files/LLVM/lib/liblldb.lib")
set(WIN_EXTRA_LIBS uvwasi_a uv_a ws2_32)
if(MVVM_BUILD_TEST)
set(BLAS_LIBRARIES "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/lib/x64/cublas.lib")
set(BLAS_LIBRARIES "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/lib/x64/cublas.lib" "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/lib/x64/cudart.lib")
set(BLAS_INCLUDE_DIRS "C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v12.1/include/")
endif()
endif ()
Expand Down Expand Up @@ -134,10 +131,10 @@ add_executable(MVVM_restore src/restore.cpp ${UNCOMMON_SHARED_SOURCE})
add_executable(MVVM_checkpoint src/checkpoint.cpp ${UNCOMMON_SHARED_SOURCE})

target_link_libraries(MVVM_export fmt::fmt -lm -ldl -lpthread ${BLAS_LIBRARIES})
target_link_libraries(MVVM_restore fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${LLVM_LLDB_LIB})
target_link_libraries(MVVM_checkpoint fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${LLVM_LLDB_LIB})
target_link_libraries(MVVM_restore fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${WIN_EXTRA_LIBS})
target_link_libraries(MVVM_checkpoint fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${WIN_EXTRA_LIBS})
if (MVVM_BUILD_MPI)
add_executable(MVVM_mpi_test test/mpi.cpp ${UNCOMMON_SHARED_SOURCE})
target_link_libraries(MVVM_mpi_test fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${LLVM_LLDB_LIB})
target_link_libraries(MVVM_mpi_test fmt::fmt cxxopts::cxxopts -lm -ldl -lpthread ${BLAS_LIBRARIES} MVVM_export vmlib ${WIN_EXTRA_LIBS})
endif()
add_definitions(-DCXXOPTS_NO_RTTI=1)
1 change: 1 addition & 0 deletions include/wamr.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class WAMRInstance {
std::string aot_file_path{};
std::string wasm_file_path{};
std::vector<std::size_t> int3_addr{};
std::vector<std::pair<std::size_t,std::size_t>> switch_addr{};
std::vector<const char *> dir_{};
std::vector<const char *> map_dir_{};
std::vector<const char *> env_{};
Expand Down
9 changes: 3 additions & 6 deletions include/wamr_export.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ struct SocketAddrPool {
};
enum fd_op { MVVM_FOPEN = 0, MVVM_FWRITE = 1, MVVM_FREAD = 2, MVVM_FSEEK = 3 };
#if !defined(_WIN32)
void insert_sock_send_to_data(uint32_t sock, uint8 *si_data, uint32 si_data_len, uint16_t si_flags,
__wasi_addr_t *dest_addr);
void insert_sock_send_to_data(uint32_t, uint8 *, uint32, uint16_t, __wasi_addr_t *);
void insert_sock_open_data(uint32_t, int, int, uint32_t);
void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_len, uint16_t ri_flags,
__wasi_addr_t *src_addr);
void replay_sock_recv_from_data(uint32_t sock, uint8 **ri_data, uint32* ri_data_len);
void insert_sock_recv_from_data(uint32_t, uint8 *, uint32, uint16_t, __wasi_addr_t *);
void replay_sock_recv_from_data(uint32_t, uint8 **, unsigned long *, __wasi_addr_t *);
void insert_socket(int, int, int, int);
void update_socket_fd_address(int, struct SocketAddrPool *);
void set_tcp();
Expand All @@ -45,7 +43,6 @@ void rename_fd(int, char const *, int, char const *);
void lightweight_checkpoint(WASMExecEnv *);
void lightweight_uncheckpoint(WASMExecEnv *);
void wamr_wait();
int check_recvbuffer(int fd, char *buf);
void sigint_handler(int sig);
void register_sigtrap();
void sigtrap_handler(int sig);
Expand Down
19 changes: 7 additions & 12 deletions include/wamr_wasi_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@
#include <string>
#include <vector>

struct WAMRArgvEnvironValues {
std::vector<std::string> argv_list;
std::vector<std::string> env_list;
};
// need to serialize the opened file in the file descripter and the socket opened also.

struct WAMRAddrPool {
uint8 ip4[4];
uint16 ip6[8];
Expand Down Expand Up @@ -51,7 +45,6 @@ struct WasiSockSendToData {
struct WasiSockRecvFromData {
uint32_t sock;
std::vector<uint8_t> ri_data;
uint32 ri_data_len;
uint16_t ri_flags;
WAMRWasiAddr src_addr;
uint32 ro_data_len;
Expand All @@ -64,19 +57,21 @@ struct SocketMetaData {
SocketAddrPool socketAddress{};
WasiSockOpenData socketOpenData{};
int replay_start_index{};
bool is_collection=false;
bool is_collection = false;
#if !defined(_WIN32)
WasiSockSendToData socketSentToData{}; //
WasiSockSendToData socketSentToData{}; //
std::vector<WasiSockRecvFromData> socketRecvFromDatas;
#endif
};
struct WAMRWASIContext {
std::map<int, std::tuple<std::string, std::vector<std::tuple<int,int,fd_op>>>> fd_map;
std::map<int, std::tuple<std::string, std::vector<std::tuple<int, int, fd_op>>>> fd_map;
std::map<int, SocketMetaData> socket_fd_map;
std::vector<std::string> dir;
std::vector<std::string> map_dir;
WAMRArgvEnvironValues argv_environ;
std::vector<WAMRAddrPool> addr_pool;
std::vector<std::string> arg;
std::vector<std::string> argv_list;
std::vector<std::string> env_list;
std::vector<std::string> addr_pool;
std::vector<std::string> ns_lookup_list;
uint32_t exit_code;
void dump_impl(WASIArguments *env);
Expand Down
51 changes: 5 additions & 46 deletions src/wamr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,6 @@ int WAMRInstance::invoke_fopen(std::string &path, uint32 option) {
wasm_runtime_module_free(module_inst, buffer_for_wasm);
return ((int)argv[0]);
}
// auto name1 = "o_";
// uint32 argv[0];
// if (!(func = wasm_runtime_lookup_function(module_inst, name1, nullptr))) {
// LOGV(ERROR) << "The wasi " << name1 << " function is not found.";
// }
// wasm_runtime_call_wasm(exec_env, func, 0, argv);
return -1;
};
int WAMRInstance::invoke_frenumber(uint32 fd, uint32 to) {
Expand Down Expand Up @@ -235,7 +229,7 @@ int WAMRInstance::invoke_sock_open(uint32_t poolfd, int af, int socktype, uint32
uint32 argv[4] = {poolfd, static_cast<uint32>(af), static_cast<uint32>(socktype), buffer_for_wasm};
auto res = wasm_runtime_call_wasm(exec_env, func, 4, argv);
wasm_runtime_module_free(module_inst, buffer_for_wasm);
return res;
return argv[0];
}
return -1;
}
Expand Down Expand Up @@ -466,24 +460,17 @@ void WAMRInstance::recover(std::vector<std::unique_ptr<WAMRExecEnv>> *execEnv) {
;
});

for (const auto &exec_ : *execEnv) {
size_t a = exec_->frames.back()->function_index;
size_t b = exec_->frames.front()->function_index;
fprintf(stderr, "exec_env %p, frames %lu %lu, cur_count %d\n", exec_.get(), a, b, exec_->cur_count);
}

argptr = (ThreadArgs **)malloc(sizeof(void *) * execEnv->size());
uint32 id = 0;
auto main_exec_env = execEnv->back().get();
set_wasi_args(main_exec_env->module_inst.wasi_ctx);
set_wasi_args(execEnv->back()->module_inst.wasi_ctx);

instantiate();
auto mi = module_inst;

get_int3_addr();
replace_int3_with_nop();

restore(main_exec_env, cur_env);
restore(execEnv->back().get(), cur_env);
auto main_env = cur_env;
auto main_saved_call_chain = main_env->restore_call_chain;
fprintf(stderr, "main_env created %p %p\n\n", main_env, main_saved_call_chain);
Expand Down Expand Up @@ -565,35 +552,8 @@ void WAMRInstance::set_wasi_args(const std::vector<std::string> &dir_list, const
wasm_runtime_set_wasi_ns_lookup_pool(module, ns_pool_.data(), ns_pool_.size());
}
void WAMRInstance::set_wasi_args(WAMRWASIContext &context) {
// TODO: some handmade directory after recovery dir
auto get_addr_from_context = [](const WAMRWASIContext &wasiContext) {
auto addr_pool = std::vector<std::string>(wasiContext.addr_pool.size());
std::transform(wasiContext.addr_pool.begin(), wasiContext.addr_pool.end(), addr_pool.begin(),
[](const WAMRAddrPool &addrs) {
std::string addr_str;
if (addrs.is_4) {
addr_str = fmt::format("{}.{}.{}.{}/{}", addrs.ip4[0], addrs.ip4[1], addrs.ip4[2],
addrs.ip4[3], addrs.mask);
if (addrs.mask != UINT8_MAX) {
addr_str += fmt::format("/{}", addrs.mask);
}
} else {
addr_str =

fmt::format("{:#}:{:#}:{:#}:{:#}:{:#}:{:#}:{:#}:{:#}", addrs.ip6[0], addrs.ip6[1],
addrs.ip6[2], addrs.ip6[3], addrs.ip6[4], addrs.ip6[5], addrs.ip6[6],
addrs.ip6[7], addrs.mask);
if (addrs.mask != UINT8_MAX) {
addr_str += fmt::format("/{}", addrs.mask);
}
}
return addr_str;
});

return addr_pool;
};
set_wasi_args(context.dir, context.map_dir, context.argv_environ.env_list, context.argv_environ.argv_list,
get_addr_from_context(context), context.ns_lookup_list);
set_wasi_args(context.dir, context.map_dir, context.env_list, context.argv_list,
context.addr_pool, context.ns_lookup_list);
}
extern WAMRInstance *wamr;
extern "C" { // stop name mangling so it can be linked externally
Expand Down Expand Up @@ -625,7 +585,6 @@ WASMExecEnv *restore_env() {

return exec_env;
}
int check_recvbuffer(int fd, char *buf) { return 0; };
}

void WAMRInstance::instantiate() {
Expand Down
37 changes: 27 additions & 10 deletions src/wamr_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_le
recvFromData.src_addr.ip.ip4[1] = src_addr->addr.ip4.addr.n1;
recvFromData.src_addr.ip.ip4[2] = src_addr->addr.ip4.addr.n2;
recvFromData.src_addr.ip.ip4[3] = src_addr->addr.ip4.addr.n3;

recvFromData.src_addr.ip.is_4 = true;
recvFromData.src_addr.port = src_addr->addr.ip4.port;
} else {
recvFromData.src_addr.ip.is_4 = false;
Expand All @@ -86,8 +84,6 @@ void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_le
recvFromData.src_addr.ip.ip6[5] = src_addr->addr.ip6.addr.h1;
recvFromData.src_addr.ip.ip6[6] = src_addr->addr.ip6.addr.h2;
recvFromData.src_addr.ip.ip6[7] = src_addr->addr.ip6.addr.h3;

recvFromData.src_addr.ip.is_4 = false;
recvFromData.src_addr.port = src_addr->addr.ip6.port;
}
LOGV(ERROR) << "insert_sock_recv_from_data " << sock << " " << ((struct mvvm_op_data *)ri_data)->op;
Expand All @@ -100,20 +96,41 @@ void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_le
LOGV(ERROR) << "socket_fd" << sock << " not found";
}
}
void replay_sock_recv_from_data(uint32_t sock, uint8 **ri_data, uint32 *ri_data_len) {
void replay_sock_recv_from_data(uint32_t sock, uint8 **ri_data, unsigned long *recv_size, __wasi_addr_t *src_addr) {
// check from wamr->socket_fd_map_[sock] and drain one
// should be in the same order
if (wamr->socket_fd_map_[sock].socketRecvFromDatas.empty()) {
LOGV(ERROR) << "no recvfrom data " << sock;
*ri_data_len = 0;
*recv_size = 0;
return;
}
// shoud we check the src_addr?
if (wamr->socket_fd_map_[sock].replay_start_index >= wamr->socket_fd_map_[sock].socketRecvFromDatas.size()) {
LOGV(ERROR) << "replay index out of bound " << sock;
*recv_size = 0;
return;
}
auto recvFromData = wamr->socket_fd_map_[sock].socketRecvFromDatas[wamr->socket_fd_map_[sock].replay_start_index];
wamr->socket_fd_map_[sock].socketRecvFromDatas.erase(wamr->socket_fd_map_[sock].socketRecvFromDatas.begin() +
wamr->socket_fd_map_[sock].replay_start_index);
std::memcpy(*ri_data, recvFromData.ri_data.data(), recvFromData.ri_data_len);
*ri_data_len = recvFromData.ri_data_len;
wamr->socket_fd_map_[sock].replay_start_index;
std::memcpy(*ri_data, recvFromData.ri_data.data(), recvFromData.ri_data.size());
*recv_size = recvFromData.ri_data.size();
if (src_addr->kind == IPv4) {
src_addr->addr.ip4.addr.n0 = recvFromData.src_addr.ip.ip4[0];
src_addr->addr.ip4.addr.n1 = recvFromData.src_addr.ip.ip4[1];
src_addr->addr.ip4.addr.n2 = recvFromData.src_addr.ip.ip4[2];
src_addr->addr.ip4.addr.n3 = recvFromData.src_addr.ip.ip4[3];
src_addr->addr.ip4.port = recvFromData.src_addr.port;
} else {
src_addr->addr.ip6.addr.n0 = recvFromData.src_addr.ip.ip6[0];
src_addr->addr.ip6.addr.n1 = recvFromData.src_addr.ip.ip6[1];
src_addr->addr.ip6.addr.n2 = recvFromData.src_addr.ip.ip6[2];
src_addr->addr.ip6.addr.n3 = recvFromData.src_addr.ip.ip6[3];
src_addr->addr.ip6.addr.h0 = recvFromData.src_addr.ip.ip6[4];
src_addr->addr.ip6.addr.h1 = recvFromData.src_addr.ip.ip6[5];
src_addr->addr.ip6.addr.h2 = recvFromData.src_addr.ip.ip6[6];
src_addr->addr.ip6.addr.h3 = recvFromData.src_addr.ip.ip6[7];
src_addr->addr.ip6.port = recvFromData.src_addr.port;
}
}
#endif
// only support one type of requests at a time
Expand Down
20 changes: 16 additions & 4 deletions src/wamr_wasi_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ void WAMRWASIContext::dump_impl(WASIArguments *env) {
for (auto &i : wamr->map_dir_) {
map_dir.emplace_back(i);
}
for (auto &i : wamr->env_) {
env_list.emplace_back(i);
}
for (auto &i : wamr->arg_) {
arg.emplace_back(i);
}
for (auto &i : wamr->addr_) {
addr_pool.emplace_back(i);
}
for (auto &i : wamr->ns_pool_) {
ns_lookup_list.emplace_back(i);
}
for (auto [fd, res] : wamr->fd_map_) {
auto [path, op] = res;
auto dumped_res = std::make_tuple(path, op);
Expand Down Expand Up @@ -87,7 +99,7 @@ void WAMRWASIContext::dump_impl(WASIArguments *env) {
}
}
}
// LOGV(ERROR) << "recv error";
// LOGV(ERROR) << "recv error";

this->socket_fd_map[fd] = socketMetaData;
}
Expand Down Expand Up @@ -128,12 +140,12 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) {
// << " SocketMetaData[socketAddress]: " << socketMetaData.socketAddress
<< " SocketMetaData[protocol]: " << socketMetaData.protocol
<< " SocketMetaData[type]: " << socketMetaData.type;
uint32 tmp_sock_fd = socketMetaData.socketOpenData.sockfd;
wamr->invoke_sock_open(socketMetaData.socketOpenData.poolfd, socketMetaData.socketOpenData.af,
uint32 tmp_sock_fd ;
auto res = wamr->invoke_sock_open(socketMetaData.socketOpenData.poolfd, socketMetaData.socketOpenData.af,
socketMetaData.socketOpenData.socktype,
&tmp_sock_fd); // should be done after restore call chain
// renumber or not?
LOGV(INFO) << "tmp_sock_fd " << tmp_sock_fd << " fd" << fd;
LOGV(INFO) << "tmp_sock_fd " << tmp_sock_fd << " fd" << fd << " res " << res;
if (tmp_sock_fd != fd)
wamr->invoke_frenumber(tmp_sock_fd, fd);
wamr->socket_fd_map_[fd] = socketMetaData;
Expand Down
2 changes: 1 addition & 1 deletion test/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ init_sockaddr_inet(struct sockaddr_in *addr)
/* 172.17.0.1:1234 */
addr->sin_family = AF_INET;
addr->sin_port = htons(12346);
addr->sin_addr.s_addr = my_inet_addr("172.17.0.2");
addr->sin_addr.s_addr = my_inet_addr("172.17.0.3");
}

static void
Expand Down

0 comments on commit 659fa2d

Please sign in to comment.