diff --git a/.gitignore b/.gitignore new file mode 100755 index 0000000..d81646e --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +all +rdmalib.a +src/*.o +.vscode/* \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100755 index 0000000..6adb087 --- /dev/null +++ b/Makefile @@ -0,0 +1,28 @@ +CXX = g++ +CXXFLAGS = -std=c++11 -g3 + +SRC = $(wildcard src/*.cpp) +OBJ = $(patsubst %.cpp,%.o,$(SRC)) + +CLEAN-O = rm -f src/*.o + +all: main.cpp rdmalib.a + $(CXX) $(CXXFLAGS) -o $@ $^ -libverbs -lmlx4 -pthread + $(CLEAN-O) + @echo "┌──────────────────────────────┐" + @echo "| Target Make Success |" + @echo "└──────────────────────────────┘" + +rdmalib.a: $(OBJ) + ar rc $@ $^ + +clean: + rm -f all rdmalib.a + $(CLEAN-O) + @echo "┌──────────────────────────────┐" + @echo "| Target Clean Success |" + @echo "└──────────────────────────────┘" + +aaa: + @echo $(SRC) + @echo $(OBJ) \ No newline at end of file diff --git a/README.md b/README.md new file mode 100755 index 0000000..44adafa --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# A Simple RDMA based communication framework diff --git a/log b/log new file mode 100755 index 0000000..68cd5de Binary files /dev/null and b/log differ diff --git a/main.cpp b/main.cpp new file mode 100755 index 0000000..add9525 --- /dev/null +++ b/main.cpp @@ -0,0 +1,50 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_MAIN +************************************************ */ + +#include "src/rdma_session.h" + +int main(int argc, char* argv[]) +{ + + if (argc == 1) + { + log_error("Missing Start Up Config"); + log_error("Usage:"); + log_error("xxx s\tto start the server"); + log_error("xxx c\tto start the client"); + } else + { + RDMA_Pre pre_tcp; + + // Connect + if (strcmp(argv[1], "s") == 0) + { + log_ok("Server Start"); + } else if (strcmp(argv[1], "c") == 0) + { + log_ok("Client Start"); + if (argc == 2) pre_tcp.config.server_name = LOCALHOST; + else pre_tcp.config.server_name = argv[2]; + } + pre_tcp.print_config(); + pre_tcp.tcp_sock_connect(); + + RDMA_Session session(&pre_tcp); + + if (strcmp(argv[1], "s") == 0) + { + + } else if (strcmp(argv[1], "c") == 0) + { + session.endpoint_->send_message(RDMA_MESSAGE_READ_REQUEST); + } + + //session.endpoint_->send_message(RDMA_MESSAGE_ACK); + //session.endpoint_->send_message(RDMA_MESSAGE_CLOSE); + } + + return 0; +} \ No newline at end of file diff --git a/src/rdma_buffer.cpp b/src/rdma_buffer.cpp new file mode 100755 index 0000000..63e076d --- /dev/null +++ b/src/rdma_buffer.cpp @@ -0,0 +1,31 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_BUFFER_CPP +************************************************ */ + +#include "rdma_buffer.h" + +RDMA_Buffer::RDMA_Buffer(ibv_pd* pd, int size) + : size_(size), status_(IDLE) +{ + buffer_ = malloc(size_); + mr_ = ibv_reg_mr(pd, buffer_, size_, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ); + if (!mr_) + { + log_error("Failed to register memory region"); + } + + log_info("RDMA_Buffer Created"); +} + +RDMA_Buffer::~RDMA_Buffer() +{ + if (ibv_dereg_mr(mr_)) + { + log_error("ibv_dereg_mr failed"); + } + + log_info("RDMA_Buffer Deleted"); +} \ No newline at end of file diff --git a/src/rdma_buffer.h b/src/rdma_buffer.h new file mode 100755 index 0000000..4296e25 --- /dev/null +++ b/src/rdma_buffer.h @@ -0,0 +1,38 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_BUFFER_H +************************************************ */ + +#ifndef RDMA_BUFFER_H +#define RDMA_BUFFER_H + +#include "rdma_util.h" + +enum Buffer_status +{ + NONE, + IDLE, + LOCK +}; + +class RDMA_Buffer +{ +public: + friend class RDMA_Pre; + friend class RDMA_Endpoint; + friend class RDMA_Message; + friend class RDMA_Session; + + RDMA_Buffer(ibv_pd* pd, int size); + ~RDMA_Buffer(); + +private: + + void* buffer_ = NULL; + uint64_t size_; + ibv_mr* mr_; + Buffer_status status_; +}; + +#endif // !RDMA_BUFFER_H \ No newline at end of file diff --git a/src/rdma_endpoint.cpp b/src/rdma_endpoint.cpp new file mode 100755 index 0000000..94fd01e --- /dev/null +++ b/src/rdma_endpoint.cpp @@ -0,0 +1,215 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_ENDPOINT_CPP +************************************************ */ + +#include "rdma_endpoint.h" + +RDMA_Endpoint::RDMA_Endpoint(RDMA_Session* session, int ib_port) + : session_(session), ib_port_(ib_port), connected_(false) +{ + // create the Queue Pair + struct ibv_qp_init_attr qp_init_attr; + memset(&qp_init_attr, 0, sizeof(qp_init_attr)); + + qp_init_attr.qp_type = IBV_QPT_RC; + // qp_init_attr.sq_sig_all = 1; + qp_init_attr.send_cq = session_->cq_; + qp_init_attr.recv_cq = session_->cq_; + qp_init_attr.cap.max_send_wr = RDMA_Session::CQ_SIZE; + qp_init_attr.cap.max_recv_wr = RDMA_Session::CQ_SIZE; + qp_init_attr.cap.max_send_sge = 1; + qp_init_attr.cap.max_recv_sge = 1; + + qp_ = ibv_create_qp(session_->pd_, &qp_init_attr); + if (!qp_) + { + log_error("failed to create QP"); + // return 1; + } + + log_info(make_string("QP was created, QP number=0x%x\n", qp_->qp_num)); + + modify_qp_to_init(); + + // Local address + struct ibv_port_attr attr; + if (ibv_query_port(session_->context_, ib_port_, &attr)) + { + log_error(make_string("ibv_query_port on port %u failed", ib_port_)); + } + self_.lid = attr.lid; + self_.qpn = qp_->qp_num; + self_.psn = static_cast(New64()) & 0xffffff; + + // int SIZE = 32; + // // Create Message Buffer ...... + // incoming_mbuffer_ = new RDMA_Buffer(this, SIZE); + // outgoing_mbuffer_ = new RDMA_Buffer(this, SIZE); + // // ACK Buffer ...... + // incoming_abuffer_ = new RDMA_Buffer(this, SIZE); + // outgoing_abuffer_ = new RDMA_Buffer(this, SIZE); + message_ = new RDMA_Message(session_->pd_, qp_); + + // post recv + for (int i=0;i<100;i++) + { + recv(); + } + + log_info("RDMA_Endpoint Created"); +} + +RDMA_Endpoint::~RDMA_Endpoint() +{ + delete message_; + + if (ibv_destroy_qp(qp_)) + { + log_error("Failed to destroy QP"); + } + + log_info("RDMA_Endpoint Deleted"); +} + +void RDMA_Endpoint::connect() +{ + if (connected_) + { + log_info("Channel Already Connected."); + } else + { + modify_qp_to_rtr(); + modify_qp_to_rts(); + + connected_ = true; + } +} + +void RDMA_Endpoint::recv() +{ + struct ibv_recv_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = (uint64_t) this; // Which RDMA_Endpoint get this message + struct ibv_recv_wr* bad_wr; + if (ibv_post_recv(qp_, &wr, &bad_wr)) + { + log_error("Failed to post recv"); + } +} + +void RDMA_Endpoint::send_message(Message_type msgt) +{ + message_->send(msgt); +} + +void RDMA_Endpoint::read_data(RDMA_Buffer* buffer, Remote_info msg) +{ + struct ibv_sge list; + list.addr = (uint64_t) buffer->buffer_; + list.length = msg.buffer_size_; + list.lkey = buffer->mr_->lkey; + + struct ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = (uint64_t) buffer; + wr.sg_list = &list; + wr.num_sge = 1; + wr.opcode = IBV_WR_RDMA_READ; + wr.send_flags = IBV_SEND_SIGNALED; + wr.wr.rdma.remote_addr = (uint64_t) msg.remote_addr_; + wr.wr.rdma.rkey = msg.rkey_; + + struct ibv_send_wr *bad_wr; + if (ibv_post_send(qp_, &wr, &bad_wr)) + { + log_error("Failed to post send"); + } +} + +int RDMA_Endpoint::modify_qp_to_init() +{ + struct ibv_qp_attr attr; + int flags; + int rc; + + // do the following QP transition: RESET -> INIT + memset(&attr, 0, sizeof(attr)); + attr.qp_state = IBV_QPS_INIT; + attr.port_num = ib_port_; + attr.pkey_index = 0; + attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; + + flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; + + rc = ibv_modify_qp(qp_, &attr, flags); + if (rc) { + fprintf(stderr, "failed to modify QP state to INIT\n"); + return rc; + } + + return 0; +} + +int RDMA_Endpoint::modify_qp_to_rtr() +{ + struct ibv_qp_attr attr; + int flags; + int rc; + + // do the following QP transition: INIT -> RTR + memset(&attr, 0, sizeof(attr)); + + attr.qp_state = IBV_QPS_RTR; + attr.path_mtu = IBV_MTU_4096; + attr.dest_qp_num = remote_.qpn; + attr.rq_psn = remote_.psn; + attr.max_dest_rd_atomic = 1; + attr.min_rnr_timer = 0x12; + attr.ah_attr.is_global = 0; + attr.ah_attr.dlid = remote_.lid; + attr.ah_attr.sl = 0; + attr.ah_attr.src_path_bits = 0; + attr.ah_attr.port_num = ib_port_; + + flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER; + + rc = ibv_modify_qp(qp_, &attr, flags); + if (rc) { + fprintf(stderr, "failed to modify QP state to RTR\n"); + return rc; + } + + return 0; +} + +int RDMA_Endpoint::modify_qp_to_rts() +{ + struct ibv_qp_attr attr; + int flags; + int rc; + + // do the following QP transition: RTR -> RTS + memset(&attr, 0, sizeof(attr)); + + attr.qp_state = IBV_QPS_RTS; + attr.timeout = 0x14; + attr.retry_cnt = 7; + attr.rnr_retry = 7; // infinite + attr.sq_psn = self_.psn; + attr.max_rd_atomic = 1; + + flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC; + + rc = ibv_modify_qp(qp_, &attr, flags); + if (rc) { + fprintf(stderr, "failed to modify QP state to RTS\n"); + return rc; + } + + return 0; +} + diff --git a/src/rdma_endpoint.h b/src/rdma_endpoint.h new file mode 100755 index 0000000..0628176 --- /dev/null +++ b/src/rdma_endpoint.h @@ -0,0 +1,51 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_ENDPOINT_H +************************************************ */ + +#ifndef RDMA_ENDPOINT_H +#define RDMA_ENDPOINT_H + +#include + +#include "rdma_session.h" +#include "rdma_message.h" + +class RDMA_Buffer; +class RDMA_Session; + +class RDMA_Endpoint +{ +public: + friend class RDMA_Pre; + friend class RDMA_Buffer; + friend class RDMA_Message; + friend class RDMA_Session; + + RDMA_Endpoint(RDMA_Session* session, int ib_port); + ~RDMA_Endpoint(); + + void connect(); + void recv(); + void send_message(Message_type msgt); + void read_data(RDMA_Buffer* buffer, Remote_info msg); + +private: + int modify_qp_to_init(); + int modify_qp_to_rtr(); + int modify_qp_to_rts(); + + bool connected_; + int ib_port_; + + RDMA_Session* session_; + // Queue Pair + ibv_qp* qp_; + RDMA_Address self_, remote_; + // Message Buffer + RDMA_Message* message_; + //std::map buffer_table_; +}; + +#endif // !RDMA_ENDPOINT_H \ No newline at end of file diff --git a/src/rdma_message.cpp b/src/rdma_message.cpp new file mode 100755 index 0000000..ee1808d --- /dev/null +++ b/src/rdma_message.cpp @@ -0,0 +1,108 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_MESSAGE_CPP +************************************************ */ + +#include "rdma_message.h" + +std::string get_message(Message_type msgt) +{ + switch(msgt) + { + case RDMA_MESSAGE_ACK: + return "RDMA_MESSAGE_ACK"; + break; + case RDMA_MESSAGE_BUFFER_UNLOCK: + return "RDMA_MESSAGE_BUFFER_UNLOCK"; + break; + case RDMA_MESSAGE_READ_REQUEST: + return "RDMA_MESSAGE_READ_REQUEST"; + break; + case RDMA_MESSAGE_CLOSE: + return "RDMA_MESSAGE_CLOSE"; + break; + default: + return "UNKNOWN MESSAGE"; + } +} + +RDMA_Message::RDMA_Message(ibv_pd* pd, ibv_qp* qp) + : qp_(qp), pd_(pd) +{ + // Create Message Buffer ...... + incoming_ = new RDMA_Buffer(pd_, kMessageTotalBytes); + outgoing_ = new RDMA_Buffer(pd_, kMessageTotalBytes); + + log_info("RDMA_Message Created"); +} + +RDMA_Message::~RDMA_Message() +{ + delete incoming_; + delete outgoing_; + + log_info("RDMA_Message Deleted"); +} + +void RDMA_Message::write(uint32_t imm_data, size_t size) +{ + struct ibv_sge list; + list.addr = (uint64_t) outgoing_->buffer_; // Message + list.length = size; // Message size + list.lkey = outgoing_->mr_->lkey; + + struct ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = (uint64_t) this; // Which RDMA_Message send this message + wr.sg_list = &list; + wr.num_sge = 1; + wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr.send_flags = IBV_SEND_SIGNALED; + wr.imm_data = imm_data; + wr.wr.rdma.remote_addr = (uint64_t) remote_mr_.remote_addr; + wr.wr.rdma.rkey = remote_mr_.rkey; + + struct ibv_send_wr *bad_wr; + if (ibv_post_send(qp_, &wr, &bad_wr)) + { + log_error("Failed to send message: ibv_post_send error"); + } else + { + log_info(make_string("Message post: %s", get_message((Message_type)imm_data).data())); + } + +} + +void RDMA_Message::send(Message_type msgt, Remote_info* msg) +{ + switch(msgt) + { + case RDMA_MESSAGE_ACK: + case RDMA_MESSAGE_BUFFER_UNLOCK: + case RDMA_MESSAGE_CLOSE: + { + write(msgt, 0); + break; + } + case RDMA_MESSAGE_READ_REQUEST: + { + outgoing_->status_ = LOCK; + char* target = (char*)outgoing_->buffer_; + + char a[] = "helloworld"; + RDMA_Buffer* test_new = new RDMA_Buffer(pd_, sizeof(a)); + memcpy(test_new->buffer_, &a, sizeof(a)); + memcpy(&target[kBufferSizeStartIndex], &(test_new->size_), sizeof(test_new->size_)); + memcpy(&target[kRemoteAddrStartIndex], &(test_new->buffer_), sizeof(test_new->buffer_)); + memcpy(&target[kRkeyStartIndex], &(test_new->mr_->rkey), sizeof(test_new->mr_->rkey)); + + write(msgt, kMessageTotalBytes); + break; + } + default: + { + + } + } +} \ No newline at end of file diff --git a/src/rdma_message.h b/src/rdma_message.h new file mode 100755 index 0000000..80592d2 --- /dev/null +++ b/src/rdma_message.h @@ -0,0 +1,61 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_MESSAGE_H +************************************************ */ + +#ifndef RDMA_MESSAGE_H +#define RDMA_MESSAGE_H + +#include "rdma_buffer.h" + +enum Message_type +{ + RDMA_MESSAGE_ACK, + RDMA_MESSAGE_BUFFER_UNLOCK, + RDMA_MESSAGE_READ_REQUEST, + RDMA_MESSAGE_CLOSE +}; + +std::string get_message(Message_type msgt); + +struct Remote_info +{ + uint64_t buffer_size_; + uint64_t remote_addr_; + uint32_t rkey_; + + // |buffer_size|remote_addr|rkey| + // | 8B | 8B | 4B | +}; + +static const size_t kBufferSizeStartIndex = 0; +static const size_t kRemoteAddrStartIndex = kBufferSizeStartIndex + sizeof(Remote_info::buffer_size_); +static const size_t kRkeyStartIndex = kRemoteAddrStartIndex + sizeof(Remote_info::remote_addr_); +static const size_t kMessageTotalBytes = kRkeyStartIndex + sizeof(Remote_info::rkey_); + +class RDMA_Message +{ +public: + friend class RDMA_Pre; + friend class RDMA_Session; + friend class RDMA_Endpoint; + + RDMA_Message(ibv_pd* pd, ibv_qp* qp); + ~RDMA_Message(); + + void send(Message_type msgt, Remote_info* msg = NULL); + void write(uint32_t imm_data, size_t size); + +private: + void ParseMessage(Message_type& rm, void* buffer); + + ibv_pd* pd_; + ibv_qp* qp_; + + RDMA_Buffer* incoming_; + RDMA_Buffer* outgoing_; + Remote_MR remote_mr_; +}; + +#endif // !RDMA_MESSAGE_H \ No newline at end of file diff --git a/src/rdma_pre.cpp b/src/rdma_pre.cpp new file mode 100755 index 0000000..8e23274 --- /dev/null +++ b/src/rdma_pre.cpp @@ -0,0 +1,296 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_PRE_CPP +************************************************ */ + +#include +#include +#include +#include + +#include "rdma_pre.h" + +// structure to exchange data which is needed to connect the QPs +struct cm_con_data_t { + uint64_t maddr; // Buffer address + uint32_t mrkey; // Remote key + uint32_t qpn; // QP number + uint32_t lid; // LID of the IB port + uint32_t psn; +} __attribute__ ((packed)); + +RDMA_Pre::RDMA_Pre() +{ + log_info("RDMA_Pre Created"); +} + +RDMA_Pre::~RDMA_Pre() +{ + log_info("RDMA_Pre Deleted"); +} + +void RDMA_Pre::print_config() +{ + log_func(); + fprintf(stdout, " ------------------------------------------------\n"); + if (config.dev_name) fprintf(stdout, " Device name : \"%s\"\n", config.dev_name); + else fprintf(stdout, " Device name : No Default Device\n"); + fprintf(stdout, " IB port : %u\n", config.ib_port); + if (config.server_name) + fprintf(stdout, " IP : %s\n", config.server_name); + fprintf(stdout, " TCP port : %u\n", config.tcp_port); + fprintf(stdout, " ------------------------------------------------\n\n"); +} + +void RDMA_Pre::tcp_sock_connect() +{ + if (config.server_name) + { + remote_sock_ = sock_client_connect(config.server_name, config.tcp_port); + if (remote_sock_ < 0) { + log_error(make_string("failed to establish TCP connection to server %s, port %d", config.server_name, config.tcp_port)); + return; + } + } else + { + log_ok(make_string("waiting on port %d for TCP connection\n", config.tcp_port)); + remote_sock_ = sock_daemon_connect(config.tcp_port); + if (remote_sock_ < 0) { + log_error(make_string("failed to establish TCP connection with client on port %d", config.tcp_port)); + return; + } + } + log_ok("TCP connection was established"); +} + +int RDMA_Pre::tcp_endpoint_connect(RDMA_Endpoint* endpoint) +{ + struct cm_con_data_t local_con_data, tmp_con_data; + int rc; + + // exchange using TCP sockets info required to connect QPs + local_con_data.maddr = htonll((uintptr_t)endpoint->message_->incoming_->buffer_); + local_con_data.mrkey = htonl(endpoint->message_->incoming_->mr_->rkey); + local_con_data.qpn = htonl(endpoint->self_.qpn); + local_con_data.lid = htonl(endpoint->self_.lid); + local_con_data.psn = htonl(endpoint->self_.psn); + + log_info(make_string("Local QP number = 0x%x", endpoint->self_.qpn)); + log_info(make_string("Local LID = 0x%x", endpoint->self_.lid)); + log_info(make_string("Local PSN = 0x%x", endpoint->self_.psn)); + + if (sock_sync_data(remote_sock_, !config.server_name, sizeof(struct cm_con_data_t), &local_con_data, &tmp_con_data) < 0) + { + log_error("failed to exchange connection data between sides"); + return 1; + } + + endpoint->message_->remote_mr_.remote_addr = ntohll(tmp_con_data.maddr); + endpoint->message_->remote_mr_.rkey = ntohl(tmp_con_data.mrkey); + endpoint->remote_.qpn = ntohl(tmp_con_data.qpn); + endpoint->remote_.lid = ntohl(tmp_con_data.lid); + endpoint->remote_.psn = ntohl(tmp_con_data.psn); + + /* save the remote side attributes, we will need it for the post SR */ + + //fprintf(stdout, "Remote address = 0x%"PRIx64"\n", remote_con_data.addr); + //fprintf(stdout, "Remote rkey = 0x%x\n", remote_con_data.rkey); + log_info(make_string("Remote QP number = 0x%x", endpoint->remote_.qpn)); + log_info(make_string("Remote LID = 0x%x", endpoint->remote_.lid)); + log_info(make_string("Remote PSN = 0x%x", endpoint->remote_.psn)); +} + +/***************************************** +* Function: sock_daemon_connect +*****************************************/ +int RDMA_Pre::sock_daemon_connect(int port) +{ + struct addrinfo *res, *t; + struct addrinfo hints = { + .ai_flags = AI_PASSIVE, + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM + }; + char *service; + int n; + int sockfd = -1, connfd; + + if (asprintf(&service, "%d", port) < 0) { + log_error("asprintf failed"); + return -1; + } + + n = getaddrinfo(NULL, service, &hints, &res); + if (n < 0) { + log_error(make_string("%s for port %d", gai_strerror(n), port)); + free(service); + return -1; + } + + for (t = res; t; t = t->ai_next) { + sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol); + if (sockfd >= 0) { + n = 1; + + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n); + + if (!bind(sockfd, t->ai_addr, t->ai_addrlen)) + break; + close(sockfd); + sockfd = -1; + } + } + + freeaddrinfo(res); + free(service); + + if (sockfd < 0) { + log_error(make_string("couldn't listen to port %d", port)); + return -1; + } + + listen(sockfd, 1); + connfd = accept(sockfd, NULL, 0); + close(sockfd); + if (connfd < 0) { + log_error("accept() failed"); + return -1; + } + + return connfd; +} + +/***************************************** +* Function: sock_client_connect +*****************************************/ +int RDMA_Pre::sock_client_connect(const char *server_name, int port) +{ + struct addrinfo *res, *t; + struct addrinfo hints = { + .ai_flags = AI_PASSIVE, + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM + }; + + char *service; + int n; + int sockfd = -1; + + if (asprintf(&service, "%d", port) < 0) { + log_error("asprintf failed"); + return -1; + } + + n = getaddrinfo(server_name, service, &hints, &res); + if (n < 0) { + log_error(make_string("%s for %s:%d", gai_strerror(n), server_name, port)); + free(service); + return -1; + } + + for (t = res; t; t = t->ai_next) { + sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol); + if (sockfd >= 0) { + if (!connect(sockfd, t->ai_addr, t->ai_addrlen)) + break; + close(sockfd); + sockfd = -1; + } + } + freeaddrinfo(res); + free(service); + + if (sockfd < 0) { + log_error(make_string("couldn't connect to %s:%d", server_name, port)); + return -1; + } + + return sockfd; +} + +/***************************************** +* Function: sock_recv +*****************************************/ +int RDMA_Pre::sock_recv(int sock_fd, size_t size, void *buf) +{ + int rc; + +retry_after_signal: + rc = recv(sock_fd, buf, size, MSG_WAITALL); + if (rc != size) { + fprintf(stderr, "recv failed: %s, rc=%d\n", strerror(errno), rc); + + if ((errno == EINTR) && (rc != 0)) + goto retry_after_signal; /* Interrupted system call */ + if (rc) + return rc; + else + return -1; + } + + return 0; +} + +/***************************************** +* Function: sock_send +*****************************************/ +int RDMA_Pre::sock_send(int sock_fd, size_t size, const void *buf) +{ + int rc; + + +retry_after_signal: + rc = send(sock_fd, buf, size, 0); + + if (rc != size) { + fprintf(stderr, "send failed: %s, rc=%d\n", strerror(errno), rc); + + if ((errno == EINTR) && (rc != 0)) + goto retry_after_signal; /* Interrupted system call */ + if (rc) + return rc; + else + return -1; + } + + return 0; +} + +/***************************************** +* Function: sock_sync_data +*****************************************/ +int RDMA_Pre::sock_sync_data(int sock_fd, int is_daemon, size_t size, const void *out_buf, void *in_buf) +{ + int rc; + + + if (is_daemon) { + rc = sock_send(sock_fd, size, out_buf); + if (rc) + return rc; + + rc = sock_recv(sock_fd, size, in_buf); + if (rc) + return rc; + } else { + rc = sock_recv(sock_fd, size, in_buf); + if (rc) + return rc; + + rc = sock_send(sock_fd, size, out_buf); + if (rc) + return rc; + } + + return 0; +} + +/***************************************** +* Function: sock_sync_ready +*****************************************/ +int RDMA_Pre::sock_sync_ready(int sock_fd, int is_daemon) +{ + char cm_buf = 'a'; + return sock_sync_data(sock_fd, is_daemon, sizeof(cm_buf), &cm_buf, &cm_buf); +} \ No newline at end of file diff --git a/src/rdma_pre.h b/src/rdma_pre.h new file mode 100755 index 0000000..7116da9 --- /dev/null +++ b/src/rdma_pre.h @@ -0,0 +1,47 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_PRE_H +************************************************ */ + +#ifndef RDMA_PRE_H +#define RDMA_PRE_H + +#include "rdma_util.h" +#include "rdma_endpoint.h" + +static char LOCALHOST[] = "localhost"; + +class RDMA_Endpoint; + +class RDMA_Pre +{ +public: + RDMA_Pre(); + ~RDMA_Pre(); + + void print_config(); + void tcp_sock_connect(); + int tcp_endpoint_connect(RDMA_Endpoint* endpoint); + + config_t config = { + NULL, // dev_name + NULL, // server_name + 23333, // tcp_port + 1 // ib_port + }; + +private: + // tcp_socket + int sock_daemon_connect(int port); + int sock_client_connect(const char *server_name, int port); + int sock_recv(int sock_fd, size_t size, void *buf); + int sock_send(int sock_fd, size_t size, const void *buf); + int sock_sync_data(int sock_fd, int is_daemon, size_t size, const void *out_buf, void *in_buf); + int sock_sync_ready(int sock_fd, int is_daemon); + + int remote_sock_; + +}; + +#endif // !RDMA_PRE_H \ No newline at end of file diff --git a/src/rdma_session.cpp b/src/rdma_session.cpp new file mode 100755 index 0000000..572ac02 --- /dev/null +++ b/src/rdma_session.cpp @@ -0,0 +1,261 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_SESSION_CPP +************************************************ */ + +#include "rdma_session.h" +#include +#include + +#define MSG_SIZE 20 + +RDMA_Session::RDMA_Session(RDMA_Pre* pre) + : config(pre->config) +{ + // init all of the resources, so cleanup will be easy + if (open_ib_device()) + { + return; + //goto cleanup; + } + + /* allocate Protection Domain */ + pd_ = ibv_alloc_pd(context_); + if (!pd_) + { + log_error("Failed to allocate protection domain"); + //return 1; + } + + /* Create completion endpoint */ + event_channel_ = ibv_create_comp_channel(context_); + if (!event_channel_) + { + log_error("Failed to create completion endpoint"); + //return 1; + } + + cq_ = ibv_create_cq(context_, CQ_SIZE, NULL, event_channel_, 0); + if (!cq_) + { + log_error(make_string("failed to create CQ with %u entries", CQ_SIZE)); + //return 1; + } + + if (ibv_req_notify_cq(cq_, 0)) + { + log_error("Countn't request CQ notification"); + //goto cleanup; + } + + // ------- new thread + //thread_ = new std::thread(std::bind(&RDMA_Adapter::Adapter_processCQ, this)); + thread_.reset(new std::thread(std::bind(&RDMA_Session::session_processCQ, this))); + + endpoint_ = new RDMA_Endpoint(this, config.ib_port); + + pre->tcp_endpoint_connect(endpoint_); + + endpoint_->connect(); + + log_info("RDMA_Session Created"); +} + +RDMA_Session::~RDMA_Session() +{ + stop_process(); + + delete endpoint_; + + thread_.reset(); + + if (ibv_destroy_cq(cq_)) + { + log_error("Failed to destroy CQ"); + } + + if (ibv_destroy_comp_channel(event_channel_)) + { + log_error("Failed to destroy completion channel"); + } + + if (ibv_dealloc_pd(pd_)) + { + log_error("Failed to deallocate PD"); + } + + log_info("RDMA_Session Deleted"); +} + +int RDMA_Session::open_ib_device() +{ + int i, num_devices; + struct ibv_device *ib_dev = NULL; + struct ibv_device **dev_list; + + log_ok("Starting Resources Initialization"); + log_ok("Searching for IB devices in host"); + + // get device names in the system + dev_list = ibv_get_device_list(&num_devices); + if (!dev_list) + { + log_error("failed to get IB devices list"); + return 1; + } + + // if there isn't any IB device in host + if (!num_devices) + { + log_error(make_string("found %d device(s)", num_devices)); + return 1; + } + log_info(make_string("found %d device(s)", num_devices)); + + if (!config.dev_name) + { + ib_dev = *dev_list; + if (!ib_dev) + { + log_error("No IB devices found"); + return 1; + } + } else + { + // search for the specific device we want to work with + for (i = 0; i < num_devices; i ++) + { + if (!strcmp(ibv_get_device_name(dev_list[i]), config.dev_name)) + { + ib_dev = dev_list[i]; + break; + } + } + // if the device wasn't found in host + if (!ib_dev) + { + log_error(make_string("IB device %s wasn't found", config.dev_name)); + return 1; + } + } + + // get device handle + context_ = ibv_open_device(ib_dev); + if (!context_) + { + log_error(make_string("failed to open device %s", config.dev_name)); + return 1; + } + + return 0; +} + +void RDMA_Session::stop_process() +{ + thread_->join(); +} + +void RDMA_Session::session_processCQ() +{ + bool doit = true; + while (doit) + { + ibv_cq* cq; + void* cq_context; + + if (ibv_get_cq_event(event_channel_, &cq, &cq_context)) + { + log_error("Failed to get cq_event"); + } + + if (cq != cq_) + { + log_error(make_string("CQ event for unknown CQ %p", cq)); + } + + ibv_ack_cq_events(cq, 1); + + if (ibv_req_notify_cq(cq_, 0)) + { + log_error("Countn't request CQ notification"); + } + + int ne = ibv_poll_cq(cq_, CQ_SIZE, static_cast(wc_)); + // VAL(ne); + if (ne > 0) + for (int i=0;i(wc_[i].wr_id); + // Consumed a ibv_post_recv, so add one + rc->recv(); + + Message_type msgt = (Message_type)wc_[i].imm_data; + log_info(make_string("Message Recv: %s", get_message(msgt).data())); + switch(msgt) + { + case RDMA_MESSAGE_ACK: + rc->message_->outgoing_->status_ = IDLE; + break; + case RDMA_MESSAGE_BUFFER_UNLOCK: + // + break; + case RDMA_MESSAGE_READ_REQUEST: + { + char* temp = (char*)rc->message_->incoming_->buffer_; + Remote_info msg; + memcpy(&(msg.buffer_size_), &temp[kBufferSizeStartIndex], 8); + memcpy(&(msg.remote_addr_), &temp[kRemoteAddrStartIndex], 8); + memcpy(&(msg.rkey_), &temp[kRkeyStartIndex], 4); + rc->send_message(RDMA_MESSAGE_ACK); + + RDMA_Buffer* test_new = new RDMA_Buffer(pd_, msg.buffer_size_); + rc->read_data(test_new, msg); + + // RDMA_READ + break; + } + case RDMA_MESSAGE_CLOSE: + doit = false; + break; + default: + ; + } + break; + } + case IBV_WC_RDMA_WRITE: // Send RDMA Message or Data + { + // Which RDMA_Message send this message/data + RDMA_Message* rm = reinterpret_cast(wc_[i].wr_id); + log_info(make_string("Message Send Success")); + + break; + } + case IBV_WC_RDMA_READ: + { + RDMA_Buffer* rb = reinterpret_cast(wc_[i].wr_id); + char* temp = (char*)rb->buffer_; + log_info(make_string("RDMA Read: %s", temp)); + + + + break; + } + default: + { + log_error("Unsupported opcode"); + } + } + } + } +} \ No newline at end of file diff --git a/src/rdma_session.h b/src/rdma_session.h new file mode 100755 index 0000000..9e4377f --- /dev/null +++ b/src/rdma_session.h @@ -0,0 +1,53 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_SESSION_H +************************************************ */ + +#ifndef RDMA_SESSION_H +#define RDMA_SESSION_H + +#include "rdma_util.h" +#include "rdma_endpoint.h" +#include "rdma_pre.h" + +class RDMA_Endpoint; +class RDMA_Pre; + +class RDMA_Session +{ +public: + + friend class RDMA_Endpoint; + + RDMA_Session(RDMA_Pre* pre); + ~RDMA_Session(); + + void stop_process(); + + const static int CQ_SIZE = 1000; + // + RDMA_Endpoint* endpoint_; + +private: + int open_ib_device(); + void session_processCQ(); + + // device handle + ibv_context* context_; + // ibverbs protection domain + ibv_pd* pd_; + // Completion event endpoint, to wait for work completions + ibv_comp_channel* event_channel_; + // Completion queue, to poll on work completions + ibv_cq* cq_; + // Pre-allocated work completions array used for polling + ibv_wc wc_[CQ_SIZE]; + + //std::thread* thread_; + std::unique_ptr thread_; + + config_t config; +}; + +#endif // !RDMA_SESSION_H \ No newline at end of file diff --git a/src/rdma_util.cpp b/src/rdma_util.cpp new file mode 100755 index 0000000..064d90f --- /dev/null +++ b/src/rdma_util.cpp @@ -0,0 +1,35 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_UTIL_CPP +************************************************ */ + +#include "rdma_util.h" + +std::string make_string(const char *fmt, ...) +{ + char * sz; + va_list ap; + va_start(ap, fmt); + if (vasprintf(&sz, fmt, ap) == -1) + throw std::runtime_error("memory allocation failed\n"); + va_end(ap); + std::string str(sz); + free(sz); + + return str; +} + +std::mt19937_64* InitRng() +{ + std::random_device device("/dev/urandom"); + return new std::mt19937_64(device()); +} + +long long New64() +{ + static std::mt19937_64* rng = InitRng(); + // static mutex mu; + // mutex_lock l(mu); + return (*rng)(); +} \ No newline at end of file diff --git a/src/rdma_util.h b/src/rdma_util.h new file mode 100755 index 0000000..074bbfd --- /dev/null +++ b/src/rdma_util.h @@ -0,0 +1,134 @@ +/* *********************************************** +MYID : Chen Fan +LANG : G++ +PROG : RDMA_UTIL_H +************************************************ */ + +#ifndef RDMA_UTIL_H +#define RDMA_UTIL_H + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#define DEV_MODE + +// ---- WHEEL +#ifdef _WIN32 +#define RESET "" +#define BLACK "" +#define RED "" +#define GREEN "" +#define YELLOW "" +#define BLUE "" +#define MAGENTA "" +#define CYAN "" +#define WHITE "" +#define BOLD_BLACK "\033[1m\033[30m" +#define BOLD_RED "\033[1m\033[31m" +#define BOLD_GREEN "\033[1m\033[32m" +#define BOLD_YELLOW "\033[1m\033[33m" +#define BOLD_BLUE "\033[1m\033[34m" +#define BOLD_MAGENTA "\033[1m\033[35m" +#define BOLD_CYAN "\033[1m\033[36m" +#define BOLD_WHITE "\033[1m\033[37m" +#else +#define RESET "\033[0m" +#define BLACK "\033[30m" +#define RED "\033[31m" +#define GREEN "\033[32m" +#define YELLOW "\033[33m" +#define BLUE "\033[34m" +#define MAGENTA "\033[35m" +#define CYAN "\033[36m" +#define WHITE "\033[37m" +#define BOLD_BLACK "\033[1m\033[30m" +#define BOLD_RED "\033[1m\033[31m" +#define BOLD_GREEN "\033[1m\033[32m" +#define BOLD_YELLOW "\033[1m\033[33m" +#define BOLD_BLUE "\033[1m\033[34m" +#define BOLD_MAGENTA "\033[1m\033[35m" +#define BOLD_CYAN "\033[1m\033[36m" +#define BOLD_WHITE "\033[1m\033[37m" +#endif // _WIN32 + +#define DIM(a) (sizeof(a)/sizeof(a[0])) + +#define log_error(x) do{std::cerr << RED << "[Error] " << x << " @" << __FILE__ << ":" << __LINE__ << RESET << std::endl;}while(false) +#define log_warning(x) do{std::cerr << YELLOW << "[Warning] " << x << " @" << __FILE__ << ":" << __LINE__ << RESET << std::endl;}while(false) +#define log_ok(x) do{std::cerr << GREEN << "[Ok] " << x << " @" << __FILE__ << ":" << __LINE__ << RESET << std::endl;}while(false) + +#ifdef DEV_MODE +#define VAL(x) do{std::cout << #x << " = " << (x) << std::endl;}while(false) +#define VAL_ARRAY(a) do{std::cout << #a << " = ["; std::for_each(a, a + DIM(a), [](int val__){std::cout << " " << val__ << " ";}); std::cout << "]\n";}while(false) +#ifdef _WIN32 +#define log_func() do{std::cerr << CYAN << "[Function] " << __FUNCTION__ << RESET << std::endl;}while(false) +#else +#define log_func() do{std::cerr << CYAN << "[Function] " << __PRETTY_FUNCTION__ << RESET << std::endl;}while(false) +#endif // _WIN32 +#define log_info(x) do{std::cerr << CYAN << "[Info] " << x << " @" << __FILE__ << ":" << __LINE__ << RESET << std::endl;}while(false) +#else +#define VAL(x) +#define VAL_ARRAY(a) +#define log_func() +#define log_info(x) +#endif // DEV_MODE + +#define ADDR(x) ((void**)&(x)) + +std::string make_string(const char *fmt, ...); +// ---- WHEEL + +// ---- UTILS +std::mt19937_64* InitRng(); +long long New64(); + +#if __BYTE_ORDER == __LITTLE_ENDIAN +static inline uint64_t htonll(uint64_t x) { return bswap_64(x); } +static inline uint64_t ntohll(uint64_t x) { return bswap_64(x); } +#elif __BYTE_ORDER == __BIG_ENDIAN +static inline uint64_t htonll(uint64_t x) { return x; } +static inline uint64_t ntohll(uint64_t x) { return x; } +#else +#error __BYTE_ORDER is neither __LITTLE_ENDIAN nor __BIG_ENDIAN +#endif +// ---- UTILS + +// ---- RDMA +struct RDMA_Address +{ + uint32_t lid; + uint32_t qpn; + uint32_t psn; +}; + +struct Remote_MR +{ + uint64_t remote_addr; + uint32_t rkey; +}; + +/* structure of test parameters */ +struct config_t +{ + char *dev_name; /* IB device name */ + char *server_name; /* daemon host name */ + uint32_t tcp_port; /* daemon TCP port */ + int ib_port; /* local IB port to work with */ +}; + +// ---- RDMA + +#endif // !RDMA_UTIL_H \ No newline at end of file