-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathverbs_wrap.cpp
266 lines (224 loc) · 10.3 KB
/
verbs_wrap.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
#include "verbs_wrap.hpp"
#include <iostream>
#include <string>
#include <cinttypes>
#include <cstdlib>
#include <cstring>
////////////////////////
/// Helper functions
////////////////////////
// die, TEST_NZ, TEST_Z adapted from:
// https://sites.google.com/a/bedeir.com/home/basic-rdma-client-server.tar.gz?attredirects=0&d=1
static void die(const std::string reason) {
std::cerr << reason << std::endl;
exit(EXIT_FAILURE);
}
// TEST_NZ and TEST_Z are macros to automatically check returned errors on
// function calls and exit with an informative message.
#define TEST_NULL(x) do { if ((x) == nullptr) die(__FILE__ ":" + std::to_string(__LINE__) + " " "error: " #x " failed (returned NULL)." ); } while (0)
#define TEST_Z(x) do { if ((x) == 0) die(__FILE__ ":" + std::to_string(__LINE__) + " " "error: " #x " failed (returned zero)."); } while (0)
#define TEST_NZ(x) do { if ((x) != 0) die(__FILE__ ":" + std::to_string(__LINE__) + " " "error: " #x " failed (returned non-zero)." ); } while (0)
#define TEST_NEG(x) do { if ((x) < 0) die(__FILE__ ":" + std::to_string(__LINE__) + " " "error: " #x " failed (returned non-zero)." ); } while (0)
//////////////////////////////
/// Configuration constants
//////////////////////////////
// We use the following constants to choose device ports, tune performance
// parameters, etc. for the RDMA setup.
// We use these hardcoded values instead of command line parameters.
/*** CreateContext ***/
// The name of the InfiniBand device (sort of like a network interface card)
// we want to choose.
//
// I found this name with 'ibv_get_device_name'. Some Sampa nodes have more
// than 1 device, but for some reason, the device that
// isn't mlx4_0 doesn't work.
static const std::string DEVICE_NAME = "mlx4_0";
// Valid port numbers go from 1 to n. We'll just use the first port.
static const uint8_t PHYS_PORT_NUM = 1;
/*** CreateCompletionQueue ***/
static const int COMPLETION_QUEUE_ENTRIES = 256;
/*** CreateQueuePair ***/
static const uint32_t MAX_SEND_WR = 16; // how many operations per queue should we be able to enqueue at a time?
static const uint32_t MAX_RECV_WR = 1; // only need 1 if we're just using RDMA ops
static const uint32_t MAX_SEND_SGE = 1; // how many SGE's do we allow per send?
static const uint32_t MAX_RECV_SGE = 1; // how many SGE's do we allow per receive?
static const uint32_t MAX_INLINE_DATA = 16; // message rate drops from 6M/s to 4M/s at 29 bytes
/*** RegisterMemory and ConnectQueuePair ***/
static const enum ibv_access_flags ACCESS_FLAGS =
(enum ibv_access_flags) // cast so compiler doesn't complain
(IBV_ACCESS_LOCAL_WRITE | // we allow all operations except memory windows
IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_ATOMIC);
/*** ConnectQueuePair ***/
// init -> rtr
static const enum ibv_mtu PATH_MTU = IBV_MTU_512; // use lowest to be safe
static const uint32_t RQ_PSN = 0; // needs to match SQ_PSN
static const uint8_t MAX_DEST_RD_ATOMIC = 16; // how many outstanding reads/atomic ops are allowed? (remote end of qp, limited by card)
static const uint8_t MIN_RNR_TIMER = 12; // Mellanox recommendation
// rtr -> rts
static const uint8_t TIMEOUT = 14; // Mellanox recommendation
static const uint8_t RETRY_CNT = 7; // Mellanox recommendation
static const uint8_t RNR_RETRY = 7; // Mellanox recommendation
static const uint32_t SQ_PSN = RQ_PSN; // needs to match RQ_PSN
static const uint16_t MAX_RD_ATOMIC = 16;
///////////////////////////////
/// Function implementations
///////////////////////////////
// CreateContext tries to create an ibv_context from InfiniBand device 0.
// For this program, each process only needs 1 context to setup the necessary
// RDMA structures, so this should be called exactly once in the program.
//
struct ibv_context *CreateContext() {
static bool ran_once = false;
if (ran_once)
die("CreateContext already ran once");
else
ran_once = true;
// Get a list of InfiniBand devices, and the number of devices.
// These devices are physical cards on the machine.
// Each Sampa node has a single IB device.
int num_devices;
struct ibv_device **device_list;
TEST_NULL(device_list = ibv_get_device_list(&num_devices));
if (num_devices == 0) {
die("CreateContext: no InfiniBand devices found");
}
// std::cout << "CreateContext: found " << num_devices << " device(s)" << std::endl;
// Choose a device.
struct ibv_device *device = nullptr;
for( int i = 0; i < num_devices; ++i ) {
const char *device_name;
TEST_NULL(device_name = ibv_get_device_name(device_list[i]));
if (DEVICE_NAME == ibv_get_device_name(device_list[i])) {
device = device_list[i];
}
}
if (device == nullptr) {
die("CreateContext: desired device not found");
}
// Create a context from the device. (Sort of like opening a file.)
struct ibv_context *context;
TEST_NULL(context = ibv_open_device(device));
std::cout << "CreateContext: success" << std::endl;
return context;
}
// We'll use 1 completion queue to check that our RDMA ops have completed.
struct ibv_cq *CreateCompletionQueue(ibv_context *context) {
struct ibv_cq *cq;
TEST_NULL(cq = ibv_create_cq(context, COMPLETION_QUEUE_ENTRIES, NULL, NULL, 0));
// std::cout << "CreateCompletionQueue: success" << std::endl;
return cq;
}
// If we're the consumer, this protection domain should hold the RDMA memory
// and queue pairs for each producer.
// If we're the producer, this PD should hold just 1 queue pair that's
// connected to a consumer queue pair.
struct ibv_pd *CreateProtectionDomain(ibv_context *context) {
ibv_pd *pd;
TEST_NULL(pd = ibv_alloc_pd(context));
// std::cout << "CreateProtectionDomain: success" << std::endl;
return pd;
}
uint16_t GetLid(ibv_context *context) {
struct ibv_port_attr port_attr;
TEST_NZ(ibv_query_port(context, 1, &port_attr));
return port_attr.lid;
}
struct ibv_qp *CreateQueuePair(ibv_pd *pd, ibv_cq *cq) {
struct ibv_qp_init_attr qp_init_attr = {
NULL, // void *qp_context: not used
cq, // struct ibv_cq *send_cq
cq, // struct ibv_cq *recv_cq
NULL, // struct ibv_srq *srq
{ // struct ibv_qp_cap cap
MAX_RECV_WR, // uint32_t max_send_wr
MAX_SEND_WR, // uint32_t max_recv_wr
MAX_SEND_SGE, // uint32_t max_send_sge
MAX_RECV_SGE, // uint32_t max_recv_sge
MAX_INLINE_DATA, // uint32_t max_inline_data
},
IBV_QPT_RC, // enum ibv_qp_type qp_type: need IBV_QPT_RC for atomic ops
1, // int sq_sig_all: yes, we want to all ops to generate the completion queue events
NULL, // struct ibv_xrc_domain *xrc_domain: not using XRC
};
struct ibv_qp *qp;
TEST_NULL(qp = ibv_create_qp(pd, &qp_init_attr));
std::cout << "CreateQueuePair: success, qp_num " << qp->qp_num << std::endl;
return qp;
}
ibv_mr *RegisterMemory(ibv_pd *pd, void *addr, size_t length) {
struct ibv_mr *mr;
TEST_NULL(mr = ibv_reg_mr(pd, addr, length, ACCESS_FLAGS));
std::cout << "RegisterMemory: success" << std::endl;
return mr;
}
// We want to get 2 processes (which may/may not be on differet machines) to
// talk with each other over RDMA. We do this by connecting their respective
// queue pairs, which involves some bizarre incantations. (See 3.5)
//
// This function needs to be run on both sides of the connection.
//
// Each RDMA device has a local identifier (LID) that's unique across the
// network of machines. Each queue pair created on an LID has a unique qp_num.
// (See Glossary for LID definition)
// So the combination of remote_lid and remote_qp_num identifies an RDMA
// process.
//
// remote_lid and remote_qp_num should be learned through some other
// communication mechanism, such as TCP sockets, message passing interface (MPI)
// or RDMA communication manager (RDMA CM).
//
void ConnectQueuePair(ibv_qp *local_qp, int remote_lid, int remote_qp_num) {
// qp: reset -> init
struct ibv_qp_attr init_qp_attr;
std::memset(&init_qp_attr, 0, sizeof(init_qp_attr));
init_qp_attr.qp_state = IBV_QPS_INIT;
init_qp_attr.pkey_index = 0; // not quite sure what this partition key means, 0 seems to be the right choice.
init_qp_attr.port_num = PHYS_PORT_NUM; // IB device port num (sampa has 1 physical port)
init_qp_attr.qp_access_flags = ACCESS_FLAGS;
TEST_NZ(ibv_modify_qp(local_qp, &init_qp_attr,
IBV_QP_STATE | // we need to pass a flag in for each ibv_qp_attr field we set (See 3.5)
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS));
// qp: init -> rtr
struct ibv_qp_attr rtr_qp_attr;
std::memset(&rtr_qp_attr, 0, sizeof(rtr_qp_attr));
rtr_qp_attr.qp_state = IBV_QPS_RTR;
rtr_qp_attr.path_mtu = PATH_MTU; // lowest mtu to be safe
rtr_qp_attr.ah_attr.dlid = remote_lid; // destination LID
rtr_qp_attr.ah_attr.port_num = PHYS_PORT_NUM; // destination port num
rtr_qp_attr.ah_attr.is_global = 0; // destination port num
rtr_qp_attr.ah_attr.sl = 0; // destination port num
rtr_qp_attr.ah_attr.src_path_bits = 0; // destination port num
rtr_qp_attr.dest_qp_num = remote_qp_num;
rtr_qp_attr.rq_psn = RQ_PSN; // starting recv packet sequence number
rtr_qp_attr.max_dest_rd_atomic = MAX_DEST_RD_ATOMIC; // resources for incoming RDMA requests
rtr_qp_attr.min_rnr_timer = MIN_RNR_TIMER; // Mellanox recommendation
TEST_NZ(ibv_modify_qp(local_qp, &rtr_qp_attr,
IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_MIN_RNR_TIMER));
// qp: rtr -> rts
struct ibv_qp_attr rts_qp_attr;
memset(&rts_qp_attr, 0, sizeof(rts_qp_attr));
rts_qp_attr.qp_state = IBV_QPS_RTS;
rts_qp_attr.timeout = TIMEOUT; // Mellanox recommendation
rts_qp_attr.retry_cnt = RETRY_CNT; // Mellanox recommendation
rts_qp_attr.rnr_retry = RNR_RETRY; // Mellanox recommendation
rts_qp_attr.sq_psn = SQ_PSN; // send packet sequence number, should match rq_psn
rts_qp_attr.max_rd_atomic = MAX_RD_ATOMIC; // # of outstanding RDMA reads and atomic ops allowed
TEST_NZ(ibv_modify_qp(local_qp, &rts_qp_attr,
IBV_QP_STATE |
IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC));
std::cout << "ConnectQueuePair: success" << std::endl;
}