From ac15ccf2b146696f73997db2c6219a7bc509d9f7 Mon Sep 17 00:00:00 2001 From: sonots Date: Sat, 2 Sep 2017 06:14:26 +0900 Subject: [PATCH 1/2] Reduce YAML load Before: * Load on each resource * Load/Dump on each resource if necessary After: * Load on each resource_uri_prefix * Load/Dump on each resource if necessary --- exe/triglav-agent-hdfs | 2 +- lib/triglav/agent/hdfs.rb | 1 + lib/triglav/agent/hdfs/monitor.rb | 30 ++++++------ lib/triglav/agent/hdfs/processor.rb | 75 +++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+), 17 deletions(-) create mode 100644 lib/triglav/agent/hdfs/processor.rb diff --git a/exe/triglav-agent-hdfs b/exe/triglav-agent-hdfs index c1185ab..36d8d21 100755 --- a/exe/triglav-agent-hdfs +++ b/exe/triglav-agent-hdfs @@ -6,7 +6,7 @@ Triglav::Agent::Configuration.configure do |config| # config.cli_class = Triglav::Agent::Hdfs::CLI # config.setting_class = Triglav::Agent::Hdfs::Setting # config.worker_module = Triglav::Agent::Hdfs::Worker - # config.processor_class = Triglav::Agent::Hdfs::Processor + config.processor_class = Triglav::Agent::Hdfs::Processor config.monitor_class = Triglav::Agent::Hdfs::Monitor config.connection_class = Triglav::Agent::Hdfs::Connection end diff --git a/lib/triglav/agent/hdfs.rb b/lib/triglav/agent/hdfs.rb index ff235ea..d502fe8 100644 --- a/lib/triglav/agent/hdfs.rb +++ b/lib/triglav/agent/hdfs.rb @@ -9,3 +9,4 @@ module Hdfs require 'triglav/agent/hdfs/connection' require 'triglav/agent/hdfs/version' require 'triglav/agent/hdfs/monitor' +require 'triglav/agent/hdfs/processor' diff --git a/lib/triglav/agent/hdfs/monitor.rb b/lib/triglav/agent/hdfs/monitor.rb index 6df5b76..424cd91 100644 --- a/lib/triglav/agent/hdfs/monitor.rb +++ b/lib/triglav/agent/hdfs/monitor.rb @@ -16,29 +16,30 @@ class Monitor < Base::Monitor # unit: 'daily', 'hourly', or 'singular' # timezone: '+09:00' # span_in_days: 32 - def initialize(connection, resource_uri_prefix, resource) + # @param [Hash] last_modification_times for a resource + def initialize(connection, resource_uri_prefix, resource, last_modification_times) @connection = connection @resource_uri_prefix = resource_uri_prefix @resource = resource @status = Triglav::Agent::Status.new(resource_uri_prefix, resource.uri) - @last_modification_times = get_last_modification_times + @last_modification_times = get_last_modification_times(last_modification_times) end def process unless resource_valid? $logger.warn { "Broken resource: #{resource.to_s}" } - return nil + return [nil, nil] end - $logger.debug { "Start process #{resource.uri}" } + started = Time.now + $logger.debug { "Start Monitor#process #{resource.uri}" } events, new_last_modification_times = get_events - $logger.debug { "Finish process #{resource.uri}" } + elapsed = Time.now - started + $logger.debug { "Finish Monitor#process #{resource.uri} elapsed:#{elapsed.to_f}" } - return nil if events.nil? || events.empty? - yield(events) if block_given? # send_message - update_status_file(new_last_modification_times) - true + return [nil, nil] if events.nil? or events.empty? + [events, new_last_modification_times] end private @@ -46,6 +47,7 @@ def process def get_events new_last_modification_times = get_new_last_modification_times latest_files = select_latest_files(new_last_modification_times) + new_last_modification_times[:max] = new_last_modification_times.values.max events = build_events(latest_files) [events, new_last_modification_times] rescue => e @@ -53,13 +55,9 @@ def get_events nil end - def update_status_file(last_modification_times) - last_modification_times[:max] = last_modification_times.values.max - @status.set(last_modification_times) - end - - def get_last_modification_times - last_modification_times = @status.get || {} + def get_last_modification_times(last_modification_times) + last_modification_times ||= {} + # ToDo: want to remove accessing Status in Monitor class max_last_modification_time = last_modification_times[:max] || @status.getsetnx([:max], $setting.debug? ? 0 : get_current_time) removes = last_modification_times.keys - paths.keys appends = paths.keys - last_modification_times.keys diff --git a/lib/triglav/agent/hdfs/processor.rb b/lib/triglav/agent/hdfs/processor.rb new file mode 100644 index 0000000..76f1548 --- /dev/null +++ b/lib/triglav/agent/hdfs/processor.rb @@ -0,0 +1,75 @@ +require 'triglav/agent/base/processor' + +module Triglav::Agent + module Hdfs + class Processor < Base::Processor + def process + before_process + success_count = 0 + consecutive_error_count = 0 + Parallel.each(resources, parallel_opts) do |resource| + raise Parallel::Break if stopped? + events = nil + new_resource_statuses = nil + begin + @connection_pool.with do |connection| + resource_statuses = get_resource_statuses(resource) + monitor = monitor_class.new( + connection, resource_uri_prefix, resource, resource_statuses + ) + events, new_resource_statuses = monitor.process + end + if events + $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" } + @api_client_pool.with {|api_client| api_client.send_messages(events) } + end + @mutex.synchronize do + set_resource_statuses(new_resource_statuses, resource) if new_resource_statuses + success_count += 1 + consecutive_error_count = 0 + end + rescue => e + log_error(e) + $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events + @mutex.synchronize do + raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count + end + end + end + success_count + ensure + after_process + end + + private + + def before_process + super + started = Time.now + @resource_uri_prefix_statuses = Triglav::Agent::Status.new(resource_uri_prefix).get + elapsed = Time.now - started + $logger.info { "Read status #{resource_uri_prefix} #{elapsed.to_f}sec" } + @started = Time.now + $logger.info { "Start Processor#process #{resource_uri_prefix}" } + end + + def after_process + super + elapsed = Time.now - @started + $logger.info { "Finish Processor#process #{resource_uri_prefix} elapsed:#{elapsed.to_f}" } + end + + def get_resource_statuses(resource) + resource_statuses = @resource_uri_prefix_statuses[resource.uri.to_sym] + end + + def set_resource_statuses(resource_statuses, resource) + started = Time.now + resource_status = Triglav::Agent::Status.new(resource_uri_prefix, resource.uri) + resource_status.set(resource_statuses) + elapsed = Time.now - started + $logger.info { "Store status resource:#{resource.uri} #{elapsed.to_f}sec" } + end + end + end +end From 3b6789f2c88caea25d4027783dbba9672d3d1298 Mon Sep 17 00:00:00 2001 From: sonots Date: Mon, 4 Sep 2017 11:53:06 +0900 Subject: [PATCH 2/2] wip --- lib/triglav/agent/hdfs/monitor.rb | 8 +-- lib/triglav/agent/hdfs/processor.rb | 108 +++++++++++++++++----------- 2 files changed, 70 insertions(+), 46 deletions(-) diff --git a/lib/triglav/agent/hdfs/monitor.rb b/lib/triglav/agent/hdfs/monitor.rb index 424cd91..f98b8f4 100644 --- a/lib/triglav/agent/hdfs/monitor.rb +++ b/lib/triglav/agent/hdfs/monitor.rb @@ -57,8 +57,8 @@ def get_events def get_last_modification_times(last_modification_times) last_modification_times ||= {} - # ToDo: want to remove accessing Status in Monitor class - max_last_modification_time = last_modification_times[:max] || @status.getsetnx([:max], $setting.debug? ? 0 : get_current_time) + raise ":max is not set #{resource.uri}" unless last_modification_times[:max] + max_last_modification_time = last_modification_times[:max] removes = last_modification_times.keys - paths.keys appends = paths.keys - last_modification_times.keys removes.each {|path| last_modification_times.delete(path) } @@ -66,10 +66,6 @@ def get_last_modification_times(last_modification_times) last_modification_times end - def get_current_time - (Time.now.to_f * 1000).to_i # msec - end - def resource_valid? self.class.resource_valid?(resource) end diff --git a/lib/triglav/agent/hdfs/processor.rb b/lib/triglav/agent/hdfs/processor.rb index 76f1548..2c93400 100644 --- a/lib/triglav/agent/hdfs/processor.rb +++ b/lib/triglav/agent/hdfs/processor.rb @@ -5,52 +5,88 @@ module Hdfs class Processor < Base::Processor def process before_process - success_count = 0 - consecutive_error_count = 0 Parallel.each(resources, parallel_opts) do |resource| raise Parallel::Break if stopped? - events = nil - new_resource_statuses = nil begin - @connection_pool.with do |connection| - resource_statuses = get_resource_statuses(resource) - monitor = monitor_class.new( - connection, resource_uri_prefix, resource, resource_statuses - ) - events, new_resource_statuses = monitor.process - end - if events - $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" } - @api_client_pool.with {|api_client| api_client.send_messages(events) } - end - @mutex.synchronize do - set_resource_statuses(new_resource_statuses, resource) if new_resource_statuses - success_count += 1 - consecutive_error_count = 0 - end + main_process(resource) rescue => e - log_error(e) - $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events - @mutex.synchronize do - raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count - end + error_process(resource) end end - success_count + return_process ensure - after_process + ensure_process end private def before_process + @started = Time.now + $logger.info { "Start Processor#process #{resource_uri_prefix}" } + super + + @success_count = 0 + @consecutive_error_count = 0 + + @status = Triglav::Agent::Status.new(resource_uri_prefix) started = Time.now - @resource_uri_prefix_statuses = Triglav::Agent::Status.new(resource_uri_prefix).get + @resource_uri_prefix_statuses = @status.get elapsed = Time.now - started $logger.info { "Read status #{resource_uri_prefix} #{elapsed.to_f}sec" } - @started = Time.now - $logger.info { "Start Processor#process #{resource_uri_prefix}" } + + resources.each do |resource| + resource_statuses = @resource_uri_prefix_statuses[resource.uri.to_sym] + resource_statuses[:max] ||= @status.getsetnx( + [resource.uri.to_sym, :max], + $setting.debug? 0 : get_current_time + ) + end + end + + def main_process(resource) + + events = nil + new_resource_statuses = nil + begin + @connection_pool.with do |connection| + resource_statuses = @resource_uri_prefix_statuses[resource.uri.to_sym] + monitor = monitor_class.new( + connection, resource_uri_prefix, resource, resource_statuses + ) + events, new_resource_statuses = monitor.process + end + if events + $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" } + @api_client_pool.with {|api_client| api_client.send_messages(events) } + end + @mutex.synchronize do + if new_resource_statuses + @resource_uri_prefix_statuses[resource.uri.to_sym] = new_resource_statuses + started = Time.now + @status.set(@resource_uri_prefix_statuses) + elapsed = Time.now - started + $logger.info { "Store status resource:#{resource.uri} #{elapsed.to_f}sec" } + end + end + rescue => e + log_error(e) + $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events + raise e + end + end + + def return_process + @success_count + end + + def error_process(resource) + @mutex.synchronize do + consecutive_error_count += 1 + end + if consecutive_error_count > self.class.max_consecuitive_error_count + raise TooManyError + end end def after_process @@ -59,16 +95,8 @@ def after_process $logger.info { "Finish Processor#process #{resource_uri_prefix} elapsed:#{elapsed.to_f}" } end - def get_resource_statuses(resource) - resource_statuses = @resource_uri_prefix_statuses[resource.uri.to_sym] - end - - def set_resource_statuses(resource_statuses, resource) - started = Time.now - resource_status = Triglav::Agent::Status.new(resource_uri_prefix, resource.uri) - resource_status.set(resource_statuses) - elapsed = Time.now - started - $logger.info { "Store status resource:#{resource.uri} #{elapsed.to_f}sec" } + def get_current_time + (Time.now.to_f * 1000).to_i # msec end end end