Skip to content

Commit

Permalink
Merge pull request dialohq#4 from dialohq/etcd-example
Browse files Browse the repository at this point in the history
Etcd client example
  • Loading branch information
wokalski authored Mar 27, 2022
2 parents 9af52b5 + 2995c40 commit 79fa937
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 0 deletions.
16 changes: 16 additions & 0 deletions examples/etcd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Etcd proxy

This example showcases communication with [etcd](https://etcd.io/), inspired by [the `ocaml-grpc-envoy` example](https://github.com/blandinw/ocaml-grpc-envoy). It implements a proxy for the two methods `etcdserverpb.KV/Put` and `etcdserverpb.KV/Range` (a small subset of the full `etcd` protocol) and showcases a persistent gRPC connection between requests.

## How to run

1. Run `etcd`
2. Run this proxy: `dune exec examples/etcd/etcd_proxy.exe -- ETCD_HOST ETCD_PORT PORT`, e.g. `dune exec examples/etcd/etcd_proxy.exe -- 127.0.0.1 2379 8080`
3. Make requests using `GET` and `POST`. The key will be the path of the request URI. A `GET` request will call `Range` to read the value for the given key, a `POST` request will call `Put` to store the value (the request body), e.g.

```
curl -XPOST localhost:8080/key -d'value'
curl localhost:8080/key
```

4. The proxy will return both the response and the original gRPC status, both in the request body.
18 changes: 18 additions & 0 deletions examples/etcd/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
(rule
(targets etcd.ml etcd_service.ml)
(deps
(:proto etcd.proto etcd_service.proto))
(action
(run
protoc
-I
.
"--ocaml_out=annot=[@@deriving show { with_path = false }]:."
%{proto})))

(executable
(name etcd_proxy)
(modules Etcd_proxy Etcd Etcd_service)
(libraries grpc grpc-lwt ocaml-protoc-plugin lwt lwt.unix cohttp cohttp-lwt cohttp-lwt-unix conduit-lwt-unix h2 h2-lwt-unix uri)
(preprocess
(pps ppx_deriving.show)))
142 changes: 142 additions & 0 deletions examples/etcd/etcd.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
syntax = "proto3";
package etcdserverpb;

message ResponseHeader {
// cluster_id is the ID of the cluster which sent the response.
uint64 cluster_id = 1;
// member_id is the ID of the member which sent the response.
uint64 member_id = 2;
// revision is the key-value store revision when the request was applied.
// For watch progress responses, the header.revision indicates progress. All future events
// recieved in this stream are guaranteed to have a higher revision number than the
// header.revision number.
int64 revision = 3;
// raft_term is the raft term when the request was applied.
uint64 raft_term = 4;
}

message KeyValue {
// key is the key in bytes. An empty key is not allowed.
bytes key = 1;
// create_revision is the revision of last creation on this key.
int64 create_revision = 2;
// mod_revision is the revision of last modification on this key.
int64 mod_revision = 3;
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
int64 version = 4;
// value is the value held by the key, in bytes.
bytes value = 5;
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
int64 lease = 6;
}

message RangeRequest {
enum SortOrder {
NONE = 0; // default, no sorting
ASCEND = 1; // lowest target value first
DESCEND = 2; // highest target value first
}
enum SortTarget {
KEY = 0;
VERSION = 1;
CREATE = 2;
MOD = 3;
VALUE = 4;
}

// key is the first key for the range. If range_end is not given, the request only looks up key.
bytes key = 1;
// range_end is the upper bound on the requested range [key, range_end).
// If range_end is '\0', the range is all keys >= key.
// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
// then the range request gets all keys prefixed with key.
// If both key and range_end are '\0', then the range request returns all keys.
bytes range_end = 2;
// limit is a limit on the number of keys returned for the request. When limit is set to 0,
// it is treated as no limit.
int64 limit = 3;
// revision is the point-in-time of the key-value store to use for the range.
// If revision is less or equal to zero, the range is over the newest key-value store.
// If the revision has been compacted, ErrCompacted is returned as a response.
int64 revision = 4;

// sort_order is the order for returned sorted results.
SortOrder sort_order = 5;

// sort_target is the key-value field to use for sorting.
SortTarget sort_target = 6;

// serializable sets the range request to use serializable member-local reads.
// Range requests are linearizable by default; linearizable requests have higher
// latency and lower throughput than serializable requests but reflect the current
// consensus of the cluster. For better performance, in exchange for possible stale reads,
// a serializable range request is served locally without needing to reach consensus
// with other nodes in the cluster.
bool serializable = 7;

// keys_only when set returns only the keys and not the values.
bool keys_only = 8;

// count_only when set returns only the count of the keys in the range.
bool count_only = 9;

// min_mod_revision is the lower bound for returned key mod revisions; all keys with
// lesser mod revisions will be filtered away.
int64 min_mod_revision = 10;

// max_mod_revision is the upper bound for returned key mod revisions; all keys with
// greater mod revisions will be filtered away.
int64 max_mod_revision = 11;

// min_create_revision is the lower bound for returned key create revisions; all keys with
// lesser create revisions will be filtered away.
int64 min_create_revision = 12;

// max_create_revision is the upper bound for returned key create revisions; all keys with
// greater create revisions will be filtered away.
int64 max_create_revision = 13;
}

message RangeResponse {
ResponseHeader header = 1;
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
repeated KeyValue kvs = 2;
// more indicates if there are more keys to return in the requested range.
bool more = 3;
// count is set to the number of keys within the range when requested.
int64 count = 4;
}

message PutRequest {
// key is the key, in bytes, to put into the key-value store.
bytes key = 1;
// value is the value, in bytes, to associate with the key in the key-value store.
bytes value = 2;
// lease is the lease ID to associate with the key in the key-value store. A lease
// value of 0 indicates no lease.
int64 lease = 3;

// If prev_kv is set, etcd gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;

// If ignore_value is set, etcd updates the key using its current value.
// Returns an error if the key does not exist.
bool ignore_value = 5;

// If ignore_lease is set, etcd updates the key using its current lease.
// Returns an error if the key does not exist.
bool ignore_lease = 6;
}

message PutResponse {
ResponseHeader header = 1;
// if prev_kv is set in the request, the previous key-value pair will be returned.
KeyValue prev_kv = 2;
}

104 changes: 104 additions & 0 deletions examples/etcd/etcd_proxy.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
open Lwt.Infix
open Cohttp
open Cohttp_lwt_unix
open Etcd.Etcdserverpb

let persistent_connection = ref None

let create_connection ~host ~port =
Lwt_unix.getaddrinfo host (string_of_int port) [] >>= fun addresses ->
let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
match addresses with
| { Unix.ai_addr; _ } :: _ ->
Lwt_unix.connect socket ai_addr >>= fun () ->
H2_lwt_unix.Client.create_connection ~error_handler:ignore socket
| _ -> assert false

let connection ~host ~port =
match !persistent_connection with
| Some connection when H2_lwt_unix.Client.is_closed connection = false ->
print_endline "Reusing existing connection.";
Lwt.return connection
| _ ->
create_connection ~host ~port >>= fun connection ->
print_endline "Connection established.";
persistent_connection := Some connection;
Lwt.return connection

let do_grpc ~host ~port ~service ~rpc ~request ~decode ~show =
let f response =
response >>= function
| Some response ->
Lwt.return
(match decode response with
| Ok value -> Ok (Some (show value))
| Error _ -> Error "decoding error")
| None -> Lwt.return (Ok None)
in
let handler = Grpc_lwt.Client.Rpc.unary ~f request in
connection ~host ~port >>= fun connection ->
Grpc_lwt.Client.call ~service ~rpc ~scheme:"http" ~handler
~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
()

let post ~key ~value =
let request =
PutRequest.make ~key:(Bytes.of_string key) ~value:(Bytes.of_string value) ()
|> PutRequest.to_proto |> Ocaml_protoc_plugin.Writer.contents
in
let decode response =
Ocaml_protoc_plugin.Reader.create response |> PutResponse.from_proto
in
do_grpc ~service:"etcdserverpb.KV" ~rpc:"Put" ~request ~decode
~show:PutResponse.show

let get ~key =
let request =
RangeRequest.make ~key:(Bytes.of_string key) ()
|> RangeRequest.to_proto |> Ocaml_protoc_plugin.Writer.contents
in
let decode response =
Ocaml_protoc_plugin.Reader.create response |> RangeResponse.from_proto
in
do_grpc ~service:"etcdserverpb.KV" ~rpc:"Range" ~request ~decode
~show:RangeResponse.show

let server ~etcd_host ~etcd_port ~port =
let callback _conn req body =
let key = req |> Request.uri |> Uri.path in
let meth = req |> Request.meth |> Code.string_of_method in
body |> Cohttp_lwt.Body.to_string >>= fun body ->
(match meth with
| "GET" -> get ~host:etcd_host ~port:etcd_port ~key
| "POST" -> post ~host:etcd_host ~port:etcd_port ~key ~value:body
| _ ->
let message = "Only GET and POST are implemented!" in
Lwt.return (Error (Grpc.Status.v ~message Unimplemented)))
>>= function
| Ok (Ok ret, status) ->
Format.printf "Success status: %a@." Grpc.Status.pp status;
let body =
match ret with
| Some ret -> Format.asprintf "%s\n%a\n" ret Grpc.Status.pp status
| None -> Format.asprintf "%a\n" Grpc.Status.pp status
in
Server.respond_string ~status:`OK ~body ()
| Ok (Error error, status) ->
Format.printf "Status: %a@." Grpc.Status.pp status;
let body = Format.asprintf "%s\n%a\n" error Grpc.Status.pp status in
Server.respond_string ~status:`Internal_server_error ~body ()
| Error status ->
Format.printf "Error status: %a@." Grpc.Status.pp status;
let body = Format.asprintf "%a\n" Grpc.Status.pp status in
Server.respond_string ~status:`Internal_server_error ~body ()
in
Server.create ~mode:(`TCP (`Port port)) (Server.make ~callback ())

let () =
let etcd_host, etcd_port, port =
try (Sys.argv.(1), int_of_string Sys.argv.(2), int_of_string Sys.argv.(3))
with Invalid_argument _ ->
Printf.eprintf "Usage: %s ETCD_HOST ETCD_PORT PORT\n" Sys.argv.(0);
exit 1
in
Lwt_main.run (server ~etcd_host ~etcd_port ~port)
14 changes: 14 additions & 0 deletions examples/etcd/etcd_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";
package etcdserverpb;

import "etcd.proto";

service KV {
// Range gets the keys in the range from the key-value store.
rpc Range(RangeRequest) returns (RangeResponse) {}

// Put puts the given key into the key-value store.
// A put request increments the revision of the key-value store
// and generates one event in the event history.
rpc Put(PutRequest) returns (PutResponse) {}
}

0 comments on commit 79fa937

Please sign in to comment.