Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not retry on 413 response codes #1199

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 12.0.1
- Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199)

## 12.0.0
- SSL settings that were marked deprecated in version `11.14.0` are now marked obsolete, and will prevent the plugin from starting.
- These settings are:
Expand Down
45 changes: 22 additions & 23 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,15 @@ def join_bulk_responses(bulk_responses)

def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}

response = @pool.post(@bulk_path, params, body_stream.string)

@bulk_response_metrics.increment(response.code.to_s)

case response.code
when 200 # OK
if response.code == 200
LogStash::Json.load(response.body)
when 413 # Payload Too Large
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
else
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) if response.code == 413
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(response.code, url, body_stream.to_s, response.body)
end
end

Expand Down Expand Up @@ -414,13 +407,21 @@ def exists?(path, use_get=false)
end

def template_exists?(template_endpoint, name)
exists?("/#{template_endpoint}/#{name}")
response = @pool.get("/#{template_endpoint}/#{name}")
return true if response.code >= 200 && response.code <= 299
return false if response.code == 404
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, nil, response.body)
end

def template_put(template_endpoint, name, template)
path = "#{template_endpoint}/#{name}"
logger.info("Installing Elasticsearch template", name: name)
@pool.put(path, nil, LogStash::Json.dump(template))
path = "#{template_endpoint}/#{name}"
response = @pool.put(path, nil, LogStash::Json.dump(template))
if response.code < 200 || response.code > 299
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, template, response.body)
end
end

# ILM methods
Expand All @@ -432,16 +433,14 @@ def rollover_alias_exists?(name)

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
begin
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
response = @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
if response.code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
unless response.code >= 200 && response.code <= 299
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise BadResponseCodeError.new(response.code, url, alias_definition, response.body)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,6 @@ def perform_request(url, method, path, params={}, body=nil)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
end

# 404s are excluded because they are valid codes in the case of
# template installation. We might need a better story around this later
# but for our current purposes this is correct
code = resp.code
if code < 200 || code > 299 && code != 404
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
end

resp
end

Expand Down
26 changes: 14 additions & 12 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ def start_resurrectionist
# @return [Hash] deserialized license document or empty Hash upon any error
def get_license(url)
response = perform_request_to_url(url, :get, LICENSE_PATH)
# CODEREVIEW: is 404 OK here? Previously it seems this would not error/warn if licence check gets 404
if response.code < 200 || response.code > 299
logger.error("Unable to get license information", code: response.code, url: url.sanitized.to_s)
end
LogStash::Json.load(response.body)
rescue => e
logger.error("Unable to get license information", url: url.sanitized.to_s, exception: e.class, message: e.message)
Expand All @@ -253,13 +257,12 @@ def get_license(url)
def health_check_request(url)
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
begin
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
response = perform_request_to_url(url, :head, @healthcheck_path)
if response.code < 200 || response.code > 299
logger.warn("Health check failed", code: response.code, url: url.sanitized.to_s)
return nil, BadResponseCodeError.new(response.code, url, nil, response.body)
end
return response, nil
end

def healthcheck!(register_phase = true)
Expand Down Expand Up @@ -312,13 +315,12 @@ def healthcheck!(register_phase = true)
end

def get_root_path(url, params={})
begin
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
response = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
if response.code < 200 || response.code > 299
logger.warn("Elasticsearch main endpoint returns #{response.code}", body: response.body)
return nil, BadResponseCodeError.new(response.code, url, nil, response.body)
end
return response, nil
end

def test_serverless_connection(url, root_response)
Expand Down
8 changes: 6 additions & 2 deletions lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ module Common

attr_reader :hosts

# These codes apply to documents, not at the request level
DOC_DLQ_CODES = [400, 404]
# These codes apply to documents, not at the request level. While the 413 error technically is at the request level
# it should be treated like an error at the document level. Specifically when the payload is too large it is wholesale
# not accepted by ES, in this case it is dumped to DLQ and not retried. Note that this applies to batches or a single message,
# if the batch size results in a 413 due to exceeding ES limit *all* events in the batch are rejected, regardless of whether
# the individual parts that were rejected would have been accepted.
DOC_DLQ_CODES = [400, 404, 413]
DOC_SUCCESS_CODES = [200, 201]
DOC_CONFLICT_CODE = 409

Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '12.0.0'
s.version = '12.0.1'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@
uri_with_path = uri.clone
uri_with_path.path = "/"

expect(::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError).to receive(:new).
with(resp.code, uri_with_path, nil, resp.body).and_call_original

expect do
subject.perform_request(uri, :get, "/")
end.to raise_error(::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError)
result = subject.perform_request(uri, :get, "/")
expect(result).to eq(resp)
expect(result.code).to eq(500)
end
end

Expand Down
24 changes: 22 additions & 2 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -915,9 +915,9 @@
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
if body.length > max_bytes
max_bytes *= 2 # ensure a successful retry
double("Response", :code => 413, :body => "")
double("Response", :code => 413, :body => "", :final_url => "http://localhost:9200/_bulk")
else
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}', :final_url => "http://localhost:9200/_bulk")
end
end
end
Expand Down Expand Up @@ -1102,7 +1102,14 @@
describe "SSL end to end" do
let(:do_register) { false } # skip the register in the global before block, as is called here.

let(:stub_http_client_pool!) do
[:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method|
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method)
end
end

before(:each) do
stub_http_client_pool!
stub_manticore_client!
subject.register
end
Expand Down Expand Up @@ -1292,9 +1299,15 @@
end

let(:options) { { 'cloud_id' => valid_cloud_id } }
let(:stub_http_client_pool!) do
[:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method|
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method)
end
end

before(:each) do
stub_manticore_client!
stub_http_client_pool!
end

it "should set host(s)" do
Expand Down Expand Up @@ -1325,8 +1338,15 @@

let(:options) { { 'cloud_auth' => LogStash::Util::Password.new('elastic:my-passwd-00') } }

let(:stub_http_client_pool!) do
[:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method|
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method)
end
end

before(:each) do
stub_manticore_client!
stub_http_client_pool!
end

it "should set host(s)" do
Expand Down
10 changes: 9 additions & 1 deletion spec/unit/outputs/elasticsearch_ssl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@
allow(manticore_double).to receive(:close)

response_double = double("manticore response").as_null_object
# Allow healtchecks
# Allow healthchecks and version checks
allow(response_double).to receive(:code).and_return(200)
allow(response_double).to receive(:body).and_return('{"version":{"number":"7.10.1"}}')
allow(manticore_double).to receive(:head).with(any_args).and_return(response_double)
allow(manticore_double).to receive(:get).with(any_args).and_return(response_double)
allow(::Manticore::Client).to receive(:new).and_return(manticore_double)

# Skip template installation etc
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:start)
allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:install_template)
allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:discover_cluster_uuid)
allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:ilm_in_use?).and_return(nil)
end

after do
Expand Down
Loading