Skip to content

Commit

Permalink
WIP: Refactor BadResponseCodeError error handling
Browse files Browse the repository at this point in the history
Instead of trying to handle this at the manticore adapter layer, let callers
decide how to handle exit codes and raise or generate errors. This will allow us
to DLQ 413 errors and not retry and hanlde 404 for template API interaction
without having to rely on catching generic errors.
  • Loading branch information
donoghuc committed Jan 9, 2025
1 parent a6f09c6 commit 8427d29
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 40 deletions.
45 changes: 22 additions & 23 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,15 @@ def join_bulk_responses(bulk_responses)

def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}

response = @pool.post(@bulk_path, params, body_stream.string)

@bulk_response_metrics.increment(response.code.to_s)

case response.code
when 200 # OK
if response.code == 200
LogStash::Json.load(response.body)
when 413 # Payload Too Large
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
else
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) if response.code == 413
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(response.code, url, body_stream.to_s, response.body)
end
end

Expand Down Expand Up @@ -414,13 +407,21 @@ def exists?(path, use_get=false)
end

def template_exists?(template_endpoint, name)
exists?("/#{template_endpoint}/#{name}")
response = @pool.get("/#{template_endpoint}/#{name}")
return true if response.code >= 200 && response.code <= 299
return false if response.code == 404
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, nil, response.body)
end

def template_put(template_endpoint, name, template)
path = "#{template_endpoint}/#{name}"
logger.info("Installing Elasticsearch template", name: name)
@pool.put(path, nil, LogStash::Json.dump(template))
path = "#{template_endpoint}/#{name}"
response = @pool.put(path, nil, LogStash::Json.dump(template))
if response.code < 200 || response.code > 299
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, template, response.body)
end
end

# ILM methods
Expand All @@ -432,16 +433,14 @@ def rollover_alias_exists?(name)

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
begin
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
response = @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
if response.code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
unless rresponse.code >= 200 && response.code <= 299
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, alias_definition, response.body)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,6 @@ def perform_request(url, method, path, params={}, body=nil)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
end

# 404s are excluded because they are valid codes in the case of
# template installation. 413s are excluded to allow the bulk_send
# error handling to process "Payload Too Large" responses rather
# than triggering retries.
code = resp.code
if code < 200 || (code > 299 && ![404, 413].include?(code))
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
end

resp
end

Expand Down
11 changes: 5 additions & 6 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,12 @@ def get_license(url)
def health_check_request(url)
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
begin
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
response = perform_request_to_url(url, :head, @healthcheck_path)
if response.code < 200 || response.code > 299
logger.warn("Health check failed", code: response.code, url: url.sanitized.to_s)
return nil, BadResponseCodeError.new(response.code, url, nil, response.body)
end
return response, nil
end

def healthcheck!(register_phase = true)
Expand Down
8 changes: 6 additions & 2 deletions lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ module Common

attr_reader :hosts

# These codes apply to documents, not at the request level
DOC_DLQ_CODES = [400, 404]
# These codes apply to documents, not at the request level. While the 413 error technically is at the request level
# it should be treated like an error at the document level. Specifically when the payload is too large it is wholesale
# not accepted by ES, in this case it is dumped to DLQ and not retried. Note that this applies to batches or a single message,
# if the batch size results in a 413 due to exceeding ES limit *all* events in the batch are rejected, regardless of whether
# the individual parts that were rejected would have been accepted.
DOC_DLQ_CODES = [400, 404, 413]
DOC_SUCCESS_CODES = [200, 201]
DOC_CONFLICT_CODE = 409

Expand Down

0 comments on commit 8427d29

Please sign in to comment.