diff --git a/CHANGELOG.md b/CHANGELOG.md index cdcdff6d..1ec0a6dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 12.0.1 + - Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199) + ## 12.0.0 - SSL settings that were marked deprecated in version `11.14.0` are now marked obsolete, and will prevent the plugin from starting. - These settings are: diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index e0b70e36..e7fc1b1b 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -181,22 +181,15 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} - response = @pool.post(@bulk_path, params, body_stream.string) - @bulk_response_metrics.increment(response.code.to_s) - case response.code - when 200 # OK + if response.code == 200 LogStash::Json.load(response.body) - when 413 # Payload Too Large - logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') else + logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) if response.code == 413 url = ::LogStash::Util::SafeURI.new(response.final_url) - raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( - response.code, url, body_stream.to_s, response.body - ) + raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(response.code, url, body_stream.to_s, response.body) end end @@ -414,13 +407,21 @@ def exists?(path, use_get=false) end def template_exists?(template_endpoint, name) - exists?("/#{template_endpoint}/#{name}") + response = @pool.get("/#{template_endpoint}/#{name}") + return true if response.code >= 200 && response.code <= 299 + return false if response.code == 404 + url = ::LogStash::Util::SafeURI.new(response.final_url) + raise BadResponseCodeError.new(response.code, url, nil, response.body) end def template_put(template_endpoint, name, template) - path = "#{template_endpoint}/#{name}" logger.info("Installing Elasticsearch template", name: name) - @pool.put(path, nil, LogStash::Json.dump(template)) + path = "#{template_endpoint}/#{name}" + response = @pool.put(path, nil, LogStash::Json.dump(template)) + if response.code < 200 || response.code > 299 + url = ::LogStash::Util::SafeURI.new(response.final_url) + raise BadResponseCodeError.new(response.code, url, template, response.body) + end end # ILM methods @@ -432,16 +433,14 @@ def rollover_alias_exists?(name) # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition) - begin - @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) - logger.info("Created rollover alias", name: alias_name) - # If the rollover alias already exists, ignore the error that comes back from Elasticsearch - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - if e.response_code == 400 - logger.info("Rollover alias already exists, skipping", name: alias_name) - return - end - raise e + response = @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) + if response.code == 400 + logger.info("Rollover alias already exists, skipping", name: alias_name) + return + end + unless response.code >= 200 && response.code <= 299 + url = ::LogStash::Util::SafeURI.new(response.final_url) + raise BadResponseCodeError.new(response.code, url, alias_definition, response.body) end end diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index c9e49ec7..ef64a5da 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -76,14 +76,6 @@ def perform_request(url, method, path, params={}, body=nil) raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string) end - # 404s are excluded because they are valid codes in the case of - # template installation. We might need a better story around this later - # but for our current purposes this is correct - code = resp.code - if code < 200 || code > 299 && code != 404 - raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body) - end - resp end diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index fb3194cc..cab3bcdf 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -244,6 +244,10 @@ def start_resurrectionist # @return [Hash] deserialized license document or empty Hash upon any error def get_license(url) response = perform_request_to_url(url, :get, LICENSE_PATH) + # CODEREVIEW: is 404 OK here? Previously it seems this would not error/warn if licence check gets 404 + if response.code < 200 || response.code > 299 + logger.error("Unable to get license information", code: response.code, url: url.sanitized.to_s) + end LogStash::Json.load(response.body) rescue => e logger.error("Unable to get license information", url: url.sanitized.to_s, exception: e.class, message: e.message) @@ -253,13 +257,12 @@ def get_license(url) def health_check_request(url) logger.debug("Running health check to see if an Elasticsearch connection is working", :healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path) - begin - response = perform_request_to_url(url, :head, @healthcheck_path) - return response, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) - return nil, e + response = perform_request_to_url(url, :head, @healthcheck_path) + if response.code < 200 || response.code > 299 + logger.warn("Health check failed", code: response.code, url: url.sanitized.to_s) + return nil, BadResponseCodeError.new(response.code, url, nil, response.body) end + return response, nil end def healthcheck!(register_phase = true) @@ -312,13 +315,12 @@ def healthcheck!(register_phase = true) end def get_root_path(url, params={}) - begin - resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) - return resp, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) - return nil, e + response = perform_request_to_url(url, :get, ROOT_URI_PATH, params) + if response.code < 200 || response.code > 299 + logger.warn("Elasticsearch main endpoint returns #{response.code}", body: response.body) + return nil, BadResponseCodeError.new(response.code, url, nil, response.body) end + return response, nil end def test_serverless_connection(url, root_response) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 977ef204..c789dbfc 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -7,8 +7,12 @@ module Common attr_reader :hosts - # These codes apply to documents, not at the request level - DOC_DLQ_CODES = [400, 404] + # These codes apply to documents, not at the request level. While the 413 error technically is at the request level + # it should be treated like an error at the document level. Specifically when the payload is too large it is wholesale + # not accepted by ES, in this case it is dumped to DLQ and not retried. Note that this applies to batches or a single message, + # if the batch size results in a 413 due to exceeding ES limit *all* events in the batch are rejected, regardless of whether + # the individual parts that were rejected would have been accepted. + DOC_DLQ_CODES = [400, 404, 413] DOC_SUCCESS_CODES = [200, 201] DOC_CONFLICT_CODE = 409 diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 8c6ef259..090b1064 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '12.0.0' + s.version = '12.0.1' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/unit/outputs/elasticsearch/http_client/manticore_adapter_spec.rb b/spec/unit/outputs/elasticsearch/http_client/manticore_adapter_spec.rb index 626cf730..ec3b2135 100644 --- a/spec/unit/outputs/elasticsearch/http_client/manticore_adapter_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/manticore_adapter_spec.rb @@ -62,12 +62,9 @@ uri_with_path = uri.clone uri_with_path.path = "/" - expect(::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError).to receive(:new). - with(resp.code, uri_with_path, nil, resp.body).and_call_original - - expect do - subject.perform_request(uri, :get, "/") - end.to raise_error(::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError) + result = subject.perform_request(uri, :get, "/") + expect(result).to eq(resp) + expect(result.code).to eq(500) end end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 95cffa85..190937b0 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -915,9 +915,9 @@ allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body| if body.length > max_bytes max_bytes *= 2 # ensure a successful retry - double("Response", :code => 413, :body => "") + double("Response", :code => 413, :body => "", :final_url => "http://localhost:9200/_bulk") else - double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}') + double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}', :final_url => "http://localhost:9200/_bulk") end end end @@ -1102,7 +1102,14 @@ describe "SSL end to end" do let(:do_register) { false } # skip the register in the global before block, as is called here. + let(:stub_http_client_pool!) do + [:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method| + allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method) + end + end + before(:each) do + stub_http_client_pool! stub_manticore_client! subject.register end @@ -1292,9 +1299,15 @@ end let(:options) { { 'cloud_id' => valid_cloud_id } } + let(:stub_http_client_pool!) do + [:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method| + allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method) + end + end before(:each) do stub_manticore_client! + stub_http_client_pool! end it "should set host(s)" do @@ -1325,8 +1338,15 @@ let(:options) { { 'cloud_auth' => LogStash::Util::Password.new('elastic:my-passwd-00') } } + let(:stub_http_client_pool!) do + [:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method| + allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method) + end + end + before(:each) do stub_manticore_client! + stub_http_client_pool! end it "should set host(s)" do diff --git a/spec/unit/outputs/elasticsearch_ssl_spec.rb b/spec/unit/outputs/elasticsearch_ssl_spec.rb index 5cf3ecad..e5ac7a09 100644 --- a/spec/unit/outputs/elasticsearch_ssl_spec.rb +++ b/spec/unit/outputs/elasticsearch_ssl_spec.rb @@ -15,10 +15,18 @@ allow(manticore_double).to receive(:close) response_double = double("manticore response").as_null_object - # Allow healtchecks + # Allow healthchecks and version checks + allow(response_double).to receive(:code).and_return(200) + allow(response_double).to receive(:body).and_return('{"version":{"number":"7.10.1"}}') allow(manticore_double).to receive(:head).with(any_args).and_return(response_double) allow(manticore_double).to receive(:get).with(any_args).and_return(response_double) allow(::Manticore::Client).to receive(:new).and_return(manticore_double) + + # Skip template installation etc + allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:start) + allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:install_template) + allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:discover_cluster_uuid) + allow_any_instance_of(LogStash::Outputs::ElasticSearch).to receive(:ilm_in_use?).and_return(nil) end after do