Skip to content

Commit

Permalink
gRPC wrapper start
Browse files Browse the repository at this point in the history
  • Loading branch information
philip-davis committed Feb 1, 2024
1 parent a4d94c7 commit 04b09a3
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 24 deletions.
4 changes: 3 additions & 1 deletion include/rpc/wrapper_grpc.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#ifndef _DSPACES_GRPC_WRAPPER_
#define _DSPACES_GRPC_WRAPPER_

#include "data_services.h"

typedef void *grpc_server_t;

grpc_server_t dspaces_grpc_server_init(const char *addr);
struct GRPCServer *dspaces_grpc_server_init(const char *addr, dspaces_service_t dsrv);

void dspaces_grpc_server_wait(grpc_server_t self);

Expand Down
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ set (dspaces-pkg "share/cmake/dspaces")
set (dspaces-vers "${dspaces_VERSION_MAJOR}.${dspaces_VERSION_MINOR}")

# list of source files
set(dspaces-src util.c bbox.c ss_data.c dspaces-client.c dspaces-ops.c)
set(dspaces-src util.c bbox.c ss_data.c dspaces-client.c dspaces-ops.c data_services.c)
if(DSPACES_USE_GRPC)
list(APPEND dspaces-src rpc/wrapper_grpc.cpp
${ds_grpc_srcs}
Expand Down
6 changes: 4 additions & 2 deletions src/dspaces-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ struct dspaces_client {
hg_id_t sub_id;
hg_id_t notify_id;
hg_id_t do_ops_id;
grpc_client_t grpc;
struct dc_gspace *dcg;
char **server_address;
char **node_names;
Expand Down Expand Up @@ -695,8 +696,8 @@ int dspaces_init_mpi(MPI_Comm comm, dspaces_client_t *c)
dspaces_init_margo(client, listen_addr_str);
free(listen_addr_str);

grpc_client_t gclient = dspaces_grpc_client_init("localhost:1025");
char *reply = dspaces_grpc_client_send_msg(gclient, "dataspaces");
client->grpc = dspaces_grpc_client_init("127.0.0.1:1025");
char *reply = dspaces_grpc_client_send_msg(client->grpc, "dataspaces stuff");
fprintf(stdout, "grpc reply: %s\n", reply);
free(reply);

Expand Down Expand Up @@ -828,6 +829,7 @@ int dspaces_fini(dspaces_client_t client)
free(client->dcg);

margo_finalize(client->mid);
dspaces_grpc_client_del(client->grpc);

free(client);

Expand Down
31 changes: 20 additions & 11 deletions src/dspaces-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ struct remote {
struct dspaces_provider {
struct list_head dirs;
margo_instance_id mid;
#ifdef DSPACES_USE_GRPC
grpc_server_t grpc;
#endif
hg_id_t put_id;
hg_id_t put_local_id;
hg_id_t put_meta_id;
Expand All @@ -89,9 +92,10 @@ struct dspaces_provider {
hg_id_t notify_id;
hg_id_t do_ops_id;
struct ds_gspace *dsg;
dspaces_service_t dsrv;
char **server_address;
char **node_names;
char *listen_addr_str_hcp;
char *listen_addr_str_hpc;
int rank;
int comm_size;
int f_debug;
Expand Down Expand Up @@ -532,7 +536,7 @@ static int write_conf(dspaces_provider_t server, MPI_Comm comm)
fprintf(fd, "%s %s\n", server->node_names[i],
server->server_address[i]);
}
fprintf(fd, "%s\n", server->listen_addr_str);
fprintf(fd, "%s\n", server->listen_addr_str_hpc);
#ifdef HAVE_DRC
fprintf(fd, "%" PRIu32 "\n", server->drc_credential_id);
#endif
Expand Down Expand Up @@ -955,7 +959,7 @@ static void drain_thread(void *arg)
}
}

int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm,
int dspaces_server_init(const char *listen_addr_str_hpc, MPI_Comm comm,
const char *conf_file, dspaces_provider_t *sv)
{
const char *envdebug = getenv("DSPACES_DEBUG");
Expand Down Expand Up @@ -1002,9 +1006,6 @@ int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm,

config_server(server, conf_file);


dspaces_grpc_server_init("localhost:1025");

margo_set_environment(NULL);
sprintf(margo_conf,
"{ \"use_progress_thread\" : true, \"rpc_thread_count\" : %d }",
Expand Down Expand Up @@ -1059,11 +1060,11 @@ int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm,
}
}

server->mid = margo_init_ext(listen_addr_str, MARGO_SERVER_MODE, &mii);
server->mid = margo_init_ext(listen_addr_str_hpc, MARGO_SERVER_MODE, &mii);

#else

server->mid = margo_init_ext(listen_addr_str, MARGO_SERVER_MODE, &mii);
server->mid = margo_init_ext(listen_addr_str_hpc, MARGO_SERVER_MODE, &mii);
if(server->f_debug) {
if(!server->rank) {
char *margo_json = margo_get_config(server->mid);
Expand All @@ -1080,7 +1081,7 @@ int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm,
fprintf(stderr, "ERROR: %s: margo_init() failed.\n", __func__);
return (dspaces_ERR_MERCURY);
}
server->listen_addr_str = strdup(listen_addr_str);
server->listen_addr_str_hpc = strdup(listen_addr_str_hpc);

ABT_mutex_create(&server->odsc_mutex);
ABT_mutex_create(&server->ls_mutex);
Expand Down Expand Up @@ -1223,7 +1224,7 @@ int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm,
for(i = 0; i < server->nremote; i++) {
DEBUG_OUT("initializing client connection to %s\n",
server->remotes[i].name);
dspaces_init_wan(listen_addr_str,
dspaces_init_wan(listen_addr_str_hpc,
server->remotes[i].addr_str, 0, &server->remotes[i].conn);
}

Expand Down Expand Up @@ -1255,6 +1256,12 @@ int dspaces_server_init(const char *listen_addr_str, MPI_Comm comm,
DEBUG_OUT("private IP is %s\n", server->priv_ip);
}

server->dsrv = dsrv_create(server->dsg, &ds_conf)

#ifdef DSPACES_USE_GRPC
server->grpc = dspaces_grpc_server_init("localhost:1025", dsrv);
#endif

*sv = server;

is_initialized = 1;
Expand Down Expand Up @@ -1352,7 +1359,7 @@ static int server_destroy(dspaces_provider_t server)
free(server->dsg);
free(server->server_address[0]);
free(server->server_address);
free(server->listen_addr_str);
free(server->listen_addr_str_hpc);

MPI_Barrier(server->comm);
MPI_Comm_free(&server->comm);
Expand Down Expand Up @@ -2611,6 +2618,8 @@ void dspaces_server_fini(dspaces_provider_t server)
{
DEBUG_OUT("waiting for finalize to occur\n");
margo_wait_for_finalize(server->mid);
dspaces_grpc_server_wait(server->grpc);
dspaces_grpc_server_del(server->grpc);
free(server);
}

Expand Down
12 changes: 12 additions & 0 deletions src/rpc/proto/dspaces.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}

rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}

rpc SharedSpaceQuery (SharedSpaceInfoRequest) returns (SharedSpaceInfoReply) {}
}

// The request message containing the user's name.
Expand All @@ -38,3 +40,13 @@ message HelloRequest {
message HelloReply {
string message = 1;
}

message SharedSpaceInfoRequest {
}

message SharedSpaceInfoReply {
bytes header = 1;
string check = 2;
}


43 changes: 34 additions & 9 deletions src/rpc/wrapper_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <grpcpp/grpcpp.h>

#include "rpc/dspaces.grpc.pb.h"
#include "data_services.h"

using grpc::Server;
using grpc::ServerBuilder;
Expand All @@ -12,26 +13,50 @@ using grpc::ClientContext;
using dspaces::Greeter;
using dspaces::HelloReply;
using dspaces::HelloRequest;
using dspaces::SharedSpaceInfoRequest;
using dspaces::SharedSpaceInfoReply;

// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Greeter::Service {
Status SayHello(ServerContext* context, const HelloRequest* request,
public:
GreeterServiceImpl(dspaces_service_t dsrv) :
dsrv_(dsrv) {
};

private:
dspaces_service_t dsrv_;

Status SayHello(ServerContext* context, const HelloRequest* request,
HelloReply* reply) override {
std::string prefix("Hello ");
std::cout << "received " << request->name() << std::endl;
reply->set_message(prefix + request->name());
return Status::OK;
}
std::string prefix("Hello ");
std::cout << "received " << request->name() << std::endl;
reply->set_message(prefix + request->name());
return Status::OK;
}

Status SharedSpaceQuery(ServerContext *context, const SharedSpaceInfoRequest *request, SharedSpaceInfoReply *reply) override {
void *obj_hdr;
unsigned int len;
char *check_str;
get_sspace_info(dsrv_, &obj_hdr, &len, &check_str);
reply->set_header(std::string((char *)obj_hdr, len));
reply->set_check(std::string(check_str));
free(obj_hdr);
free(check_str);
return Status::OK;
}
};

typedef struct grpc_server {
typedef struct GRPCServer {
GRPCServer(dspaces_service_t dsrv):
service(dsrv) {};
std::unique_ptr<Server> server;
GreeterServiceImpl service;
} *grpc_server_t;

extern "C" struct grpc_server *dspaces_grpc_server_init(const char *addr)
extern "C" struct GRPCServer *dspaces_grpc_server_init(const char *addr, dspaces_service_t dsrv)
{
struct grpc_server *self = new struct grpc_server;
struct GRPCServer *self = new struct GRPCServer(dsrv);
std::string server_address(addr);
ServerBuilder builder;

Expand Down

0 comments on commit 04b09a3

Please sign in to comment.