Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update HTTPoison functions #54

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
language: elixir
elixir:
- 1.6
- 1.7
- 1.8
otp_release:
- 20.3
- 21.0
- 22.3
notifications:
recipients:
- [email protected]
Expand All @@ -18,6 +18,4 @@ cache:
script:
- mix do deps.get, deps.compile, compile
- mix test
- if [[ `elixir -v` = *"1.7"* ]]; then
travis_wait 30 mix dialyzer --halt-exit-status --format term;
fi
- travis_wait 30 mix dialyzer --halt-exit-status --format term;
54 changes: 33 additions & 21 deletions lib/forcex/api/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Forcex.Api.Http do
@type forcex_response :: map | {number, any} | String.t

def raw_request(method, url, body, headers, options) do
response = method |> request!(url, body, headers, extra_options() ++ options) |> process_response
response = method |> request!(url, body, headers, extra_options() ++ options) |> process_response()
Logger.debug("#{__ENV__.module}.#{elem(__ENV__.function, 0)} response=" <> inspect(response))
response
end
Expand All @@ -25,29 +25,41 @@ defmodule Forcex.Api.Http do
Application.get_env(:forcex, :request_options, [])
end

@spec process_response(HTTPoison.Response.t) :: forcex_response
defp process_response(%HTTPoison.Response{body: body, headers: %{"Content-Encoding" => "gzip"} = headers} = resp) do
%{resp | body: :zlib.gunzip(body), headers: Map.drop(headers, ["Content-Encoding"])}
|> process_response
end
defp process_response(%HTTPoison.Response{body: body, headers: %{"Content-Encoding" => "deflate"} = headers} = resp) do
zstream = :zlib.open
:ok = :zlib.inflateInit(zstream, -15)
uncompressed_data = zstream |> :zlib.inflate(body) |> Enum.join
:zlib.inflateEnd(zstream)
:zlib.close(zstream)
%{resp | body: uncompressed_data, headers: Map.drop(headers, ["Content-Encoding"])}
|> process_response
end
defp process_response(%HTTPoison.Response{body: body, headers: %{"Content-Type" => "application/json" <> _} = headers} = resp) do
%{resp | body: Poison.decode!(body, keys: :atoms), headers: Map.drop(headers, ["Content-Type"])}
|> process_response
def process_response(%HTTPoison.Response{body: body, headers: headers} = resp) do
cond do
"gzip" = find_header(headers, "Content-Encoding") ->
%{resp | body: :zlib.gunzip(body), headers: List.keydelete(headers, "Content-Encoding", 0)}
|> process_response()

"deflate" = find_header(headers, "Content-Encoding") ->
zstream = :zlib.open
:ok = :zlib.inflateInit(zstream, -15)
uncompressed_data = zstream |> :zlib.inflate(body) |> Enum.join
:zlib.inflateEnd(zstream)
:zlib.close(zstream)
%{resp | body: uncompressed_data, headers: List.delete(headers, {"Content-Encoding", "deflate"})}
|> process_response()

"application/json" <> suffix = find_header(headers, "Content-Type") ->
%{resp | body: Poison.decode!(body, keys: :atoms), headers: List.delete(headers, {"Content-Type", "application/json" <> suffix})}
|> process_response()
true ->
resp
end
end
defp process_response(%HTTPoison.Response{body: body, status_code: 200}), do: body
defp process_response(%HTTPoison.Response{body: body, status_code: status}), do: {status, body}
def process_response(%HTTPoison.Response{body: body, status_code: 200}), do: body
def process_response(%HTTPoison.Response{body: body, status_code: status}), do: {status, body}

def process_request_headers(headers), do: headers ++ @user_agent ++ @accept ++ @accept_encoding

@spec process_headers(list({String.t, String.t})) :: map
def process_headers(headers), do: Map.new(headers)

defp find_header(headers, header_name) do
Enum.find_value(
headers,
fn {name, value} ->
name =~ ~r/#{header_name}/i && String.downcase(value)
end
)
end
end
44 changes: 29 additions & 15 deletions lib/forcex/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ defmodule Forcex.Bulk do

def process_headers(headers), do: Map.new(headers)

def process_response(%HTTPoison.Response{body: body, headers: %{"Content-Encoding" => "gzip"} = headers } = resp) do
%{resp | body: :zlib.gunzip(body), headers: Map.drop(headers, ["Content-Encoding"])}
|> process_response
end
def process_response(%HTTPoison.Response{body: body, headers: %{"Content-Type" => "application/json" <> _} = headers} = resp) do
%{resp | body: Poison.decode!(body, keys: :atoms), headers: Map.drop(headers, ["Content-Type"])}
|> process_response
def process_response(%HTTPoison.Response{body: body, headers: headers} = resp) do
cond do
"gzip" = find_header(headers, "Content-Encoding") ->
%{resp | body: :zlib.gunzip(body), headers: List.delete(headers, {"Content-Encoding", "gzip"})}
|> process_response()

"application/json" <> suffix = find_header(headers, "Content-Type") ->
%{resp | body: Poison.decode!(body, keys: :atoms), headers: List.delete(headers, {"Content-Type", "application/json" <> suffix})}
|> process_response()
true ->
resp
end
end

def process_response(%HTTPoison.Response{body: body, status_code: status}) when status < 300 and status >= 200, do: body
def process_response(%HTTPoison.Response{body: body, status_code: status}), do: {status, body}

Expand All @@ -44,34 +50,34 @@ defmodule Forcex.Bulk do
request!(method, url, body, headers, extra_options() ++ options) |> process_response
end

def get(path, headers \\ [], client) do
def client_get(path, headers \\ [], client) do
url = "https://#{client.host}/services/async/#{client.api_version}" <> path
raw_request(:get, url, "", headers ++ authorization_header(client), [])
end

def post(path, body \\ "", client) do
def client_post(path, body \\ "", client) do
url = "https://#{client.host}/services/async/#{client.api_version}" <> path
json_request(:post, url, body, authorization_header(client), [])
end

@spec create_query_job(binary, map) :: job
def create_query_job(sobject, client) do
payload = %{"operation" => "query", "object" => sobject, "concurrencyMode" => "Parallel", "contentType" => "JSON"}
post("/job", payload, client)
client_post("/job", payload, client)
end

@spec close_job(job | id, map) :: job
def close_job(job, client) when is_map(job) do
close_job(job.id, client)
end
def close_job(id, client) when is_binary(id) do
post("/job/#{id}", %{"state" => "Closed"}, client)
client_post("/job/#{id}", %{"state" => "Closed"}, client)
end

@spec fetch_job_status(job | id, map) :: job
def fetch_job_status(job, client) when is_map(job), do: fetch_job_status(job.id, client)
def fetch_job_status(id, client) when is_binary(id) do
get("/job/#{id}", client)
client_get("/job/#{id}", client)
end

@spec create_query_batch(String.t, job | id, map) :: job
Expand All @@ -90,7 +96,7 @@ defmodule Forcex.Bulk do
fetch_batch_status(id, job.id, client)
end
def fetch_batch_status(id, job_id, client) when is_binary(id) and is_binary(job_id) do
get("/job/#{job_id}/batch/#{id}", client)
client_get("/job/#{job_id}/batch/#{id}", client)
end

@spec fetch_batch_result_status(batch, map) :: list(String.t)
Expand All @@ -99,7 +105,7 @@ defmodule Forcex.Bulk do
end
@spec fetch_batch_result_status(id, id, map) :: list(String.t)
def fetch_batch_result_status(batch_id, job_id, client) when is_binary(batch_id) and is_binary(job_id) do
get("/job/#{job_id}/batch/#{batch_id}/result", client)
client_get("/job/#{job_id}/batch/#{batch_id}/result", client)
end

@spec fetch_results(id, batch, map) :: list(map)
Expand All @@ -108,7 +114,15 @@ defmodule Forcex.Bulk do
end
@spec fetch_results(id, id, id, map) :: list(map)
def fetch_results(id, batch_id, job_id, client) when is_binary(id) and is_binary(batch_id) and is_binary(job_id) do
get("/job/#{job_id}/batch/#{batch_id}/result/#{id}", client)
client_get("/job/#{job_id}/batch/#{batch_id}/result/#{id}", client)
end

defp find_header(headers, header_name) do
Enum.find_value(
headers,
fn {name, value} ->
name =~ ~r/#{header_name}/i && String.downcase(value)
end
)
end
end
8 changes: 2 additions & 6 deletions lib/forcex/bulk/batch_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ defmodule Forcex.Bulk.BatchWorker do
seen_results = Keyword.get(state, :results, [])

results = Forcex.Bulk.fetch_batch_result_status(batch, client)
case (results -- seen_results) do
list when is_list(list) ->
for result <- list do
notify_handlers({:batch_partial_result_ready, batch, result}, handlers)
end
_ -> true
for result <- (results -- seen_results) do
notify_handlers({:batch_partial_result_ready, batch, result}, handlers)
end

Keyword.put(state, :results, results)
Expand Down
Loading