Skip to content

Commit

Permalink
Add support for client#publish method for transient publishes without…
Browse files Browse the repository at this point in the history
… instancing a Channel object

See https://github.com/ably/docs/issues/468
  • Loading branch information
mattheworiordan committed Feb 6, 2019
1 parent b0b8524 commit ff95159
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 125 deletions.
115 changes: 17 additions & 98 deletions lib/ably/realtime/channel.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'ably/realtime/channel/publisher'

module Ably
module Realtime
# The Channel class represents a Channel belonging to this application.
Expand Down Expand Up @@ -32,6 +34,7 @@ class Channel
include Ably::Modules::EventMachineHelpers
include Ably::Modules::AsyncWrapper
include Ably::Modules::MessageEmitter
include Ably::Realtime::Channel::Publisher
extend Ably::Modules::Enum

# ChannelState
Expand Down Expand Up @@ -59,7 +62,7 @@ class Channel
# Max number of messages to bundle in a single ProtocolMessage
MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50

# Ably client associated with this channel
# {Ably::Realtime::Client} associated with this channel
# @return [Ably::Realtime::Client]
# @api private
attr_reader :client
Expand Down Expand Up @@ -122,7 +125,7 @@ def initialize(client, name, channel_options = {})
# @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
# @param attributes [Hash, nil] Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
#
# @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is publishde, or an Array of {Ably::Models::Message} when multiple messages are published
# @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
#
# @example
Expand Down Expand Up @@ -158,7 +161,7 @@ def publish(name, data = nil, attributes = {}, &success_block)
end

if !connection.can_publish_messages?
error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}")
error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue of messages when connection is in state #{connection.state}")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end

Expand All @@ -170,7 +173,12 @@ def publish(name, data = nil, attributes = {}, &success_block)
[{ name: name, data: data }.merge(attributes)]
end

queue_messages(messages).tap do |deferrable|
if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end

enqueue_messages_on_connection(client, messages, channel_name, options).tap do |deferrable|
deferrable.callback(&success_block) if block_given?
end
end
Expand Down Expand Up @@ -327,12 +335,6 @@ def logger
client.logger
end

# Internal queue used for messages published that cannot yet be enqueued on the connection
# @api private
def __queue__
@queue
end

# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state
Expand All @@ -345,94 +347,6 @@ def setup_event_handlers
end
emit_message message.name, message
end

unsafe_on(STATE.Attached) do
process_queue
end
end

# Queue messages and process queue if channel is attached.
# If channel is not yet attached, attempt to attach it before the message queue is processed.
# @return [Ably::Util::SafeDeferrable]
def queue_messages(raw_messages)
messages = Array(raw_messages).map do |raw_msg|
create_message(raw_msg).tap do |message|
next if message.client_id.nil?
if message.client_id == '*'
raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
end
if message.client_id && !message.client_id.kind_of?(String)
raise Ably::Exceptions::IncompatibleClientId.new('client_id must be a String when publishing messages')
end
unless client.auth.can_assume_client_id?(message.client_id)
raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
end
end
end

__queue__.push(*messages)

process_queue

if messages.count == 1
# A message is a Deferrable so, if publishing only one message, simply return that Deferrable
messages.first
else
deferrable_for_multiple_messages(messages)
end
end

# A deferrable object that calls the success callback once all messages are delivered
# If any message fails, the errback is called immediately
# Only one callback or errback is ever called i.e. if a group of messages all fail, only once
# errback will be invoked
def deferrable_for_multiple_messages(messages)
expected_deliveries = messages.count
actual_deliveries = 0
failed = false

Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
messages.each do |message|
message.callback do
next if failed
actual_deliveries += 1
deferrable.succeed messages if actual_deliveries == expected_deliveries
end
message.errback do |error|
next if failed
failed = true
deferrable.fail error, message
end
end
end
end

def messages_in_queue?
!__queue__.empty?
end

# Move messages from Channel Queue into Outgoing Connection Queue
def process_queue
condition = -> { messages_in_queue? }
non_blocking_loop_while(condition) do
send_messages_within_protocol_message __queue__.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE)
end
end

def send_messages_within_protocol_message(messages)
connection.send_protocol_message(
action: Ably::Models::ProtocolMessage::ACTION.Message.to_i,
channel: name,
messages: messages
)
end

def create_message(message)
Ably::Models::Message(message.dup).tap do |msg|
msg.encode(client.encoders, options) do |encode_error, error_message|
client.logger.error error_message
end
end
end

def rest_channel
Expand All @@ -446,6 +360,11 @@ def connection
def setup_presence
@presence ||= Presence.new(self)
end

# Alias useful for methods with a name argument
def channel_name
name
end
end
end
end
Expand Down
6 changes: 1 addition & 5 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,12 @@ def fail_messages_awaiting_ack(error, options = {})
end
end

# When a channel becomes detached, suspended or failed,
# When a channel becomes suspended or failed,
# all queued messages should be failed immediately as we don't queue in
# any of those states
def fail_queued_messages(error)
error = Ably::Exceptions::MessageDeliveryFailed.new("Queued messages on channel '#{channel.name}' in state '#{channel.state}' will never be delivered") unless error
fail_messages_in_queue connection.__outgoing_message_queue__, error
channel.__queue__.each do |message|
nack_message message, error
end
channel.__queue__.clear
end

def fail_messages_in_queue(queue, error)
Expand Down
74 changes: 74 additions & 0 deletions lib/ably/realtime/channel/publisher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
module Ably::Realtime
class Channel
# Publisher module adds publishing capabilities to the current object
module Publisher
private

# Prepare and queue messages on the connection queue immediately
# @return [Ably::Util::SafeDeferrable]
def enqueue_messages_on_connection(client, raw_messages, channel_name, channel_options = {})
messages = Array(raw_messages).map do |raw_msg|
create_message(client, raw_msg, channel_options).tap do |message|
next if message.client_id.nil?
if message.client_id == '*'
raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
end
if message.client_id && !message.client_id.kind_of?(String)
raise Ably::Exceptions::IncompatibleClientId.new('client_id must be a String when publishing messages')
end
unless client.auth.can_assume_client_id?(message.client_id)
raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
end
end
end

connection.send_protocol_message(
action: Ably::Models::ProtocolMessage::ACTION.Message.to_i,
channel: channel_name,
messages: messages
)

if messages.count == 1
# A message is a Deferrable so, if publishing only one message, simply return that Deferrable
messages.first
else
deferrable_for_multiple_messages(messages)
end
end

# A deferrable object that calls the success callback once all messages are delivered
# If any message fails, the errback is called immediately
# Only one callback or errback is ever called i.e. if a group of messages all fail, only once
# errback will be invoked
def deferrable_for_multiple_messages(messages)
expected_deliveries = messages.count
actual_deliveries = 0
failed = false

Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
messages.each do |message|
message.callback do
next if failed
actual_deliveries += 1
deferrable.succeed messages if actual_deliveries == expected_deliveries
end
message.errback do |error|
next if failed
failed = true
deferrable.fail error, message
end
end
end
end

def create_message(client, message, channel_options)
Ably::Models::Message(message.dup).tap do |msg|
msg.encode(client.encoders, channel_options) do |encode_error, error_message|
client.logger.error error_message
end
end
end
end
end
end

72 changes: 72 additions & 0 deletions lib/ably/realtime/client.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'uri'
require 'ably/realtime/channel/publisher'

module Ably
module Realtime
Expand All @@ -21,6 +22,9 @@ module Realtime
#
class Client
include Ably::Modules::AsyncWrapper
include Ably::Realtime::Channel::Publisher
include Ably::Modules::Conversions

extend Forwardable

DOMAIN = 'realtime.ably.io'
Expand Down Expand Up @@ -179,6 +183,74 @@ def request(method, path, params = {}, body = nil, headers = {}, &callback)
end
end

# Publish one or more messages to the specified channel.
#
# This method allows messages to be efficiently published to Ably without instancing a {Ably::Realtime::Channel} object.
# If you want to publish a high rate of messages to Ably without instancing channels or using the REST API, then this method
# is recommended. However, channel options such as encryption are not supported with this method. If you need to specify channel options
# we recommend you use the {Ably::Realtime::Channel} +publish+ method without attaching to each channel, unless you also want to subscribe
# to published messages on that channel.
#
# Note: This feature is still in beta. As such, we cannot guarantee the API will not change in future.
#
# @param channel [String] The channel name you want to publish the message(s) to
# @param name [String, Array<Ably::Models::Message|Hash>, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
# @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
# @param attributes [Hash, nil] Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
#
# @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
#
# @example
# # Publish a single message
# client.publish 'activityChannel', click', { x: 1, y: 2 }
#
# # Publish an array of message Hashes
# messages = [
# { name: 'click', { x: 1, y: 2 } },
# { name: 'click', { x: 2, y: 3 } }
# ]
# client.publish 'activityChannel', messages
#
# # Publish an array of Ably::Models::Message objects
# messages = [
# Ably::Models::Message(name: 'click', { x: 1, y: 2 })
# Ably::Models::Message(name: 'click', { x: 2, y: 3 })
# ]
# client.publish 'activityChannel', messages
#
# client.publish('activityChannel', 'click', 'body') do |message|
# puts "#{message.name} event received with #{message.data}"
# end
#
# client.publish('activityChannel', 'click', 'body').errback do |error, message|
# puts "#{message.name} was not received, error #{error.message}"
# end
#
def publish(channel_name, name, data = nil, attributes = {}, &success_block)
if !connection.can_publish_messages?
error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue of messages when connection is in state #{connection.state}")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end

messages = if name.kind_of?(Enumerable)
name
else
name = ensure_utf_8(:name, name, allow_nil: true)
ensure_supported_payload data
[{ name: name, data: data }.merge(attributes)]
end

if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end

enqueue_messages_on_connection(self, messages, channel_name).tap do |deferrable|
deferrable.callback(&success_block) if block_given?
end
end

# @!attribute [r] endpoint
# @return [URI::Generic] Default Ably Realtime endpoint used for all requests
def endpoint
Expand Down
3 changes: 3 additions & 0 deletions lib/ably/realtime/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class Connection
websocket_heartbeats_disabled: false,
}.freeze

# Max number of messages to bundle in a single ProtocolMessage
MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50

# A unique public identifier for this connection, used to identify this member in presence events and messages
# @return [String]
attr_reader :id
Expand Down
22 changes: 0 additions & 22 deletions spec/acceptance/realtime/channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -838,28 +838,6 @@ def disconnect_transport
end
end

context 'when the connection is not yet connected' do
it 'publishes queued messages within a single protocol message once connected' do
expect(connection.state).to eq(:initialized)
3.times { channel.publish('event', random_str) }
channel.subscribe do |message|
messages << message if message.name == 'event'
next unless messages.length == 3

# All 3 messages should be batched into a single Protocol Message by the client library
# message.id = "{protocol_message.id}:{protocol_message_index}"
# Check that all messages share the same protocol_message.id
message_id = messages.map { |msg| msg.id.split(':')[0...-1].join(':') }
expect(message_id.uniq.count).to eql(1)

# Check that messages use index 0,1,2 in the ID
message_indexes = messages.map { |msg| msg.id.split(':').last }
expect(message_indexes).to include("0", "1", "2")
stop_reactor
end
end
end

context 'with :queue_messages client option set to false (#RTL6c4)' do
let(:client_options) { default_options.merge(queue_messages: false) }

Expand Down
Loading

0 comments on commit ff95159

Please sign in to comment.