Skip to content

Commit

Permalink
Merge pull request #12 from estantevirtual/dev
Browse files Browse the repository at this point in the history
Permite passar topic no producer e no consumer
  • Loading branch information
RibaDev authored Nov 30, 2017
2 parents fb48731 + 938411a commit 13e8007
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 24 deletions.
15 changes: 8 additions & 7 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
PATH
remote: .
specs:
event-pub-sub (3.0.0)
event-pub-sub (3.1.0)
activesupport
bunny (= 2.3.0)

GEM
remote: https://rubygems.org/
specs:
activesupport (5.0.2)
activesupport (5.1.4)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (~> 0.7)
minitest (~> 5.1)
tzinfo (~> 1.1)
amq-protocol (2.1.0)
amq-protocol (2.2.0)
ast (2.2.0)
astrolabe (1.3.1)
parser (~> 2.2)
Expand Down Expand Up @@ -44,15 +44,16 @@ GEM
guard-compat (~> 1.1)
rspec (>= 2.99.0, < 4.0)
hitimes (1.2.2)
i18n (0.8.1)
i18n (0.9.1)
concurrent-ruby (~> 1.0)
iniparse (1.4.2)
listen (2.8.5)
celluloid (>= 0.15.2)
rb-fsevent (>= 0.9.3)
rb-inotify (>= 0.9)
lumberjack (1.0.9)
method_source (0.8.2)
minitest (5.10.1)
minitest (5.10.3)
nenv (0.2.0)
notiffany (0.0.3)
nenv (~> 0.1)
Expand Down Expand Up @@ -100,7 +101,7 @@ GEM
timers (4.0.1)
hitimes
tins (1.6.0)
tzinfo (1.2.2)
tzinfo (1.2.4)
thread_safe (~> 0.1)

PLATFORMS
Expand All @@ -117,4 +118,4 @@ DEPENDENCIES
rubocop (= 0.35.1)

BUNDLED WITH
1.13.7
1.15.4
24 changes: 12 additions & 12 deletions lib/event_pub_sub/broker_handler.rb
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
module EventPubSub
class BrokerHandler

def initialize(config, logger)
raise ArgumentError, "missing broker ip" unless config[:ip]
raise ArgumentError, "missing broker port" unless config[:port]
raise ArgumentError, "missing broker username" unless config[:username]
raise ArgumentError, "missing broker password" unless config[:password]
def initialize(config, logger, topic='topic_events')
raise ArgumentError, 'missing broker ip' unless config[:ip]
raise ArgumentError, 'missing broker port' unless config[:port]
raise ArgumentError, 'missing broker username' unless config[:username]
raise ArgumentError, 'missing broker password' unless config[:password]
@config = config
@logger = logger
@topic_name = topic
end

def start_connection
@connection = build_connection
@connection.start
rescue => e
@logger.error "#{e.message} - #{e.class}\n #{e.backtrace.join("\n")}"
rescue => e
@logger.error "#{e.message} - #{e.class}\n #{e.backtrace.join("\n")}"
end

def close_connection
@connection.close
end

def publish(message, routing_key)
topic.publish( message, persistent: true, routing_key: routing_key )
topic.publish(message, persistent: true, routing_key: routing_key)
end

def setup_queue(queue_name)
@queue = channel.queue(queue_name, durable: true, auto_delete: false)
@queue.bind(topic, routing_key: '#')
end

def subscribe(consumer_name, params={ack: true, block: false}, &block)
def subscribe(consumer_name, params = { ack: true, block: false }, &block)
params[:consumer_tag] = consumer_name
@queue.subscribe(params) do |delivery_info, properties, payload|
begin
Expand All @@ -44,6 +44,7 @@ def subscribe(consumer_name, params={ack: true, block: false}, &block)
end

private

def build_connection
Bunny.new(
host: @config[:ip],
Expand All @@ -54,10 +55,9 @@ def build_connection
end

def topic
@topic ||= channel.topic('topic_events', durable: true)
@topic ||= channel.topic(@topic_name, durable: true)
end


def channel
@channel ||= @connection.create_channel(nil, workers_total)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/event_pub_sub/message_consumer.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module EventPubSub
class MessageConsumer
def initialize(config, logger)
def initialize(config, logger, topic='topic_events')
raise ArgumentError, "missing module base_routing_key " unless config[:base_routing_key]
@queue_name = config[:base_routing_key]
@logger = logger
@broker_handler = BrokerHandler.new(config[:broker], @logger)
@broker_handler = BrokerHandler.new(config[:broker], @logger, topic)
@logger.info '[MessageConsumer] - Starting Connection'
@broker_handler.start_connection
@broker_handler.setup_queue(@queue_name)
Expand Down
4 changes: 2 additions & 2 deletions lib/event_pub_sub/message_producer.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module EventPubSub
class MessageProducer
def initialize(config, logger)
def initialize(config, logger, topic='topic_events')
raise ArgumentError, "missing module base_routing_key " unless config[:base_routing_key]
@base_routing_key = config[:base_routing_key]
@logger = logger

@broker_handler = BrokerHandler.new(config[:broker], @logger)
@broker_handler = BrokerHandler.new(config[:broker], @logger, topic)
@logger.info '[MessageProducer] - Starting Connection'
@broker_handler.start_connection
end
Expand Down
2 changes: 1 addition & 1 deletion lib/event_pub_sub/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module EventPubSub
VERSION = '3.0.0'.freeze
VERSION = '3.1.0'.freeze
end

0 comments on commit 13e8007

Please sign in to comment.