From b7eda2bd2a1c6f229620dc7cc404355629daec1b Mon Sep 17 00:00:00 2001 From: jcf94 Date: Sun, 31 Mar 2019 13:01:09 +0800 Subject: [PATCH] Memorypool add success --- Makefile | 2 +- benchmark.cpp | 35 +++++++++++++++++++++-------------- main.cpp | 4 ++-- src/rdma_buffer.cpp | 13 +++++++++++-- src/rdma_buffer.h | 2 ++ src/rdma_channel.cpp | 1 + src/rdma_endpoint.cpp | 22 ++++++++++++++++++---- src/rdma_endpoint.h | 15 +++++++++------ src/rdma_session.cpp | 2 +- utils/MemoryPool | 2 +- 10 files changed, 67 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index 2a6b597..21dd092 100755 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ CC = g++ CFLAGS += -std=c++14 -Iutils/ThreadPool/src -Iutils/MemoryPool/src LD = g++ -LDFLAGS += -std=c++14 +LDFLAGS += -std=c++14 -Iutils/ThreadPool/src -Iutils/MemoryPool/src #NAME = $(wildcard *.cpp) NAME = benchmark.cpp diff --git a/benchmark.cpp b/benchmark.cpp index 56de041..4d2ae2f 100644 --- a/benchmark.cpp +++ b/benchmark.cpp @@ -17,12 +17,11 @@ PROG : benchmark #include "src/rdma_endpoint.h" #include "src/tcp_sock_pre.h" #include "src/rdma_message.h" +#include "src/rdma_buffer.h" -using namespace std; +#include "src/rdma_channel.h" -#define KB (1024) -#define MB (1024 * KB) -#define GB (1024 * MB) +using namespace std; int main(int argc, char* argv[]) { @@ -51,23 +50,29 @@ int main(int argc, char* argv[]) RDMA_Endpoint* endpoint = session.ptp_connect(&pre_tcp); // Prepare data - int total_data = 512 * MB; - int block_data = 1024 * KB; + int total_data = 256 * MB; + int block_data = 4 * KB; if (strcmp(argv[1], "s") == 0) { } else if (strcmp(argv[1], "c") == 0) { - char* test_data = (char*)malloc(total_data); + //char* test_data = (char*)malloc(total_data); + RDMA_Buffer* test_data = endpoint->bufferalloc(total_data); + printf("%p\n", test_data->buffer()); // Warm Up - endpoint->set_sync_barrier(total_data); - for (int i=0;isend_data((void*)(test_data+i), block_data); - } - endpoint->wait_for_sync(); + // endpoint->set_sync_barrier(total_data); + // for (int i=0;ibuffer()); + // endpoint->send_data(temp); + // //endpoint->send_rawdata((void*)(test_data+i), block_data); + // } + // endpoint->wait_for_sync(); + // log_ok(make_string("%d", endpoint->channel()->get_table_size())); log_ok("Test Start"); @@ -76,7 +81,9 @@ int main(int argc, char* argv[]) endpoint->set_sync_barrier(total_data); for (int i=0;isend_data((void*)(test_data+i), block_data); + RDMA_Buffer* temp = new RDMA_Buffer(test_data, i, block_data); + endpoint->send_data(temp); + //endpoint->send_rawdata((void*)(test_data+i), block_data); } endpoint->wait_for_sync(); diff --git a/main.cpp b/main.cpp index 28f112c..2413240 100755 --- a/main.cpp +++ b/main.cpp @@ -53,7 +53,7 @@ int main(int argc, char* argv[]) log_ok("Ready to Transport\n"); char a[] = "Hello World from Client !!!!!"; - endpoint->send_data((void*)a, sizeof(a)); + endpoint->send_rawdata((void*)a, sizeof(a)); char b[] = "Test Test Test Test Test Test Test Test Test"; @@ -61,7 +61,7 @@ int main(int argc, char* argv[]) { int com; std::cin >> com; - if (com == 0) endpoint->send_data((void*)b, sizeof(b)); + if (com == 0) endpoint->send_rawdata((void*)b, sizeof(b)); else { endpoint->close(); diff --git a/src/rdma_buffer.cpp b/src/rdma_buffer.cpp index 165118e..50511aa 100755 --- a/src/rdma_buffer.cpp +++ b/src/rdma_buffer.cpp @@ -19,6 +19,7 @@ RDMA_Buffer::RDMA_Buffer(RDMA_Channel* channel, ibv_pd* pd, int size, void* addr { buffer_ = addr; buffer_owned_ = false; + buffer_registerd_ = true; mr_ = ibv_reg_mr(pd, buffer_, size_, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ); if (!mr_) @@ -29,7 +30,7 @@ RDMA_Buffer::RDMA_Buffer(RDMA_Channel* channel, ibv_pd* pd, int size, void* addr { //log_info("ddddddddddddddddddddd"); //buffer_ = malloc(size); - memblock_ = (RDMA_MemBlock*)channel_->endpoint()->session()->mempool()->blockalloc(size); + memblock_ = (RDMA_MemBlock*)channel_->endpoint()->mempool()->blockalloc(size); buffer_ = memblock_->dataaddr(); mr_ = memblock_->mr(); buffer_owned_ = true; @@ -39,6 +40,14 @@ RDMA_Buffer::RDMA_Buffer(RDMA_Channel* channel, ibv_pd* pd, int size, void* addr log_info("RDMA_Buffer Created"); } +RDMA_Buffer::RDMA_Buffer(RDMA_Buffer* source, int offset, int size) + : buffer_owned_(false), channel_(source->channel_), + size_(size), mr_(source->mr_), memblock_(source->memblock_) +{ + buffer_ = (void*)(source->buffer_+offset) + log_info("RDMA_Buffer Copyed"); +} + RDMA_Buffer::~RDMA_Buffer() { if (buffer_owned_) @@ -46,7 +55,7 @@ RDMA_Buffer::~RDMA_Buffer() //free(buffer_); memblock_->free(); //channel_->endpoint()->session()->mempool()->travel(); - } else + } else if (buffer_registerd_) { if (ibv_dereg_mr(mr_)) { diff --git a/src/rdma_buffer.h b/src/rdma_buffer.h index ff84b9d..de1db84 100755 --- a/src/rdma_buffer.h +++ b/src/rdma_buffer.h @@ -21,6 +21,7 @@ class RDMA_Buffer { public: RDMA_Buffer(RDMA_Channel* channel, ibv_pd* pd, int size, void* addr = NULL); + RDMA_Buffer(RDMA_Buffer* source, int offset, int size); ~RDMA_Buffer(); // ----- Private To Public ----- @@ -31,6 +32,7 @@ class RDMA_Buffer private: bool buffer_owned_; + bool buffer_registerd_ = false; RDMA_Channel* channel_ = NULL; void* buffer_ = NULL; diff --git a/src/rdma_channel.cpp b/src/rdma_channel.cpp index 63ef752..bb0db45 100644 --- a/src/rdma_channel.cpp +++ b/src/rdma_channel.cpp @@ -81,6 +81,7 @@ void RDMA_Channel::request_read(RDMA_Buffer* buffer) { task_with_lock([this, buffer] { + //log_ok(make_string("%p", buffer->buffer())); insert_to_table((uint64_t)buffer->buffer(), (uint64_t)buffer); RDMA_Message::fill_message_content((char*)outgoing_->buffer(), buffer->buffer(), buffer->size(), buffer->mr()); diff --git a/src/rdma_endpoint.cpp b/src/rdma_endpoint.cpp index a48f207..e6566d0 100755 --- a/src/rdma_endpoint.cpp +++ b/src/rdma_endpoint.cpp @@ -11,8 +11,8 @@ PROG : RDMA_ENDPOINT_CPP #include "rdma_buffer.h" #include "rdma_channel.h" -RDMA_Endpoint::RDMA_Endpoint(ibv_pd* pd, ibv_cq* cq, ibv_context* context, int ib_port, int cq_size, RDMA_Session* session) - : pd_(pd), ib_port_(ib_port), connected_(false), session_(session) +RDMA_Endpoint::RDMA_Endpoint(ibv_pd* pd, ibv_cq* cq, ibv_context* context, int ib_port, int cq_size, RDMA_MemoryPool* mempool) + : pd_(pd), ib_port_(ib_port), connected_(false), mempool_(mempool) { // create the Queue Pair struct ibv_qp_init_attr qp_init_attr; @@ -134,10 +134,24 @@ void RDMA_Endpoint::close() // ---------------------------------------------- -void RDMA_Endpoint::send_data(void* addr, int size) +RDMA_Buffer* RDMA_Endpoint::bufferalloc(int size) +{ + RDMA_Buffer* new_buffer = new RDMA_Buffer(channel_, pd_, size); + return new_buffer; +} + +// ---------------------------------------------- + +void RDMA_Endpoint::send_data(RDMA_Buffer* buffer) +{ + channel_->request_read(buffer); +} + +void RDMA_Endpoint::send_rawdata(void* addr, int size) { RDMA_Buffer* new_buffer = new RDMA_Buffer(channel_, pd_, size, addr); - channel_->request_read(new_buffer); + //channel_->request_read(new_buffer); + send_data(new_buffer); } // ---------------------------------------------- diff --git a/src/rdma_endpoint.h b/src/rdma_endpoint.h index fbb076b..4b50bdb 100755 --- a/src/rdma_endpoint.h +++ b/src/rdma_endpoint.h @@ -23,26 +23,29 @@ struct RDMA_Endpoint_Info class RDMA_Channel; class RDMA_Buffer; -class RDMA_Session; +class RDMA_MemoryPool; class RDMA_Endpoint { public: - RDMA_Endpoint(ibv_pd* pd, ibv_cq* cq, ibv_context* context, int ib_port, int cq_size, RDMA_Session* session); + RDMA_Endpoint(ibv_pd* pd, ibv_cq* cq, ibv_context* context, int ib_port, int cq_size, RDMA_MemoryPool* mempool); ~RDMA_Endpoint(); struct cm_con_data_t get_local_con_data(); void connect(struct cm_con_data_t remote_con_data); void close(); - void send_data(void* addr, int size); + RDMA_Buffer* bufferalloc(int size); + + void send_data(RDMA_Buffer* buffer); + void send_rawdata(void* addr, int size); void set_sync_barrier(int size); void wait_for_sync(); // ----- Private To Public ----- inline RDMA_Channel* channel() const {return channel_;} - inline RDMA_Session* session() const {return session_;} + inline RDMA_MemoryPool* mempool() const {return mempool_;} bool connected_; @@ -62,8 +65,8 @@ class RDMA_Endpoint RDMA_Endpoint_Info self_, remote_; // Message channel RDMA_Channel* channel_ = NULL; - // Session - RDMA_Session* session_ = NULL; + // MemoryPool + RDMA_MemoryPool* mempool_ = NULL; }; #endif // !RDMA_ENDPOINT_H \ No newline at end of file diff --git a/src/rdma_session.cpp b/src/rdma_session.cpp index dffc6d2..0db81d2 100755 --- a/src/rdma_session.cpp +++ b/src/rdma_session.cpp @@ -111,7 +111,7 @@ void RDMA_Session::stop_process() RDMA_Endpoint* RDMA_Session::new_endpoint(RDMA_Pre* pre) { - RDMA_Endpoint* new_endpoint = new RDMA_Endpoint(pd_, cq_, context_, pre->config.ib_port, CQ_SIZE, this); + RDMA_Endpoint* new_endpoint = new RDMA_Endpoint(pd_, cq_, context_, pre->config.ib_port, CQ_SIZE, mempool_); endpoint_list_.push_back(new_endpoint); new_endpoint->connect(pre->exchange_qp_data(new_endpoint->get_local_con_data())); diff --git a/utils/MemoryPool b/utils/MemoryPool index 5567525..41b8cd6 160000 --- a/utils/MemoryPool +++ b/utils/MemoryPool @@ -1 +1 @@ -Subproject commit 55675255c86a81d251bbb468c56050c6da239d7e +Subproject commit 41b8cd65627d5fe11eeba19388e5e23c9e236b44