Skip to content

Commit

Permalink
WIP: Refactor BadResponseCodeError error handling
Browse files Browse the repository at this point in the history
Instead of trying to handle this at the manticore adapter layer, let callers
decide how to handle exit codes and raise or generate errors. This will allow us
to DLQ 413 errors and not retry and hanlde 404 for template API interaction
without having to rely on catching generic errors.
  • Loading branch information
donoghuc committed Jan 10, 2025
1 parent a6f09c6 commit 82872d0
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 55 deletions.
45 changes: 22 additions & 23 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,15 @@ def join_bulk_responses(bulk_responses)

def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}

response = @pool.post(@bulk_path, params, body_stream.string)

@bulk_response_metrics.increment(response.code.to_s)

case response.code
when 200 # OK
if response.code == 200
LogStash::Json.load(response.body)
when 413 # Payload Too Large
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
else
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) if response.code == 413
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(response.code, url, body_stream.to_s, response.body)
end
end

Expand Down Expand Up @@ -414,13 +407,21 @@ def exists?(path, use_get=false)
end

def template_exists?(template_endpoint, name)
exists?("/#{template_endpoint}/#{name}")
response = @pool.get("/#{template_endpoint}/#{name}")
return true if response.code >= 200 && response.code <= 299
return false if response.code == 404
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, nil, response.body)
end

def template_put(template_endpoint, name, template)
path = "#{template_endpoint}/#{name}"
logger.info("Installing Elasticsearch template", name: name)
@pool.put(path, nil, LogStash::Json.dump(template))
path = "#{template_endpoint}/#{name}"
response = @pool.put(path, nil, LogStash::Json.dump(template))
if response.code < 200 || response.code > 299
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, template, response.body)
end
end

# ILM methods
Expand All @@ -432,16 +433,14 @@ def rollover_alias_exists?(name)

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
begin
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
response = @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
if response.code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
unless response.code >= 200 && response.code <= 299
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, alias_definition, response.body)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,6 @@ def perform_request(url, method, path, params={}, body=nil)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
end

# 404s are excluded because they are valid codes in the case of
# template installation. 413s are excluded to allow the bulk_send
# error handling to process "Payload Too Large" responses rather
# than triggering retries.
code = resp.code
if code < 200 || (code > 299 && ![404, 413].include?(code))
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
end

resp
end

Expand Down
26 changes: 14 additions & 12 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ def start_resurrectionist
# @return [Hash] deserialized license document or empty Hash upon any error
def get_license(url)
response = perform_request_to_url(url, :get, LICENSE_PATH)
# CODEREVIEW: is 404 OK here? Previously it seems this would not error/warn if licence check gets 404
if response.code < 200 || response.code > 299
logger.error("Unable to get license information", code: response.code, url: url.sanitized.to_s)
end
LogStash::Json.load(response.body)
rescue => e
logger.error("Unable to get license information", url: url.sanitized.to_s, exception: e.class, message: e.message)
Expand All @@ -253,13 +257,12 @@ def get_license(url)
def health_check_request(url)
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
begin
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
response = perform_request_to_url(url, :head, @healthcheck_path)
if response.code < 200 || response.code > 299
logger.warn("Health check failed", code: response.code, url: url.sanitized.to_s)
return nil, BadResponseCodeError.new(response.code, url, nil, response.body)
end
return response, nil
end

def healthcheck!(register_phase = true)
Expand Down Expand Up @@ -312,13 +315,12 @@ def healthcheck!(register_phase = true)
end

def get_root_path(url, params={})
begin
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
response = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
if response.code < 200 || response.code > 299
logger.warn("Elasticsearch main endpoint returns #{response.code}", body: response.body)
return nil, BadResponseCodeError.new(response.code, url, nil, response.body)
end
return response, nil
end

def test_serverless_connection(url, root_response)
Expand Down
8 changes: 6 additions & 2 deletions lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ module Common

attr_reader :hosts

# These codes apply to documents, not at the request level
DOC_DLQ_CODES = [400, 404]
# These codes apply to documents, not at the request level. While the 413 error technically is at the request level
# it should be treated like an error at the document level. Specifically when the payload is too large it is wholesale
# not accepted by ES, in this case it is dumped to DLQ and not retried. Note that this applies to batches or a single message,
# if the batch size results in a 413 due to exceeding ES limit *all* events in the batch are rejected, regardless of whether
# the individual parts that were rejected would have been accepted.
DOC_DLQ_CODES = [400, 404, 413]
DOC_SUCCESS_CODES = [200, 201]
DOC_CONFLICT_CODE = 409

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@
uri_with_path = uri.clone
uri_with_path.path = "/"

expect(::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError).to receive(:new).
with(resp.code, uri_with_path, nil, resp.body).and_call_original

expect do
subject.perform_request(uri, :get, "/")
end.to raise_error(::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError)
result = subject.perform_request(uri, :get, "/")
expect(result).to eq(resp)
expect(result.code).to eq(500)
end
end

Expand Down
24 changes: 22 additions & 2 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -915,9 +915,9 @@
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
if body.length > max_bytes
max_bytes *= 2 # ensure a successful retry
double("Response", :code => 413, :body => "")
double("Response", :code => 413, :body => "", :final_url => "http://localhost:9200/_bulk")
else
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}', :final_url => "http://localhost:9200/_bulk")
end
end
end
Expand Down Expand Up @@ -1102,7 +1102,14 @@
describe "SSL end to end" do
let(:do_register) { false } # skip the register in the global before block, as is called here.

let(:stub_http_client_pool!) do
[:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method|
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method)
end
end

before(:each) do
stub_http_client_pool!
stub_manticore_client!
subject.register
end
Expand Down Expand Up @@ -1292,9 +1299,15 @@
end

let(:options) { { 'cloud_id' => valid_cloud_id } }
let(:stub_http_client_pool!) do
[:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method|
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method)
end
end

before(:each) do
stub_manticore_client!
stub_http_client_pool!
end

it "should set host(s)" do
Expand Down Expand Up @@ -1325,8 +1338,15 @@

let(:options) { { 'cloud_auth' => LogStash::Util::Password.new('elastic:my-passwd-00') } }

let(:stub_http_client_pool!) do
[:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method|
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method)
end
end

before(:each) do
stub_manticore_client!
stub_http_client_pool!
end

it "should set host(s)" do
Expand Down
10 changes: 9 additions & 1 deletion spec/unit/outputs/elasticsearch_ssl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@
allow(manticore_double).to receive(:close)

response_double = double("manticore response").as_null_object
# Allow healtchecks
# Allow healthchecks and version checks
allow(response_double).to receive(:code).and_return(200)
allow(response_double).to receive(:body).and_return('{"version":{"number":"7.10.1"}}')
allow(manticore_double).to receive(:head).with(any_args).and_return(response_double)
allow(manticore_double).to receive(:get).with(any_args).and_return(response_double)
allow(::Manticore::Client).to receive(:new).and_return(manticore_double)

# Skip template installation etc
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:start)
allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:install_template)
allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:discover_cluster_uuid)
allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:ilm_in_use?).and_return(nil)
end

after do
Expand Down

0 comments on commit 82872d0

Please sign in to comment.