diff --git a/code/adminoper.rb b/code/adminoper.rb index 7b194be..46795cb 100644 --- a/code/adminoper.rb +++ b/code/adminoper.rb @@ -1,6 +1,5 @@ module RQ class AdminOper - attr_accessor :admin_status attr_accessor :oper_status @@ -10,26 +9,26 @@ def initialize(pathname) @filename = File.basename(pathname) raise ArgumentError, "#{@dirname} doesn't exist" unless File.directory? @dirname - @down_name = @dirname + "/" + @filename + ".down" - @pause_name = @dirname + "/" + @filename + ".pause" + @down_name = @dirname + '/' + @filename + '.down' + @pause_name = @dirname + '/' + @filename + '.pause' - @admin_status = "UNKNOWN" - @oper_status = "UNKNOWN" - @daemon_status = "UP" + @admin_status = 'UNKNOWN' + @oper_status = 'UNKNOWN' + @daemon_status = 'UP' end def update! if File.exists?(@down_name) - @admin_status = @oper_status = "DOWN" + @admin_status = @oper_status = 'DOWN' elsif File.exists?(@pause_name) - @admin_status = @oper_status = "PAUSE" + @admin_status = @oper_status = 'PAUSE' else - @admin_status = @oper_status = "UP" + @admin_status = @oper_status = 'UP' end update_status end - # What the administrator cannot set, only daemons should set this + # What the administrator cannot set, only daemons should set this def set_daemon_status(stat) @daemon_status = stat @@ -37,13 +36,12 @@ def set_daemon_status(stat) end def update_status - if @daemon_status == "UP" + if @daemon_status == 'UP' @oper_status = @admin_status else @oper_status = @daemon_status end end private :update_status - end end diff --git a/code/cleaner_script.rb b/code/cleaner_script.rb index d85290b..2ddab52 100755 --- a/code/cleaner_script.rb +++ b/code/cleaner_script.rb @@ -1,5 +1,5 @@ #!/usr/bin/env ruby -$:.unshift(File.join(File.dirname(__FILE__), "..")) +$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..')) require 'vendor/environment' require 'fileutils' @@ -23,8 +23,8 @@ def handle_fail(mesg = 'soft fail') count = ENV['RQ_COUNT'].to_i if count > 15 - write_status('run', "RQ_COUNT > 15 - failing") - write_status('fail', "RQ_COUNT > 15 - failing") + write_status('run', 'RQ_COUNT > 15 - failing') + write_status('fail', 'RQ_COUNT > 15 - failing') exit(0) end @@ -43,13 +43,13 @@ def fail_hard(mesg) ################################################################################ def listq(basedir) - queues = Dir.glob(basedir + "/queue/??*") + queues = Dir.glob(basedir + '/queue/??*') queues end def rm_logs_older_than(qname, regex, hours) Dir.glob(qname + "/#{regex}").each do |f| - if (Time.now-File.mtime(f))/3600 > hours + if (Time.now - File.mtime(f)) / 3600 > hours begin puts "status: removing #{f}" STDOUT.flush @@ -65,8 +65,8 @@ def rm_logs_older_than(qname, regex, hours) def mv_logs(qname) if File.exists?("#{qname}/queue.log") - a=Time.now - b = sprintf("%s%.2d%.2d.%.2d:%.2d" ,a.year, a.month, a.day, a.hour, a.min) + a = Time.now + b = sprintf('%s%.2d%.2d.%.2d:%.2d' , a.year, a.month, a.day, a.hour, a.min) puts "status: moving #{qname}/queue.log" STDOUT.flush FileUtils.mv("#{qname}/queue.log", "#{qname}/queue.log.#{b}") @@ -74,26 +74,26 @@ def mv_logs(qname) end def remove_old(qname, days) - clean_queues = ["/done", "/relayed", "/prep", "/queue"] + clean_queues = ['/done', '/relayed', '/prep', '/queue'] clean_queues.each do |cq| if File.exists?(qname + cq) - + # go by directories and remove any day dir > days + 1 # then go into the hour dirs and remove by time # easier to whack a whole higher level dir then stat everything below it - - Dir.glob(qname + cq + "/????????").each do |x| - if Date.today - Date.strptime(File.basename(x), "%Y%m%d") >= days + 1 - puts "status: removing " + x + + Dir.glob(qname + cq + '/????????').each do |x| + if Date.today - Date.strptime(File.basename(x), '%Y%m%d') >= days + 1 + puts 'status: removing ' + x STDOUT.flush FileUtils.rm_rf(x) - elsif Date.today - Date.strptime(File.basename(x), "%Y%m%d") == days - Dir.glob(qname + cq + "/????????/??").each do |y| + elsif Date.today - Date.strptime(File.basename(x), '%Y%m%d') == days + Dir.glob(qname + cq + '/????????/??').each do |y| if y =~ /(\d{8})\/(\d{2})$/ - timstr = $1 + "."+ $2 + ":00:00" - j= DateTime.now - DateTime.strptime(timstr, "%Y%m%d.%H:%M:%S") + timstr = Regexp.last_match[1] + '.' + Regexp.last_match[2] + ':00:00' + j = DateTime.now - DateTime.strptime(timstr, '%Y%m%d.%H:%M:%S') if j.to_i >= days - puts "status: removing " + y + puts 'status: removing ' + y STDOUT.flush FileUtils.rm_rf(y) end @@ -103,14 +103,13 @@ def remove_old(qname, days) end end end - end - + def trim_relay(qpath, num) puts "Trimming Relay to #{num} entries" STDOUT.flush - all_msgs = RQ::HashDir.entries(qpath + "/relayed") + all_msgs = RQ::HashDir.entries(qpath + '/relayed') msgs = all_msgs[num..-1] @@ -123,11 +122,11 @@ def trim_relay(qpath, num) msgs.each do |ent| - path = RQ::HashDir.path_for(qpath + "/relayed", ent) + path = RQ::HashDir.path_for(qpath + '/relayed', ent) # TODO: put progress - #puts "status: removing " + path - #STDOUT.flush + # puts "status: removing " + path + # STDOUT.flush FileUtils.rm_rf(path) end @@ -135,20 +134,19 @@ def trim_relay(qpath, num) STDOUT.flush end - ################################################################## # My MAIN ################################################################## -basedir = "/rq/current" +basedir = '/rq/current' -if not ENV.has_key?("RQ_PARAM1") - fail_hard("need to specify a PARAM1") +if not ENV.key?('RQ_PARAM1') + fail_hard('need to specify a PARAM1') end -if ENV['RQ_PARAM1'] == "ALLQUEUES" +if ENV['RQ_PARAM1'] == 'ALLQUEUES' queues = listq(basedir) else - queues = [basedir + "/queue/" + ENV['RQ_PARAM1']] + queues = [basedir + '/queue/' + ENV['RQ_PARAM1']] if not File.exists?queues[0] fail_hard("the specified queue #{queues} does not exist") end @@ -161,11 +159,11 @@ def trim_relay(qpath, num) STDOUT.flush end queues.each do |q| - rm_logs_older_than(q, "/queue.log.?*", log_days*24) + rm_logs_older_than(q, '/queue.log.?*', log_days * 24) mv_logs(q) remove_old(q, log_days) end -trim_relay(basedir + "/queue/relay", 60000) +trim_relay(basedir + '/queue/relay', 60_000) -write_status('done', "successfully ran this script") +write_status('done', 'successfully ran this script') diff --git a/code/hashdir.rb b/code/hashdir.rb index 528804b..fe7aca3 100644 --- a/code/hashdir.rb +++ b/code/hashdir.rb @@ -1,15 +1,14 @@ module RQ class HashDir - # For now, the system is not configurable in terms of pattern match or depth def self.make(path) FileUtils.mkdir_p(path) - return true + true end def self.exist(path, msg_id) - parts = self.msg_id_parts(msg_id) + parts = msg_id_parts(msg_id) # parts = [ "YYYYmmDD", "HH", "MM" ] # If we got bad data, return false @@ -21,7 +20,7 @@ def self.exist(path, msg_id) # Do a DFT traverse in reverse order so most # recent is first def self.entries(path, limit = nil) - self.entries_int(path, 0, [], limit) + entries_int(path, 0, [], limit) end def self.entries_int(path, level, accum, limit = nil) @@ -29,21 +28,21 @@ def self.entries_int(path, level, accum, limit = nil) # YYYYMMDD ents1 = Dir.glob("#{path}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]") ents1.sort.reverse.each do |e| - self.entries_int(e, 1, accum, limit) + entries_int(e, 1, accum, limit) break if limit && accum.length == limit end elsif level == 1 # HH ents1 = Dir.glob("#{path}/[0-9][0-9]") ents1.sort.reverse.each do |e| - self.entries_int(e, 2, accum, limit) + entries_int(e, 2, accum, limit) break if limit && accum.length == limit end elsif level == 2 # MM ents1 = Dir.glob("#{path}/[0-9][0-9]") ents1.sort.reverse.each do |e| - self.entries_int(e, 3, accum, limit) + entries_int(e, 3, accum, limit) break if limit && accum.length == limit end elsif level == 3 @@ -65,7 +64,7 @@ def self.entries_old(path, limit = nil) end def self.num_entries(path) - self.num_entries_int(path, 0) + num_entries_int(path, 0) end def self.num_entries_int(path, level) @@ -74,19 +73,19 @@ def self.num_entries_int(path, level) # YYYYMMDD ents1 = Dir.glob("#{path}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]") ents1.sort.reverse.each do |e| - sum += self.num_entries_int(e, 1) + sum += num_entries_int(e, 1) end elsif level == 1 # HH ents1 = Dir.glob("#{path}/[0-9][0-9]") ents1.sort.reverse.each do |e| - sum += self.num_entries_int(e, 2) + sum += num_entries_int(e, 2) end elsif level == 2 # MM ents1 = Dir.glob("#{path}/[0-9][0-9]") ents1.sort.reverse.each do |e| - sum += self.num_entries_int(e, 3) + sum += num_entries_int(e, 3) end elsif level == 3 # MESG-ID @@ -102,27 +101,25 @@ def self.num_entries_old(path) ents1.length end - def self.inject(prev_msg_path, new_base_path, msg_id) - parts = self.msg_id_parts(msg_id) + parts = msg_id_parts(msg_id) FileUtils.mkdir_p("#{new_base_path}/#{parts[0]}/#{parts[1]}/#{parts[2]}") newname = "#{new_base_path}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}" File.rename(prev_msg_path, newname) end def self.path_for(que_base_path, msg_id) - parts = self.msg_id_parts(msg_id) + parts = msg_id_parts(msg_id) "#{que_base_path}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}" end def self.msg_id_parts(msg_id) # Ex. msg_id 20100625.0127.35.122.7509656 if msg_id =~ /(\d\d\d\d\d\d\d\d)\.(\d\d)(\d\d)/ - [$1, $2, $3] + [Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3]] else nil end end - end end diff --git a/code/htmlutils.rb b/code/htmlutils.rb index d8c446c..948391d 100644 --- a/code/htmlutils.rb +++ b/code/htmlutils.rb @@ -5,16 +5,15 @@ module RQ class HtmlUtils - ESCAPE_HTML = { - "&" => "&", - "<" => "<", - ">" => ">", + '&' => '&', + '<' => '<', + '>' => '>', } ESCAPE_HTML_PATTERN = Regexp.union(*ESCAPE_HTML.keys) def self.escape_html(text) - text.gsub(ESCAPE_HTML_PATTERN){|c| ESCAPE_HTML[c] } + text.gsub(ESCAPE_HTML_PATTERN) { |c| ESCAPE_HTML[c] } end def self.linkify_text(text) @@ -25,15 +24,13 @@ def self.ansi_to_html(text) terminal = AnsiUtils.new terminal.process_text(text) end - end class AnsiUtils - # Normal and then Bright ANSI_COLORS = [ - ["0,0,0", "187, 0, 0", "0, 187, 0", "187, 187, 0", "0, 0, 187", "187, 0, 187", "0, 187, 187", "255,255,255" ], - ["85,85,85", "255, 85, 85", "0, 255, 0", "255, 255, 85", "85, 85, 255", "255, 85, 255", "85, 255, 255", "255,255,255" ], + ['0,0,0', '187, 0, 0', '0, 187, 0', '187, 187, 0', '0, 0, 187', '187, 0, 187', '0, 187, 187', '255,255,255'], + ['85,85,85', '255, 85, 85', '0, 255, 0', '255, 255, 85', '85, 85, 255', '255, 85, 255', '85, 255, 255', '255,255,255'], ] attr_accessor :fore @@ -58,7 +55,7 @@ def process_text(text) def process_chunk(text) # Do proper handling of sequences (aka - injest vi split(';') into state machine - match,codes,txt = *text.match(/([\d;]+)m(.*)/m) + match, codes, txt = *text.match(/([\d;]+)m(.*)/m) if not match return txt @@ -87,9 +84,8 @@ def process_chunk(text) style = [] style << "color:#{@fore}" if @fore style << "background-color:#{@back}" if @back - ["", txt, ""] + ["", txt, ''] end end - end end diff --git a/code/jsonconfigfile.rb b/code/jsonconfigfile.rb index 0570096..408bc7c 100644 --- a/code/jsonconfigfile.rb +++ b/code/jsonconfigfile.rb @@ -3,11 +3,10 @@ module RQ class JSONConfigFile - # A class to keep an eye on a config file and determine # if it was reloaded in order to cause any observers to adjust - #stat = File.stat(@queue_path + "/prep/" + name) + # stat = File.stat(@queue_path + "/prep/" + name) attr_accessor :path attr_accessor :conf @@ -44,6 +43,5 @@ def check_for_change rescue ERROR_IGNORED end - end end diff --git a/code/main.rb b/code/main.rb index 3d417dd..85a03b9 100644 --- a/code/main.rb +++ b/code/main.rb @@ -10,7 +10,6 @@ module RQ class Main < Sinatra::Base - disable :protection enable :sessions set :session_secret, 'super secret' # we are forking, so we must set @@ -30,8 +29,8 @@ def get_queueclient(name) # No RQ should be connecting to another box's relay # However, users need the web ui to interact, so we think # this is safe and good for debugging/visiblity - if File.exists?("./config/rq_router_rules.rb") - if not ['relay', 'cleaner'].include?(name) + if File.exists?('./config/rq_router_rules.rb') + if not %w(relay cleaner).include?(name) name = 'rq_router' end end @@ -42,17 +41,17 @@ def msgs_labels %w[prep que run err done relayed] end - def queue_row(name, options={}) + def queue_row(name, options = {}) html = options[:odd] ? "" : - "" + '' html += "#{name}" begin qc = get_queueclient(name) raise unless qc.running? admin_stat, oper_stat = qc.status html += <<-END -
#{msgs_labels.zip(qc.num_messages.values_at(*msgs_labels)).map{|ab| "#{ab[0]}:#{ab[1].to_s.ljust(4)} "}.join}
+
#{msgs_labels.zip(qc.num_messages.values_at(*msgs_labels)).map { |ab| "#{ab[0]}:#{ab[1].to_s.ljust(4)} " }.join}
#{qc.ping} #{qc.read_pid} #{qc.uptime} @@ -62,16 +61,16 @@ def queue_row(name, options={}) END rescue - html += "-" - html += "-" - html += "-" - html += "-" - html += "DOWN #{$!}" + html += '-' + html += '-' + html += '-' + html += '-' + html += "DOWN #{$ERROR_INFO}" end html += "
" html += "" - html += "
" - html += "" + html += '' + html += '' end def flash(type, msg) @@ -122,12 +121,12 @@ def flash_now(type, msg) begin js_data = JSON.parse(File.read(params['queue']['json_path'])) rescue - p $! - p "BAD config.json - could not parse" + p $ERROR_INFO + p 'BAD config.json - could not parse' throw :halt, [404, "404 - Couldn't parse json file (#{params['queue']['json_path']})."] end result = RQ::QueueMgrClient.create_queue_link(params['queue']['json_path']) - #TODO - do the right thing with the result code + # TODO - do the right thing with the result code flash :notice, "We got #{params.inspect} from form, and #{result} from QueueMgr" redirect "/q/#{js_data['name']}" end @@ -136,12 +135,12 @@ def flash_now(type, msg) # This creates and starts a queue result = RQ::QueueMgrClient.delete_queue(params['queue_name']) flash :notice, "We got #{params.inspect} from form, and #{result} from QueueMgr" - redirect "/" + redirect '/' end get '/q.txt' do content_type 'text/plain', :charset => 'utf-8' - erb :queue_list, :layout => false, :locals => {:queues => RQ::QueueMgrClient.queues} + erb :queue_list, :layout => false, :locals => { :queues => RQ::QueueMgrClient.queues } end get '/q.json' do @@ -149,23 +148,23 @@ def flash_now(type, msg) end get '/q/:name' do - if params[:name].index(".txt") + if params[:name].index('.txt') content_type 'text/plain', :charset => 'utf-8' - return erb :queue_txt, :layout => false, :locals => { :qc => RQ::QueueClient.new(params[:name].split(".txt").first) } - elsif params[:name].index(".json") + return erb :queue_txt, :layout => false, :locals => { :qc => RQ::QueueClient.new(params[:name].split('.txt').first) } + elsif params[:name].index('.json') if '.json' == params[:name][-5..-1] - return erb :queue_json, :layout => false, :locals => { :qc => RQ::QueueClient.new(params[:name].split(".json").first) } + return erb :queue_json, :layout => false, :locals => { :qc => RQ::QueueClient.new(params[:name].split('.json').first) } end end if not RQ::QueueMgrClient.running? - throw :halt, [503, "503 - QueueMgr not running"] + throw :halt, [503, '503 - QueueMgr not running'] end begin qc = RQ::QueueClient.new(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end ok, config = qc.get_config @@ -174,20 +173,20 @@ def flash_now(type, msg) get '/q/:name/done.json' do if not RQ::QueueMgrClient.running? - throw :halt, [503, "503 - QueueMgr not running"] + throw :halt, [503, '503 - QueueMgr not running'] end begin qc = RQ::QueueClient.new(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end limit = 10 if params['limit'] limit = params['limit'].to_i end - result = qc.messages({'state' => 'done', 'limit' => limit}) + result = qc.messages( 'state' => 'done', 'limit' => limit ) "#{result.to_json}" end @@ -195,7 +194,7 @@ def flash_now(type, msg) begin qc = RQ::QueueClient.new(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end overrides = RQ::Overrides.new(params['name']) @@ -211,22 +210,22 @@ def flash_now(type, msg) end # Normalize some values - if prms.has_key? 'post_run_webhook' and prms['post_run_webhook'].is_a? String + if prms.key? 'post_run_webhook' and prms['post_run_webhook'].is_a? String # clean webhook input of any spaces # Ruby split..... so good! prms['post_run_webhook'] = prms['post_run_webhook'].split ' ' end - if prms.has_key? 'count' + if prms.key? 'count' prms['count'] = prms['count'].to_i end - if prms.has_key? 'max_count' + if prms.key? 'max_count' prms['max_count'] = prms['max_count'].to_i end begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end the_method = prms.fetch("_method", 'commit') @@ -237,11 +236,11 @@ def flash_now(type, msg) elsif the_method == 'commit' result = qc.create_message(prms) else - throw :halt, [400, "400 - Invalid method param"] + throw :halt, [400, '400 - Invalid method param'] end - if result == [ "fail", "oper_status: DOWN"] - throw :halt, [503, "503 - Service Unavailable - Operationally Down"] + if result == ['fail', 'oper_status: DOWN'] + throw :halt, [503, '503 - Service Unavailable - Operationally Down'] end if api_call == 'json' @@ -279,10 +278,10 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end - ok, config = qc.get_config() + ok, config = qc.get_config config.to_json end @@ -299,13 +298,13 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end - ok, msg = qc.get_message({ 'msg_id' => msg_id }) + ok, msg = qc.get_message( 'msg_id' => msg_id ) if ok != 'ok' - throw :halt, [404, "404 - Message ID not found"] + throw :halt, [404, '404 - Message ID not found'] end if fmt == :html @@ -315,7 +314,7 @@ def flash_now(type, msg) erb :message, :locals => { :q_name => qc.name, :msg_id => msg_id, :msg => msg } end else - #content_type 'application/json' + # content_type 'application/json' msg.to_json end end @@ -327,27 +326,26 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end - ok, state = qc.get_message_state({ 'msg_id' => msg_id }) + ok, state = qc.get_message_state( 'msg_id' => msg_id ) if ok != 'ok' - throw :halt, [404, "404 - Message ID not found"] + throw :halt, [404, '404 - Message ID not found'] end - [ state ].to_json + [state].to_json end - post '/q/:name/:msg_id/clone' do begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end - res = qc.clone_message({ 'msg_id' => params[:msg_id] }) + res = qc.clone_message( 'msg_id' => params[:msg_id] ) if not res throw :halt, [500, "500 - Couldn't clone message. Internal error."] @@ -356,7 +354,7 @@ def flash_now(type, msg) throw :halt, [500, "500 - Couldn't clone message. #{res.inspect}."] end - flash :notice, "Message cloned successfully" + flash :notice, 'Message cloned successfully' redirect "/q/#{params[:name]}" end @@ -364,10 +362,10 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end - res = qc.run_message({ 'msg_id' => params[:msg_id] }) + res = qc.run_message( 'msg_id' => params[:msg_id] ) if not res throw :halt, [500, "500 - Couldn't run message. Internal error."] @@ -376,7 +374,7 @@ def flash_now(type, msg) throw :halt, [500, "500 - Couldn't run message. #{res.inspect}."] end - flash :notice, "Message in run successfully" + flash :notice, 'Message in run successfully' redirect "/q/#{params[:name]}/#{params[:msg_id]}" end @@ -385,54 +383,53 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end # Sample of what params look like # {"name"=>"test", "filedata"=>{:type=>"image/jpeg", :head=>"Content-Disposition: form-data; name=\"data\"; filename=\"studio3.jpg\"\r\nContent-Type: image/jpeg\r\n", :tempfile=>#, :name=>"filedata", :filename=>"studio3.jpg"}, "msg_id"=>"20091215.1829.21.853", "x_format"=>"json"} # - #p params - #p params['filedata'] - #p params['filedata'][:tempfile].path - #p params['filedata'][:tempfile].class - #p params['filedata'][:tempfile].methods.sort + # p params + # p params['filedata'] + # p params['filedata'][:tempfile].path + # p params['filedata'][:tempfile].class + # p params['filedata'][:tempfile].methods.sort if not params['filedata'] - throw :halt, [404, "404 - Missing required param filedata"] + throw :halt, [404, '404 - Missing required param filedata'] end if params['filedata'].class != Hash - throw :halt, [404, "404 - Wrong input type for filedata param"] + throw :halt, [404, '404 - Wrong input type for filedata param'] end if not params['filedata'][:tempfile] - throw :halt, [404, "404 - Missing pathname to upload temp file in filedata param"] + throw :halt, [404, '404 - Missing pathname to upload temp file in filedata param'] end if not params['filedata'][:filename] - throw :halt, [404, "404 - Missing filename of uploaded file in filedata param"] + throw :halt, [404, '404 - Missing filename of uploaded file in filedata param'] end - api_call = params.fetch('x_format', 'html') msg = { 'msg_id' => params['msg_id'], - 'pathname' => params['filedata'][:tempfile].path, - 'name' => params['filedata'][:filename] + 'pathname' => params['filedata'][:tempfile].path, + 'name' => params['filedata'][:filename] } - result = qc.attach_message( msg ) + result = qc.attach_message(msg) # Success - clean up temp file - if result[0] == "ok" + if result[0] == 'ok' File.unlink(params['filedata'][:tempfile].path) rescue nil end if api_call == 'json' result.to_json else - if result[0] == "ok" - flash :notice, "Attached message successfully" + if result[0] == 'ok' + flash :notice, 'Attached message successfully' redirect "/q/#{params[:name]}/#{params[:msg_id]}" else "Commit #{params[:name]}/#{params[:msg_id]} got #{result}" @@ -444,27 +441,27 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end api_call = params.fetch('x_format', 'html') - result = ['fail', 'unknown'] + result = %w(fail unknown) if params[:_method] == 'delete' - result = qc.delete_attach_message( {'msg_id' => params[:msg_id], - 'attachment_name' => params[:attachment_name]} ) + result = qc.delete_attach_message( 'msg_id' => params[:msg_id], + 'attachment_name' => params[:attachment_name ]) if api_call == 'json' result.to_json else - if result[0] == "ok" - flash :notice, "Attachment deleted successfully" + if result[0] == 'ok' + flash :notice, 'Attachment deleted successfully' redirect "/q/#{params[:name]}/#{params[:msg_id]}" else "Delete of attach #{params[:attachment_name]} on #{params[:name]}/#{params[:msg_id]} got #{result}" end end else - throw :halt, [400, "400 - Invalid method param"] + throw :halt, [400, '400 - Invalid method param'] end end @@ -472,17 +469,17 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end msg_id = params['msg_id'] - ok, msg = qc.get_message({ 'msg_id' => msg_id }) + ok, msg = qc.get_message( 'msg_id' => msg_id ) if ok != 'ok' - throw :halt, [404, "404 - Message ID not found"] + throw :halt, [404, '404 - Message ID not found'] end - if ['done', 'relayed'].include? msg['state'] + if %w(done relayed).include? msg['state'] path = RQ::HashDir.path_for("./queue/#{params['name']}/#{msg['state']}", params['msg_id']) path += "/job/#{params['log_name']}" else @@ -502,18 +499,18 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end msg_id = params['msg_id'] - ok, msg = qc.get_message({ 'msg_id' => msg_id }) + ok, msg = qc.get_message( 'msg_id' => msg_id ) if ok != 'ok' - throw :halt, [404, "404 - Message ID not found"] + throw :halt, [404, '404 - Message ID not found'] end # TODO: use path from get_message instead of below - if ['done', 'relayed'].include? msg['state'] + if %w(done relayed).include? msg['state'] path = RQ::HashDir.path_for("./queue/#{params['name']}/#{msg['state']}", params['msg_id']) path += "/attach/#{params['attach_name']}" else @@ -533,18 +530,18 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end msg_id = params['msg_id'] - ok, msg = qc.get_message({ 'msg_id' => msg_id }) + ok, msg = qc.get_message( 'msg_id' => msg_id ) if ok != 'ok' - throw :halt, [404, "404 - Message ID not found"] + throw :halt, [404, '404 - Message ID not found'] end # TODO: use path from get_message instead of below - if ['done', 'relayed'].include? msg['state'] + if %w(done relayed).include? msg['state'] path = RQ::HashDir.path_for("./queue/#{params['name']}/#{msg['state']}", params['msg_id']) path += "/attach/#{params['attach_name']}" else @@ -572,14 +569,14 @@ def flash_now(type, msg) begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end msg_id = params['msg_id'] - ok, msg = qc.get_message({ 'msg_id' => msg_id }) + ok, msg = qc.get_message( 'msg_id' => msg_id ) if ok != 'ok' - throw :halt, [404, "404 - Message ID not found"] + throw :halt, [404, '404 - Message ID not found'] end erb :tailview, :layout => false, @@ -589,23 +586,22 @@ def flash_now(type, msg) } end - post '/q/:name/:msg_id' do begin qc = get_queueclient(params[:name]) rescue RQ::RqQueueNotFound - throw :halt, [404, "404 - Queue not found"] + throw :halt, [404, '404 - Queue not found'] end api_call = params.fetch('x_format', 'html') if params[:_method] == 'delete' - result = qc.delete_message( {'msg_id' => params[:msg_id]} ) + result = qc.delete_message( 'msg_id' => params[:msg_id ]) if api_call == 'json' result.to_json else - if result[0] == "ok" - flash :notice, "Message deleted successfully" + if result[0] == 'ok' + flash :notice, 'Message deleted successfully' redirect "/q/#{params[:name]}" else flash :error, "Delete got #{result.inspect}" @@ -613,21 +609,20 @@ def flash_now(type, msg) end end elsif params[:_method] == 'commit' - result = qc.commit_message( {'msg_id' => params[:msg_id]} ) + result = qc.commit_message( 'msg_id' => params[:msg_id ]) if api_call == 'json' result.to_json else - if result[0] == "ok" - flash :notice, "Message committed successfully" + if result[0] == 'ok' + flash :notice, 'Message committed successfully' else flash :error, "Commit got #{result.inspect}" end redirect "/q/#{params[:name]}/#{params[:msg_id]}" end else - throw :halt, [400, "400 - Invalid method param"] + throw :halt, [400, '400 - Invalid method param'] end end - end end diff --git a/code/message.rb b/code/message.rb index 24e8fc6..fb59323 100644 --- a/code/message.rb +++ b/code/message.rb @@ -3,20 +3,18 @@ module RQ class Message < Struct.new(:msg_id, :status, :dest, :src, :param1, :param2, :param3, :param4) - - def initialize(options={}) + def initialize(options = {}) end def init_with_opts(options) - #"dest"=>"http://localhost:3333/queue/", "src"=>"dru", "param1"=>"test", "param2"=>"", "param3"=>"", "param4"=>"", "status"=>"ready" - @status = options["status"] - @dest = options["dest"] - @src = options["src"] - @param1 = options["param1"] - @param2 = options["param2"] - @param3 = options["param3"] - @param4 = options["param4"] + # "dest"=>"http://localhost:3333/queue/", "src"=>"dru", "param1"=>"test", "param2"=>"", "param3"=>"", "param4"=>"", "status"=>"ready" + @status = options['status'] + @dest = options['dest'] + @src = options['src'] + @param1 = options['param1'] + @param2 = options['param2'] + @param3 = options['param3'] + @param4 = options['param4'] end - end end diff --git a/code/overrides.rb b/code/overrides.rb index 135f002..201d7fc 100644 --- a/code/overrides.rb +++ b/code/overrides.rb @@ -1,6 +1,5 @@ module RQ class Overrides - attr_accessor :data def initialize(name, read_file = true) @@ -14,13 +13,13 @@ def get_json(path) begin @data = JSON.parse(File.read(path)) rescue - throw :halt, [500, "500 - Bad overrides file"] + throw :halt, [500, '500 - Bad overrides file'] end end end def show_field(name) - if @data['default'] == "hidden" && @data[name] == nil + if @data['default'] == 'hidden' && @data[name] == nil false else true diff --git a/code/queue.rb b/code/queue.rb index 63bbdbb..4382e4a 100644 --- a/code/queue.rb +++ b/code/queue.rb @@ -12,7 +12,6 @@ require 'pathname' module RQ - class Worker < Struct.new( :qc, :name, @@ -36,13 +35,12 @@ class QueueConfig < Struct.new( end class Queue - def initialize(options, parent_pipe) @start_time = Time.now # Read config @name = options['name'] @queue_path = "queue/#{@name}" - @rq_config_path = "./config/" + @rq_config_path = './config/' @parent_pipe = parent_pipe init_socket @@ -60,25 +58,25 @@ def initialize(options, parent_pipe) @signal_hup_rd, @signal_hup_wr = IO.pipe - Signal.trap("TERM") do - log("received TERM signal") + Signal.trap('TERM') do + log('received TERM signal') shutdown! end - Signal.trap("HUP") do + Signal.trap('HUP') do # Ye Ole DJB self_pipe trick again @signal_hup_wr.syswrite('.') end unless load_rq_config sleep 5 - log("Invalid main rq config for #{@name}. Exiting." ) + log("Invalid main rq config for #{@name}. Exiting.") exit! 1 end unless load_config sleep 5 - log("Invalid config for #{@name}. Exiting." ) + log("Invalid config for #{@name}. Exiting.") exit! 1 end @@ -98,7 +96,7 @@ def self.delete(name) FileUtils.mv(queue_path, new_queue_path) end - def self.create(options,config_path=nil) + def self.create(options, config_path = nil) # Create a directories and config queue_path = "queue/#{options['name']}" FileUtils.mkdir_p(queue_path) @@ -115,7 +113,7 @@ def self.create(options,config_path=nil) File.symlink(old_path, queue_path + '/config.json') else # Write config to dir - File.open(queue_path + '/config.json', "w") do |f| + File.open(queue_path + '/config.json', 'w') do |f| f.write(options.to_json) end end @@ -123,7 +121,7 @@ def self.create(options,config_path=nil) end def self.log(path, mesg) - File.open(path + '/queue.log', "a") do |f| + File.open(path + '/queue.log', 'a') do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end rescue Exception @@ -145,11 +143,11 @@ def self.start_process(options) $0 = "[rq-que] [#{options['name']}]" begin parent_wr.close - #child only code block + # child only code block RQ::Queue.log(queue_path, 'post fork') # Unix house keeping - self.close_all_fds([child_rd.fileno]) + close_all_fds([child_rd.fileno]) # TODO: probly some other signal, session, proc grp, etc. crap RQ::Queue.log(queue_path, 'post close_all') @@ -159,14 +157,14 @@ def self.start_process(options) RQ::Queue.log(queue_path, 'post new') q.run_loop rescue Exception - self.log(queue_path, "Exception!") - self.log(queue_path, $!) - self.log(queue_path, $!.backtrace) + log(queue_path, 'Exception!') + log(queue_path, $ERROR_INFO) + log(queue_path, $ERROR_INFO.backtrace) raise end end - #parent only code block + # parent only code block child_rd.close if child_pid == nil @@ -192,7 +190,7 @@ def self.start_process(options) worker # If anything went wrong at all log it and return nil. rescue Exception - self.log("startup", "Failed to start worker #{options.inspect}: #{$!}") + log('startup', "Failed to start worker #{options.inspect}: #{$ERROR_INFO}") nil end @@ -213,11 +211,11 @@ def self.validate_options(options) if options.include?('name') if (1..128).include?(options['name'].size) if options['name'].class != String - resp = "json config has invalid name (not String)" + resp = 'json config has invalid name (not String)' err = true end else - resp = "json config has invalid name (size)" + resp = 'json config has invalid name (size)' err = true end else @@ -228,8 +226,8 @@ def self.validate_options(options) if not err if options.include?('num_workers') - if not ( (1..128).include?(options['num_workers'].to_i) ) - resp = "json config has invalid num_workers field (out of range 1..128)" + if not ( (1..128).include?(options['num_workers'].to_i)) + resp = 'json config has invalid num_workers field (out of range 1..128)' err = true end else @@ -242,11 +240,11 @@ def self.validate_options(options) if options.include?('script') if (1..1024).include?(options['script'].size) if options['script'].class != String - resp = "json config has invalid script (not String)" + resp = 'json config has invalid script (not String)' err = true end else - resp = "json config has invalid script (size)" + resp = 'json config has invalid script (size)' err = true end else @@ -261,7 +259,7 @@ def self.validate_options(options) def run_queue_script!(msg) msg_id = msg['msg_id'] - basename = @queue_path + "/run/" + msg_id + basename = @queue_path + '/run/' + msg_id job_path = File.expand_path(basename + '/job/') Dir.mkdir(job_path) unless File.exists?(job_path) @@ -270,11 +268,11 @@ def run_queue_script!(msg) # This meant that a script would see a new directory on a code deploy if that # script lived under a symlinked path script_path = Pathname.new(@config.script).realpath.to_s rescue @config.script - if (!File.executable?(script_path) rescue false) + if !File.executable?(script_path) log("ERROR - QUEUE SCRIPT - not there or runnable #{script_path}") if @status.oper_status != 'SCRIPTERROR' @status.set_daemon_status('SCRIPTERROR') - log("SCRIPTERROR - DAEMON STATUS is set to SCRIPTERROR") + log('SCRIPTERROR - DAEMON STATUS is set to SCRIPTERROR') log("OPER STATUS is now: #{@status.oper_status}") end return @@ -282,52 +280,52 @@ def run_queue_script!(msg) if @status.oper_status == 'SCRIPTERROR' @status.set_daemon_status('UP') - log("SCRIPTERROR FIXED - DAEMON STATUS is set to UP") + log('SCRIPTERROR FIXED - DAEMON STATUS is set to UP') log("OPER STATUS is now: #{@status.oper_status}") end - #log("0 child process prep step for runnable #{script_path}") + # log("0 child process prep step for runnable #{script_path}") # 0 = stdin, 1 = stdout, 2 = stderr, 4 = pipe # parent_rd, child_wr = IO.pipe child_rd, parent_wr = IO.pipe log("1 child process prep step for runnable #{script_path}") - #log("1 child process prep step for runnable #{job_path}") + # log("1 child process prep step for runnable #{job_path}") child_pid = fork do # Setup env $0 = "[rq-msg] [#{@name}] [#{msg_id}]" begin - #child only code block + # child only code block Dir.chdir(job_path) # Chdir to child path # TODO: log level - #RQ::Queue.log(job_path, "child process prep step for runnable #{script_path}") + # RQ::Queue.log(job_path, "child process prep step for runnable #{script_path}") - #RQ::Queue.log(job_path, "post fork - parent rd pipe fd: #{parent_rd.fileno}") - #RQ::Queue.log(job_path, "post fork - child wr pipe fd: #{child_wr.fileno}") + # RQ::Queue.log(job_path, "post fork - parent rd pipe fd: #{parent_rd.fileno}") + # RQ::Queue.log(job_path, "post fork - child wr pipe fd: #{child_wr.fileno}") - #RQ::Queue.log(job_path, "post fork - child rd pipe fd: #{child_rd.fileno}") - #RQ::Queue.log(job_path, "post fork - parent wr pipe fd: #{parent_wr.fileno}") + # RQ::Queue.log(job_path, "post fork - child rd pipe fd: #{child_rd.fileno}") + # RQ::Queue.log(job_path, "post fork - parent wr pipe fd: #{parent_wr.fileno}") # WE MUST DO THIS BECAUSE WE MAY GET PIPE FDs IN THE 3-4 RANGE # THIS GIVES US HIGHER # FDs SO WE CAN SAFELY CLOSE child_wr_fd = child_wr.fcntl(Fcntl::F_DUPFD) child_rd_fd = child_rd.fcntl(Fcntl::F_DUPFD) - #RQ::Queue.log(job_path, "post fork - child_wr_fd pipe fd: #{child_wr_fd}") - #RQ::Queue.log(job_path, "post fork - child_rd_fd pipe fd: #{child_rd_fd}") + # RQ::Queue.log(job_path, "post fork - child_wr_fd pipe fd: #{child_wr_fd}") + # RQ::Queue.log(job_path, "post fork - child_rd_fd pipe fd: #{child_rd_fd}") parent_rd.close parent_wr.close # Unix house keeping - #self.close_all_fds([child_wr.fileno]) + # self.close_all_fds([child_wr.fileno]) - #... the pipe fd will get closed on exec + # ... the pipe fd will get closed on exec # child_wr IO.for_fd(3).close rescue nil @@ -341,7 +339,7 @@ def run_queue_script!(msg) RQ::Queue.log(job_path, "Error duping fd for 4 - got #{fd}") unless fd == 4 IO.for_fd(child_rd_fd).close rescue nil - f = File.open(job_path + "/stdio.log", "a") + f = File.open(job_path + '/stdio.log', 'a') pfx = "#{Process.pid} - #{Time.now} -" f.write("\n#{pfx} RQ START - #{script_path}\n") f.flush @@ -362,38 +360,35 @@ def run_queue_script!(msg) next unless io io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) end - #RQ::Queue.log(job_path, 'post FD_CLOEXEC') unless fd == 2 - - #RQ::Queue.log(job_path, "running #{script_path}") - - ENV["RQ_SCRIPT"] = @config.script - ENV["RQ_REALSCRIPT"] = script_path - ENV["RQ_HOST"] = "http://#{@host}:#{@port}/" - ENV["RQ_DEST"] = gen_full_dest(msg)['dest'] - ENV["RQ_DEST_QUEUE"] = gen_full_dest(msg)['queue'] - ENV["RQ_MSG_ID"] = msg_id - ENV["RQ_FULL_MSG_ID"] = gen_full_msg_id(msg) - ENV["RQ_MSG_DIR"] = job_path - ENV["RQ_PIPE"] = "3" # DEPRECATED - ENV["RQ_WRITE"] = "3" # USE THESE INSTEAD - ENV["RQ_READ"] = "4" - ENV["RQ_COUNT"] = msg['count'].to_s - ENV["RQ_PARAM1"] = msg['param1'] - ENV["RQ_PARAM2"] = msg['param2'] - ENV["RQ_PARAM3"] = msg['param3'] - ENV["RQ_PARAM4"] = msg['param4'] - ENV["RQ_ORIG_MSG_ID"] = msg['orig_msg_id'] - ENV["RQ_FORCE_REMOTE"] = "1" if msg['force_remote'] + + ENV['RQ_SCRIPT'] = @config.script + ENV['RQ_REALSCRIPT'] = script_path + ENV['RQ_HOST'] = "http://#{@host}:#{@port}/" + ENV['RQ_DEST'] = gen_full_dest(msg)['dest'] + ENV['RQ_DEST_QUEUE'] = gen_full_dest(msg)['queue'] + ENV['RQ_MSG_ID'] = msg_id + ENV['RQ_FULL_MSG_ID'] = gen_full_msg_id(msg) + ENV['RQ_MSG_DIR'] = job_path + ENV['RQ_PIPE'] = '3' # DEPRECATED + ENV['RQ_WRITE'] = '3' # USE THESE INSTEAD + ENV['RQ_READ'] = '4' + ENV['RQ_COUNT'] = msg['count'].to_s + ENV['RQ_PARAM1'] = msg['param1'] + ENV['RQ_PARAM2'] = msg['param2'] + ENV['RQ_PARAM3'] = msg['param3'] + ENV['RQ_PARAM4'] = msg['param4'] + ENV['RQ_ORIG_MSG_ID'] = msg['orig_msg_id'] + ENV['RQ_FORCE_REMOTE'] = '1' if msg['force_remote'] # Set env vars specified in queue config file if @config.env_vars - @config.env_vars.each do |varname,value| + @config.env_vars.each do |varname, value| ENV[varname] = value unless varname.match(/^RQ_/) # Don't let the config override RQ-specific env vars though end end # unset RUBYOPT so it doesn't reinitialize the client ruby's GEM_HOME, etc. - ENV.delete("RUBYOPT") + ENV.delete('RUBYOPT') # TODO # RQ::Queue.log(job_path, "set ENV now executing #{msg.inspect}") @@ -402,33 +397,33 @@ def run_queue_script!(msg) Process.setpriority(Process::PRIO_PROCESS, 0, 19) # TODO - #RQ::Queue.log(job_path, "set ENV, now executing #{script_path}") + # RQ::Queue.log(job_path, "set ENV, now executing #{script_path}") # bash -lc will execute the command but first re-initializing like a new login (reading .bashrc, etc.) - exec_prefix = @config.exec_prefix || "bash -lc " + exec_prefix = @config.exec_prefix || 'bash -lc ' if exec_prefix.empty? - #RQ::Queue.log(job_path, "exec path: #{script_path}") - exec(script_path, "") if RUBY_VERSION < '2.0' - exec(script_path, "", :close_others => false) + # RQ::Queue.log(job_path, "exec path: #{script_path}") + exec(script_path, '') if RUBY_VERSION < '2.0' + exec(script_path, '', :close_others => false) else - #RQ::Queue.log(job_path, "exec path: #{exec_prefix + script_path}") + # RQ::Queue.log(job_path, "exec path: #{exec_prefix + script_path}") exec(exec_prefix + script_path) if RUBY_VERSION < '2.0' exec(exec_prefix + script_path, :close_others => false) end rescue - RQ::Queue.log(job_path, $!) - RQ::Queue.log(job_path, $!.backtrace) + RQ::Queue.log(job_path, $ERROR_INFO) + RQ::Queue.log(job_path, $ERROR_INFO.backtrace) raise end end - #parent only code block + # parent only code block child_wr.close child_rd.close if child_pid == nil parent_rd.close - log("ERROR failed to run child script: queue_path, $!") + log("ERROR failed to run child script: #{script_path}") return nil end @@ -438,11 +433,10 @@ def run_queue_script!(msg) write_msg_process_id(msg_id, child_pid) end - def init_socket # Show pid File.unlink(@queue_path + '/queue.pid') rescue nil - File.open(@queue_path + '/queue.pid', "w") do |f| + File.open(@queue_path + '/queue.pid', 'w') do |f| f.write("#{Process.pid}\n") end @@ -478,7 +472,7 @@ def sublimate_config(conf) new_config.exec_prefix = conf['exec_prefix'] new_config.env_vars = conf['env_vars'] new_config.coalesce = !!(%w{true yes 1}.include? conf['coalesce']) - new_config.coalesce_params = Hash[ (1..4).map {|x| [x, !!(conf["coalesce_param#{x}"].to_i == 1)]} ] + new_config.coalesce_params = Hash[ (1..4).map { |x| [x, !!(conf["coalesce_param#{x}"].to_i == 1)] }] new_config end @@ -488,19 +482,19 @@ def alloc_id(msg) times = 0 begin z = Time.now.getutc - name = z.strftime("_%Y%m%d.%H%M.%S.") + sprintf("%03d", (z.tv_usec / 1000)) - Dir.mkdir(@queue_path + "/prep/" + name) - stat = File.stat(@queue_path + "/prep/" + name) - new_name = z.strftime("%Y%m%d.%H%M.%S.") + sprintf("%03d.%d", (z.tv_usec / 1000), stat.ino) - File.rename(@queue_path + "/prep/" + name, @queue_path + "/prep/" + new_name) + name = z.strftime('_%Y%m%d.%H%M.%S.') + sprintf('%03d', (z.tv_usec / 1000)) + Dir.mkdir(@queue_path + '/prep/' + name) + stat = File.stat(@queue_path + '/prep/' + name) + new_name = z.strftime('%Y%m%d.%H%M.%S.') + sprintf('%03d.%d', (z.tv_usec / 1000), stat.ino) + File.rename(@queue_path + '/prep/' + name, @queue_path + '/prep/' + new_name) @prep << new_name - msg["msg_id"] = new_name + msg['msg_id'] = new_name return msg rescue Exception times += 1 - log("FATAL - couldn't ALLOC ID times: #{times} #{$!}") + log("FATAL - couldn't ALLOC ID times: #{times} #{$ERROR_INFO}") if times > 10 - log("FAILED TO ALLOC ID") + log('FAILED TO ALLOC ID') return nil end sleep 0.001 # A tiny pause to prevent consuming all CPU @@ -514,7 +508,7 @@ def alloc_id(msg) # It is called right after alloc_id def check_msg(msg, input) # Required parameter - return false unless input.has_key?('dest') + return false unless input.key?('dest') msg['dest'] = input['dest'] # If orig_msg_id is set already, then use it @@ -526,20 +520,20 @@ def check_msg(msg, input) # Copy only these keys from input message keys = %w(src param1 param2 param3 param4 post_run_webhook due force_remote) keys.each do |key| - next unless input.has_key?(key) + next unless input.key?(key) msg[key] = input[key] end - return true + true end def store_msg(msg, que = 'prep') # Write message to disk begin - if not msg.has_key?('due') + if not msg.key?('due') msg['due'] = Time.now.to_i end - clean = msg.reject { |k,v| k == 'child_read_pipe' || k == 'child_pid' || k == 'child_write_pipe' } + clean = msg.reject { |k, v| k == 'child_read_pipe' || k == 'child_pid' || k == 'child_write_pipe' } data = clean.to_json # Need a sysopen style system here TODO basename = @queue_path + "/#{que}/" + msg['msg_id'] @@ -550,7 +544,7 @@ def store_msg(msg, que = 'prep') return false end - return true + true end def que(msg, from_state = 'prep') @@ -559,13 +553,13 @@ def que(msg, from_state = 'prep') # Read in full message msg, basename = get_message(msg, from_state) return false unless File.exists? basename - newname = @queue_path + "/que/" + msg_id + newname = @queue_path + '/que/' + msg_id File.rename(basename, newname) msg['state'] = 'que' msg['status'] = 'que' rescue log("FATAL - couldn't commit message #{msg_id}") - log(" [ #{$!} ]") + log(" [ #{$ERROR_INFO} ]") return false end @@ -575,7 +569,7 @@ def que(msg, from_state = 'prep') run_scheduler! - return true + true end def is_duplicate?(msg1, msg2) @@ -595,14 +589,14 @@ def handle_dups_done(msg, new_state) h = @temp_que_dups.delete(i) new_status = "duplicate #{gen_full_msg_id(msg)}" write_msg_status(i, new_status, 'que') - h['status'] = new_state + " - " + new_status + h['status'] = new_state + ' - ' + new_status h['state'] = new_state store_msg(h, 'que') # TODO: refactor this basename = "#{@queue_path}/que/#{i}" RQ::HashDir.inject(basename, "#{@queue_path}/#{new_state}", i) end - msg['dups'] = msg['dups'].map { |i| gen_full_msg_id({'msg_id' => i}) } + msg['dups'] = msg['dups'].map { |i| gen_full_msg_id( 'msg_id' => i) } end end @@ -629,13 +623,13 @@ def handle_dups(msg) # Collect all the dups into the msg and remove from the @que # also show parent in each dup msg['dups'] = [] - duplicates.each { |i| + duplicates.each do |i| msg['dups'] << i['msg_id'] @temp_que_dups[i['msg_id']] = i r = @que.delete(i) # ordering here is important log("#{r['msg_id']} - removed from @que as dup") i['dup'] = gen_full_msg_id(msg) - } + end end # This is similar to check_msg, but it works with a message that is already @@ -653,11 +647,11 @@ def copy_and_clean_msg(input, new_dest = nil) # Copy only these keys from input message keys = %w(src param1 param2 param3 param4 post_run_webhook due) keys.each do |key| - next unless input.has_key?(key) + next unless input.key?(key) msg[key] = input[key] end - return msg + msg end def run_job(msg, from_state = 'que') @@ -668,7 +662,7 @@ def run_job(msg, from_state = 'que') File.rename(basename, newname) rescue log("FATAL - couldn't run message #{msg_id}") - log(" [ #{$!} ]") + log(" [ #{$ERROR_INFO} ]") # Remove the job from the queue. This may leave things in que state that # will be attempted again after a restart, but avoids the job jamming @@ -686,7 +680,7 @@ def run_job(msg, from_state = 'que') run_queue_script!(msg) end - def lookup_msg(msg, state = 'prep', options={:consistency => true}) + def lookup_msg(msg, state = 'prep', options = { :consistency => true }) msg_id = msg['msg_id'] basename = nil if state == 'prep' @@ -741,7 +735,7 @@ def lookup_msg(msg, state = 'prep', options={:consistency => true}) return false end end - return state + state end def delete_msg!(msg) @@ -751,12 +745,12 @@ def delete_msg!(msg) basename = @queue_path + "/#{state}/" + msg['msg_id'] if state == 'prep' - #FileUtils.remove_entry_secure(basename) + # FileUtils.remove_entry_secure(basename) FileUtils.rm_rf(basename) @prep.delete(msg['msg_id']) end if state == 'que' - #FileUtils.remove_entry_secure(basename) + # FileUtils.remove_entry_secure(basename) FileUtils.rm_rf(basename) @que.delete_if { |o| o['msg_id'] == msg['msg_id'] } end @@ -771,12 +765,11 @@ def clone_msg(msg) state = lookup_msg(msg, '*') return resp unless state - return resp unless ['err', 'relayed', 'done'].include? state - + return resp unless %w(err relayed done).include? state old_msg, old_basename = get_message(msg, state) - new_msg = { } + new_msg = {} if alloc_id(new_msg) and check_msg(new_msg, old_msg) # check_msg copies only required fields, but still copies count # so we delete that as well @@ -785,10 +778,10 @@ def clone_msg(msg) # Now check for, and copy attachments # Assumes that original message guaranteed attachment integrity - new_basename = @queue_path + "/prep/" + new_msg['msg_id'] + new_basename = @queue_path + '/prep/' + new_msg['msg_id'] - if File.directory?(old_basename + "/attach/") - ents = Dir.entries(old_basename + "/attach/").reject { |i| i.start_with?('.') } + if File.directory?(old_basename + '/attach/') + ents = Dir.entries(old_basename + '/attach/').reject { |i| i.start_with?('.') } if not ents.empty? # simple check for attachment dir old_attach_path = old_basename + '/attach/' @@ -813,9 +806,9 @@ def clone_msg(msg) end def get_message(params, state, - options={ :read_message => true, - :check_attachments => true}) - if ['done', 'relayed'].include? state + options = { :read_message => true, + :check_attachments => true }) + if %w(done relayed).include? state basename = RQ::HashDir.path_for("#{@queue_path}/#{state}", params['msg_id']) else basename = @queue_path + "/#{state}/" + params['msg_id'] @@ -824,27 +817,27 @@ def get_message(params, state, msg = nil begin if options[:read_message] - data = File.read(basename + "/msg") + data = File.read(basename + '/msg') msg = JSON.parse(data) else msg = {} end msg['status'] = state msg['state'] = state - if File.exists?(basename + "/status") - xtra_data = File.read(basename + "/status") + if File.exists?(basename + '/status') + xtra_data = File.read(basename + '/status') xtra_status = JSON.parse(xtra_data) msg['status'] += " - #{xtra_status['job_status']}" end # Now check for attachments - if options[:read_message] && options[:check_attachments] && File.directory?(basename + "/attach/") + if options[:read_message] && options[:check_attachments] && File.directory?(basename + '/attach/') cwd = Dir.pwd - ents = Dir.entries(basename + "/attach/").reject { |i| i.start_with?('.') } + ents = Dir.entries(basename + '/attach/').reject { |i| i.start_with?('.') } if not ents.empty? - msg['_attachments'] = { } + msg['_attachments'] = {} ents.each do |ent| - msg['_attachments'][ent] = { } + msg['_attachments'][ent] = {} path = "#{basename}/attach/#{ent}" md5, size = file_md5(path) msg['_attachments'][ent]['md5'] = md5 @@ -857,15 +850,15 @@ def get_message(params, state, rescue msg = nil log("Bad message in queue: #{basename}") - log(" [ #{$!} ]") + log(" [ #{$ERROR_INFO} ]") end - return [msg, basename] + [msg, basename] end def gen_full_msg_id(msg) full_name = "http://#{@host}:#{@port}/q/#{@name}/#{msg['msg_id']}" - return full_name + full_name end def gen_full_dest(msg) @@ -879,7 +872,7 @@ def gen_full_dest(msg) res['dest'] = msg['dest'] q_name = msg['dest'][/\/q\/([^\/]+)/, 1] res['queue'] = q_name; - #msg_id = msg['dest'][/\/q\/[^\/]+\/([^\/]+)/, 1] + # msg_id = msg['dest'][/\/q\/[^\/]+\/([^\/]+)/, 1] end res @@ -890,18 +883,17 @@ def attach_msg(msg) # validate attachment result = [false, 'Unknown error'] begin - basename = @queue_path + "/prep/" + msg_id - return [false, "No message on disk"] unless File.exists? basename + basename = @queue_path + '/prep/' + msg_id + return [false, 'No message on disk'] unless File.exists? basename - #TODO: deal with symlinks + # TODO: deal with symlinks # simple early check, ok, now check for pathname - return [false, "Invalid pathname, must be normalized #{msg['pathname']} (ie. must start with /"] unless msg['pathname'].start_with?("/") + return [false, "Invalid pathname, must be normalized #{msg['pathname']} (ie. must start with /"] unless msg['pathname'].start_with?('/') return [false, "No such file #{msg['pathname']} to attach to message"] unless File.exists?(msg['pathname']) return [false, "Attachment currently cannot be a directory #{msg['pathname']}"] if File.directory?(msg['pathname']) return [false, "Attachment currently cannot be read: #{msg['pathname']}"] unless File.readable?(msg['pathname']) return [false, "Attachment currently not of supported type: #{msg['pathname']}"] unless File.file?(msg['pathname']) - # simple check for attachment dir attach_path = basename + '/attach/' Dir.mkdir(attach_path) unless File.exists?(attach_path) @@ -911,12 +903,12 @@ def attach_msg(msg) name = msg['name'] || File.basename(msg['pathname']) # Validate - that it does not have any '/' chars or a '.' prefix - if name.start_with?(".") + if name.start_with?('.') return [false, "Attachment name as a dot-file not allowed: #{name}"] end # Unsafe char removal name_test = name.tr('~?[]%|$&<>', '*') - if name_test.index("*") + if name_test.index('*') return [false, "Attachment name has invalid character. not allowed: #{name}"] end # TODO: support directory moves @@ -950,11 +942,11 @@ def attach_msg(msg) result = [true, "#{md5}-Attached successfully"] rescue log("FATAL - couldn't add attachment to message #{msg_id}") - log(" [ #{$!} ]") + log(" [ #{$ERROR_INFO} ]") return false end - return result + result end def del_attach_msg(msg) @@ -963,25 +955,25 @@ def del_attach_msg(msg) # validate attachment result = [false, 'Unknown error'] begin - basename = @queue_path + "/prep/" + msg_id - return [false, "No message on disk"] unless File.exists? basename + basename = @queue_path + '/prep/' + msg_id + return [false, 'No message on disk'] unless File.exists? basename # simple check for attachment dir attach_path = basename + '/attach/' - return [false, "No attach directory for msg"] unless File.exists?(attach_path) + return [false, 'No attach directory for msg'] unless File.exists?(attach_path) new_path = attach_path + attach_name - return [false, "No attachment with that named for msg"] unless File.exists?(new_path) + return [false, 'No attachment with that named for msg'] unless File.exists?(new_path) File.unlink(new_path) - result = ["ok", "Attachment deleted successfully"] + result = ['ok', 'Attachment deleted successfully'] rescue log("FATAL - couldn't delete attachment #{attach_name} from message #{msg_id}") - log(" [ #{$!} ]") + log(" [ #{$ERROR_INFO} ]") end - return result + result end def file_md5(path) @@ -990,7 +982,7 @@ def file_md5(path) size = nil File.open(path, 'r') do |file| size = file.stat.size - hasher.update(file.read(32768)) until file.eof + hasher.update(file.read(32_768)) until file.eof end result = hasher.hexdigest @@ -999,7 +991,7 @@ def file_md5(path) def fixup_msg(msg, que) needs_fixing = false - if not msg.has_key?('due') + if not msg.key?('due') needs_fixing = true end @@ -1009,7 +1001,6 @@ def fixup_msg(msg, que) end def load_messages - # prep just has message ids basename = @queue_path + '/prep/' @prep = Dir.entries(basename).reject { |i| i.start_with?('.') } @@ -1035,7 +1026,7 @@ def load_messages messages.each do |mname| begin - data = File.read(basename + mname + "/msg") + data = File.read(basename + mname + '/msg') msg = JSON.parse(data) fixup_msg(msg, 'que') rescue @@ -1044,7 +1035,6 @@ def load_messages end @que << msg end - end def handle_status_read(msg) @@ -1057,35 +1047,35 @@ def handle_status_read(msg) # the ruby IO model removes power from those who know # with wrappers written by those who do not know # update... using sysread - data = "" + data = '' loop do begin - #child_io.sysread(4096) - #data += child_io.readpartial(4096) + # child_io.sysread(4096) + # data += child_io.readpartial(4096) data += child_io.sysread(4096) break rescue Errno::EAGAIN, Errno::EINTR - #log("Error: #{$!}") + # log("Error: #{$!}") sleep 0.001 # A tiny pause to prevent consuming all CPU retry rescue EOFError - #log("EOFError - #{$!}") + # log("EOFError - #{$!}") break end end - #if data + # if data # log("Done Reading status from child len: #{data.length}") - #else + # else # log("Done Reading status from child (nil)") - #end + # end return false if data.empty? child_msgs = data.split("\n") child_msgs.each do |child_msg| - parts = child_msg.split(" ", 2) + parts = child_msg.split(' ', 2) # Always write message status write_msg_status(msg['msg_id'], parts[1]) @@ -1100,7 +1090,7 @@ def handle_status_read(msg) # and then kill them down the road. log("#{child_pid}: child msg came in: #{child_msg}") - if (parts[0] != "run") + if parts[0] != 'run' if parts[0] == 'done' @completed << [msg, :done, Time.now.to_i] end @@ -1118,7 +1108,7 @@ def handle_status_read(msg) end if parts[0] == 'resend' @completed << [msg, :resend, parts[1].to_i] - due,reason = parts[1].split('-',2) + due, reason = parts[1].split('-', 2) msg['due'] = Time.now.to_i + due.to_i msg['count'] = msg.fetch('count', 0) + 1 store_msg(msg, 'run') @@ -1133,7 +1123,7 @@ def handle_status_read(msg) # We need to take an action instead of expecting an exit # that will arrive soon. if parts[0] == 'dup' - due,future,new_dest = parts[1].split('-',3) + due, future, new_dest = parts[1].split('-', 3) new_due = Time.now.to_i + due.to_i if new_dest.start_with?('http') @@ -1154,16 +1144,16 @@ def handle_status_read(msg) msg_copy = copy_and_clean_msg(msg, new_dest) msg_copy['due'] = new_due - basename = @queue_path + "/run/" + msg['msg_id'] + basename = @queue_path + '/run/' + msg['msg_id'] # Now see if there are any attachments attachments = [] - if File.directory?(basename + "/attach/") - ents = Dir.entries(basename + "/attach/").reject { |i| i.start_with?('.') } + if File.directory?(basename + '/attach/') + ents = Dir.entries(basename + '/attach/').reject { |i| i.start_with?('.') } if not ents.empty? # Cool, lets normalize the paths - full_path = File.expand_path(basename + "/attach/") + full_path = File.expand_path(basename + '/attach/') attachments = ents.map { |e| "#{full_path}/#{e}" } end @@ -1190,7 +1180,7 @@ def handle_status_read(msg) que_msg_id = result[1][/\/q\/[^\/]+\/([^\/]+)/, 1] attachments.each do |path| - r2 = qc.attach_message({'msg_id' => que_msg_id, 'pathname' => path}) + r2 = qc.attach_message( 'msg_id' => que_msg_id, 'pathname' => path ) if r2[0] != 'ok' log("#{@name}:#{Process.pid} couldn't DUP message - #{r2[1]}") msg['child_write_pipe'].syswrite("fail dup failed - attach fail #{r2[1]}\n") @@ -1198,7 +1188,7 @@ def handle_status_read(msg) end end - r3 = qc.commit_message({'msg_id' => que_msg_id}) + r3 = qc.commit_message( 'msg_id' => que_msg_id ) if r3[0] != 'ok' log("#{@name}:#{Process.pid} couldn't DUP message - #{r3[1]}") msg['child_write_pipe'].syswrite("fail dup failed - commit fail #{r3[1]}\n") @@ -1213,23 +1203,23 @@ def handle_status_read(msg) end end - return true + true end def write_msg_status(msg_id, mesg, state = 'run') # Write message to disk begin data = { 'job_status' => mesg }.to_json - basename = @queue_path + "/#{state}/" + msg_id + "/status" + basename = @queue_path + "/#{state}/" + msg_id + '/status' File.open(basename + '.tmp', 'w') { |f| f.write(data) } File.rename(basename + '.tmp', basename) rescue log("FATAL - couldn't write status message") - log(" [ #{$!} ]") + log(" [ #{$ERROR_INFO} ]") return false end - return true + true end def write_msg_process_id(msg_id, pid) @@ -1240,11 +1230,11 @@ def write_msg_process_id(msg_id, pid) File.rename(basename + '.tmp', basename) rescue log("FATAL - couldn't write message pid file") - log(" [ #{$!} ]") + log(" [ #{$ERROR_INFO} ]") return false end - return true + true end def remove_msg_process_id(msg_id, state = 'run') @@ -1253,13 +1243,13 @@ def remove_msg_process_id(msg_id, state = 'run') end def log(mesg) - File.open(@queue_path + '/queue.log', "a") do |f| + File.open(@queue_path + '/queue.log', 'a') do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end end def shutdown! - log("Received shutdown") + log('Received shutdown') Process.exit! 0 end @@ -1272,22 +1262,22 @@ def run_scheduler! return unless @status.oper_status == 'UP' # Are we arleady running max workers - active_count = @run.inject(0) do |acc, o| - if o.has_key?('child_pid') + active_count = @run.reduce(0) do |acc, o| + if o.key?('child_pid') acc = acc + 1 end acc end if active_count >= @config.num_workers - #log("Already running #{active_count} config is max: #{@config['num_workers']}") + # log("Already running #{active_count} config is max: #{@config['num_workers']}") return end # If we got started, and there are jobs in run que, but # without any workers if @run.length != active_count - job = @run.find { |o| not o.has_key?('child_pid') } + job = @run.find { |o| not o.key?('child_pid') } run_queue_script!(job) return end @@ -1297,7 +1287,7 @@ def run_scheduler! end # Ok, locate the next job - ready_msg = @que.min {|a,b| a['due'].to_f <=> b['due'].to_f } + ready_msg = @que.min { |a, b| a['due'].to_f <=> b['due'].to_f } delta = ready_msg['due'].to_f - Time.now.to_f @@ -1333,11 +1323,11 @@ def run_loop io_list << @sock io_list << @parent_pipe io_list << @signal_hup_rd - #log('sleeping') if @wait_time == 60 + # log('sleeping') if @wait_time == 60 begin ready = IO.select(io_list, nil, nil, @wait_time) rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR - log("error on SELECT #{$!}") + log("error on SELECT #{$ERROR_INFO}") sleep 0.001 # A tiny pause to prevent consuming all CPU retry end @@ -1360,7 +1350,7 @@ def run_loop if defined?(Fcntl::F_GETFL) flag &= client_socket.fcntl(Fcntl::F_GETFL) end - #log("Non Block Flag -> #{flag} == #{File::NONBLOCK}") + # log("Non Block Flag -> #{flag} == #{File::NONBLOCK}") client_socket.fcntl(Fcntl::F_SETFL, flag) handle_request(client_socket) next @@ -1381,14 +1371,14 @@ def run_loop end @signal_hup_rd.fcntl(Fcntl::F_SETFL, flag) dat = do_read(@signal_hup_rd, 1) - log("Strange Result from HUP signal pipe.") if dat.size != 1 + log('Strange Result from HUP signal pipe.') if dat.size != 1 next end msg = @run.find { |o| o['child_read_pipe'].fileno == io.fileno } if msg - #log("QUEUE #{@name} of PID #{Process.pid} noticed child pipe readable... #{msg['child_pid']}") - #log("QUEUE #{@name} of PID #{Process.pid} #{msg['child_read_pipe'].object_id} <=> #{io.object_id}") + # log("QUEUE #{@name} of PID #{Process.pid} noticed child pipe readable... #{msg['child_pid']}") + # log("QUEUE #{@name} of PID #{Process.pid} #{msg['child_read_pipe'].object_id} <=> #{io.object_id}") # TODO: make this stateful for incomplete reads next if handle_status_read(msg) @@ -1431,15 +1421,15 @@ def run_loop new_state = 'err' end - #if completion[1] == :pause + # if completion[1] == :pause # new_state = 'err' - #end + # end if completion[1] == :resend && res[1] == 0 if msg['count'] >= msg['max_count'] new_state = 'err' log("RESEND hit max: #{msg['count']} / #{msg['max_count']} - #{msg_id}") - write_msg_status(msg_id, "HIT MAX RESEND COUNT - MOVING TO ERR" ) + write_msg_status(msg_id, 'HIT MAX RESEND COUNT - MOVING TO ERR') else new_state = 'que' end @@ -1447,8 +1437,8 @@ def run_loop if new_state == nil # log a message - write_msg_status(msg_id, "PROCESS EXITED IMPROPERLY - MOVING TO ERR- Expected #{completion[1]} - and - status #{res.inspect}" ) - write_msg_status(msg_id, "PROCESS EXITED IMPROPERLY" ) + write_msg_status(msg_id, "PROCESS EXITED IMPROPERLY - MOVING TO ERR- Expected #{completion[1]} - and - status #{res.inspect}") + write_msg_status(msg_id, 'PROCESS EXITED IMPROPERLY') new_state = 'err' end @@ -1458,7 +1448,7 @@ def run_loop basename = "#{@queue_path}/run/#{msg_id}" raise unless File.exists? basename remove_msg_process_id(msg_id) - if ['done', 'relayed'].include? new_state + if %w(done relayed).include? new_state handle_dups_done(msg, new_state) store_msg(msg, 'run') # store message since it made it to done and we want the 'dups' field to live RQ::HashDir.inject(basename, "#{@queue_path}/#{new_state}", msg_id) @@ -1471,11 +1461,11 @@ def run_loop end rescue log("FATAL - couldn't move from 'run' to '#{new_state}' #{msg_id}") - log(" [ #{$!} ]") + log(" [ #{$ERROR_INFO} ]") next end - if ['err', 'done', 'relayed'].include? new_state + if %w(err done relayed).include? new_state # Send a webhook if there is a web hook if msg.include? 'post_run_webhook' msg['post_run_webhook'].each do |wh| @@ -1554,18 +1544,18 @@ def webhook_message(url, msg, new_state) end end - def do_read(client, numr = 32768) + def do_read(client, numr = 32_768) begin dat = client.sysread(numr) rescue Errno::EINTR # Ruby threading can cause an alarm/timer interrupt on a syscall sleep 0.001 # A tiny pause to prevent consuming all CPU retry rescue EOFError - #TODO: add debug mode - #puts "Got an EOF from socket read" + # TODO: add debug mode + # puts "Got an EOF from socket read" return nil - rescue Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF - puts "Got an #{$!} from socket read" + rescue Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL, Errno::EBADF + puts "Got an #{$ERROR_INFO} from socket read" exit! 0 end dat @@ -1575,13 +1565,13 @@ def read_packet(sock) protocol = do_read(sock, 4) if protocol != 'rq1 ' - log("REQ - Invalid protocol - bad ver") + log('REQ - Invalid protocol - bad ver') return nil end size_str = do_read(sock, 9) - if size_str[-1..-1] != " " + if size_str[-1..-1] != ' ' log("REQ - Invalid protocol - bad size #{size_str}") return nil end @@ -1602,13 +1592,12 @@ def read_packet(sock) def send_packet(sock, resp) log_msg = resp.length > 80 ? "#{resp[0...80]}..." : resp log("RESP [ #{resp.length} #{log_msg} ]") - sock_msg = sprintf("rq1 %08d %s", resp.length, resp) + sock_msg = sprintf('rq1 %08d %s', resp.length, resp) UnixRack::Socket.write_buff(sock, sock_msg) sock.close end def handle_request(sock) - packet = read_packet(sock) return if packet == nil @@ -1616,33 +1605,33 @@ def handle_request(sock) log("REQ [ #{packet} ]") if packet.start_with?('ping ') - resp = [ "pong" ].to_json + resp = ['pong'].to_json send_packet(sock, resp) return end if packet.start_with?('uptime ') - resp = [(Time.now - @start_time).to_i, ].to_json + resp = [(Time.now - @start_time).to_i,].to_json send_packet(sock, resp) return end if packet.start_with?('config ') # Sadly there's no struct-to-hash method until Ruby 2.0 - resp = [ 'ok', Hash[@config.each_pair.to_a]].to_json + resp = ['ok', Hash[@config.each_pair.to_a]].to_json send_packet(sock, resp) return end if packet.start_with?('status') @status.update! - resp = [ @status.admin_status, @status.oper_status ].to_json + resp = [@status.admin_status, @status.oper_status].to_json send_packet(sock, resp) return end if packet.start_with?('shutdown') - resp = [ 'ok' ].to_json + resp = ['ok'].to_json send_packet(sock, resp) shutdown! return @@ -1651,7 +1640,7 @@ def handle_request(sock) # IF queue is admin_status DOWN, no need to respond to any of the # following messages (Note: there are other states, this is a hard DOWN) if @status.admin_status == 'DOWN' - resp = [ "fail", "oper_status: DOWN"].to_json + resp = ['fail', 'oper_status: DOWN'].to_json send_packet(sock, resp) return end @@ -1660,14 +1649,14 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - msg = { } + msg = {} if alloc_id(msg) and check_msg(msg, options) store_msg(msg) que(msg) msg_id = gen_full_msg_id(msg) - resp = [ "ok", msg_id ].to_json + resp = ['ok', msg_id].to_json else - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json end send_packet(sock, resp) return @@ -1677,32 +1666,32 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - msg = { } + msg = {} if not @que.empty? msg_id = gen_full_msg_id(@que[0]) - resp = [ "ok", msg_id ].to_json + resp = ['ok', msg_id].to_json elsif alloc_id(msg) and check_msg(msg, options) store_msg(msg) que(msg) msg_id = gen_full_msg_id(msg) - resp = [ "ok", msg_id ].to_json + resp = ['ok', msg_id].to_json else - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json end send_packet(sock, resp) return end if packet.start_with?('num_messages') - status = { } + status = {} status['prep'] = @prep.length status['que'] = @que.length status['run'] = @run.length status['pause'] = [] - status['done'] = RQ::HashDir.num_entries(@queue_path + "/done") - status['relayed'] = RQ::HashDir.num_entries(@queue_path + "/relayed/") - status['err'] = Dir.entries(@queue_path + "/err/").reject { |i| i.start_with?('.') }.length + status['done'] = RQ::HashDir.num_entries(@queue_path + '/done') + status['relayed'] = RQ::HashDir.num_entries(@queue_path + '/relayed/') + status['err'] = Dir.entries(@queue_path + '/err/').reject { |i| i.start_with?('.') }.length resp = status.to_json send_packet(sock, resp) @@ -1712,8 +1701,8 @@ def handle_request(sock) if packet.start_with?('messages') json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('state') - resp = [ "fail", "lacking 'state' field"].to_json + if not options.key?('state') + resp = ['fail', "lacking 'state' field"].to_json send_packet(sock, resp) return end @@ -1724,13 +1713,13 @@ def handle_request(sock) elsif options['state'] == 'run' status = @run.map { |m| [m['msg_id'], m['status']] } elsif options['state'] == 'done' - status = RQ::HashDir.entries(@queue_path + "/done", options['limit']) + status = RQ::HashDir.entries(@queue_path + '/done', options['limit']) elsif options['state'] == 'relayed' - status = RQ::HashDir.entries(@queue_path + "/relayed/", options['limit']) + status = RQ::HashDir.entries(@queue_path + '/relayed/', options['limit']) elsif options['state'] == 'err' - status = Dir.entries(@queue_path + "/err/").reject { |i| i.start_with?('.') } + status = Dir.entries(@queue_path + '/err/').reject { |i| i.start_with?('.') } else - status = [ "fail", "invalid 'state' field (#{options['state']})"] + status = ['fail', "invalid 'state' field (#{options['state']})"] end resp = status.to_json @@ -1742,13 +1731,13 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - msg = { } + msg = {} if alloc_id(msg) and check_msg(msg, options) store_msg(msg) msg_id = gen_full_msg_id(msg) - resp = [ "ok", msg_id ].to_json + resp = ['ok', msg_id].to_json else - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json end send_packet(sock, resp) return @@ -1758,8 +1747,8 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end @@ -1767,12 +1756,12 @@ def handle_request(sock) if lookup_msg(options) success, attach_message = attach_msg(options) if success - resp = [ "ok", attach_message ].to_json + resp = ['ok', attach_message].to_json else - resp = [ "fail", attach_message ].to_json + resp = ['fail', attach_message].to_json end else - resp = [ "fail", "cannot find message"].to_json + resp = ['fail', 'cannot find message'].to_json end send_packet(sock, resp) return @@ -1783,8 +1772,8 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end @@ -1792,13 +1781,13 @@ def handle_request(sock) state = lookup_msg(options, '*') if state if state != 'prep' - resp = [ "fail", "msg not in prep" ].to_json + resp = ['fail', 'msg not in prep'].to_json else success, del_attach_result = del_attach_msg(options) - resp = [ success, del_attach_result ].to_json + resp = [success, del_attach_result].to_json end else - resp = [ "fail", "msg not found" ].to_json + resp = ['fail', 'msg not found'].to_json end send_packet(sock, resp) return @@ -1808,20 +1797,20 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json if lookup_msg(options) if que(options) - resp = [ "ok", "msg commited" ].to_json + resp = ['ok', 'msg commited'].to_json end else - resp = [ "fail", "cannot find message"].to_json + resp = ['fail', 'cannot find message'].to_json end send_packet(sock, resp) @@ -1832,24 +1821,24 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json state = lookup_msg(options, '*') if state msg, msg_path = get_message(options, state) if msg - resp = [ "ok", msg ].to_json + resp = ['ok', msg].to_json else - resp = [ "fail", "msg couldn't be read" ].to_json + resp = ['fail', "msg couldn't be read"].to_json end else - resp = [ "fail", "msg not found" ].to_json + resp = ['fail', 'msg not found'].to_json end send_packet(sock, resp) @@ -1860,20 +1849,20 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json # turn off consistency for a little more speed - state = lookup_msg(options, '*', {:consistency => false}) + state = lookup_msg(options, '*', :consistency => false) if state - resp = [ "ok", state ].to_json + resp = ['ok', state].to_json else - resp = [ "fail", "msg not found" ].to_json + resp = ['fail', 'msg not found'].to_json end send_packet(sock, resp) @@ -1884,27 +1873,27 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json # turn off consistency for a little more speed - state = lookup_msg(options, '*', {:consistency => false}) + state = lookup_msg(options, '*', :consistency => false) if state msg, msg_path = get_message(options, state, - {:read_message => false}) + :read_message => false) if msg - resp = [ "ok", msg ].to_json + resp = ['ok', msg].to_json else - resp = [ "fail", "msg couldn't be read" ].to_json + resp = ['fail', "msg couldn't be read"].to_json end else - resp = [ "fail", "msg not found" ].to_json + resp = ['fail', 'msg not found'].to_json end send_packet(sock, resp) @@ -1915,19 +1904,19 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json if lookup_msg(options, '*') delete_msg!(options) - resp = [ "ok", "msg deleted" ].to_json + resp = ['ok', 'msg deleted'].to_json else - resp = [ "fail", "msg not found" ].to_json + resp = ['fail', 'msg not found'].to_json end send_packet(sock, resp) @@ -1938,24 +1927,24 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json state = lookup_msg(options, '*') if state == 'que' # Jump to the front of the queue - ready_msg = @que.min {|a,b| a['due'].to_f <=> b['due'].to_f } + ready_msg = @que.min { |a, b| a['due'].to_f <=> b['due'].to_f } m = @que.find { |e| e['msg_id'] == options['msg_id'] } if (not m.nil?) and (not ready_msg.nil?) m['due'] = ready_msg['due'] - 1.0 - resp = [ "ok", "msg sent to front of run queue" ].to_json + resp = ['ok', 'msg sent to front of run queue'].to_json else - resp = [ "fail", "cannot send message to run state" ].to_json + resp = ['fail', 'cannot send message to run state'].to_json end end @@ -1967,28 +1956,28 @@ def handle_request(sock) json = packet.split(' ', 2)[1] options = JSON.parse(json) - if not options.has_key?('msg_id') - resp = [ "fail", "lacking 'msg_id' field"].to_json + if not options.key?('msg_id') + resp = ['fail', "lacking 'msg_id' field"].to_json send_packet(sock, resp) return end - resp = [ "fail", "unknown reason"].to_json + resp = ['fail', 'unknown reason'].to_json state = lookup_msg(options, '*') if state - if ['err', 'relayed', 'done'].include? state + if %w(err relayed done).include? state msg_id = clone_msg(options) if msg_id - resp = [ "ok", msg_id ].to_json + resp = ['ok', msg_id].to_json else - resp = [ "fail", "msg couldn't be cloned" ].to_json + resp = ['fail', "msg couldn't be cloned"].to_json end else - resp = [ "fail", "cannot clone message in #{state} state" ].to_json + resp = ['fail', "cannot clone message in #{state} state"].to_json end else - resp = [ "fail", "msg not found" ].to_json + resp = ['fail', 'msg not found'].to_json end send_packet(sock, resp) @@ -1996,8 +1985,7 @@ def handle_request(sock) end send_packet(sock, '[ "ERROR" ]') - log("RESP [ ERROR ] - Unhandled message") + log('RESP [ ERROR ] - Unhandled message') end - end end diff --git a/code/queueclient.rb b/code/queueclient.rb index 6e6ce0a..4347b25 100644 --- a/code/queueclient.rb +++ b/code/queueclient.rb @@ -6,14 +6,13 @@ module RQ class QueueClient - attr_accessor :name attr_accessor :pid - def initialize(name, path=".") + def initialize(name, path = '.') @name = name - path = File.join(File.dirname(__FILE__), "..") + path = File.join(File.dirname(__FILE__), '..') @queue_path = File.join(path, 'queue', @name) @queue_sock_path = File.join(@queue_path, 'queue.sock') @@ -21,14 +20,14 @@ def initialize(name, path=".") raise RQ::RqQueueNotFound unless File.directory?(@queue_path) end - def running?(pid=read_pid) + def running?(pid = read_pid) Process.kill(0, pid) rescue end def stop! pid = read_pid - Process.kill("TERM", pid) if running?(pid) + Process.kill('TERM', pid) if running?(pid) rescue end @@ -41,28 +40,28 @@ def read_pid File.read(@queue_path + '/queue.pid').to_i end - def do_read(client, numr = 32768) + def do_read(client, numr = 32_768) begin dat = client.sysread(numr) rescue Errno::EINTR # Ruby threading can cause an alarm/timer interrupt on a syscall sleep 0.001 # A tiny pause to prevent consuming all CPU retry rescue EOFError - #TODO: add debug mode - #puts "Got an EOF from socket read" + # TODO: add debug mode + # puts "Got an EOF from socket read" return nil - rescue Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF - raise "Got an #{$!} from socket read" + rescue Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL, Errno::EBADF + raise "Got an #{$ERROR_INFO} from socket read" end dat end # msg is single word, data is assumbed to be content as json - def send_recv(msg, data="") + def send_recv(msg, data = '') client = UNIXSocket.open(@queue_sock_path) contents = "#{msg} #{data}" - sock_msg = sprintf("rq1 %08d %s", contents.length, contents) + sock_msg = sprintf('rq1 %08d %s', contents.length, contents) UnixRack::Socket.write_buff(client, sock_msg) @@ -74,16 +73,16 @@ def send_recv(msg, data="") size_str = do_read(client, 9) - if size_str[-1..-1] != " " - raise "Invalid Protocol" + if size_str[-1..-1] != ' ' + raise 'Invalid Protocol' end size = size_str.to_i - result = UnixRack::Socket.read_sock_num_bytes(client, size, lambda {|s| puts s}) + result = UnixRack::Socket.read_sock_num_bytes(client, size, lambda { |s| puts s }) if result[0] == false - return ["fail", result[1]] + return ['fail', result[1]] end client.close @@ -166,6 +165,5 @@ def get_message_state(params) def get_message_status(params) send_recv('get_message_status', params.to_json) end - end end diff --git a/code/queuemgr.rb b/code/queuemgr.rb index c114a25..f0daa07 100644 --- a/code/queuemgr.rb +++ b/code/queuemgr.rb @@ -8,14 +8,13 @@ require 'version' def log(mesg) - File.open('log/queuemgr.log', "a") do |f| + File.open('log/queuemgr.log', 'a') do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end end module RQ class QueueMgr - attr_accessor :queues attr_accessor :scheduler attr_accessor :web_server @@ -27,19 +26,19 @@ def initialize @scheduler = nil @web_server = nil @start_time = Time.now - @status = "RUNNING" + @status = 'RUNNING' end def load_config - ENV["RQ_VER"] = VERSION_NUMBER - ENV["RQ_SEMVER"] = SEMANTIC_VERSION_NUMBER + ENV['RQ_VER'] = VERSION_NUMBER + ENV['RQ_SEMVER'] = SEMANTIC_VERSION_NUMBER begin data = File.read('config/config.json') @config = JSON.parse(data) - ENV["RQ_ENV"] = @config['env'] + ENV['RQ_ENV'] = @config['env'] rescue - log("Bad config file. Exiting") + log('Bad config file. Exiting') exit! 1 end @@ -60,7 +59,7 @@ def load_config def init # Show pid File.unlink('config/queuemgr.pid') rescue nil - File.open('config/queuemgr.pid', "w") do |f| + File.open('config/queuemgr.pid', 'w') do |f| f.write("#{Process.pid}\n") end @@ -90,9 +89,9 @@ def handle_request(sock) case cmd when 'ping' - sock.send("pong", 0) + sock.send('pong', 0) sock.close - log("RESP [ pong ]"); + log('RESP [ pong ]'); when 'environment' sock.send(ENV['RQ_ENV'], 0) @@ -100,7 +99,7 @@ def handle_request(sock) log("RESP [ environment - #{ENV['RQ_ENV']} ]") when 'version' - data = [ ENV['RQ_VER'], ENV['RQ_SEMVER'] ].to_json + data = [ENV['RQ_VER'], ENV['RQ_SEMVER']].to_json sock.send(data, 0) sock.close log("RESP [ version - #{data} ]") @@ -112,7 +111,7 @@ def handle_request(sock) sock.close when 'uptime' - data = [(Time.now - @start_time).to_i, ].to_json #['local','brserv_push'].to_json + data = [(Time.now - @start_time).to_i,].to_json # ['local','brserv_push'].to_json log("RESP [ uptime - #{data} ]") sock.send(data, 0) sock.close @@ -121,8 +120,8 @@ def handle_request(sock) log("RESP [ restart_queue - #{arg} ]") worker = @queues.find { |i| i.name == arg } status = 'fail' - if worker.status == "RUNNING" - Process.kill("TERM", worker.pid) rescue nil + if worker.status == 'RUNNING' + Process.kill('TERM', worker.pid) rescue nil status = 'ok' else # TODO @@ -135,7 +134,7 @@ def handle_request(sock) status = 'ok' end end - resp = [status, arg].to_json #['ok','brserv_push'].to_json + resp = [status, arg].to_json # ['ok','brserv_push'].to_json sock.send(resp, 0) sock.close @@ -168,7 +167,7 @@ def handle_request(sock) begin options = JSON.parse(File.read(arg)) rescue - reason = "could not read queue config [ #{arg} ]: #{$!}" + reason = "could not read queue config [ #{arg} ]: #{$ERROR_INFO}" err = true end @@ -203,7 +202,7 @@ def handle_request(sock) end end - resp = [ (err ? 'fail' : 'success'), reason ].to_json + resp = [(err ? 'fail' : 'success'), reason].to_json log("RESP [ #{resp} ]") sock.send(resp, 0) @@ -214,20 +213,20 @@ def handle_request(sock) status = 'fail' msg = 'no such queue' if worker - worker.status = "DELETE" - Process.kill("TERM", worker.pid) rescue nil + worker.status = 'DELETE' + Process.kill('TERM', worker.pid) rescue nil status = 'ok' msg = 'started deleting queue' end - resp = [ status, msg ].to_json #['ok','brserv_push'].to_json + resp = [status, msg].to_json # ['ok','brserv_push'].to_json sock.send(resp, 0) sock.close log("RESP [ #{resp} ]") else - sock.send("ERROR", 0) + sock.send('ERROR', 0) sock.close - log("RESP [ ERROR ] - Unhandled message") + log('RESP [ ERROR ] - Unhandled message') end end @@ -237,13 +236,13 @@ def reload # Notify running queues to reload configs @queues.each do |worker| - if dirs.has_key? worker.name + if dirs.key? worker.name log("RELOAD [ #{worker.name} - #{worker.pid} ] - SENDING HUP") - Process.kill("HUP", worker.pid) if worker.pid rescue nil + Process.kill('HUP', worker.pid) if worker.pid rescue nil else log("RELOAD [ #{worker.name} - #{worker.pid} ] - SENDING TERM") - worker.status = "SHUTDOWN" - Process.kill("TERM", worker.pid) if worker.pid rescue nil + worker.status = 'SHUTDOWN' + Process.kill('TERM', worker.pid) if worker.pid rescue nil end end @@ -258,12 +257,12 @@ def shutdown @queues.delete_if { |q| !q.pid } @queues.each do |q| - q.status = "SHUTDOWN" + q.status = 'SHUTDOWN' end @queues.each do |q| begin - Process.kill("TERM", q.pid) if q.pid + Process.kill('TERM', q.pid) if q.pid rescue StandardError => e puts "#{q.pid} #{e.inspect}" end @@ -275,18 +274,18 @@ def final_shutdown! # Process.kill("TERM", @scheduler.pid) if @scheduler.pid # Once all the queues are down, take the web server down - Process.kill("TERM", @web_server) if @web_server + Process.kill('TERM', @web_server) if @web_server # The actual shutdown happens when all procs are reaped File.unlink('config/queuemgr.pid') rescue nil $sock.close File.unlink('config/queuemgr.sock') rescue nil - log("FINAL SHUTDOWN - EXITING") + log('FINAL SHUTDOWN - EXITING') Process.exit! 0 end def start_queue(name) - worker = RQ::Queue.start_process({'name' => name}) + worker = RQ::Queue.start_process( 'name' => name ) if worker @queues << worker log("STARTED [ #{worker.name} - #{worker.pid} ]") @@ -326,17 +325,17 @@ def run! init load_config - Signal.trap("TERM") do - log("received TERM signal") + Signal.trap('TERM') do + log('received TERM signal') shutdown end - Signal.trap("CHLD") do - log("received CHLD signal") + Signal.trap('CHLD') do + log('received CHLD signal') end - Signal.trap("HUP") do - log("received HUP signal") + Signal.trap('HUP') do + log('received HUP signal') reload end @@ -355,16 +354,16 @@ def run! # Ye old event loop while true - #log(queues.select { |i| i.status != "ERROR" }.map { |i| [i.name, i.child_write_pipe] }.inspect) - io_list = queues.select { |i| i.status != "ERROR" }.map { |i| i.child_write_pipe } + # log(queues.select { |i| i.status != "ERROR" }.map { |i| [i.name, i.child_write_pipe] }.inspect) + io_list = queues.select { |i| i.status != 'ERROR' }.map { |i| i.child_write_pipe } io_list << $sock - #log(io_list.inspect) + # log(io_list.inspect) log('sleeping') begin ready, _, _ = IO.select(io_list, nil, nil, 60) rescue SystemCallError, StandardError # SystemCallError is the parent for all Errno::EFOO exceptions sleep 0.001 # A tiny pause to prevent consuming all CPU - log("error on SELECT #{$!}") + log("error on SELECT #{$ERROR_INFO}") closed_sockets = io_list.delete_if { |i| i.closed? } log("removing closed sockets #{closed_sockets.inspect} from io_list") retry @@ -384,7 +383,7 @@ def run! if defined?(Fcntl::F_GETFL) flag &= client_socket.fcntl(Fcntl::F_GETFL) end - #log("Non Block Flag -> #{flag} == #{File::NONBLOCK}") + # log("Non Block Flag -> #{flag} == #{File::NONBLOCK}") client_socket.fcntl(Fcntl::F_SETFL, flag) handle_request(client_socket) else @@ -404,7 +403,7 @@ def run! if res log("QUEUE PROC #{worker.name} of PID #{worker.pid} exited with status #{res[1]} - #{worker.status}") worker.child_write_pipe.close - if worker.status == "RUNNING" + if worker.status == 'RUNNING' worker.num_restarts += 1 # TODO # would really like a timer on the event loop so I can sleep a sec, but @@ -412,7 +411,7 @@ def run! # # If queue.rb code fails/exits if worker.num_restarts >= 11 - worker.status = "ERROR" + worker.status = 'ERROR' worker.pid = nil worker.child_write_pipe = nil log("FAILED [ #{worker.name} - too many restarts. Not restarting ]") @@ -422,11 +421,11 @@ def run! worker.pid = results[0] worker.child_write_pipe = results[1] end - elsif worker.status == "DELETE" + elsif worker.status == 'DELETE' RQ::Queue.delete(worker.name) queues.delete(worker) log("DELETED [ #{worker.name} ]") - elsif worker.status == "SHUTDOWN" + elsif worker.status == 'SHUTDOWN' queues.delete(worker) if queues.empty? final_shutdown! @@ -446,6 +445,5 @@ def run! end end - end end diff --git a/code/queuemgrclient.rb b/code/queuemgrclient.rb index aaaf035..026d071 100644 --- a/code/queuemgrclient.rb +++ b/code/queuemgrclient.rb @@ -3,9 +3,8 @@ module RQ class QueueMgrClient - def self.running? - pid = self.read_pid + pid = read_pid return false unless pid @@ -15,20 +14,20 @@ def self.running? return false end - return true + true end def self.stop! if self.running? - pid = self.read_pid + pid = read_pid begin - Process.kill("TERM", pid) + Process.kill('TERM', pid) return true rescue return false end end - return false + false end def self.read_pid @@ -37,7 +36,7 @@ def self.read_pid def self.ping client = UNIXSocket.open('config/queuemgr.sock') - client.send("ping", 0) + client.send('ping', 0) result = client.recvfrom(1024) client.close result ? result[0] : nil @@ -45,7 +44,7 @@ def self.ping def self.environment client = UNIXSocket.open('config/queuemgr.sock') - client.send("environment", 0) + client.send('environment', 0) result = client.recvfrom(1024) client.close result ? result[0] : nil @@ -53,7 +52,7 @@ def self.environment def self.version client = UNIXSocket.open('config/queuemgr.sock') - client.send("version", 0) + client.send('version', 0) result = client.recvfrom(1024) client.close result ? JSON.parse(result[0]) : nil @@ -61,7 +60,7 @@ def self.version def self.queues client = UNIXSocket.open('config/queuemgr.sock') - client.send("queues", 0) + client.send('queues', 0) result = client.recvfrom(1024) client.close result ? JSON.parse(result[0]) : nil @@ -69,7 +68,7 @@ def self.queues def self.uptime client = UNIXSocket.open('config/queuemgr.sock') - client.send("uptime", 0) + client.send('uptime', 0) result = client.recvfrom(1024) client.close result ? JSON.parse(result[0]) : nil @@ -107,6 +106,5 @@ def self.delete_queue(queue_name) client.close result ? JSON.parse(result[0]) : nil end - end end diff --git a/code/relay_script.rb b/code/relay_script.rb index 9bc0bb0..7233663 100755 --- a/code/relay_script.rb +++ b/code/relay_script.rb @@ -1,5 +1,5 @@ #!/usr/bin/env ruby -$:.unshift(File.join(File.dirname(__FILE__), "..")) +$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..')) require 'vendor/environment' require 'json' @@ -12,7 +12,7 @@ require 'code/queueclient' def log(mesg) - puts "\033[0;36m#{$$} - #{Time.now}\033[0m - #{mesg}" + puts "\033[0;36m#{$PROCESS_ID} - #{Time.now}\033[0m - #{mesg}" $stdout.flush end @@ -37,20 +37,20 @@ def soft_fail(mesg = 'soft fail') exit(0) end -def get_id() +def get_id return nil unless File.exist?('relay_id') - File.open('relay_id', "r") do + File.open('relay_id', 'r') do |f| x = f.read log("Using prior relay_id: #{x}") return x end - return nil + nil end def set_id(msg_id) - File.open('relay_id.tmp', "w") do + File.open('relay_id.tmp', 'w') do |f| f.write(msg_id) end @@ -58,10 +58,10 @@ def set_id(msg_id) log("Now using relay_id: #{msg_id}") - return true + true end -def erase_id() +def erase_id File.unlink('relay_id') rescue nil end @@ -69,7 +69,7 @@ def file_md5(path) hasher = Digest::MD5.new File.open(path, 'r') do |file| - hasher.update(file.read(32768)) until file.eof + hasher.update(file.read(32_768)) until file.eof end result = hasher.hexdigest @@ -78,8 +78,8 @@ def file_md5(path) # Get count, if too high, big fail count = ENV['RQ_COUNT'].to_i if count > 15 - write_status('run', "RQ_COUNT > 15 - failing") - write_status('fail', "RQ_COUNT > 15 FAIL") + write_status('run', 'RQ_COUNT > 15 - failing') + write_status('fail', 'RQ_COUNT > 15 FAIL') end # Get destination queue @@ -93,17 +93,17 @@ def file_md5(path) if (ENV['RQ_DEST'] == 'http://127.0.0.1:3333/q/test') && (ENV['RQ_PARAM2'] == 'the_mighty_rq_force') force = true - log("TEST MODE force = true") + log('TEST MODE force = true') if ENV['RQ_PARAM3'] == 'fail' && ENV['RQ_COUNT'] == '0' fake_fail = true - log("TEST MODE fake_fail = true") + log('TEST MODE fake_fail = true') end end # If this was a force_remote if ENV['RQ_FORCE_REMOTE'] == '1' - log("FORCE REMOTE") + log('FORCE REMOTE') remote_delivery = true end @@ -111,33 +111,33 @@ def file_md5(path) # Get the URL remote_q_uri = dest[/(.*?\/q\/[^\/]+)/, 1] - #write_status('err', "Cannot do remote queue relay yet.") - #exit(0) + # write_status('err', "Cannot do remote queue relay yet.") + # exit(0) ## ## REMOTE QUEUE DELIVERY ## ## Check if destq is relay, which is invalid - #destq = ENV['RQ_DEST_QUEUE'] - #if destq == 'relay' + # destq = ENV['RQ_DEST_QUEUE'] + # if destq == 'relay' # write_status('err', "Message dest queue is relay. Cannot have that.") # exit(0) - #end + # end ## If valid queue, attempt to relay message - #require 'code/queueclient' - #qc = RQ::QueueClient.new(destq, "../../../../..") + # require 'code/queueclient' + # qc = RQ::QueueClient.new(destq, "../../../../..") - #log("Attempting connect with #{destq}") + # log("Attempting connect with #{destq}") - #if not qc.exists? + # if not qc.exists? # soft_fail("#{destq} does not exist") - #end + # end # 2 phase commit section - new_msg_id = get_id() # Do we have an ID already + new_msg_id = get_id # Do we have an ID already # Yes, skip prep... if nil == new_msg_id # No, do prep, and store id @@ -146,7 +146,7 @@ def file_md5(path) # Get data about message curr_msg = nil begin - data = File.read("../msg") # TODO: eliminate this cheat + data = File.read('../msg') # TODO: eliminate this cheat curr_msg = JSON.parse(data) rescue # TODO: Log to private log here @@ -161,7 +161,7 @@ def file_md5(path) keys = %w(dest src count max_count param1 param2 param3 param4 post_run_webhook orig_msg_id) keys.each do |key| - next unless curr_msg.has_key?(key) + next unless curr_msg.key?(key) mesg[key] = curr_msg[key] end @@ -169,11 +169,11 @@ def file_md5(path) log("attempting remote #{remote_q_uri}") # Connect to that site for that queue and submit the message - uri = remote_q_uri + "/new_message" + uri = remote_q_uri + '/new_message' begin - res = Net::HTTP.post_form(URI.parse(uri), {:x_format => 'json', :mesg => mesg.to_json }) + res = Net::HTTP.post_form(URI.parse(uri), :x_format => 'json', :mesg => mesg.to_json) rescue Exception - log("Net::HTTP exception: " + $!.to_s) + log('Net::HTTP exception: ' + $ERROR_INFO.to_s) # THIS IS SO BAD, BUT HEY SUCH IS LIFE UNTIL 1.9 # WHY? # BEST DESCRIPTION IS HERE http://jerith.livejournal.com/40063.html @@ -189,7 +189,7 @@ def file_md5(path) soft_fail("Couldn't queue message: #{json_result.inspect}") end if fake_fail # We are exiting anyways - set_id(new_msg_id + "00110011") + set_id(new_msg_id + '00110011') soft_fail("FAKE FAIL - Couldn't queue message: #{json_result.inspect}") end else @@ -199,12 +199,12 @@ def file_md5(path) end # Pull the Short MSG ID out of the Full Msg Id - #q_name = full_mesg_id[/\/q\/([^\/]+)/, 1] + # q_name = full_mesg_id[/\/q\/([^\/]+)/, 1] new_short_msg_id = new_msg_id[/\/q\/[^\/]+\/([^\/]+)/, 1] # Idempotently attach any attachments if File.exists?('../attach') - log("attempting sending attach") + log('attempting sending attach') entries = Dir.entries('../attach').reject { |e| e.start_with?('.') } fnames = entries.select { |e| File.file?("../attach/#{e}") } @@ -216,27 +216,27 @@ def file_md5(path) md5 = file_md5("../attach/#{fname}") pipe_res = `curl -0 -s -F x_format=json -F filedata=@../attach/#{fname} -F pathname=#{fname} -F msg_id=#{new_short_msg_id} #{new_msg_id}/attach/new` - #p $? - #p pipe_res + # p $? + # p pipe_res # Get the URL - #res = Net::HTTP.post_form(URI.parse(remote_q_uri + "/#{msg_id}/attach/new"), form) + # res = Net::HTTP.post_form(URI.parse(remote_q_uri + "/#{msg_id}/attach/new"), form) - if $?.exitstatus != 0 - soft_fail("Couldn't run curl to attach to message: #{$?.exitstatus.inspect}") + if $CHILD_STATUS.exitstatus != 0 + soft_fail("Couldn't run curl to attach to message: #{$CHILD_STATUS.exitstatus.inspect}") end begin result = JSON.parse(pipe_res) rescue Exception - log("Could not parse JSON") + log('Could not parse JSON') log(pipe_res) - write_status('err', "BAD JSON") + write_status('err', 'BAD JSON') exit(1) end if result[0] != 'ok' if result[0] == 'fail' and result[1] == 'cannot find message' - erase_id() + erase_id soft_fail("Remote message [#{new_msg_id}] disappeared: #{pipe_res}. Getting new id.") end soft_fail("Couldn't attach to test message properly : #{pipe_res}") @@ -259,7 +259,7 @@ def file_md5(path) begin res = Net::HTTP.post_form(URI.parse(uri), form) rescue Exception - log("Net::HTTP exception: " + $!.to_s) + log('Net::HTTP exception: ' + $ERROR_INFO.to_s) # THIS IS SO BAD, BUT HEY SUCH IS LIFE UNTIL 1.9 # WHY? # BEST DESCRIPTION IS HERE http://jerith.livejournal.com/40063.html @@ -274,12 +274,12 @@ def file_md5(path) if json_result[0] != 'ok' if json_result[0] == 'fail' and json_result[1] == 'cannot find message' - erase_id() + erase_id soft_fail("Remote message [#{new_msg_id}] disappeared: #{json_result.inspect}. Getting new id.") end soft_fail("Couldn't commit message: #{json_result.inspect}") else - erase_id() + erase_id write_status('relayed', new_msg_id) end @@ -293,18 +293,18 @@ def file_md5(path) # Check if destq is relay, which is invalid destq = ENV['RQ_DEST_QUEUE'] if destq == 'relay' - write_status('err', "Message dest queue is relay. Cannot have that.") + write_status('err', 'Message dest queue is relay. Cannot have that.') exit(0) end # If valid queue, attempt to relay message log("Attempting connect with local queue #{destq}") -qc = RQ::QueueClient.new(destq, "../../../../..") rescue soft_fail("#{destq} does not exist") +qc = RQ::QueueClient.new(destq, '../../../../..') rescue soft_fail("#{destq} does not exist") # 2 phase commit section -new_msg_id = get_id() # Do we have an ID already +new_msg_id = get_id # Do we have an ID already # Yes, skip prep... if nil == new_msg_id # No, do prep, and store id @@ -313,14 +313,13 @@ def file_md5(path) # Get data about message curr_msg = nil begin - data = File.read("../msg") # TODO: eliminate this cheat + data = File.read('../msg') # TODO: eliminate this cheat curr_msg = JSON.parse(data) rescue # TODO: Log to private log here soft_fail("Couldn't read message data from file") end - # Increment count curr_msg['count'] = curr_msg.fetch('count', 0) + 1 @@ -329,13 +328,13 @@ def file_md5(path) keys = %w(dest src count max_count param1 param2 param3 param4 post_run_webhook orig_msg_id) keys.each do |key| - next unless curr_msg.has_key?(key) + next unless curr_msg.key?(key) mesg[key] = curr_msg[key] end result = qc.prep_message(mesg) - if result && (result[0] == "ok") + if result && (result[0] == 'ok') new_msg_id = result[1] set_id(new_msg_id) else @@ -344,10 +343,10 @@ def file_md5(path) end # Pull the Short MSG ID out of the Full Msg Id -#q_name = full_mesg_id[/\/q\/([^\/]+)/, 1] +# q_name = full_mesg_id[/\/q\/([^\/]+)/, 1] new_short_msg_id = new_msg_id[/\/q\/[^\/]+\/([^\/]+)/, 1] -log("attempting local send attach") +log('attempting local send attach') # Idempotently attach any attachments if File.exists?('../attach') @@ -358,12 +357,12 @@ def file_md5(path) |fname| log("attempting local send attach #{fname}") - mesg = {'msg_id' => new_short_msg_id, - 'pathname' => File.expand_path("../attach/#{fname}") + mesg = { 'msg_id' => new_short_msg_id, + 'pathname' => File.expand_path("../attach/#{fname}") } result = qc.attach_message(mesg) - if result == nil + if result.nil? soft_fail("Couldn't attach file: #{mesg}") end if result && (result[0] == false) @@ -374,14 +373,12 @@ def file_md5(path) end # Commit ID -mesg = {'msg_id' => new_short_msg_id, } +mesg = { 'msg_id' => new_short_msg_id, } result = qc.commit_message(mesg) -if result && (result[0] == "ok") - erase_id() +if result && (result[0] == 'ok') + erase_id write_status('relayed', new_msg_id) else soft_fail("Couldn't commit message: #{result[0]} - #{result[1]}") end - - diff --git a/code/router.rb b/code/router.rb index a7747ff..bf35116 100644 --- a/code/router.rb +++ b/code/router.rb @@ -2,7 +2,7 @@ class MiniRouter def call(env) - path = env["PATH_INFO"].to_s.squeeze("/") + path = env['PATH_INFO'].to_s.squeeze('/') ## Notice the chaining here, if a path below has a dependency, ## that dependency must be handled prior, otherwhise an infinite @@ -10,10 +10,10 @@ def call(env) # Gotta deal with static stuff first if path.index('/css') or path.index('/javascripts') or path.index('/favicon.ico') - return Rack::ConditionalGet.new(Rack::Static.new(nil, :urls => ["/css", "/javascripts", "/favicon.ico"], :root => 'code/public')).call(env) + return Rack::ConditionalGet.new(Rack::Static.new(nil, :urls => ['/css', '/javascripts', '/favicon.ico'], :root => 'code/public')).call(env) end if path.index('/scripts') - return Rack::ConditionalGet.new(Rack::Static.new(nil, :urls => ["/scripts"], :root => 'code')).call(env) + return Rack::ConditionalGet.new(Rack::Static.new(nil, :urls => ['/scripts'], :root => 'code')).call(env) end # Everything else goes into main diff --git a/code/rq.rb b/code/rq.rb index 550ec3b..ff632cc 100644 --- a/code/rq.rb +++ b/code/rq.rb @@ -5,14 +5,14 @@ def check_usage(arg_list) if not arg_list.length > 0 or arg_list.include?('-h') or arg_list.include?('--help') - puts "Valid commands are:" - puts " " + Commands.new.public_methods.grep(/^cmd_/).each{|c| c.to_s.gsub!(/^cmd_/, '') }.sort.join("\n ") + puts 'Valid commands are:' + puts ' ' + Commands.new.public_methods.grep(/^cmd_/).each { |c| c.to_s.gsub!(/^cmd_/, '') }.sort.join("\n ") exit 1 end end def process_args(arg_list) - input = { } + input = {} input[:cmd] = arg_list.shift input[:xtra] = [] @@ -29,8 +29,8 @@ def process_args(arg_list) input[parts[0][2..-1]] = parts[1] i += 1 next - elsif (i+1 < arg_list.length) && (arg_list[i+1].index('--') == nil) - input[arg_list[i][2..-1]] = arg_list[i+1] + elsif (i + 1 < arg_list.length) && (arg_list[i + 1].index('--') == nil) + input[arg_list[i][2..-1]] = arg_list[i + 1] i += 2 next end @@ -60,7 +60,7 @@ def cmd_sendmesg(args) raise RQ::RqMissingArgument if not q_name if q_name.start_with?('http:') - raise RQ::RqCannotRelay if !args.has_key?('relay-ok') # throw :halt, [404, 'Sorry - cannot relay message'] + raise RQ::RqCannotRelay if !args.key?('relay-ok') # throw :halt, [404, 'Sorry - cannot relay message'] q_name = 'relay' end @@ -70,12 +70,12 @@ def cmd_sendmesg(args) mesg = {} keys = %w(dest src count max_count param1 param2 param3 param4 due force_remote) keys.each do |key| - next unless args.has_key?(key) + next unless args.key?(key) mesg[key] = args[key] end result = qc.create_message(mesg) print "#{result[0]} #{result[1]}\n" - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def cmd_prepmesg(args) @@ -83,7 +83,7 @@ def cmd_prepmesg(args) raise RQ::RqMissingArgument if not q_name if q_name.start_with?('http:') - raise RQ::RqCannotRelay if !args.has_key?('relay-ok') # throw :halt, [404, 'Sorry - cannot relay message'] + raise RQ::RqCannotRelay if !args.key?('relay-ok') # throw :halt, [404, 'Sorry - cannot relay message'] q_name = 'relay' end @@ -93,12 +93,12 @@ def cmd_prepmesg(args) mesg = {} keys = %w(dest src count max_count param1 param2 param3 param4 due force_remote) keys.each do |key| - next unless args.has_key?(key) + next unless args.key?(key) mesg[key] = args[key] end result = qc.prep_message(mesg) print "#{result[0]} #{result[1]}\n" - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def check_attachment(msg) @@ -107,7 +107,7 @@ def check_attachment(msg) return [false, "Attachment currently cannot be a directory #{msg['pathname']}"] if File.directory?(msg['pathname']) return [false, "Attachment currently cannot be read: #{msg['pathname']}"] unless File.readable?(msg['pathname']) return [false, "Attachment currently not of supported type: #{msg['pathname']}"] unless File.file?(msg['pathname']) - return [true, ''] + [true, ''] end def cmd_attachmesg(args) @@ -120,10 +120,10 @@ def cmd_attachmesg(args) qc = get_queue_client(q_name) # Construct message for queue mgr - msg = {'msg_id' => msg_id} + msg = { 'msg_id' => msg_id } keys = %w(pathname name local_fs_only) keys.each do |key| - next unless args.has_key?(key) + next unless args.key?(key) msg[key] = args[key] end @@ -132,7 +132,7 @@ def cmd_attachmesg(args) raise RQ::RqError(results[0]) if not results[0] # throw :halt, [404, "404 - #{results[0]}"] result = qc.attach_message(msg) print "#{result[0]} #{result[1]} for Message: #{full_mesg_id} attachment\n" - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def cmd_commitmesg(args) @@ -145,11 +145,11 @@ def cmd_commitmesg(args) qc = get_queue_client(q_name) # Construct message for queue mgr - mesg = {'msg_id' => msg_id } + mesg = { 'msg_id' => msg_id } result = qc.commit_message(mesg) print "#{result[0]} #{result[1]}\n" - result[0] == "ok" ? 0 : 1 - #p "#{result} for Message: #{mesg['msg-id']} committed" + result[0] == 'ok' ? 0 : 1 + # p "#{result} for Message: #{mesg['msg-id']} committed" end def cmd_statusmesg(args) @@ -162,14 +162,14 @@ def cmd_statusmesg(args) qc = get_queue_client(q_name) # Construct message for queue mgr - mesg = {'msg_id' => msg_id } + mesg = { 'msg_id' => msg_id } result = qc.get_message_status(mesg) if result[0] == 'ok' print "#{result[0]} #{result[1]['status']}\n" else print "#{result[0]} #{result[1]}\n" end - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def cmd_state(args) @@ -182,14 +182,14 @@ def cmd_state(args) qc = get_queue_client(q_name) # Construct message for queue mgr - mesg = {'msg_id' => msg_id } + mesg = { 'msg_id' => msg_id } result = qc.get_message_state(mesg) if result[0] == 'ok' print "#{result[0]} #{result[1]}\n" else print "#{result[0]} #{result[1]}\n" end - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def cmd_statuscountmesg(args) @@ -202,14 +202,14 @@ def cmd_statuscountmesg(args) qc = get_queue_client(q_name) # Construct message for queue mgr - mesg = {'msg_id' => msg_id } + mesg = { 'msg_id' => msg_id } result = qc.get_message(mesg) if result[0] == 'ok' print "#{result[0]} #{result[1].fetch('count', '0')}\n" else print "#{result[0]} #{result[1]}\n" end - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def cmd_single_que(args) @@ -222,12 +222,12 @@ def cmd_single_que(args) mesg = {} keys = %w(dest src count max_count param1 param2 param3 param4 due force_remote) keys.each do |key| - next unless args.has_key?(key) + next unless args.key?(key) mesg[key] = args[key] end result = qc.single_que(mesg) print "#{result[0]} #{result[1]}\n" - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def cmd_attachstatusmesg(args) @@ -240,12 +240,12 @@ def cmd_attachstatusmesg(args) qc = get_queue_client(q_name) # Construct message for queue mgr - mesg = {'msg_id' => msg_id } + mesg = { 'msg_id' => msg_id } result = qc.get_message(mesg) if result[0] == 'ok' ents = [] - if result[1].has_key?('_attachments') - result[1]['_attachments'].each do |k,v| + if result[1].key?('_attachments') + result[1]['_attachments'].each do |k, v| ents << [k, v['md5'], v['size'], v['path']] end end @@ -256,7 +256,7 @@ def cmd_attachstatusmesg(args) else print "#{result[0]} #{result[1]}\n" end - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def cmd_clone(args) @@ -268,10 +268,10 @@ def cmd_clone(args) qc = get_queue_client(q_name) - mesg = {'msg_id' => msg_id } + mesg = { 'msg_id' => msg_id } result = qc.clone_message(mesg) print "#{result[0]} #{result[1]}\n" - result[0] == "ok" ? 0 : 1 + result[0] == 'ok' ? 0 : 1 end def cmd_verify_rules(args) @@ -280,13 +280,13 @@ def cmd_verify_rules(args) rp = RQ::RuleProcessor.process_pathname(args['path'], args['verbose']) if rp == nil - puts "fail bad router rules file" + puts 'fail bad router rules file' return 1 else if args['verbose'] - rp.rules.each {|o| p o} + rp.rules.each { |o| p o } end - puts "ok" + puts 'ok' return 0 end end diff --git a/code/rq_router_script.rb b/code/rq_router_script.rb index 1e38b3e..cd5e72c 100755 --- a/code/rq_router_script.rb +++ b/code/rq_router_script.rb @@ -18,16 +18,15 @@ module Alarm so_ext = 'so.6' end dlload "libc.#{so_ext}" - extern "unsigned int alarm(unsigned int)" + extern 'unsigned int alarm(unsigned int)' end - $rq_msg_dir = Dir.pwd Dir.chdir("#{File.dirname(__FILE__)}") require 'rule_processor' -$LOAD_PATH.unshift(File.expand_path("../vendor/gems/json_pure-1.1.6/lib")) +$LOAD_PATH.unshift(File.expand_path('../vendor/gems/json_pure-1.1.6/lib')) require 'json' # Setup a global binding so the GC doesn't close the file @@ -42,7 +41,7 @@ def write_status(state, mesg) $RQ_IO.syswrite(msg) end -def read_status() +def read_status data = $RQ_RESULT_IO.sysread(512) data.split(' ', 2) end @@ -51,8 +50,8 @@ def handle_fail(mesg = 'soft fail') count = ENV['RQ_COUNT'].to_i if count > 15 - write_status('run', "RQ_COUNT > 15 - failing") - write_status('fail', "RQ_COUNT > 15 - failing") + write_status('run', 'RQ_COUNT > 15 - failing') + write_status('fail', 'RQ_COUNT > 15 - failing') exit(0) end @@ -62,16 +61,16 @@ def handle_fail(mesg = 'soft fail') end def main - path = "../config/rq_router_rules.rb" + path = '../config/rq_router_rules.rb' rp = RQ::RuleProcessor.process_pathname(path) if rp == nil - File.open("../config/rq_router.down", "w") { |f| f.write("bad rules file") } + File.open('../config/rq_router.down', 'w') { |f| f.write('bad rules file') } write_status('err', "Bad rule file at: #{path}") exit 1 end - trap("ALRM") { puts "#{$$}: program took too long (60 secs). Goodbye"; $stdout.flush; exit! 2 } + trap('ALRM') { puts "#{$PROCESS_ID}: program took too long (60 secs). Goodbye"; $stdout.flush; exit! 2 } Alarm.alarm(60) # Read current message @@ -81,7 +80,7 @@ def main rule = rp.first_match(curr_msg) if rule.data[:log] - File.open("../log/rq_router.log", "a") do + File.open('../log/rq_router.log', 'a') do |f| f.write("#{Process.pid} - #{Time.now} - #{rule.data[:desc]} - #{rule.data[:action].to_s} - #{curr_msg['msg_id']} - #{curr_msg['src']}\n") end @@ -93,20 +92,20 @@ def main end if rule.data[:action] == :done - write_status('done', "router processed a done") + write_status('done', 'router processed a done') exit 0 end if rule.data[:action] == :err - write_status('err', "router processed an err") + write_status('err', 'router processed an err') exit 1 end if rule.data[:action] == :balance - host = rule.select_hosts()[0] + host = rule.select_hosts[0] new_dest = rp.txform_host(curr_msg['dest'], host) write_status('dup', "0-X-#{new_dest}") - status, new_id = read_status() + status, new_id = read_status if status == 'ok' write_status('done', "DUP to #{new_id}") exit 0 @@ -117,25 +116,25 @@ def main end if rule.data[:action] == :relay - hosts = rule.select_hosts() + hosts = rule.select_hosts - hosts.each { + hosts.each do |h| new_dest = rp.txform_host(curr_msg['dest'], h) write_status('dup', "0-X-#{new_dest}") - status, new_id = read_status() + status, new_id = read_status if status == 'ok' write_status('run', "DUP relay to #{new_id}") else write_status('err', "DUP relay failed - #{new_id}") exit 1 end - } + end write_status('done', "done relay of #{hosts.length} messages") exit 0 end end -main() +main diff --git a/code/rule_processor.rb b/code/rule_processor.rb index 76f0fb4..f90e8fe 100755 --- a/code/rule_processor.rb +++ b/code/rule_processor.rb @@ -1,21 +1,19 @@ module RQ - class DSLArgumentError < StandardError; end class DSLRuleError < StandardError; end class Rule - attr_accessor :data @@fields = [:desc, :action, :src, :dest, :route, :delay, :log, :num] - @@rand_proc = lambda {|x| rand(x) } + @@rand_proc = lambda { |x| rand(x) } def self.rand_func=(f) @@rand_proc = f end - def initialize() + def initialize @data = {} end @@ -38,7 +36,7 @@ def src(rgx) end def dest(dst) - raise DSLArgumentError, "Dest not a regexp" if dst.class != Regexp + raise DSLArgumentError, 'Dest not a regexp' if dst.class != Regexp @data[:dest] = dst end @@ -48,7 +46,7 @@ def route(*rt) end def log(tf) - raise DSLArgumentError, "delay must be an boolean: #{tf}" unless (tf.class == TrueClass || tf.class == FalseClass) + raise DSLArgumentError, "delay must be an boolean: #{tf}" unless tf.class == TrueClass || tf.class == FalseClass @data[:log] = tf end @@ -59,7 +57,7 @@ def delay(dly) def end_rule # Validate rule - raise ArgumentError if act.class != Symbol - #$rules << self + # $rules << self if [:blackhole, :err].include? @data[:action] @data[:log] = true end @@ -68,7 +66,7 @@ def end_rule @data[:delay] ||= 0 if @data[:desc] != 'default' - raise DSLRuleError, "rule must have a src or dest pattern match" unless (@data[:src] || @data[:dest]) + raise DSLRuleError, 'rule must have a src or dest pattern match' unless @data[:src] || @data[:dest] end @data[:route] ||= [] @@ -114,11 +112,11 @@ def select_hosts end else # pick a random element - [ rts[@@rand_proc.call(rts.length)] ] + [rts[@@rand_proc.call(rts.length)]] end end - def process(str, num, verbose=false) + def process(str, num, verbose = false) begin instance_eval(str) rescue DSLArgumentError => ex @@ -136,7 +134,6 @@ def process(str, num, verbose=false) end class RuleProcessor - attr_accessor :rules def initialize(rls) @@ -153,7 +150,7 @@ def first_match(o) def txform_host(old, new) # if new has full address, we just use that - if new.start_with?("http") + if new.start_with?('http') return new end @@ -162,7 +159,7 @@ def txform_host(old, new) new += ':3333' end - if old.start_with?("http") # if a standard full msg_id + if old.start_with?('http') # if a standard full msg_id # just swap out the host parts = old.split('/q/', 2) "http://#{new}/q/#{parts[1]}" @@ -172,7 +169,7 @@ def txform_host(old, new) end end - def self.process_pathname(path, verbose=false) + def self.process_pathname(path, verbose = false) rules = [] begin lines = [] @@ -185,7 +182,7 @@ def self.process_pathname(path, verbose=false) lines.each_with_index do |line, i| i = i + 1 # i is offset by 0, so we bump it up for human readable line #s - next if line[0..1] == "#" + next if line[0..1] == '#' if in_rule if line[0..1] == "\n" @@ -195,10 +192,10 @@ def self.process_pathname(path, verbose=false) end rule.process(line, i, verbose) end - if line[0..4] == "rule " + if line[0..4] == 'rule ' rule.end_rule if in_rule in_rule = true - rule = Rule.new() + rule = Rule.new rule.data[:num] = rules.length + 1 rules << rule rule.process(line, i, verbose) @@ -217,12 +214,12 @@ def self.process_pathname(path, verbose=false) return nil end - any_defaults,rules = rules.partition {|o| o.data[:desc] == 'default'} + any_defaults, rules = rules.partition { |o| o.data[:desc] == 'default' } default_rule = Rule.new default_rule.rule('default') default_rule.action(:err) - default_rule.end_rule() + default_rule.end_rule any_defaults.unshift(default_rule) @@ -230,7 +227,5 @@ def self.process_pathname(path, verbose=false) RuleProcessor.new(rules) end - end - end diff --git a/code/scheduler.rb b/code/scheduler.rb index 0d0601b..b79395e 100644 --- a/code/scheduler.rb +++ b/code/scheduler.rb @@ -7,13 +7,12 @@ module RQ class Scheduler - def initialize(options, parent_pipe) @start_time = Time.now # Read config - @name = "scheduler" - @sched_path = "scheduler/" - @rq_config_path = "./config/" + @name = 'scheduler' + @sched_path = 'scheduler/' + @rq_config_path = './config/' @parent_pipe = parent_pipe init_socket @@ -21,12 +20,12 @@ def initialize(options, parent_pipe) end def self.log(path, mesg) - File.open(path + '/sched.log', "a") do |f| + File.open(path + '/sched.log', 'a') do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end end - def self.start_process(options={}) + def self.start_process(options = {}) # nice pipes writeup # http://www.cim.mcgill.ca/~franco/OpSys-304-427/lecture-notes/node28.html child_rd, parent_wr = IO.pipe @@ -37,11 +36,11 @@ def self.start_process(options={}) Signal.trap('CHLD', 'DEFAULT') Signal.trap('HUP', 'DEFAULT') - sched_path = "scheduler/" - $0 = "[rq-scheduler]" + sched_path = 'scheduler/' + $0 = '[rq-scheduler]' begin parent_wr.close - #child only code block + # child only code block RQ::Scheduler.log(sched_path, 'post fork') q = RQ::Scheduler.new(options, child_rd) @@ -52,14 +51,14 @@ def self.start_process(options={}) sleep 60 end rescue Exception - self.log(sched_path, "Exception!") - self.log(sched_path, $!) - self.log(sched_path, $!.backtrace) + log(sched_path, 'Exception!') + log(sched_path, $ERROR_INFO) + log(sched_path, $ERROR_INFO.backtrace) raise end end - #parent only code block + # parent only code block child_rd.close if child_pid == nil @@ -91,7 +90,7 @@ def self.close_all_fds(exclude_fds) def init_socket # Show pid File.unlink(@sched_path + '/sched.pid') rescue nil - File.open(@sched_path + '/sched.pid', "w") do |f| + File.open(@sched_path + '/sched.pid', 'w') do |f| f.write("#{Process.pid}\n") end @@ -101,24 +100,23 @@ def init_socket end def log(mesg) - File.open(@sched_path + '/sched.log', "a") do |f| + File.open(@sched_path + '/sched.log', 'a') do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end end def shutdown! - log("Received shutdown") + log('Received shutdown') Process.exit! 0 end def run_loop Signal.trap('TERM') do - log("received TERM signal") + log('received TERM signal') shutdown! end sleep while true end - end end diff --git a/code/web_server.rb b/code/web_server.rb index 7b121d1..a369290 100755 --- a/code/web_server.rb +++ b/code/web_server.rb @@ -6,7 +6,6 @@ module RQ class WebServer - def initialize(config) @basic_auth = config['basic_auth'] @port = config['port'] @@ -27,12 +26,12 @@ def run! router = protected_router end - Rack::Handler::UnixRack.run(router, { - :Port => @port, - :Host => @addr, - :Hostname => @host, - :allowed_ips => @allowed_ips, - }) + Rack::Handler::UnixRack.run(router, + :Port => @port, + :Host => @addr, + :Hostname => @host, + :allowed_ips => @allowed_ips + ) end end end diff --git a/code/webhook_script.rb b/code/webhook_script.rb index d86646d..929512a 100755 --- a/code/webhook_script.rb +++ b/code/webhook_script.rb @@ -18,8 +18,8 @@ def handle_fail(mesg = 'soft fail') count = ENV['RQ_COUNT'].to_i if count > 15 - write_status('run', "RQ_COUNT > 15 - failing") - write_status('fail', "RQ_COUNT > 15 - failing") + write_status('run', 'RQ_COUNT > 15 - failing') + write_status('fail', 'RQ_COUNT > 15 - failing') exit(0) end @@ -51,4 +51,4 @@ def send_post end end -send_post() +send_post diff --git a/version.rb b/version.rb index 7bf8417..5e4402d 100644 --- a/version.rb +++ b/version.rb @@ -1,4 +1,4 @@ -#any comments go here -VERSION_NUMBER = "20140203.0" -SEMANTIC_VERSION_NUMBER = "v1.13.5" +# any comments go here +VERSION_NUMBER = '20140203.0' +SEMANTIC_VERSION_NUMBER = 'v1.13.5' # http://semver.org/