Skip to content

Commit

Permalink
Add stream_response option to hackney adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Smart committed Oct 29, 2021
1 parent c1e0f2d commit a9e7766
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
44 changes: 40 additions & 4 deletions lib/tesla/adapter/hackney.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ if Code.ensure_loaded?(:hackney) do
end

defp format_body(data) when is_list(data), do: IO.iodata_to_binary(data)
defp format_body(data) when is_binary(data) or is_reference(data), do: data

defp format_body(data)
when is_binary(data) or is_reference(data) or is_function(data),
do: data

defp request(env, opts) do
request(
Expand All @@ -68,14 +71,22 @@ if Code.ensure_loaded?(:hackney) do
end

defp request(method, url, headers, body, opts) do
handle(:hackney.request(method, url, headers, body || '', opts))
response = :hackney.request(method, url, headers, body || '', opts)

case Keyword.get(opts, :stream_response, false) do
true -> handle(response, :stream)
false -> handle(response)
end
end

defp request_stream(method, url, headers, body, opts) do
with {:ok, ref} <- :hackney.request(method, url, headers, :stream, opts) do
case send_stream(ref, body) do
:ok -> handle(:hackney.start_response(ref))
error -> handle(error)
:ok ->
handle(:hackney.start_response(ref))

error ->
handle(error)
end
else
e -> handle(e)
Expand Down Expand Up @@ -106,6 +117,31 @@ if Code.ensure_loaded?(:hackney) do

defp handle({:ok, status, headers, body}), do: {:ok, status, headers, body}

defp handle({:ok, status, headers, ref}, :stream) when is_reference(ref) do
state = :hackney_manager.get_state(ref)

body =
Stream.resource(
fn -> state end,
fn
{:done, state} ->
{:halt, state}

{:ok, data, state} ->
{[data], state}

{:error, reason} ->
raise inspect(reason)

state ->
{[], :hackney_response.stream_body(state)}
end,
&:hackney_response.close/1
)

{:ok, status, headers, body}
end

defp handle_async_response({ref, %{headers: headers, status: status}})
when not (is_nil(headers) or is_nil(status)) do
{:ok, status, headers, ref}
Expand Down
12 changes: 12 additions & 0 deletions test/tesla/adapter/hackney_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,16 @@ defmodule Tesla.Adapter.HackneyTest do

assert {:error, :fake_error} = call(request)
end

test "get with `stream_response: true` option" do
request = %Env{
method: :get,
url: "#{@http}/ip"
}

assert {:ok, %Env{} = response} = call(request, stream_response: true)

assert response.status == 200
assert(is_function(response.body))
end
end

0 comments on commit a9e7766

Please sign in to comment.