From 3263eadd38a414920ea2d62b9169e247439d9038 Mon Sep 17 00:00:00 2001 From: Zhang690683220 Date: Tue, 21 May 2024 10:58:00 -0600 Subject: [PATCH] clean code for dspaces_cuda_get --- include/gspace.h | 3 - include/ss_data.h | 13 ---- src/dspaces-client.c | 181 +++++++++++++++++-------------------------- src/dspaces-server.c | 3 - src/ss_data.c | 38 --------- 5 files changed, 70 insertions(+), 168 deletions(-) diff --git a/include/gspace.h b/include/gspace.h index 79b00ec7..e6f7d973 100644 --- a/include/gspace.h +++ b/include/gspace.h @@ -60,9 +60,6 @@ struct ds_gspace { /* Pending object descriptors for draining. */ struct list_head obj_desc_drain_list; - /* Pending requests for dual channel put. */ - struct list_head dc_req_list; - int rank; int size_sp; char **server_address; diff --git a/include/ss_data.h b/include/ss_data.h index ca8edb95..3f7cc2b0 100644 --- a/include/ss_data.h +++ b/include/ss_data.h @@ -87,15 +87,6 @@ struct obj_data { unsigned int f_free : 1; }; -struct dc_request { - struct list_head entry; - struct obj_data* od; - /* margo request for bulk_itransfer(); 0 - gdr; 1 - host */ - margo_request * margo_req; - hg_bulk_t* bulk_handle; - int f_error; -}; - /* A view in the matrix allows to extract any subset of values from a matrix. @@ -520,8 +511,4 @@ struct lock_data *create_lock(struct list_head *list, char *name); char **addr_str_buf_to_list(char *buf, int num_addrs); -struct dc_request *dc_req_alloc(struct obj_data *od); -struct dc_request *dc_req_find(struct list_head *dc_req_list, obj_descriptor *odsc); -void dc_req_free(struct dc_request *dc_req); - #endif /* __SS_DATA_H_ */ diff --git a/src/dspaces-client.c b/src/dspaces-client.c index 36b85c54..39481e47 100644 --- a/src/dspaces-client.c +++ b/src/dspaces-client.c @@ -30,10 +30,6 @@ #include #endif /* HAVE_DRC */ -// #ifdef HAVE_GDRCOPY -// #include -// #endif - #include #ifdef USE_APEX @@ -711,17 +707,11 @@ static int dspaces_init_margo(dspaces_client_t client, "{ \"use_progress_thread\" : false, \"rpc_thread_count\" : 0, " "\"handle_cache_size\" : 64}"); hii.request_post_init = 1024; - hii.auto_sm = 0; + hii.auto_sm = false; + hii.no_bulk_eager = true; + hii.na_init_info.request_mem_device = true; mii.hg_init_info = &hii; mii.json_config = margo_conf; - hii.auto_sm = false; - if(client->cuda_info.cuda_put_mode == 1 && client->cuda_info.cuda_get_mode != 2) { - hii.no_bulk_eager=0; - hii.na_init_info.request_mem_device = false; - } else { - hii.no_bulk_eager=1; - hii.na_init_info.request_mem_device = true; - } mii.hg_init_info = &hii; mii.json_config = margo_conf; ABT_init(0, NULL); @@ -1633,11 +1623,11 @@ static int finalize_req(struct dspaces_put_req *req) return ret; } -static struct dspaces_put_req *dspaces_cpu_iput(dspaces_client_t client, - const char *var_name, unsigned int ver, - int elem_size, int ndim, uint64_t *lb, - uint64_t *ub, void *data, int alloc, - int check, int free) +struct dspaces_put_req *dspaces_iput(dspaces_client_t client, + const char *var_name, unsigned int ver, + int elem_size, int ndim, uint64_t *lb, + uint64_t *ub, void *data, int alloc, + int check, int free) { hg_addr_t server_addr; hg_return_t hret; @@ -1928,12 +1918,9 @@ static int get_data(dspaces_client_t client, int num_odscs, return 0; } -static int get_data_baseline(dspaces_client_t client, int num_odscs, - obj_descriptor req_obj, obj_descriptor *odsc_tab, - void *data, double *ctime) +static int cuda_get_data_baseline(dspaces_client_t client, int num_odscs, + obj_descriptor req_obj, obj_descriptor *odsc_tab, void *data) { - struct timeval start, end; - double timer = 0; // timer in second bulk_in_t *in; in = (bulk_in_t *)malloc(sizeof(bulk_in_t) * num_odscs); @@ -1949,6 +1936,9 @@ static int get_data_baseline(dspaces_client_t client, int num_odscs, od[i] = obj_data_alloc(&odsc_tab[i]); in[i].odsc.size = sizeof(obj_descriptor); in[i].odsc.raw_odsc = (char *)(&odsc_tab[i]); + /* CPU->GPU transfer don't support lz4 compression now. We need + * the client to set the flag. */ + in[i].flags = in[i].flags | DS_NO_COMPRESS; hg_size_t rdma_size = (req_obj.size) * bbox_volume(&odsc_tab[i].bb); @@ -1986,10 +1976,7 @@ static int get_data_baseline(dspaces_client_t client, int num_odscs, margo_bulk_free(in[i].handle); margo_destroy(hndl[i]); // copy received data into user return buffer - gettimeofday(&start, NULL); ssd_copy(return_od, od[i]); - gettimeofday(&end, NULL); - timer += (end.tv_sec - start.tv_sec) * 1e3 + (end.tv_usec - start.tv_usec) * 1e-3; obj_data_free(od[i]); } free(hndl); @@ -1998,16 +1985,12 @@ static int get_data_baseline(dspaces_client_t client, int num_odscs, free(in); free(return_od); - *ctime = timer; return 0; } -static int get_data_gdr(dspaces_client_t client, int num_odscs, - obj_descriptor req_obj, obj_descriptor *odsc_tab, - void *d_data, double *ctime) +static int cuda_get_data_gdr(dspaces_client_t client, int num_odscs, + obj_descriptor req_obj, obj_descriptor *odsc_tab, void *d_data) { - struct timeval start, end; - double timer = 0; // timer in second int ret = dspaces_SUCCESS; bulk_in_t *in; in = (bulk_in_t *)malloc(sizeof(bulk_in_t) * num_odscs); @@ -2041,6 +2024,9 @@ static int get_data_gdr(dspaces_client_t client, int num_odscs, od[i] = obj_data_alloc_cuda(&odsc_tab[i]); in[i].odsc.size = sizeof(obj_descriptor); in[i].odsc.raw_odsc = (char *)(&odsc_tab[i]); + /* CPU->GPU transfer don't support lz4 compression now. We need + * the client to set the flag. */ + in[i].flags = in[i].flags | DS_NO_COMPRESS; hg_size_t rdma_size = (req_obj.size) * bbox_volume(&odsc_tab[i].bb); @@ -2113,7 +2099,6 @@ static int get_data_gdr(dspaces_client_t client, int num_odscs, margo_destroy(hndl[i]); // copy received data into user return buffer if(client->cuda_info.concurrency_enabled) { - gettimeofday(&start, NULL); ret = ssd_copy_cuda_async(return_od, od[i], &stream[i%stream_size]); if(ret != dspaces_SUCCESS) { fprintf(stderr, "ERROR: (%s): ssd_copy_cuda_async() failed, Err Code: (%s)\n", @@ -2130,9 +2115,7 @@ static int get_data_gdr(dspaces_client_t client, int num_odscs, free(in); return dspaces_ERR_CUDA; } - gettimeofday(&end, NULL); } else { - gettimeofday(&start, NULL); ret = ssd_copy_cuda(return_od, od[i]); if(ret != dspaces_SUCCESS) { fprintf(stderr, "ERROR: (%s): ssd_copy_cuda() failed, Err Code: (%s)\n", @@ -2149,14 +2132,11 @@ static int get_data_gdr(dspaces_client_t client, int num_odscs, free(in); return dspaces_ERR_CUDA; } - gettimeofday(&end, NULL); obj_data_free_cuda(od[i]); } - timer += (end.tv_sec - start.tv_sec) * 1e3 + (end.tv_usec - start.tv_usec) * 1e-3; } if(client->cuda_info.concurrency_enabled) { - gettimeofday(&start, NULL); for(int i = 0; i < stream_size; i++) { curet = cudaStreamSynchronize(stream[i]); if(curet != cudaSuccess) { @@ -2190,8 +2170,6 @@ static int get_data_gdr(dspaces_client_t client, int num_odscs, return dspaces_ERR_CUDA; } } - gettimeofday(&end, NULL); - timer += (end.tv_sec - start.tv_sec) * 1e3 + (end.tv_usec - start.tv_usec) * 1e-3; free(stream); for(int i = 0; i < num_odscs; i++) { @@ -2206,16 +2184,13 @@ static int get_data_gdr(dspaces_client_t client, int num_odscs, free(in); free(return_od); - *ctime = timer; return ret; } -static int get_data_hybrid(dspaces_client_t client, int num_odscs, - obj_descriptor req_obj, obj_descriptor *odsc_tab, - void *d_data, double *ctime) +static int cuda_get_data_hybrid(dspaces_client_t client, int num_odscs, + obj_descriptor req_obj, obj_descriptor *odsc_tab, + void *d_data) { - struct timeval start, end; - double timer = 0; // timer in second int ret = dspaces_SUCCESS; bulk_in_t *in; in = (bulk_in_t *)malloc(sizeof(bulk_in_t) * num_odscs); @@ -2223,7 +2198,6 @@ static int get_data_hybrid(dspaces_client_t client, int num_odscs, struct obj_data **host_od; host_od = malloc(num_odscs * sizeof(struct obj_data *)); - margo_request *serv_req; hg_handle_t *hndl; hndl = (hg_handle_t *)malloc(sizeof(hg_handle_t) * num_odscs); @@ -2233,6 +2207,9 @@ static int get_data_hybrid(dspaces_client_t client, int num_odscs, host_od[i] = obj_data_alloc(&odsc_tab[i]); in[i].odsc.size = sizeof(obj_descriptor); in[i].odsc.raw_odsc = (char *)(&odsc_tab[i]); + /* CPU->GPU transfer don't support lz4 compression now. We need + * the client to set the flag. */ + in[i].flags = in[i].flags | DS_NO_COMPRESS; hg_size_t rdma_size = (req_obj.size) * bbox_volume(&odsc_tab[i].bb); @@ -2323,7 +2300,6 @@ static int get_data_hybrid(dspaces_client_t client, int num_odscs, free(in); return dspaces_ERR_CUDA; } - gettimeofday(&start, NULL); ret = ssd_copy_cuda_async(return_od, device_od[i], &stream[i%stream_size]); if(ret != dspaces_SUCCESS) { fprintf(stderr, "ERROR: (%s): ssd_copy_cuda_async() failed, Err Code: (%s)\n", @@ -2340,7 +2316,6 @@ static int get_data_hybrid(dspaces_client_t client, int num_odscs, free(in); return dspaces_ERR_CUDA; } - gettimeofday(&end, NULL); } else { curet = cudaMemcpy(device_od[i]->data, host_od[i]->data, data_size, cudaMemcpyHostToDevice); @@ -2360,7 +2335,6 @@ static int get_data_hybrid(dspaces_client_t client, int num_odscs, return dspaces_ERR_CUDA; } obj_data_free(host_od[i]); - gettimeofday(&start, NULL); ret = ssd_copy_cuda(return_od, device_od[i]); if(ret != dspaces_SUCCESS) { fprintf(stderr, "ERROR: (%s): ssd_copy_cuda() failed, Err Code: (%s)\n", @@ -2377,11 +2351,8 @@ static int get_data_hybrid(dspaces_client_t client, int num_odscs, free(in); return dspaces_ERR_CUDA; } - gettimeofday(&end, NULL); obj_data_free_cuda(device_od[i]); } - - timer += (end.tv_sec - start.tv_sec) * 1e3 + (end.tv_usec - start.tv_usec) * 1e-3; margo_free_output(hndl[i], &resp); margo_bulk_free(in[i].handle); margo_destroy(hndl[i]); @@ -2439,7 +2410,6 @@ static int get_data_hybrid(dspaces_client_t client, int num_odscs, free(in); free(return_od); - *ctime = timer; return ret; } @@ -2791,15 +2761,17 @@ int dspaces_cpu_get(dspaces_client_t client, const char *var_name, unsigned int return (ret); } -int dspaces_cuda_get(dspaces_client_t client, const char *var_name, unsigned int ver, - int elem_size, int ndim, uint64_t *lb, uint64_t *ub, void *data, - int timeout, double* ttime, double* ctime) +static int dspaces_cuda_get(dspaces_client_t client, const char *var_name, unsigned int ver, + int elem_size, int ndim, uint64_t *lb, uint64_t *ub, void *data, int timeout) { - struct timeval start, end; - double timer = 0; // timer in second + int device; + CUDA_ASSERT_RT_CLIENT(cudaGetDevice(&device)); + obj_descriptor odsc; obj_descriptor *odsc_tab; int num_odscs; + size_t rdma_size; + void* host_buffer; int ret = dspaces_SUCCESS; int curet; @@ -2816,64 +2788,52 @@ int dspaces_cuda_get(dspaces_client_t client, const char *var_name, unsigned int // send request to get the obj_desc if(num_odscs != 0) { - switch (client->cuda_info.cuda_get_mode) - { - // Baseline - case 1: - { - size_t rdma_size = elem_size*bbox_volume(&odsc.bb); - void* buffer = (void*) malloc(rdma_size); - gettimeofday(&start, NULL); - get_data_baseline(client, num_odscs, odsc, odsc_tab, buffer, ctime); - curet = cudaMemcpy(data, buffer, rdma_size, cudaMemcpyHostToDevice); - if(curet != cudaSuccess) { - fprintf(stderr, "ERROR: (%s): cudaMemcpy() failed, Err Code: (%s)\n", __func__, cudaGetErrorString(curet)); - ret = dspaces_ERR_CUDA; + // GPU Reassembly Kernel only supports data types in {double-8, float/int-4, short-2, char-1} + if(elem_size == 8 || elem_size == 4 || elem_size == 2 || elem_size == 1) { + switch (client->cuda_info.dev_list[device].cuda_get_mode) + { + // CPU Communciations + GPU Reassembly + case 1: + { + ret = cuda_get_data_hybrid(client, num_odscs, odsc, odsc_tab, data); + break; } - gettimeofday(&end, NULL); - timer += (end.tv_sec - start.tv_sec) * 1e3 + (end.tv_usec - start.tv_usec) * 1e-3; - free(buffer); - break; - } - // GDR - case 2: - { - gettimeofday(&start, NULL); - ret = get_data_gdr(client, num_odscs, odsc, odsc_tab, data, ctime); - gettimeofday(&end, NULL); - timer += (end.tv_sec - start.tv_sec) * 1e3 + (end.tv_usec - start.tv_usec) * 1e-3; - break; - } - // Hybrid - case 3: - { - gettimeofday(&start, NULL); - ret = get_data_hybrid(client, num_odscs, odsc, odsc_tab, data, ctime); - gettimeofday(&end, NULL); - timer += (end.tv_sec - start.tv_sec) * 1e3 + (end.tv_usec - start.tv_usec) * 1e-3; - break; - } - default: - { - size_t rdma_size = elem_size*bbox_volume(&odsc.bb); - void* buffer = (void*) malloc(rdma_size); - gettimeofday(&start, NULL); - get_data_baseline(client, num_odscs, odsc, odsc_tab, buffer, ctime); - curet = cudaMemcpy(data, buffer, rdma_size, cudaMemcpyHostToDevice); + // GDR Communications + GPU Reassembly + case 2: + { + ret = cuda_get_data_gdr(client, num_odscs, odsc, odsc_tab, data); + break; + } + // CPU Communications + CPU Reassembly + CUDA memcpy + default: + { + rdma_size = elem_size*bbox_volume(&odsc.bb); + host_buffer = (void*) malloc(rdma_size); + cuda_get_data_baseline(client, num_odscs, odsc, odsc_tab, host_buffer); + curet = cudaMemcpy(data, host_buffer, rdma_size, cudaMemcpyHostToDevice); + if(curet != cudaSuccess) { + fprintf(stderr, "ERROR: (%s): cudaMemcpy() failed, Err Code: (%s)\n", + __func__, cudaGetErrorString(curet)); + ret = dspaces_ERR_CUDA; + } + free(host_buffer); + break; + } + } + } else { + // CPU Communications + CPU Reassembly + CUDA memcpy + rdma_size = elem_size*bbox_volume(&odsc.bb); + host_buffer = (void*) malloc(rdma_size); + cuda_get_data_baseline(client, num_odscs, odsc, odsc_tab, host_buffer); + curet = cudaMemcpy(data, host_buffer, rdma_size, cudaMemcpyHostToDevice); if(curet != cudaSuccess) { fprintf(stderr, "ERROR: (%s): cudaMemcpy() failed, Err Code: (%s)\n", __func__, cudaGetErrorString(curet)); ret = dspaces_ERR_CUDA; } - gettimeofday(&end, NULL); - timer += (end.tv_sec - start.tv_sec) * 1e3 + (end.tv_usec - start.tv_usec) * 1e-3; - free(buffer); - break; + free(host_buffer); } - } - *ttime = timer - *ctime; free(odsc_tab); } - return (ret); } @@ -2882,11 +2842,10 @@ int dspaces_get(dspaces_client_t client, const char *var_name, unsigned int ver, int timeout) { int ret; - double ttime, ctime; struct cudaPointerAttributes ptr_attr; CUDA_ASSERT_RT_CLIENT(cudaPointerGetAttributes(&ptr_attr, data)); if(ptr_attr.type == cudaMemoryTypeDevice) { - ret = dspaces_cuda_get(client, var_name, ver, elem_size, ndim, lb, ub, data, timeout, &ttime, &ctime); + ret = dspaces_cuda_get(client, var_name, ver, elem_size, ndim, lb, ub, data, timeout); } else { ret = dspaces_cpu_get(client, var_name, ver, elem_size, ndim, lb, ub, data, timeout); } diff --git a/src/dspaces-server.c b/src/dspaces-server.c index 302190e0..1545e384 100644 --- a/src/dspaces-server.c +++ b/src/dspaces-server.c @@ -160,7 +160,6 @@ struct dspaces_provider { ABT_mutex dht_mutex; ABT_mutex sspace_mutex; ABT_mutex kill_mutex; - ABT_mutex dc_mutex; ABT_xstream drain_xstream; ABT_pool drain_pool; @@ -724,7 +723,6 @@ static int dsg_alloc(dspaces_provider_t server, const char *conf_name, dsg_l->num_apps = ds_conf.num_apps; INIT_LIST_HEAD(&dsg_l->obj_desc_drain_list); - INIT_LIST_HEAD(&dsg_l->dc_req_list); server->dsg = dsg_l; return 0; @@ -1199,7 +1197,6 @@ int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm, ABT_mutex_create(&server->dht_mutex); ABT_mutex_create(&server->sspace_mutex); ABT_mutex_create(&server->kill_mutex); - ABT_mutex_create(&server->dc_mutex); hg = margo_get_class(server->mid); diff --git a/src/ss_data.c b/src/ss_data.c index 24e4756f..66317f92 100644 --- a/src/ss_data.c +++ b/src/ss_data.c @@ -1982,42 +1982,4 @@ char **addr_str_buf_to_list(char *buf, int num_addrs) ret[i] = a + strlen(a) + 1; } return ret; -} - -struct dc_request *dc_req_alloc(struct obj_data *od) -{ - struct dc_request *dc_req = (struct dc_request*) malloc(sizeof(struct dc_request)); - if(!dc_req) { - fprintf(stderr, "Malloc dc_req error\n"); - return NULL; - } - memset(dc_req, 0, sizeof(struct dc_request)); - - dc_req->margo_req = (margo_request *) malloc(2*sizeof(margo_request)); - for(int i=0; i<2; i++) { - dc_req->margo_req[i] = MARGO_REQUEST_NULL; - } - dc_req->od = od; - dc_req->f_error = 0; - return dc_req; -} -struct dc_request *dc_req_find(struct list_head *dc_req_list, obj_descriptor *odsc) -{ - if(!dc_req_list) { - return NULL; - } - struct dc_request *e; - list_for_each_entry(e, dc_req_list, struct dc_request, entry) - { - if(obj_desc_equals_no_owner(odsc, &e->od->obj_desc)) - return e; - } - - return NULL; -} - -void dc_req_free(struct dc_request *dc_req) -{ - free(dc_req->margo_req); - free(dc_req); } \ No newline at end of file