Skip to content

Commit

Permalink
Merge pull request #220 from talex5/progress
Browse files Browse the repository at this point in the history
Add connection progress indicator
  • Loading branch information
talex5 authored Dec 28, 2020
2 parents 1553019 + 01a0d49 commit f73f232
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 38 deletions.
4 changes: 0 additions & 4 deletions .dockerignore

This file was deleted.

7 changes: 0 additions & 7 deletions .travis.yml

This file was deleted.

14 changes: 0 additions & 14 deletions Dockerfile

This file was deleted.

18 changes: 8 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -602,10 +602,9 @@ let run_client service =
let connect uri =
Lwt_main.run begin
Fmt.pr "Connecting to echo service at: %a@." Uri.pp_hum uri;
let client_vat = Capnp_rpc_unix.client_only_vat () in
let sr = Capnp_rpc_unix.Vat.import_exn client_vat uri in
Sturdy_ref.with_cap_exn sr run_client
Capnp_rpc_unix.with_cap_exn sr run_client
end
open Cmdliner
Expand Down Expand Up @@ -636,12 +635,14 @@ With the server still running in another window, run the client using the `echo.

```
$ dune exec ./client.exe echo.cap
Connecting to echo service at: capnp://sha-256:[email protected]:7000/JL_hRxzrTSbLNcb0Tqp2f0N_sh5znvY2ym9KMVzLtcQ
Callback got "foo"
Callback got "foo"
Callback got "foo"
```

Note that we're using `Capnp_rpc_unix.with_cap_exn` here instead of `Sturdy_ref.with_cap_exn`.
It's almost the same, except that it displays a suitable progress indicator if the connection takes too long.

### Pipelining

Let's say the server also offers a logging service, which the client can get from the main echo service:
Expand Down Expand Up @@ -1291,15 +1292,12 @@ To build:
git clone https://github.com/mirage/capnp-rpc.git
cd capnp-rpc
opam pin add -nyk git capnp-rpc .
opam pin add -nyk git capnp-rpc-lwt .
opam pin add -nyk git capnp-rpc-net .
opam pin add -nyk git capnp-rpc-unix .
opam depext capnp-rpc-lwt alcotest
opam install --deps-only -t capnp-rpc-unix
opam pin add -ny .
opam depext -t capnp-rpc-unix capnp-rpc-mirage
opam install --deps-only -t .
make test
If you have trouble building, you can build it with Docker from a known-good state using `docker build .`.
If you have trouble building, you can use the Dockerfile shown in the CI logs (click the green tick on the main page).
### Testing
Expand Down
2 changes: 1 addition & 1 deletion test-bin/calc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ let connect addr =
Lwt_main.run begin
let vat = Capnp_rpc_unix.client_only_vat () in
let sr = Vat.import_exn vat addr in
Capnp_rpc_lwt.Sturdy_ref.with_cap_exn sr @@ fun calc ->
Capnp_rpc_unix.with_cap_exn sr @@ fun calc ->
Logs.info (fun f -> f "Evaluating expression...");
let remote_add = Calc.getOperator calc `Add in
let result = Calc.evaluate calc Calc.Expr.(Call (remote_add, [Float 40.0; Float 2.0])) in
Expand Down
79 changes: 79 additions & 0 deletions unix/capnp_rpc_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Vat = Vat_network.Vat
module Network = Network
module Vat_config = Vat_config
module File_store = File_store
module Sturdy_ref = Capnp_rpc_lwt.Sturdy_ref

let error fmt =
fmt |> Fmt.kstrf @@ fun msg ->
Expand Down Expand Up @@ -72,6 +73,84 @@ let sturdy_uri =
in
Cmdliner.Arg.conv (of_string, Uri.pp_hum)

module Console = struct
(* The first item in this list is what is currently displayed on screen *)
let messages = ref []

let clear () =
match !messages with
| [] -> ()
| msg :: _ ->
let blank = Stdlib.String.make (String.length msg) ' ' in
Printf.fprintf stderr "\r%s\r%!" blank

let show () =
match !messages with
| [] -> ()
| msg :: _ ->
prerr_string msg;
flush stderr

let with_msg msg f =
clear ();
messages := msg :: !messages;
show ();
Lwt.finalize f
(fun () ->
clear ();
let rec remove_first = function
| [] -> assert false
| x :: xs when x = msg -> xs
| x :: xs -> x :: remove_first xs
in
messages := remove_first !messages;
show ();
Lwt.return_unit
)
end

let addr_of_sr sr =
match Capnp_rpc_net.Capnp_address.parse_uri (Capnp_rpc_lwt.Cast.sturdy_to_raw sr)#to_uri_with_secrets with
| Ok ((addr, _auth), _id) -> addr
| Error (`Msg m) -> failwith m

let rec connect_with_progress ?(mode=`Auto) sr =
let pp = Fmt.using addr_of_sr Capnp_rpc_net.Capnp_address.Location.pp in
match mode with
| `Auto
| `Log ->
let did_log = ref false in
Log.info (fun f -> did_log := true; f "Connecting to %a..." pp sr);
if !did_log then (
Sturdy_ref.connect sr >|= function
| Ok _ as x -> Log.info (fun f -> f "Connected to %a" pp sr); x
| Error _ as e -> e
) else (
if Unix.(isatty stderr) then
connect_with_progress ~mode:`Console sr
else
connect_with_progress ~mode:`Batch sr
)
| `Batch ->
Fmt.epr "Connecting to %a... %!" pp sr;
begin Sturdy_ref.connect sr >|= function
| Ok _ as x -> Fmt.epr "OK@."; x
| Error _ as x -> Fmt.epr "ERROR@."; x
end
| `Console ->
let x = Sturdy_ref.connect sr in
Lwt.choose [Lwt_unix.sleep 0.5; Lwt.map ignore x] >>= fun () ->
if Lwt.is_sleeping x then (
Console.with_msg (Fmt.strf "[ connecting to %a ]" pp sr)
(fun () -> x)
) else x
| `Silent -> Sturdy_ref.connect sr

let with_cap_exn ?progress sr f =
connect_with_progress ?mode:progress sr >>= function
| Error ex -> Fmt.failwith "%a" Capnp_rpc.Exception.pp ex
| Ok x -> Capnp_rpc_lwt.Capability.with_ref x f

let handle_connection ?tags ~secret_key vat client =
Lwt.catch (fun () ->
let switch = Lwt_switch.create () in
Expand Down
20 changes: 20 additions & 0 deletions unix/capnp_rpc_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ end
val sturdy_uri : Uri.t Cmdliner.Arg.conv
(** A cmdliner argument converter for a "capnp://" URI (or the path of a file containing such a URI). *)

val connect_with_progress :
?mode:[`Auto | `Log | `Batch | `Console | `Silent] ->
'a Sturdy_ref.t -> ('a Capability.t, Capnp_rpc.Exception.t) Lwt_result.t
(** [connect_with_progress sr] is like [Sturdy_ref.connect], but shows that a connection is in progress.
Note: On failure, it does {e not} display the error, which should instead be handled by the caller.
@param mode Controls how progress is displayed:
- [`Log] writes info-level log messages about starting and completing the connection.
- [`Batch] prints a message to stderr when starting, then prints OK when done.
- [`Console] displays a message while connecting if it takes too long, then erases it when done.
- [`Silent] does nothing.
- [`Auto] (the default) tries to log (as for [`Log]), but if the log message isn't used then it behaves as
[`Console] (if stderr is a tty) or as [`Batch] (if not). *)

val with_cap_exn :
?progress:[`Auto | `Log | `Batch | `Console | `Silent] ->
'a Sturdy_ref.t ->
('a Capability.t -> 'b Lwt.t) ->
'b Lwt.t
(** Like [Sturdy_ref.with_cap_exn], but using [connect_with_progress] to show progress. *)

val serve :
?switch:Lwt_switch.t ->
?tags:Logs.Tag.set ->
Expand Down
4 changes: 2 additions & 2 deletions unix/network.ml
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ let try_set_keepalive_idle socket i =

let connect_socket = function
| `Unix path ->
Logs.info (fun f -> f "Connecting to %S..." path);
Log.info (fun f -> f "Connecting to %S..." path);
let socket = Lwt_unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt.catch
(fun () -> Lwt_unix.connect socket (Unix.ADDR_UNIX path) >|= fun () -> socket)
(fun ex -> Lwt_unix.close socket >>= fun () -> Lwt.fail ex)
| `TCP (host, port) ->
Logs.info (fun f -> f "Connecting to %s:%d..." host port);
Log.info (fun f -> f "Connecting to %s:%d..." host port);
let socket = Lwt_unix.(socket PF_INET SOCK_STREAM 0) in
Lwt.catch
(fun () ->
Expand Down

0 comments on commit f73f232

Please sign in to comment.