Skip to content

Commit

Permalink
tcp done
Browse files Browse the repository at this point in the history
  • Loading branch information
victoryang00 committed Jan 23, 2024
1 parent d048687 commit 268eb45
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 137 deletions.
123 changes: 89 additions & 34 deletions gateway/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ int client_fd;
int fd;
int new_fd;
// assuming they are continuous
std::vector<std::tuple<int, int, int>> tcp_pair; // server, server accept fd, client
std::vector<std::jthread> recv_thread;
std::vector<std::jthread> send_thread;
std::map<std::string, std::tuple<int, int, int>> slept_tcp_pair;
struct connection_pair {
std::jthread *send;
std::jthread *recv;
int server_fd;
int new_server;
int new_client;
bool is_sleep;
};
std::map<std::string, struct connection_pair> tcp_pair;
std::vector<std::tuple<std::string, std::string, std::string>> forward_pair;
std::vector<std::jthread> backend_thread;
bool is_forward = false;
Expand Down Expand Up @@ -417,17 +422,20 @@ int main() {
forward_pair.emplace_back(server_ip, client_ip, "");
// send the fin to server
op_data->op = MVVM_SOCK_FIN;
sleep(2);
LOGV(INFO) << "send fin";

if (!op_data->is_tcp) {
send_fin(client_ip, client_port, server_ip, server_port, (char *)op_data);
} else {
// TODO remove the tcp connection
send_thread.pop_back();
recv_thread.pop_back();
auto p = tcp_pair.back();
slept_tcp_pair[fmt::format("{}:{}", server_ip, server_port)] = p;

auto to_stop = tcp_pair[fmt::format("{}:{}", server_ip, server_port)];
LOGV(ERROR) << server_ip << ":" << server_port << " " << to_stop.new_client << " "
<< to_stop.new_server;
sleep(2);
send(to_stop.new_server, (char *)op_data, sizeof(*op_data), 0);
to_stop.is_sleep = true;
delete to_stop.send;
delete to_stop.recv;
}
}
break;
Expand All @@ -442,26 +450,50 @@ int main() {
<< std::get<1>(forward_pair[forward_pair.size() - 1]);

if (op_data->is_tcp) {
auto to_start = tcp_pair[fmt::format("{}:{}", server_ip, server_port)];
LOGV(ERROR) << server_ip << ":" << server_port;

if (to_start.new_server == 0) {
LOGV(ERROR) << "to_start is empty";
exit(-1);
} else {
to_start.is_sleep = false;
}

socklen_t size = sizeof(address);
auto new_client = accept(new_fd, (struct sockaddr *)&address, &size); // if is cl
bool closed = false;
recv_thread.emplace_back([&]() {
int new_server = std::get<1>(tcp_pair[tcp_pair.size() - 1]);
while (!closed) {
auto new_server = accept(to_start.server_fd, (struct sockaddr *)&address, &size);
int new_client = to_start.new_client;

to_start.send = new std::jthread([&](std::stop_token stopToken) {
while (!stopToken.stop_requested()) {
if ((rc = recv(new_server, buffer1, sizeof(buffer1), 0)) > 0) {
LOGV(ERROR) << "send" << sizeof(buffer1);
send(new_client, buffer1, sizeof(buffer1), 0);
}
}
});
recv_thread.emplace_back([&]() {
int new_server = std::get<1>(tcp_pair[tcp_pair.size() - 1]);
while (!closed) {
if ((rc = recv(new_client, buffer, sizeof(buffer), 0)) > 0) {
LOGV(ERROR) << "recv" << buffer;
send(new_server, buffer, sizeof(buffer), 0);
to_start.recv = new std::jthread([&](std::stop_token stopToken) {
// Set the socket to non-blocking
fcntl(new_client, F_SETFL, O_NONBLOCK);

while (!stopToken.stop_requested()) {
rc = recv(new_client, buffer, sizeof(buffer), 0);
if (rc > 0) {
LOGV(ERROR) << "recv" << sizeof(buffer);
while ((rc = send(new_server, buffer, sizeof(buffer), 0) <= 0)) {
LOGV(ERROR) << "error sending" << rc << errno;
sleep(1);
}
} else if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
// Handle error
break;
}
// Add a small sleep to prevent busy-waiting
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
to_start.new_server = new_server;
tcp_pair[fmt::format("{}:{}", server_ip, server_port)] = to_start;
} else
is_forward = true;
break;
Expand Down Expand Up @@ -501,7 +533,7 @@ int main() {
}

address.sin_family = AF_INET;
address.sin_port = htons(server_port);
address.sin_port = htons(client_port);
// Convert IPv4 and IPv6 addresses from text to binary form
if (inet_pton(AF_INET, MVVM_SOCK_ADDR, &address.sin_addr) <= 0) {
LOGV(ERROR) << "Invalid address/ Address not supported";
Expand All @@ -519,13 +551,16 @@ int main() {
LOGV(ERROR) << "listen";
exit(EXIT_FAILURE);
}
// sleep(1);
int new_server =
accept(new_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen); // will be instantly consumed

accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen); // will be instantly consumed
// Create a socket connect remote
int new_client = socket(AF_INET, SOCK_STREAM, 0);
// Convert IPv4 and IPv6 addresses from text to binary form
if (inet_pton(AF_INET, server_ip.c_str(), &address.sin_addr) <= 0) {

address.sin_family = AF_INET;
address.sin_port = htons(client_port);
if (inet_pton(AF_INET, client_ip.c_str(), &address.sin_addr) <= 0) {
LOGV(ERROR) << "Invalid address/ Address not supported";
exit(EXIT_FAILURE);
}
Expand All @@ -536,22 +571,42 @@ int main() {
exit(EXIT_FAILURE);
}
LOGV(ERROR) << "new_client " << new_client;
bool closed = false;
send_thread.emplace_back([&]() {
while (!closed) {
struct connection_pair cp = {
.server_fd = server_fd, .new_server = new_server, .new_client = new_client, .is_sleep = false};
cp.send = new std::jthread([&](std::stop_token stopToken) {
while (!stopToken.stop_requested()) {
if ((rc = recv(new_server, buffer1, sizeof(buffer1), 0)) > 0) {
send(new_client, buffer1, sizeof(buffer1), 0);
LOGV(ERROR) << "send" << sizeof(buffer1);
while ((rc = send(new_client, buffer1, sizeof(buffer1), 0) <= 0)) {
LOGV(ERROR) << "error sending" << rc << errno;
sleep(1);
}
}
}
});
recv_thread.emplace_back([&]() {
while (!closed) {
if ((rc = recv(new_client, buffer, sizeof(buffer), 0)) > 0) {
send(new_server, buffer, sizeof(buffer), 0);
cp.recv = new std::jthread([&](std::stop_token stopToken) {
// Set the socket to non-blocking
fcntl(new_client, F_SETFL, O_NONBLOCK);

while (!stopToken.stop_requested()) {
rc = recv(new_client, buffer, sizeof(buffer), 0);
if (rc > 0) {
LOGV(ERROR) << "recv" << sizeof(buffer);
while ((rc = send(new_server, buffer, sizeof(buffer), 0) <= 0)) {
LOGV(ERROR) << "error sending" << rc << errno;
sleep(1);
}
} else if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
// Handle error
break;
}
// Add a small sleep to prevent busy-waiting
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
tcp_pair.emplace_back(server_fd, new_server, new_client);
// client send this for the server so reverse order
LOGV(ERROR) << client_ip << ":" << client_port;
tcp_pair[fmt::format("{}:{}", client_ip, client_port)] = cp;
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/wasm-micro-runtime
44 changes: 4 additions & 40 deletions src/restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,50 +63,13 @@ int main(int argc, char **argv) {
struct sockaddr_in addr {};
int fd = 0;

SocketAddrPool src_addr = {.ip4 = {0}, .ip6 = {0}, .is_4 = true, .port = 0}; // get current ip
struct ifaddrs *ifaddr, *ifa;

if (getifaddrs(&ifaddr) == -1) {
LOGV(ERROR) << "getifaddrs";
exit(EXIT_FAILURE);
}

for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == nullptr)
continue;

if (ifa->ifa_addr->sa_family == AF_INET && src_addr.is_4) {
// IPv4
auto *ipv4 = (struct sockaddr_in *)ifa->ifa_addr;
uint32_t ip = ntohl(ipv4->sin_addr.s_addr);
if (is_ip_in_cidr(MVVM_SOCK_ADDR, MVVM_SOCK_MASK, ip)) {
// Extract IPv4 address
src_addr.ip4[0] = (ip >> 24) & 0xFF;
src_addr.ip4[1] = (ip >> 16) & 0xFF;
src_addr.ip4[2] = (ip >> 8) & 0xFF;
src_addr.ip4[3] = ip & 0xFF;
}

} else if (ifa->ifa_addr->sa_family == AF_INET6 && !src_addr.is_4) {
// IPv6
auto *ipv6 = (struct sockaddr_in6 *)ifa->ifa_addr;
src_addr.is_4 = false;
// Extract IPv6 address
const auto *bytes = (const uint8_t *)ipv6->sin6_addr.s6_addr;
if (is_ipv6_in_cidr(MVVM_SOCK_ADDR6, MVVM_SOCK_MASK6, &ipv6->sin6_addr)) {
for (int i = 0; i < 16; i += 2) {
src_addr.ip6[i / 2] = (bytes[i] << 8) + bytes[i + 1];
}
}
}
}

freeifaddrs(ifaddr);
SocketAddrPool src_addr = wamr->local_addr;
LOGV(INFO) << "new ip is "
<< fmt::format("{}.{}.{}.{}", src_addr.ip4[0], src_addr.ip4[1], src_addr.ip4[2], src_addr.ip4[3]);
wamr->op_data.op = MVVM_SOCK_RESUME;
wamr->op_data.addr[0][0] = src_addr;
for (auto &[fd,socketMetaData] : a[0]->module_inst.wasi_ctx.socket_fd_map) {
// got from wamr
for (auto &[fd, socketMetaData] : a[0]->module_inst.wasi_ctx.socket_fd_map) {
wamr->op_data.is_tcp |= socketMetaData.type;
}

Expand Down Expand Up @@ -136,6 +99,7 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
close(fd);
LOGV(ERROR) << "sent the resume signal";
}
#endif
// do iptables here
Expand Down
2 changes: 2 additions & 0 deletions src/wamr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ WAMRInstance::WAMRInstance(const char *wasm_path, bool is_jit) : is_jit(is_jit)
LOGV(ERROR) << fmt::format("Load wasm module failed. error: {}", error_buf);
throw;
}
#if !defined(_WIN32)
struct ifaddrs *ifaddr, *ifa;
int family, s;
char host[NI_MAXHOST];
Expand Down Expand Up @@ -118,6 +119,7 @@ WAMRInstance::WAMRInstance(const char *wasm_path, bool is_jit) : is_jit(is_jit)
local_addr.is_4 = true;

freeifaddrs(ifaddr);
#endif
}

bool WAMRInstance::load_wasm_binary(const char *wasm_path, char **buffer_ptr) {
Expand Down
64 changes: 33 additions & 31 deletions src/wamr_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,42 +225,44 @@ void update_socket_fd_address(int fd, SocketAddrPool *address) {

void init_gateway(SocketAddrPool *address) {
// tell gateway to keep alive the server
struct sockaddr_in addr {};
int fd = 0;
ssize_t rc;
wamr->op_data.op = MVVM_SOCK_INIT;
wamr->op_data.addr[0][0] = wamr->local_addr;
std::memcpy(&wamr->op_data.addr[0][1], address, sizeof(SocketAddrPool));
if (wamr->op_data.op != MVVM_SOCK_RESUME) {
struct sockaddr_in addr {};
int fd = 0;
ssize_t rc;
wamr->op_data.op = MVVM_SOCK_INIT;
wamr->op_data.addr[0][0] = wamr->local_addr;
std::memcpy(&wamr->op_data.addr[0][1], address, sizeof(SocketAddrPool));

// Create a socket
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
LOGV(ERROR) << "socket error";
throw std::runtime_error("socket error");
}
// Create a socket
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
LOGV(ERROR) << "socket error";
throw std::runtime_error("socket error");
}

addr.sin_family = AF_INET;
addr.sin_port = htons(MVVM_SOCK_PORT);
addr.sin_family = AF_INET;
addr.sin_port = htons(MVVM_SOCK_PORT);

// Convert IPv4 and IPv6 addresses from text to binary form
if (inet_pton(AF_INET, MVVM_SOCK_ADDR, &addr.sin_addr) <= 0) {
LOGV(ERROR) << "AF_INET not supported";
exit(EXIT_FAILURE);
}
// Connect to the server
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
LOGV(ERROR) << "Connection Failed " << errno;
exit(EXIT_FAILURE);
}
// Convert IPv4 and IPv6 addresses from text to binary form
if (inet_pton(AF_INET, MVVM_SOCK_ADDR, &addr.sin_addr) <= 0) {
LOGV(ERROR) << "AF_INET not supported";
exit(EXIT_FAILURE);
}
// Connect to the server
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
LOGV(ERROR) << "Connection Failed " << errno;
exit(EXIT_FAILURE);
}

LOGV(INFO) << "Connected successfully";
rc = send(fd, &wamr->op_data, sizeof(struct mvvm_op_data), 0);
if (rc == -1) {
LOGV(ERROR) << "send error";
exit(EXIT_FAILURE);
}
LOGV(INFO) << "Connected successfully";
rc = send(fd, &wamr->op_data, sizeof(struct mvvm_op_data), 0);
if (rc == -1) {
LOGV(ERROR) << "send error";
exit(EXIT_FAILURE);
}

// Clean up
close(fd);
// Clean up
close(fd);
}
}

void insert_lock(char const *, int) {}
Expand Down
Loading

0 comments on commit 268eb45

Please sign in to comment.