Skip to content

Commit

Permalink
Merge pull request #127 from cookpad/errm/outbox_message_bus
Browse files Browse the repository at this point in the history
Add a message bus for transactional outbox deliveries
  • Loading branch information
errm authored Aug 8, 2024
2 parents 2c66bb8 + abd1ba1 commit fb5c94e
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/streamy/event_handler.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "active_support/core_ext/hash/indifferent_access"
require "ostruct"

module Streamy
class EventHandler
Expand Down
22 changes: 22 additions & 0 deletions lib/streamy/message_buses/outbox_message_bus.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
require "streamy/kafka_configuration"
require "waterdrop"
require "active_support/core_ext/hash/indifferent_access"
require "active_support/json"

module Streamy
module MessageBuses
class OutboxMessageBus < MessageBus
def initialize(config)
@model = config[:model]
end

def deliver(key:, topic:, payload:, priority:)
@model.create(key: key, topic: topic, payload: payload)
end

def deliver_many(messages)
@model.create(messages.map { |message| message.except(:priority) })
end
end
end
end
1 change: 1 addition & 0 deletions streamy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength
spec.add_dependency "avro_turf", "~> 1.3.0"
spec.add_dependency "waterdrop", ">= 2.4.10", "< 3.0.0"
spec.add_dependency "webmock", "~> 3.3"
spec.add_dependency "ostruct"
end
1 change: 1 addition & 0 deletions test/avro_deserializer_test.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "test_helper"
require "avro_turf/test/fake_confluent_schema_registry_server"
require "webmock/minitest"
require "ostruct"

module Streamy
class AvroDeserializerTest < Minitest::Test
Expand Down
101 changes: 101 additions & 0 deletions test/message_buses/outbox_message_bus_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
require "test_helper"
require "waterdrop"
require "streamy/message_buses/outbox_message_bus"

module Streamy
class OutboxMessageBusTest < Minitest::Test
attr_reader :bus

def setup
@model = mock("outbox_model")
@bus = MessageBuses::OutboxMessageBus.new(model: @model)
end

def example_delivery(priority)
bus.deliver(
payload: payload.to_s,
key: "prk-sg-001",
topic: "charcuterie",
priority: priority
)
end

def payload
{
type: "sausage",
body: { meat: "pork", herbs: "sage" },
event_time: "2018"
}
end

def expected_event(key: "prk-sg-001")
{
payload: {
type: "sausage",
body: {
meat: "pork",
herbs: "sage"
},
event_time: "2018"
}.to_s,
key: key,
topic: "charcuterie"
}
end

def test_standard_priority_deliver
@model.expects(:create).with(expected_event)
example_delivery(:standard)
end

def test_low_priority_deliver
@model.expects(:create).with(expected_event)
example_delivery(:low)
end

def test_essential_priority_deliver
@model.expects(:create).with(expected_event)
example_delivery(:essential)
end

def test_all_priority_delivery
@model.expects(:create).with(expected_event)
example_delivery(:essential)

@model.expects(:create).with(expected_event)
example_delivery(:low)

@model.expects(:create).with(expected_event)
example_delivery(:standard)
end

def test_batch_delivery
@model.expects(:create).with([
expected_event(key: "prk-sg-001"),
expected_event(key: "prk-sg-002"),
expected_event(key: "prk-sg-003")
])

bus.deliver_many([
{
payload: payload.to_s,
key: "prk-sg-001",
topic: "charcuterie",
priority: :standard
},
{
payload: payload.to_s,
key: "prk-sg-002",
topic: "charcuterie",
priority: :standard
},
{
payload: payload.to_s,
key: "prk-sg-003",
topic: "charcuterie",
priority: :standard
}
])
end
end
end

0 comments on commit fb5c94e

Please sign in to comment.