From bd8198874b0e625a3126f3225d827a97cadbfab0 Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Sat, 14 Dec 2024 16:41:55 +0100 Subject: [PATCH] Replace Fresh with own implementation --- CHANGELOG.md | 4 + lib/kubereq.ex | 4 +- lib/kubereq/connect.ex | 456 +++++++++++++++++++++++++++++----------- lib/kubereq/pod_exec.ex | 29 ++- lib/kubereq/pod_logs.ex | 31 ++- mix.exs | 1 - 6 files changed, 363 insertions(+), 162 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d9207b..d432ef0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support for selectors as maps +### Changed + +- Replace Fresh with our own implementation for web sockets [#56](https://github.com/mruoss/kubereq/pull/56) + diff --git a/lib/kubereq.ex b/lib/kubereq.ex index 8912e5a..210a095 100644 --- a/lib/kubereq.ex +++ b/lib/kubereq.ex @@ -1177,7 +1177,7 @@ defmodule Kubereq do name: name, operation: :connect, subresource: "log", - adapter: &Kubereq.PodLogs.run(&1) + adapter: &Kubereq.PodLogs.connect_and_stream(&1) ) |> Kubereq.PodLogs.args_to_opts() @@ -1288,7 +1288,7 @@ defmodule Kubereq do name: name, operation: :connect, subresource: "exec", - adapter: &Kubereq.PodExec.run(&1) + adapter: &Kubereq.PodExec.connect_and_stream(&1) ) |> Kubereq.PodExec.args_to_opts() diff --git a/lib/kubereq/connect.ex b/lib/kubereq/connect.ex index c02d26a..414eb58 100644 --- a/lib/kubereq/connect.ex +++ b/lib/kubereq/connect.ex @@ -1,114 +1,312 @@ defmodule Kubereq.Connect do @moduledoc false + + use GenServer + + require Logger + require Mint.HTTP + + @callback handle_frame(frame :: Mint.WebSocket.frame(), state :: term()) :: + {:noreply, new_state} + | {:noreply, new_state, timeout()} + | {:stop, reason, new_state} + when new_state: term(), reason: term() + + @callback code_change(old_vsn, state :: term(), extra :: term()) :: + {:ok, new_state :: term()} | {:error, reason :: term()} + when old_vsn: term() | {:down, term()} + + @callback init(init_arg :: term()) :: + {:ok, state :: any()} | {:stop, reason :: any()} + + @callback handle_info(msg :: :timeout | term(), state :: term()) :: + {:noreply, new_state} + | {:noreply, new_state, + timeout() | :hibernate | {:continue, continue_arg :: term()}} + | {:stop, reason :: term(), new_state} + when new_state: term() + + @callback handle_call(request :: term(), GenServer.from(), state :: term()) :: + {:reply, reply, new_state} + | {:reply, reply, new_state, + timeout() | :hibernate | {:continue, continue_arg :: term()}} + | {:noreply, new_state} + | {:noreply, new_state, + timeout() | :hibernate | {:continue, continue_arg :: term()}} + | {:stop, reason, reply, new_state} + | {:stop, reason, new_state} + when reply: term(), new_state: term(), reason: term() + + @callback handle_cast(request :: term(), state :: term()) :: + {:noreply, new_state} + | {:noreply, new_state, + timeout() | :hibernate | {:continue, continue_arg :: term()}} + | {:stop, reason :: term(), new_state} + when new_state: term() + + @callback handle_continue(continue_arg, state :: term()) :: + {:noreply, new_state} + | {:noreply, new_state, timeout() | :hibernate | {:continue, continue_arg}} + | {:stop, reason :: term(), new_state} + when new_state: term(), continue_arg: term() + + @callback terminate(reason, state :: term()) :: term() + when reason: :normal | :shutdown | {:shutdown, term()} | term() + + @callback format_status(status :: :gen_server.format_status()) :: + new_status :: :gen_server.format_status() + + @optional_callbacks code_change: 3, + terminate: 2, + handle_info: 2, + handle_cast: 2, + handle_call: 3, + format_status: 1, + handle_continue: 2, + init: 1 + + defstruct [:mint, :websocket, :ref, :handler_module, :handler_state] + defmacro __using__(opts) do - quote location: :keep do - @behaviour Fresh + quote do + @behaviour Kubereq.Connect - @doc false - def child_spec(args) do - %{ + require Logger + + def child_spec(init_arg) do + Keyword.validate!(init_arg, [:req, :state, :genserver_opts]) + {req, init_arg} = Keyword.fetch!(init_arg, :req) + {handler_state, init_arg} = Keyword.get(init_arg, :state, %{}) + {req, genserver_opts} = Keyword.fetch!(init_arg, :genserver_opts) + + default = %{ id: __MODULE__, - start: {__MODULE__, :start_link, [args]}, - restart: :transient + start: {Kubereq.Connect, :start_link, [__MODULE__, req, handler_state, genserver_opts]} } - |> Supervisor.child_spec(unquote(Macro.escape(opts))) - end - @doc false - def start_link(args) do - Kubereq.Connect.start_link(__MODULE__, args) + Supervisor.child_spec(default, unquote(Macro.escape(opts))) end + defoverridable child_spec: 1 + + @impl Kubereq.Connect + def init(state), do: {:ok, state} + + @impl Kubereq.Connect + def terminate(reason, state), do: :ok + @doc false - def start(args) do - Kubereq.Connect.start(__MODULE__, args) + @impl Kubereq.Connect + def handle_info(msg, state) do + proc = + case Process.info(self(), :registered_name) do + {_, []} -> self() + {_, name} -> name + end + + :logger.error( + %{ + label: {GenServer, :no_handle_info}, + report: %{ + module: __MODULE__, + message: msg, + name: proc + } + }, + %{ + domain: [:otp, :elixir], + error_logger: %{tag: :error_msg}, + report_cb: &GenServer.format_report/1 + } + ) + + {:noreply, state} end - @doc false - def handle_connect(_status, _headers, state), do: {:ok, state} + # Allow overriding handle_event + defoverridable Kubereq.Connect + end + end - @doc false - def handle_control(_message, state), do: {:ok, state} + def start_link(handler_module, req, handler_state, opts \\ []) do + {:ok, resp} = + req + |> Req.request( + kind: "Pod", + operation: :connect, + adapter: fn req -> + {:ok, pid} = + GenServer.start_link(__MODULE__, {req, handler_module, handler_state}, opts) - @doc false - def handle_in(frame, state), do: Kubereq.Connect.handle_in(__MODULE__, frame, state) + {req, Req.Response.new(status: 101, body: pid)} + end + ) - @doc false - def handle_info(_message, state), do: {:ok, state} + {:ok, resp.body} + end - @doc false - def handle_error({error, _reason}, state) - when error in [:encoding_failed, :casting_failed], - do: {:ignore, state} + @impl GenServer + def init({req, handler_module, handler_state}) do + with {:ok, mint, websocket, ref} <- connect(req), + {:ok, mint} <- Mint.HTTP.set_mode(mint, :active), + {:ok, handler_state} <- handler_module.init(handler_state) do + state = + struct(__MODULE__, + mint: mint, + websocket: websocket, + ref: ref, + handler_module: handler_module, + handler_state: handler_state + ) + + {:ok, state} + else + {:error, error} -> + {:stop, error} - def handle_error(_error, _state), do: :reconnect + {:stop, reason} -> + {:stop, reason} - @doc false - def handle_disconnect(_code, _reason, _state), do: :close + {:error, _mint, error} -> + {:stop, error} + end + end - @doc false - def handle_terminate(_reason, _state), do: :ok - - defoverridable child_spec: 1, - start_link: 1, - start: 1, - handle_connect: 3, - handle_control: 2, - handle_in: 2, - handle_info: 2, - handle_disconnect: 3, - handle_terminate: 2 + @impl GenServer + # Dialyzer doesn't like our pattern matching against opaque types in the error handler + @dialyzer {:no_opaque, handle_info: 2} + def handle_info(message, state) when Mint.HTTP.is_connection_message(state.mint, message) do + ref = state.ref + + with {:ok, mint, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(state.mint, message), + {:ok, websocket, frames} <- Mint.WebSocket.decode(state.websocket, data) do + state = %{state | mint: mint, websocket: websocket} + handle_frame(frames, {:noreply, state}) + else + {:error, %Mint.WebSocket{} = websocket, error} -> + {:stop, error, %{state | websocket: websocket}} + + {:error, mint, error} -> + {:stop, error, %{state | mint: mint}} + + {:error, mint, error, _} -> + {:stop, error, %{state | mint: mint}} end end - defdelegate close(dest, code, reason), to: Fresh - defdelegate open?(dest), to: Fresh + def handle_info(msg, state) do + state.handler_module.handle_info(msg, state.handler_state) + |> process_result(state) + end + + defp handle_frame([], result), do: result + + defp handle_frame([frame | frames], result) do + state = elem(result, 1) + + case state.handler_module.handle_frame(frame, state.handler_state) + |> process_result(state) do + {:stop, _reason, _new_user_state} = result -> + result - @spec close(dest :: :gen_statem.server_ref()) :: :ok - def close(dest), do: Fresh.send(dest, {:close, 1000, ""}) + result when elem(frame, 0) == :close -> + {:stop, :normal, elem(result, 1)} - @doc false - def start_link(module, args) do - do_start(module, args, &Fresh.start_link/4) + result -> + handle_frame(frames, result) + end end - @doc false - def start(module, args) do - do_start(module, args, &Fresh.start/4) + @impl GenServer + def handle_call(:open?, _from, state) do + {:reply, Mint.HTTP.open?(state.mint)} end - defp do_start(module, args, start_callback) do - req = Keyword.fetch!(args, :req) - state = Keyword.fetch!(args, :state) - opts = Keyword.fetch!(args, :opts) + def handle_call(request, from, state) do + state.handler_module.handle_call(request, from, state.handler_state) + |> process_result(state) + end - {:ok, resp} = - req - |> Req.merge( - kind: "Pod", - operation: :connect, - adapter: &run(&1, module, state, start_callback) - ) - |> Req.request(opts) + @impl GenServer + # Dialyzer doesn't like our pattern matching against opaque types in the error handler + @dialyzer {:no_opaque, handle_cast: 2} + def handle_cast({:send_frame, frame}, state) do + with {:ok, websocket, data} <- Mint.WebSocket.encode(state.websocket, frame), + {:ok, mint} <- Mint.WebSocket.stream_request_body(state.mint, state.ref, data) do + {:noreply, %{state | websocket: websocket, mint: mint}} + else + {:error, %Mint.WebSocket{} = websocket, error} -> + Logger.error(error) + {:noreply, %{state | websocket: websocket}} - {:ok, resp.body} + {:error, mint, error} -> + Logger.error(error) + {:noreply, %{state | mint: mint}} + end + end + + def handle_cast(request, state) do + state.handler_module.handle_cast(request, state.handler_state) + |> process_result(state) end - defp run(req, module, state, start) do - {_, _, scheme} = ws_scheme(req.url.scheme) + @impl GenServer + def handle_continue(continue_arg, state) do + state.handler_module.handle_continue(continue_arg, state.handler_state) + |> process_result(state) + end - uri = %{req.url | scheme: scheme} + @impl GenServer + def terminate(reason, state) do + state.handler_module.terminate(reason, state.handler_state) + end - headers = format_headres(req.headers) - opts = Keyword.merge(req.options.connect_options, headers: headers) + @impl GenServer + def code_change(old_vsn, state, extra) do + with {:ok, new_handler_state} <- + state.handler_module.code_change(old_vsn, state.handler_state, extra) do + {:ok, %{state | handler_state: new_handler_state}} + end + end - {:ok, pid} = start.(uri, module, state, opts) - {req, Req.Response.new(status: 101, body: pid)} + @impl GenServer + def format_status( + %{state: %{handler_module: handler_module, handler_state: handler_state} = state} = status + ) do + handler_module.format_status(%{status | state: handler_state}) + |> Map.update!(:state, &%{state | handler_state: &1}) end - def run(req, map_frame_fun) do + def format_status(format), do: format + + def send_frame(server, frame) do + GenServer.cast(server, {:send_frame, frame}) + end + + def close(server, code, reason) do + send_frame(server, {:close, code, reason}) + end + + def open?(server) do + GenServer.call(server, :open?) + end + + def connect_and_stream(req, map_frame_fun) do + with {:ok, mint, websocket, ref} <- connect(req) do + stream = create_stream(mint, ref, websocket, map_frame_fun) + {req, Req.Response.new(status: 101, body: stream)} + end + end + + defp connect(req) do uri = req.url - {http_scheme, ws_scheme, _} = ws_scheme(uri.scheme) + {http_scheme, ws_scheme} = ws_scheme(uri.scheme) + + conn_opts = + req.options.connect_options + |> Keyword.put(:mode, :passive) + |> Keyword.put_new(:protocols, [:http1]) + path = uri.path || "/" path = @@ -117,26 +315,19 @@ defmodule Kubereq.Connect do query -> path <> "?" <> query end - conn_opts = - req.options.connect_options - |> Keyword.put(:mode, :passive) - |> Keyword.put_new(:protocols, [:http1]) + headers = format_headers(req.headers) - headers = format_headres(req.headers) - - with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port, conn_opts), - {:ok, conn} <- Mint.HTTP.set_mode(conn, :passive), - {:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, path, headers), - {:ok, conn, upgrade_response} <- receive_upgrade_response(conn, ref), - {:ok, conn, websocket} <- + with {:ok, mint} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port, conn_opts), + {:ok, mint, ref} <- Mint.WebSocket.upgrade(ws_scheme, mint, path, headers), + {:ok, mint, upgrade_response} <- receive_upgrade_response(mint, ref), + {:ok, mint, websocket} <- Mint.WebSocket.new( - conn, + mint, ref, upgrade_response.status, upgrade_response.headers ) do - stream = create_stream(conn, ref, websocket, map_frame_fun) - {req, Req.Response.new(status: 101, body: stream)} + {:ok, mint, websocket, ref} else {:error, error} -> {req, error} @@ -146,60 +337,73 @@ defmodule Kubereq.Connect do end end - defp format_headres(headers) do - for {name, values} <- headers, value <- values, do: {name, value} + defp process_result(result, state) do + case result do + {no_or_reply, new_handler_state, timeout_continue} when no_or_reply in [:reply, :noreply] -> + {no_or_reply, %{state | handler_state: new_handler_state}, timeout_continue} + + {no_or_reply, new_handler_state} when no_or_reply in [:reply, :noreply] -> + {no_or_reply, %{state | handler_state: new_handler_state}} + + {:stop, reason, new_handler_state} -> + {:stop, reason, %{state | handler_state: new_handler_state}} + + {:stop, reason, reply, new_handler_state} -> + {:stop, reason, reply, %{state | handler_state: new_handler_state}} + end end - defp create_stream(conn, ref, websocket, map_frame_fun) do + defp receive_upgrade_response(mint, ref, response \\ %{}) do + case Mint.HTTP.recv(mint, 0, 10_000) do + {:ok, mint, parts} -> + response = + parts + |> Map.new(fn + {type, ^ref} -> {type, true} + {type, ^ref, value} -> {type, value} + end) + |> Map.merge(response) + + if response[:done], + do: {:ok, mint, response}, + else: receive_upgrade_response(mint, ref, response) + + {:error, mint, error, _} -> + {:error, mint, error} + end + end + + defp create_stream(mint, ref, websocket, map_frame_fun) do Stream.resource( - fn -> {[], conn, ref, websocket} end, + fn -> {[], mint, ref, websocket} end, fn - {[{:close, _, _} | _], conn, ref, websocket} -> - {:halt, {conn, ref, websocket}} + {[{:close, _, _} | _], mint, ref, websocket} -> + {:halt, {mint, ref, websocket}} - {[frame | rest], conn, ref, websocket} -> - {[map_frame_fun.(frame)], {rest, conn, ref, websocket}} + {[frame | rest], mint, ref, websocket} -> + {[map_frame_fun.(frame)], {rest, mint, ref, websocket}} - {[], conn, ref, websocket} -> - with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.recv(conn, 0, :infinity), + {[], mint, ref, websocket} -> + with {:ok, mint, [{:data, ^ref, data}]} <- Mint.WebSocket.recv(mint, 0, :infinity), {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data) do - {[], {frames, conn, ref, websocket}} + {[], {frames, mint, ref, websocket}} else - {:error, _conn, _error} -> + {:error, _mint, _error} -> {:halt, :ok} - {:ok, conn, _other} -> - {[], {[], conn, ref, websocket}} + {:ok, mint, _other} -> + {[], {[], mint, ref, websocket}} end end, fn _ -> :ok end ) end - @spec ws_scheme(binary()) :: {:http, :ws, binary()} | {:https, :wss, binary()} - defp ws_scheme("http"), do: {:http, :ws, "ws"} - defp ws_scheme("https"), do: {:https, :wss, "wss"} - - defp receive_upgrade_response(conn, ref) do - Enum.reduce_while(Stream.cycle([:ok]), {conn, %{}}, fn _, {conn, response} -> - case Mint.HTTP.recv(conn, 0, 10_000) do - {:ok, conn, parts} -> - response = - parts - |> Map.new(fn - {type, ^ref} -> {type, true} - {type, ^ref, value} -> {type, value} - end) - |> Map.merge(response) - - # credo:disable-for-lines:3 - if response[:done], - do: {:halt, {:ok, conn, response}}, - else: {:cont, {conn, response}} - - {:error, conn, error, _} -> - {:halt, {:error, conn, error}} - end - end) + defp format_headers(headers) do + for {name, values} <- headers, value <- values, do: {name, value} end + + @spec ws_scheme(binary()) :: {:http, :ws} | {:https, :wss} + defp ws_scheme("http"), do: {:http, :ws} + defp ws_scheme("https"), do: {:https, :wss} end diff --git a/lib/kubereq/pod_exec.ex b/lib/kubereq/pod_exec.ex index 66c697b..f39589a 100644 --- a/lib/kubereq/pod_exec.ex +++ b/lib/kubereq/pod_exec.ex @@ -81,47 +81,46 @@ defmodule Kubereq.PodExec do |> Keyword.put(:subresource, "exec") |> args_to_opts() - Kubereq.Connect.start_link(__MODULE__, req: req, state: %{into: into}, opts: opts) + req = Req.merge(req, opts) + + Kubereq.Connect.start_link(__MODULE__, req, %{into: into}) end - defdelegate open?(dest), to: Fresh - defdelegate close(dest, code, reason), to: Fresh + defdelegate open?(dest), to: Kubereq.Connect + defdelegate close(dest, code, reason), to: Kubereq.Connect @doc """ Send the given `data` to the container. """ @spec send_stdin(dest :: :gen_statem.server_ref(), data :: binary()) :: :ok def send_stdin(dest, data) do - Fresh.send(dest, {:text, <<0, data::binary>>}) + Kubereq.Connect.send_frame(dest, {:text, <<0, data::binary>>}) end @doc """ Close the connection and terminate the process. """ @spec close(dest :: :gen_statem.server_ref()) :: :ok - def close(dest), do: Fresh.send(dest, {:close, 1000, ""}) + def close(dest), do: Kubereq.Connect.send_frame(dest, {:close, 1000, ""}) - def handle_connect(_status, _headers, state) do + @impl Kubereq.Connect + def init(state) do send_frame(state.into, :connected) {:ok, state} end - def handle_disconnect(code, reason, state) do - send_frame(state.into, {:close, code, reason}) - :close - end - - def handle_in(frame, state) do + @impl Kubereq.Connect + def handle_frame(frame, state) do data = map_frame(frame) send_frame(state.into, data) - {:ok, state} + {:noreply, state} end defp send_frame({dest, ref}, frame), do: send(dest, {ref, frame}) defp send_frame(dest, frame), do: send(dest, frame) - def run(req) do - Kubereq.Connect.run(req, &map_frame/1) + def connect_and_stream(req) do + Kubereq.Connect.connect_and_stream(req, &map_frame/1) end defp map_frame(frame) do diff --git a/lib/kubereq/pod_logs.ex b/lib/kubereq/pod_logs.ex index e37e276..0d0b0b5 100644 --- a/lib/kubereq/pod_logs.ex +++ b/lib/kubereq/pod_logs.ex @@ -69,43 +69,38 @@ defmodule Kubereq.PodLogs do |> Keyword.put(:subresource, "log") |> args_to_opts() - Kubereq.Connect.start_link(__MODULE__, req: req, state: %{into: into}, opts: opts) + req = Req.merge(req, opts) + + Kubereq.Connect.start_link(__MODULE__, req, %{into: into}) end - defdelegate open?(dest), to: Fresh - defdelegate close(dest, code, reason), to: Fresh + defdelegate open?(dest), to: Kubereq.Connect + defdelegate close(dest, code, reason), to: Kubereq.Connect @doc """ Close the connection and terminate the process. """ @spec close(dest :: :gen_statem.server_ref()) :: :ok - def close(dest), do: Fresh.send(dest, {:close, 1000, ""}) - - def handle_disconnect(code, reason, state) do - send_frame(state.into, {:close, code, reason}) - :close - end + def close(dest), do: Kubereq.Connect.send_frame(dest, {:close, 1000, ""}) - def handle_in(frame, state) do + @impl Kubereq.Connect + def handle_frame(frame, state) do data = map_frame(frame) send_frame(state.into, data) - {:ok, state} + {:noreply, state} end defp send_frame({dest, ref}, frame), do: send(dest, {ref, frame}) defp send_frame(dest, frame), do: send(dest, frame) - def run(req) do - Kubereq.Connect.run(req, &map_frame/1) + def connect_and_stream(req) do + Kubereq.Connect.connect_and_stream(req, &map_frame/1) end defp map_frame(frame) do case frame do - {:binary, <>} -> - {:stdout, msg} - - other -> - other + {:binary, msg} -> {:stdout, msg} + other -> other end end diff --git a/mix.exs b/mix.exs index d27b6fb..6e8d4db 100644 --- a/mix.exs +++ b/mix.exs @@ -40,7 +40,6 @@ defmodule Kubereq.MixProject do {:yaml_elixir, "~> 2.0"}, {:mint, "~> 1.0"}, {:mint_web_socket, "~> 1.0"}, - {:fresh, "~> 0.4.4"}, {:req, "~> 0.5.0"}, # Test deps