diff --git a/examples/etcd/README.md b/examples/etcd/README.md new file mode 100644 index 0000000..812b590 --- /dev/null +++ b/examples/etcd/README.md @@ -0,0 +1,16 @@ +# Etcd proxy + +This example showcases communication with [etcd](https://etcd.io/), inspired by [the `ocaml-grpc-envoy` example](https://github.com/blandinw/ocaml-grpc-envoy). It implements a proxy for the two methods `etcdserverpb.KV/Put` and `etcdserverpb.KV/Range` (a small subset of the full `etcd` protocol) and showcases a persistent gRPC connection between requests. + +## How to run + +1. Run `etcd` +2. Run this proxy: `dune exec examples/etcd/etcd_proxy.exe -- ETCD_HOST ETCD_PORT PORT`, e.g. `dune exec examples/etcd/etcd_proxy.exe -- 127.0.0.1 2379 8080` +3. Make requests using `GET` and `POST`. The key will be the path of the request URI. A `GET` request will call `Range` to read the value for the given key, a `POST` request will call `Put` to store the value (the request body), e.g. + +``` +curl -XPOST localhost:8080/key -d'value' +curl localhost:8080/key +``` + +4. The proxy will return both the response and the original gRPC status, both in the request body. diff --git a/examples/etcd/dune b/examples/etcd/dune new file mode 100644 index 0000000..e03c208 --- /dev/null +++ b/examples/etcd/dune @@ -0,0 +1,18 @@ +(rule + (targets etcd.ml etcd_service.ml) + (deps + (:proto etcd.proto etcd_service.proto)) + (action + (run + protoc + -I + . + "--ocaml_out=annot=[@@deriving show { with_path = false }]:." + %{proto}))) + +(executable + (name etcd_proxy) + (modules Etcd_proxy Etcd Etcd_service) + (libraries grpc grpc-lwt ocaml-protoc-plugin lwt lwt.unix cohttp cohttp-lwt cohttp-lwt-unix conduit-lwt-unix h2 h2-lwt-unix uri) + (preprocess + (pps ppx_deriving.show))) diff --git a/examples/etcd/etcd.proto b/examples/etcd/etcd.proto new file mode 100644 index 0000000..fad3aab --- /dev/null +++ b/examples/etcd/etcd.proto @@ -0,0 +1,142 @@ +syntax = "proto3"; +package etcdserverpb; + +message ResponseHeader { + // cluster_id is the ID of the cluster which sent the response. + uint64 cluster_id = 1; + // member_id is the ID of the member which sent the response. + uint64 member_id = 2; + // revision is the key-value store revision when the request was applied. + // For watch progress responses, the header.revision indicates progress. All future events + // recieved in this stream are guaranteed to have a higher revision number than the + // header.revision number. + int64 revision = 3; + // raft_term is the raft term when the request was applied. + uint64 raft_term = 4; +} + +message KeyValue { + // key is the key in bytes. An empty key is not allowed. + bytes key = 1; + // create_revision is the revision of last creation on this key. + int64 create_revision = 2; + // mod_revision is the revision of last modification on this key. + int64 mod_revision = 3; + // version is the version of the key. A deletion resets + // the version to zero and any modification of the key + // increases its version. + int64 version = 4; + // value is the value held by the key, in bytes. + bytes value = 5; + // lease is the ID of the lease that attached to key. + // When the attached lease expires, the key will be deleted. + // If lease is 0, then no lease is attached to the key. + int64 lease = 6; +} + +message RangeRequest { + enum SortOrder { + NONE = 0; // default, no sorting + ASCEND = 1; // lowest target value first + DESCEND = 2; // highest target value first + } + enum SortTarget { + KEY = 0; + VERSION = 1; + CREATE = 2; + MOD = 3; + VALUE = 4; + } + + // key is the first key for the range. If range_end is not given, the request only looks up key. + bytes key = 1; + // range_end is the upper bound on the requested range [key, range_end). + // If range_end is '\0', the range is all keys >= key. + // If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), + // then the range request gets all keys prefixed with key. + // If both key and range_end are '\0', then the range request returns all keys. + bytes range_end = 2; + // limit is a limit on the number of keys returned for the request. When limit is set to 0, + // it is treated as no limit. + int64 limit = 3; + // revision is the point-in-time of the key-value store to use for the range. + // If revision is less or equal to zero, the range is over the newest key-value store. + // If the revision has been compacted, ErrCompacted is returned as a response. + int64 revision = 4; + + // sort_order is the order for returned sorted results. + SortOrder sort_order = 5; + + // sort_target is the key-value field to use for sorting. + SortTarget sort_target = 6; + + // serializable sets the range request to use serializable member-local reads. + // Range requests are linearizable by default; linearizable requests have higher + // latency and lower throughput than serializable requests but reflect the current + // consensus of the cluster. For better performance, in exchange for possible stale reads, + // a serializable range request is served locally without needing to reach consensus + // with other nodes in the cluster. + bool serializable = 7; + + // keys_only when set returns only the keys and not the values. + bool keys_only = 8; + + // count_only when set returns only the count of the keys in the range. + bool count_only = 9; + + // min_mod_revision is the lower bound for returned key mod revisions; all keys with + // lesser mod revisions will be filtered away. + int64 min_mod_revision = 10; + + // max_mod_revision is the upper bound for returned key mod revisions; all keys with + // greater mod revisions will be filtered away. + int64 max_mod_revision = 11; + + // min_create_revision is the lower bound for returned key create revisions; all keys with + // lesser create revisions will be filtered away. + int64 min_create_revision = 12; + + // max_create_revision is the upper bound for returned key create revisions; all keys with + // greater create revisions will be filtered away. + int64 max_create_revision = 13; +} + +message RangeResponse { + ResponseHeader header = 1; + // kvs is the list of key-value pairs matched by the range request. + // kvs is empty when count is requested. + repeated KeyValue kvs = 2; + // more indicates if there are more keys to return in the requested range. + bool more = 3; + // count is set to the number of keys within the range when requested. + int64 count = 4; +} + +message PutRequest { + // key is the key, in bytes, to put into the key-value store. + bytes key = 1; + // value is the value, in bytes, to associate with the key in the key-value store. + bytes value = 2; + // lease is the lease ID to associate with the key in the key-value store. A lease + // value of 0 indicates no lease. + int64 lease = 3; + + // If prev_kv is set, etcd gets the previous key-value pair before changing it. + // The previous key-value pair will be returned in the put response. + bool prev_kv = 4; + + // If ignore_value is set, etcd updates the key using its current value. + // Returns an error if the key does not exist. + bool ignore_value = 5; + + // If ignore_lease is set, etcd updates the key using its current lease. + // Returns an error if the key does not exist. + bool ignore_lease = 6; +} + +message PutResponse { + ResponseHeader header = 1; + // if prev_kv is set in the request, the previous key-value pair will be returned. + KeyValue prev_kv = 2; +} + diff --git a/examples/etcd/etcd_proxy.ml b/examples/etcd/etcd_proxy.ml new file mode 100644 index 0000000..d94d4d6 --- /dev/null +++ b/examples/etcd/etcd_proxy.ml @@ -0,0 +1,104 @@ +open Lwt.Infix +open Cohttp +open Cohttp_lwt_unix +open Etcd.Etcdserverpb + +let persistent_connection = ref None + +let create_connection ~host ~port = + Lwt_unix.getaddrinfo host (string_of_int port) [] >>= fun addresses -> + let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + match addresses with + | { Unix.ai_addr; _ } :: _ -> + Lwt_unix.connect socket ai_addr >>= fun () -> + H2_lwt_unix.Client.create_connection ~error_handler:ignore socket + | _ -> assert false + +let connection ~host ~port = + match !persistent_connection with + | Some connection when H2_lwt_unix.Client.is_closed connection = false -> + print_endline "Reusing existing connection."; + Lwt.return connection + | _ -> + create_connection ~host ~port >>= fun connection -> + print_endline "Connection established."; + persistent_connection := Some connection; + Lwt.return connection + +let do_grpc ~host ~port ~service ~rpc ~request ~decode ~show = + let f response = + response >>= function + | Some response -> + Lwt.return + (match decode response with + | Ok value -> Ok (Some (show value)) + | Error _ -> Error "decoding error") + | None -> Lwt.return (Ok None) + in + let handler = Grpc_lwt.Client.Rpc.unary ~f request in + connection ~host ~port >>= fun connection -> + Grpc_lwt.Client.call ~service ~rpc ~scheme:"http" ~handler + ~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore) + () + +let post ~key ~value = + let request = + PutRequest.make ~key:(Bytes.of_string key) ~value:(Bytes.of_string value) () + |> PutRequest.to_proto |> Ocaml_protoc_plugin.Writer.contents + in + let decode response = + Ocaml_protoc_plugin.Reader.create response |> PutResponse.from_proto + in + do_grpc ~service:"etcdserverpb.KV" ~rpc:"Put" ~request ~decode + ~show:PutResponse.show + +let get ~key = + let request = + RangeRequest.make ~key:(Bytes.of_string key) () + |> RangeRequest.to_proto |> Ocaml_protoc_plugin.Writer.contents + in + let decode response = + Ocaml_protoc_plugin.Reader.create response |> RangeResponse.from_proto + in + do_grpc ~service:"etcdserverpb.KV" ~rpc:"Range" ~request ~decode + ~show:RangeResponse.show + +let server ~etcd_host ~etcd_port ~port = + let callback _conn req body = + let key = req |> Request.uri |> Uri.path in + let meth = req |> Request.meth |> Code.string_of_method in + body |> Cohttp_lwt.Body.to_string >>= fun body -> + (match meth with + | "GET" -> get ~host:etcd_host ~port:etcd_port ~key + | "POST" -> post ~host:etcd_host ~port:etcd_port ~key ~value:body + | _ -> + let message = "Only GET and POST are implemented!" in + Lwt.return (Error (Grpc.Status.v ~message Unimplemented))) + >>= function + | Ok (Ok ret, status) -> + Format.printf "Success status: %a@." Grpc.Status.pp status; + let body = + match ret with + | Some ret -> Format.asprintf "%s\n%a\n" ret Grpc.Status.pp status + | None -> Format.asprintf "%a\n" Grpc.Status.pp status + in + Server.respond_string ~status:`OK ~body () + | Ok (Error error, status) -> + Format.printf "Status: %a@." Grpc.Status.pp status; + let body = Format.asprintf "%s\n%a\n" error Grpc.Status.pp status in + Server.respond_string ~status:`Internal_server_error ~body () + | Error status -> + Format.printf "Error status: %a@." Grpc.Status.pp status; + let body = Format.asprintf "%a\n" Grpc.Status.pp status in + Server.respond_string ~status:`Internal_server_error ~body () + in + Server.create ~mode:(`TCP (`Port port)) (Server.make ~callback ()) + +let () = + let etcd_host, etcd_port, port = + try (Sys.argv.(1), int_of_string Sys.argv.(2), int_of_string Sys.argv.(3)) + with Invalid_argument _ -> + Printf.eprintf "Usage: %s ETCD_HOST ETCD_PORT PORT\n" Sys.argv.(0); + exit 1 + in + Lwt_main.run (server ~etcd_host ~etcd_port ~port) diff --git a/examples/etcd/etcd_service.proto b/examples/etcd/etcd_service.proto new file mode 100644 index 0000000..250faba --- /dev/null +++ b/examples/etcd/etcd_service.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; +package etcdserverpb; + +import "etcd.proto"; + +service KV { + // Range gets the keys in the range from the key-value store. + rpc Range(RangeRequest) returns (RangeResponse) {} + + // Put puts the given key into the key-value store. + // A put request increments the revision of the key-value store + // and generates one event in the event history. + rpc Put(PutRequest) returns (PutResponse) {} +} \ No newline at end of file