Skip to content

Commit

Permalink
tcp recover init
Browse files Browse the repository at this point in the history
  • Loading branch information
victoryang00 committed Jan 17, 2024
1 parent b4846c7 commit b82863a
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 29 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@

## To checkpoint and migrate a WAMR nano process
```bash
LOGV=1 ./MVVM_checkpoint -t ./test/counter.aot -f poll_oneoff -c 0 -x 10 -a "10" -e OMP_NUM_THREADS=1
LOGV=1 ./MVVM_restore -t ./test/counter.aot # All the wasi env will be restored
python3 ../artifact/common_util.py # return $recv is 193
LOGV=1 ./MVVM_checkpoint -t ./test/tcp_client.aot -f 193 -c 0 -x 10 -a "10" -e OMP_NUM_THREADS=1 -i
LOGV=1 ./MVVM_restore -t ./test/tcp_client.aot # All the wasi env will be restored
```
1. -t Target: The path to the WASM interpreter or AOT executable
2. -f Function: The function to stop and checkpoint
3. -x Function Counter: The WASM function counter to stop and checkpoint
4. -c Counter: The WASM instruction counter to stop and checkpoint(Conflict with -f and -x)
5. -a Arguments: The arguments to the function
6. -e Environment: The environment variables to the function
2. -i Debug Mode: Switch on for debugging
3. -f Function: The function to stop and checkpoint
4. -x Function Counter: The WASM function counter to stop and checkpoint
5. -c Counter: The WASM instruction counter to stop and checkpoint(Conflict with -f and -x)
6. -a Arguments: The arguments to the function
7. -e Environment: The environment variables to the function

## Design Doc
1. All the pointer will be stored as offset to the linear memory.
Expand Down
1 change: 1 addition & 0 deletions include/wamr.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class WAMRInstance {
int invoke_sock_open(uint32_t domain, uint32_t socktype, uint32_t protocol, uint32_t sockfd);
int invoke_sock_listen(uint32_t sockfd, uint32_t fd);
int invoke_sock_bind(uint32_t sockfd, struct sockaddr *sock, socklen_t sock_size);
int invoke_sock_getsockname(uint32_t sockfd, struct sockaddr **sock, socklen_t sock_size);
int invoke_recv(int sockfd, uint8 **buf, size_t len, int flags);
int invoke_recvfrom(int sockfd, uint8 **buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
~WAMRInstance();
Expand Down
56 changes: 40 additions & 16 deletions src/checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ void serialize_to_file(WASMExecEnv *instance) {
int fd = 0;
ssize_t rc;
SocketAddrPool src_addr{};

for (auto [tmp_fd, sock_data] : wamr->socket_fd_map_) {
int idx =wamr->op_data.size;
int idx = wamr->op_data.size;
src_addr = sock_data.socketAddress;
auto tmp_ip4 =
fmt::format("{}.{}.{}.{}", src_addr.ip4[0], src_addr.ip4[1], src_addr.ip4[2], src_addr.ip4[3]);
Expand Down Expand Up @@ -107,22 +107,46 @@ void serialize_to_file(WASMExecEnv *instance) {
sock_data.socketSentToData.dest_addr.ip.ip6[3], sock_data.socketSentToData.dest_addr.ip.ip6[4],
sock_data.socketSentToData.dest_addr.ip.ip6[5], sock_data.socketSentToData.dest_addr.ip.ip6[6],
sock_data.socketSentToData.dest_addr.ip.ip6[7]);
if (sock_data.socketSentToData.dest_addr.ip.is_4 && tmp_ip4 == "0.0.0.0" ||
!sock_data.socketSentToData.dest_addr.ip.is_4 && tmp_ip6 == "0:0:0:0:0:0:0:0") {
wamr->op_data.addr[idx][1].is_4 = sock_data.socketRecvFromDatas[0].src_addr.ip.is_4;
std::memcpy(wamr->op_data.addr[idx][1].ip4, sock_data.socketRecvFromDatas[0].src_addr.ip.ip4,
sizeof(sock_data.socketRecvFromDatas[0].src_addr.ip.ip4));
std::memcpy(wamr->op_data.addr[idx][1].ip6, sock_data.socketRecvFromDatas[0].src_addr.ip.ip6,
sizeof(sock_data.socketRecvFromDatas[0].src_addr.ip.ip6));
wamr->op_data.addr[idx][1].port = sock_data.socketRecvFromDatas[0].src_addr.port;
if ((tmp_ip4 == "0.0.0.0" || tmp_ip6 == "0:0:0:0:0:0:0:0") && !wamr->op_data.is_tcp) {
if (sock_data.socketSentToData.dest_addr.ip.is_4 && tmp_ip4 == "0.0.0.0" ||
!sock_data.socketSentToData.dest_addr.ip.is_4 && tmp_ip6 == "0:0:0:0:0:0:0:0") {

wamr->op_data.addr[idx][1].is_4 = sock_data.socketRecvFromDatas[0].src_addr.ip.is_4;
std::memcpy(wamr->op_data.addr[idx][1].ip4, sock_data.socketRecvFromDatas[0].src_addr.ip.ip4,
sizeof(sock_data.socketRecvFromDatas[0].src_addr.ip.ip4));
std::memcpy(wamr->op_data.addr[idx][1].ip6, sock_data.socketRecvFromDatas[0].src_addr.ip.ip6,
sizeof(sock_data.socketRecvFromDatas[0].src_addr.ip.ip6));
wamr->op_data.addr[idx][1].port = sock_data.socketRecvFromDatas[0].src_addr.port;

} else {
wamr->op_data.addr[idx][1].is_4 = sock_data.socketSentToData.dest_addr.ip.is_4;
std::memcpy(wamr->op_data.addr[idx][1].ip4, sock_data.socketSentToData.dest_addr.ip.ip4,
sizeof(sock_data.socketSentToData.dest_addr.ip.ip4));
std::memcpy(wamr->op_data.addr[idx][1].ip6, sock_data.socketSentToData.dest_addr.ip.ip6,
sizeof(sock_data.socketSentToData.dest_addr.ip.ip6));
wamr->op_data.addr[idx][1].port = sock_data.socketSentToData.dest_addr.port;
}
} else {
wamr->op_data.addr[idx][1].is_4 = sock_data.socketSentToData.dest_addr.ip.is_4;
std::memcpy(wamr->op_data.addr[idx][1].ip4, sock_data.socketSentToData.dest_addr.ip.ip4,
sizeof(sock_data.socketSentToData.dest_addr.ip.ip4));
std::memcpy(wamr->op_data.addr[idx][1].ip6, sock_data.socketSentToData.dest_addr.ip.ip6,
sizeof(sock_data.socketSentToData.dest_addr.ip.ip6));
wamr->op_data.addr[idx][1].port = sock_data.socketSentToData.dest_addr.port;
sockaddr *ss = (sockaddr *)malloc(sizeof(sockaddr));
wamr->invoke_sock_getsockname(fd, &ss, sizeof(*ss));
if (ss->sa_family == AF_INET) {
auto *ipv4 = (struct sockaddr_in *)ss;
uint32_t ip = ntohl(ipv4->sin_addr.s_addr);
wamr->op_data.addr[idx][1].is_4 = true;
wamr->op_data.addr[idx][1].ip4[0] = (ip >> 24) & 0xFF;
wamr->op_data.addr[idx][1].ip4[1] = (ip >> 16) & 0xFF;
wamr->op_data.addr[idx][1].ip4[2] = (ip >> 8) & 0xFF;
wamr->op_data.addr[idx][1].ip4[3] = ip & 0xFF;
wamr->op_data.addr[idx][1].port = ntohs(ipv4->sin_port);
} else if (ss->sa_family == AF_INET6) {
auto *ipv6 = (struct sockaddr_in6 *)ss;
wamr->op_data.addr[idx][1].is_4 = false;
const auto *bytes = (const uint8_t *)ipv6->sin6_addr.s6_addr;
for (int i = 0; i < 16; i += 2) {
wamr->op_data.addr[idx][1].ip6[i / 2] = (bytes[i] << 8) + bytes[i + 1];
}
wamr->op_data.addr[idx][1].port = ntohs(ipv6->sin6_port);
}
}
LOGV(INFO) << "dest_addr: "
<< fmt::format("{}.{}.{}.{}", wamr->op_data.addr[idx][1].ip4[0],
Expand Down
46 changes: 44 additions & 2 deletions src/wamr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
#include "wasm_runtime.h"
#include <regex>
#include <semaphore>
#include <unistd.h>
#if !defined(_WIN32)
#include "thread_manager.h"
#include <unistd.h>
#endif

WAMRInstance::ThreadArgs **argptr;
Expand Down Expand Up @@ -288,11 +288,53 @@ int WAMRInstance::invoke_sock_bind(uint32_t sockfd, struct sockaddr *sock, sockl
buffer_for_wasm = wasm_runtime_module_malloc(module_inst, 100, reinterpret_cast<void **>(&buffer_));
if (buffer_for_wasm != 0) {
uint32 argv[3];
memcpy(buffer_, sock, sizeof(sock)); // use native address for accessing in runtime
memcpy(buffer_, sock, sizeof(sockaddr)); // use native address for accessing in runtime
argv[0] = sockfd; // pass the buffer_ address for WASM space
argv[1] = buffer_for_wasm; // the size of buffer_
argv[2] = sock_size; // O_RW | O_CREATE
wasm_runtime_call_wasm(exec_env, func, 3, argv);
int res = argv[0];
wasm_runtime_module_free(module_inst, buffer_for_wasm);
return res;
}
return -1;
}
int WAMRInstance::invoke_sock_getsockname(uint32_t sockfd, struct sockaddr **sock, socklen_t sock_size) {
auto name = "getsockname";
if (!(func = wasm_runtime_lookup_function(module_inst, name, nullptr))) {
LOGV(ERROR) << "The wasi " << name << " function is not found.";
auto target_module = get_module_instance()->e;
for (int i = 0; i < target_module->function_count; i++) {
auto cur_func = &target_module->functions[i];
if (cur_func->is_import_func) {
LOGV(DEBUG) << cur_func->u.func_import->field_name;
if (!strcmp(cur_func->u.func_import->field_name, name)) {
func = ((WASMFunctionInstanceCommon *)cur_func);
break;
}
} else {
LOGV(DEBUG) << cur_func->u.func->field_name;

if (!strcmp(cur_func->u.func->field_name, name)) {
func = ((WASMFunctionInstanceCommon *)cur_func);
break;
}
}
}
}

char *buffer_ = nullptr;
uint32_t buffer_for_wasm;

buffer_for_wasm = wasm_runtime_module_malloc(module_inst, 100, reinterpret_cast<void **>(&buffer_));
if (buffer_for_wasm != 0) {
uint32 argv[3];
memcpy(buffer_, *sock, sizeof(struct sockaddr_storage)); // use native address for accessing in runtime
argv[0] = sockfd; // pass the buffer_ address for WASM space
argv[1] = buffer_for_wasm; // the size of buffer_
argv[2] = sock_size; // O_RW | O_CREATE
wasm_runtime_call_wasm(exec_env, func, 3, argv);
memcpy(*sock, buffer_, sizeof(struct sockaddr));
int res = argv[0];
wasm_runtime_module_free(module_inst, buffer_for_wasm);
return res;
Expand Down
1 change: 0 additions & 1 deletion src/wamr_wasi_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) {
struct sockaddr_in sockaddr4 = sockaddr_from_ip4(socketMetaData.socketAddress);
socklen_t sockaddr4_size = sizeof(sockaddr4);
wamr->invoke_sock_bind(fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4));

} else {
struct sockaddr_in6 sockaddr6 = sockaddr_from_ip6(socketMetaData.socketAddress);
socklen_t sockaddr6_size = sizeof(sockaddr6);
Expand Down
8 changes: 5 additions & 3 deletions test/tcp_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
void *
run(void *arg)
{
const char *message = "Say Hi from the Serverxxx\n";
char *message = (char *)malloc(1024);
int new_socket = *(int *)arg;
int i;

for (i = 0; i < 1024; i++) {
message[i] = 'a';
}
printf("[Server] Communicate with the new connection #%u @ %p ..\n",
new_socket, (void *)(uintptr_t)pthread_self());

for (i = 0; i < 5; i++) {
for (i = 0; i < 1000; i++) {
if (send(new_socket, message, strlen(message), 0) < 0) {
perror("Send failed");
break;
Expand Down

0 comments on commit b82863a

Please sign in to comment.