Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite plugin using http client mixin #28

Merged
merged 20 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

# Copyright 2021 Dynatrace LLC
# Copyright 2023 Dynatrace LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
7 changes: 7 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Logstash Output Dynatrace
Copyright 2023 Dynatrace LLC

The Initial Developer of some parts of this plugin, which are copied from,
derived from, or inspired by Logstash Output HTTP, is
Elastic (https://www.elastic.co) and Logstash contributors.
Copyright 2020 Elastic and contributors. All rights reserved.
3 changes: 1 addition & 2 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env rake
# frozen_string_literal: true

# Copyright 2021 Dynatrace LLC
# Copyright 2023 Dynatrace LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'logstash/devutils/rake'
require 'rubocop/rake_task'

RuboCop::RakeTask.new
308 changes: 243 additions & 65 deletions lib/logstash/outputs/dynatrace.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

# Copyright 2021 Dynatrace LLC
# Copyright 2023 Dynatrace LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,24 +14,48 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'logstash/namespace'
require 'logstash/outputs/base'
require 'logstash/namespace'
require 'logstash/json'
require 'openssl'
require 'uri'
require 'logstash/plugin_mixins/http_client'

MAX_RETRIES = 5
PLUGIN_VERSION = '0.4.0'

# These constants came from the http plugin config but we don't want them configurable
# If encountered as response codes this plugin will retry these requests
RETRYABLE_CODES = [429, 500, 502, 503, 504].freeze
RETRY_FAILED = true

module LogStash
module Outputs
class RetryableError < StandardError;
end

# An output which sends logs to the Dynatrace log ingest v2 endpoint formatted as JSON
class Dynatrace < LogStash::Outputs::Base
config_name 'dynatrace'
include LogStash::PluginMixins::HttpClient

concurrency :shared

RETRYABLE_MANTICORE_EXCEPTIONS = [
::Manticore::Timeout,
::Manticore::SocketException,
::Manticore::ClientProtocolException,
::Manticore::ResolutionFailure,
::Manticore::SocketTimeout
].freeze

RETRYABLE_UNKNOWN_EXCEPTION_STRINGS = [
/Connection reset by peer/i,
/Read Timed out/i
].freeze

concurrency :single
class PluginInternalQueueLeftoverError < StandardError; end

# This output will execute up to 'pool_max' requests in parallel for performance.
# Consider this when tuning this plugin for performance.
#
# Additionally, note that when parallel execution is used strict ordering of events is not
# guaranteed!

config_name 'dynatrace'

# The full URL of the Dynatrace log ingestion endpoint:
# - on SaaS: https://{your-environment-id}.live.dynatrace.com/api/v2/logs/ingest
Expand All @@ -44,80 +68,234 @@ class Dynatrace < LogStash::Outputs::Base
# Disable SSL validation by setting :verify_mode OpenSSL::SSL::VERIFY_NONE
config :ssl_verify_none, validate: :boolean, default: false

default :codec, 'json'
# Include headers in debug logs when HTTP errors occur. Headers include sensitive data such as API tokens.
config :debug_include_headers, validate: :boolean, default: false

attr_accessor :uri, :plugin_version
# Include body in debug logs when HTTP errors occur. Body may be large and include sensitive data.
config :debug_include_body, validate: :boolean, default: false
dyladan marked this conversation as resolved.
Show resolved Hide resolved

def register
@logger.debug("Registering plugin")
require 'net/https'
require 'uri'
@uri = URI.parse(@ingest_endpoint_url.uri.to_s)
@client = Net::HTTP.new(@uri.host, @uri.port)

if uri.scheme == 'https'
@client.use_ssl = true
@client.verify_mode = OpenSSL::SSL::VERIFY_NONE if @ssl_verify_none
# ssl_verification_mode config is from mixin but ssl_verify_none is our documented config
@ssl_verification_mode = 'none' if @ssl_verify_none

@ingest_endpoint_url = @ingest_endpoint_url.to_s

# TODO: I don't really understand how this mechanism works. Does it work?
# TODO try to remove this and see what happens
# We count outstanding requests with this queue
# This queue tracks the requests to create backpressure
# When this queue is empty no new requests may be sent,
# tokens must be added back by the client on success
@request_tokens = SizedQueue.new(@pool_max)
@pool_max.times { |_t| @request_tokens << true }
@requests = []
dyladan marked this conversation as resolved.
Show resolved Hide resolved

# Run named Timer as daemon thread
@timer = java.util.Timer.new("HTTP Output #{params['id']}", true)
end

def multi_receive(events)
return if events.empty?

send_events(events)
end

class RetryTimerTask < java.util.TimerTask
def initialize(pending, event, attempt)
@pending = pending
@event = event
@attempt = attempt
super()
end

def run
@pending << [@event, @attempt]
end
@logger.info('Client', client: @client.inspect)
end

def headers
def make_headers
{
'User-Agent' => "logstash-output-dynatrace/#{PLUGIN_VERSION}",
'Content-Type' => 'application/json; charset=utf-8',
'Authorization' => "Api-Token #{@api_key.value}"
}
end

# Takes an array of events
def multi_receive(events)
@logger.debug("Received #{events.length} events")
return if events.length.zero?

retries = 0
begin
request = Net::HTTP::Post.new(uri, headers)
request.body = "#{LogStash::Json.dump(events.map(&:to_hash)).chomp}\n"
response = @client.request(request)

case response
when Net::HTTPSuccess
@logger.debug("successfully sent #{events.length} events#{" with #{retries} retries" if retries > 0}")
when Net::HTTPServerError
@logger.error("Encountered an HTTP server error", :message => response.message, :code => response.code, :body => response.body) if retries == 0
when Net::HTTPNotFound
@logger.error("Encountered a 404 Not Found error. Please check that log ingest is enabled and your API token has the `logs.ingest` (Ingest Logs) scope.", :message => response.message, :code => response.code)
when Net::HTTPClientError
@logger.error("Encountered an HTTP client error", :message => response.message, :code => response.code, :body => response.body)
else
@logger.error("Encountered an unexpected response code", :message => response.message, :code => response.code)
def log_retryable_response(response)
retry_msg = RETRY_FAILED ? 'will retry' : "won't retry"
if response.code == 429
@logger.debug? && @logger.debug("Encountered a 429 response, #{retry_msg}. This is not serious, just flow control via HTTP")
else
@logger.warn("Encountered a retryable HTTP request in HTTP output, #{retry_msg}", code: response.code,
body: response.body)
end
end

def log_error_response(response, ingest_endpoint_url, event)
log_failure(
"Encountered non-2xx HTTP code #{response.code}",
response_code: response.code,
ingest_endpoint_url: ingest_endpoint_url,
event: event
)
end

def send_events(events)
successes = java.util.concurrent.atomic.AtomicInteger.new(0)
failures = java.util.concurrent.atomic.AtomicInteger.new(0)

pending = Queue.new
pending << [events, 0]

while popped = pending.pop
break if popped == :done

event, attempt = popped

if attempt > 2 && pipeline_shutdown_requested?
raise PluginInternalQueueLeftoverError, 'Received pipeline shutdown request but http output has unfinished events. ' \
'If persistent queue is enabled, events will be retried.'
end

raise RetryableError.new "code #{response.code}" if retryable(response)

rescue Net::OpenTimeout, Net::HTTPBadResponse, OpenSSL::SSL::SSLError, RetryableError => e
# Net::OpenTimeout indicates a connection could not be established within the timeout period
# Net::HTTPBadResponse indicates a protocol error
# OpenSSL::SSL::SSLErrorWaitReadable indicates an error establishing the ssl connection
if retries < MAX_RETRIES
sleep_seconds = 2 ** retries
@logger.warn("Failed to contact dynatrace: #{e.message}. Trying again after #{sleep_seconds} seconds.")
sleep sleep_seconds
retries += 1
retry
else
@logger.error("Failed to export logs to Dynatrace.")
return
action, event, attempt = send_event(event, attempt)
begin
action = :failure if action == :retry && !RETRY_FAILED

case action
when :success
successes.incrementAndGet
when :retry
next_attempt = attempt + 1
sleep_for = sleep_for_attempt(next_attempt)
@logger.info("Retrying http request, will sleep for #{sleep_for} seconds")
timer_task = RetryTimerTask.new(pending, event, next_attempt)
@timer.schedule(timer_task, sleep_for * 1000)
when :failure
failures.incrementAndGet
else
# this should never happen. It means send_event returned a symbol we didn't recognize
raise "Unknown action #{action}"
end

pending << :done if %i[success failure].include?(action) && (successes.get + failures.get == 1)
rescue StandardError => e
# This should never happen unless there's a flat out bug in the code
@logger.error('Error sending HTTP Request',
class: e.class.name,
message: e.message,
backtrace: e.backtrace)
failures.incrementAndGet
raise e
end
rescue StandardError => e
@logger.error("Unknown error raised", :error => e.inspect)
raise e
end
rescue StandardError => e
@logger.error('Error in http output loop',
class: e.class.name,
message: e.message,
backtrace: e.backtrace)
raise e
end
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved

def pipeline_shutdown_requested?
return super if defined?(super) # since LS 8.1.0

nil
end

def sleep_for_attempt(attempt)
sleep_for = attempt**2
sleep_for = sleep_for <= 60 ? sleep_for : 60
(sleep_for / 2) + (rand(0..sleep_for) / 2)
end

def send_event(event, attempt)
body = event_body(event)
headers = make_headers

# TODO: keep? If we want this make sure to require zlib
# # Compress the body and add appropriate header
# if @http_compression == true
# headers["Content-Encoding"] = "gzip"
# body = gzip(body)
# end
dyladan marked this conversation as resolved.
Show resolved Hide resolved

# Create an async request
response = client.post(ingest_endpoint_url, body: body, headers: headers)

if response_success?(response)
[:success, event, attempt]
elsif retryable_response?(response)
log_retryable_response(response)
[:retry, event, attempt]
else
log_error_response(response, ingest_endpoint_url, event)
[:failure, event, attempt]
end
rescue StandardError => e
will_retry = retryable_exception?(e)
log_params = {
ingest_endpoint_url: ingest_endpoint_url,
message: e.message,
class: e.class,
will_retry: will_retry
}
if @logger.debug?
# backtraces are big
log_params[:backtrace] = e.backtrace
if @debug_include_headers
# headers can have sensitive data
log_params[:headers] = headers
end
if @debug_include_body
# body can be big and may have sensitive data
log_params[:body] = body
end
end
log_failure('Could not fetch URL', log_params)

if will_retry
[:retry, event, attempt]
else
[:failure, event, attempt]
end
end

def close
@timer.cancel
client.close
end

private

def response_success?(response)
response.code >= 200 && response.code <= 299
end

def retryable_response?(response)
RETRYABLE_CODES.include?(response.code)
end

def retryable_exception?(exception)
retryable_manticore_exception?(exception) || retryable_unknown_exception?(exception)
end

def retryable_manticore_exception?(exception)
RETRYABLE_MANTICORE_EXCEPTIONS.any? { |me| exception.is_a?(me) }
end

def retryable_unknown_exception?(exception)
exception.is_a?(::Manticore::UnknownException) &&
RETRYABLE_UNKNOWN_EXCEPTION_STRINGS.any? { |snippet| exception.message =~ snippet }
end

# This is split into a separate method mostly to help testing
def log_failure(message, opts)
@logger.error(message, opts)
end

def retryable(response)
return response.is_a? Net::HTTPServerError
# Format the HTTP body
def event_body(event)
"#{LogStash::Json.dump(event.map(&:to_hash)).chomp}\n"
end
end
end
Expand Down
Loading