Skip to content

Commit

Permalink
Memorypool add success
Browse files Browse the repository at this point in the history
  • Loading branch information
jcf94 committed Mar 31, 2019
1 parent b6f6054 commit b7eda2b
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ CC = g++
CFLAGS += -std=c++14 -Iutils/ThreadPool/src -Iutils/MemoryPool/src

LD = g++
LDFLAGS += -std=c++14
LDFLAGS += -std=c++14 -Iutils/ThreadPool/src -Iutils/MemoryPool/src

#NAME = $(wildcard *.cpp)
NAME = benchmark.cpp
Expand Down
35 changes: 21 additions & 14 deletions benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ PROG : benchmark
#include "src/rdma_endpoint.h"
#include "src/tcp_sock_pre.h"
#include "src/rdma_message.h"
#include "src/rdma_buffer.h"

using namespace std;
#include "src/rdma_channel.h"

#define KB (1024)
#define MB (1024 * KB)
#define GB (1024 * MB)
using namespace std;

int main(int argc, char* argv[])
{
Expand Down Expand Up @@ -51,23 +50,29 @@ int main(int argc, char* argv[])
RDMA_Endpoint* endpoint = session.ptp_connect(&pre_tcp);

// Prepare data
int total_data = 512 * MB;
int block_data = 1024 * KB;
int total_data = 256 * MB;
int block_data = 4 * KB;

if (strcmp(argv[1], "s") == 0)
{

} else if (strcmp(argv[1], "c") == 0)
{
char* test_data = (char*)malloc(total_data);
//char* test_data = (char*)malloc(total_data);
RDMA_Buffer* test_data = endpoint->bufferalloc(total_data);
printf("%p\n", test_data->buffer());

// Warm Up
endpoint->set_sync_barrier(total_data);
for (int i=0;i<total_data;i+=block_data)
{
endpoint->send_data((void*)(test_data+i), block_data);
}
endpoint->wait_for_sync();
// endpoint->set_sync_barrier(total_data);
// for (int i=0;i<total_data;i+=block_data)
// {
// RDMA_Buffer* temp = new RDMA_Buffer(test_data, i, block_data);
// //printf("%p\n", temp->buffer());
// endpoint->send_data(temp);
// //endpoint->send_rawdata((void*)(test_data+i), block_data);
// }
// endpoint->wait_for_sync();
// log_ok(make_string("%d", endpoint->channel()->get_table_size()));

log_ok("Test Start");

Expand All @@ -76,7 +81,9 @@ int main(int argc, char* argv[])
endpoint->set_sync_barrier(total_data);
for (int i=0;i<total_data;i+=block_data)
{
endpoint->send_data((void*)(test_data+i), block_data);
RDMA_Buffer* temp = new RDMA_Buffer(test_data, i, block_data);
endpoint->send_data(temp);
//endpoint->send_rawdata((void*)(test_data+i), block_data);
}
endpoint->wait_for_sync();

Expand Down
4 changes: 2 additions & 2 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ int main(int argc, char* argv[])
log_ok("Ready to Transport\n");

char a[] = "Hello World from Client !!!!!";
endpoint->send_data((void*)a, sizeof(a));
endpoint->send_rawdata((void*)a, sizeof(a));

char b[] = "Test Test Test Test Test Test Test Test Test";

while (1)
{
int com;
std::cin >> com;
if (com == 0) endpoint->send_data((void*)b, sizeof(b));
if (com == 0) endpoint->send_rawdata((void*)b, sizeof(b));
else
{
endpoint->close();
Expand Down
13 changes: 11 additions & 2 deletions src/rdma_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ RDMA_Buffer::RDMA_Buffer(RDMA_Channel* channel, ibv_pd* pd, int size, void* addr
{
buffer_ = addr;
buffer_owned_ = false;
buffer_registerd_ = true;
mr_ = ibv_reg_mr(pd, buffer_, size_,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ);
if (!mr_)
Expand All @@ -29,7 +30,7 @@ RDMA_Buffer::RDMA_Buffer(RDMA_Channel* channel, ibv_pd* pd, int size, void* addr
{
//log_info("ddddddddddddddddddddd");
//buffer_ = malloc(size);
memblock_ = (RDMA_MemBlock*)channel_->endpoint()->session()->mempool()->blockalloc(size);
memblock_ = (RDMA_MemBlock*)channel_->endpoint()->mempool()->blockalloc(size);
buffer_ = memblock_->dataaddr();
mr_ = memblock_->mr();
buffer_owned_ = true;
Expand All @@ -39,14 +40,22 @@ RDMA_Buffer::RDMA_Buffer(RDMA_Channel* channel, ibv_pd* pd, int size, void* addr
log_info("RDMA_Buffer Created");
}

RDMA_Buffer::RDMA_Buffer(RDMA_Buffer* source, int offset, int size)
: buffer_owned_(false), channel_(source->channel_),
size_(size), mr_(source->mr_), memblock_(source->memblock_)
{
buffer_ = (void*)(source->buffer_+offset)
log_info("RDMA_Buffer Copyed");
}

RDMA_Buffer::~RDMA_Buffer()
{
if (buffer_owned_)
{
//free(buffer_);
memblock_->free();
//channel_->endpoint()->session()->mempool()->travel();
} else
} else if (buffer_registerd_)
{
if (ibv_dereg_mr(mr_))
{
Expand Down
2 changes: 2 additions & 0 deletions src/rdma_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class RDMA_Buffer
{
public:
RDMA_Buffer(RDMA_Channel* channel, ibv_pd* pd, int size, void* addr = NULL);
RDMA_Buffer(RDMA_Buffer* source, int offset, int size);
~RDMA_Buffer();

// ----- Private To Public -----
Expand All @@ -31,6 +32,7 @@ class RDMA_Buffer

private:
bool buffer_owned_;
bool buffer_registerd_ = false;
RDMA_Channel* channel_ = NULL;

void* buffer_ = NULL;
Expand Down
1 change: 1 addition & 0 deletions src/rdma_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void RDMA_Channel::request_read(RDMA_Buffer* buffer)
{
task_with_lock([this, buffer]
{
//log_ok(make_string("%p", buffer->buffer()));
insert_to_table((uint64_t)buffer->buffer(), (uint64_t)buffer);

RDMA_Message::fill_message_content((char*)outgoing_->buffer(), buffer->buffer(), buffer->size(), buffer->mr());
Expand Down
22 changes: 18 additions & 4 deletions src/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ PROG : RDMA_ENDPOINT_CPP
#include "rdma_buffer.h"
#include "rdma_channel.h"

RDMA_Endpoint::RDMA_Endpoint(ibv_pd* pd, ibv_cq* cq, ibv_context* context, int ib_port, int cq_size, RDMA_Session* session)
: pd_(pd), ib_port_(ib_port), connected_(false), session_(session)
RDMA_Endpoint::RDMA_Endpoint(ibv_pd* pd, ibv_cq* cq, ibv_context* context, int ib_port, int cq_size, RDMA_MemoryPool* mempool)
: pd_(pd), ib_port_(ib_port), connected_(false), mempool_(mempool)
{
// create the Queue Pair
struct ibv_qp_init_attr qp_init_attr;
Expand Down Expand Up @@ -134,10 +134,24 @@ void RDMA_Endpoint::close()

// ----------------------------------------------

void RDMA_Endpoint::send_data(void* addr, int size)
RDMA_Buffer* RDMA_Endpoint::bufferalloc(int size)
{
RDMA_Buffer* new_buffer = new RDMA_Buffer(channel_, pd_, size);
return new_buffer;
}

// ----------------------------------------------

void RDMA_Endpoint::send_data(RDMA_Buffer* buffer)
{
channel_->request_read(buffer);
}

void RDMA_Endpoint::send_rawdata(void* addr, int size)
{
RDMA_Buffer* new_buffer = new RDMA_Buffer(channel_, pd_, size, addr);
channel_->request_read(new_buffer);
//channel_->request_read(new_buffer);
send_data(new_buffer);
}

// ----------------------------------------------
Expand Down
15 changes: 9 additions & 6 deletions src/rdma_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,29 @@ struct RDMA_Endpoint_Info

class RDMA_Channel;
class RDMA_Buffer;
class RDMA_Session;
class RDMA_MemoryPool;

class RDMA_Endpoint
{
public:
RDMA_Endpoint(ibv_pd* pd, ibv_cq* cq, ibv_context* context, int ib_port, int cq_size, RDMA_Session* session);
RDMA_Endpoint(ibv_pd* pd, ibv_cq* cq, ibv_context* context, int ib_port, int cq_size, RDMA_MemoryPool* mempool);
~RDMA_Endpoint();

struct cm_con_data_t get_local_con_data();
void connect(struct cm_con_data_t remote_con_data);
void close();

void send_data(void* addr, int size);
RDMA_Buffer* bufferalloc(int size);

void send_data(RDMA_Buffer* buffer);
void send_rawdata(void* addr, int size);

void set_sync_barrier(int size);
void wait_for_sync();

// ----- Private To Public -----
inline RDMA_Channel* channel() const {return channel_;}
inline RDMA_Session* session() const {return session_;}
inline RDMA_MemoryPool* mempool() const {return mempool_;}

bool connected_;

Expand All @@ -62,8 +65,8 @@ class RDMA_Endpoint
RDMA_Endpoint_Info self_, remote_;
// Message channel
RDMA_Channel* channel_ = NULL;
// Session
RDMA_Session* session_ = NULL;
// MemoryPool
RDMA_MemoryPool* mempool_ = NULL;
};

#endif // !RDMA_ENDPOINT_H
2 changes: 1 addition & 1 deletion src/rdma_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void RDMA_Session::stop_process()

RDMA_Endpoint* RDMA_Session::new_endpoint(RDMA_Pre* pre)
{
RDMA_Endpoint* new_endpoint = new RDMA_Endpoint(pd_, cq_, context_, pre->config.ib_port, CQ_SIZE, this);
RDMA_Endpoint* new_endpoint = new RDMA_Endpoint(pd_, cq_, context_, pre->config.ib_port, CQ_SIZE, mempool_);
endpoint_list_.push_back(new_endpoint);
new_endpoint->connect(pre->exchange_qp_data(new_endpoint->get_local_con_data()));

Expand Down
2 changes: 1 addition & 1 deletion utils/MemoryPool
Submodule MemoryPool updated 1 files
+3 −2 src/blocklist.h

0 comments on commit b7eda2b

Please sign in to comment.