From b21dd36f0076507f44128111e9cf3430c82b58b1 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 3 Dec 2024 14:45:41 +0530 Subject: [PATCH] [CHA-M5] Improved message subscription history implementation 1. Removed use of custom 'DeferredValue', which can become bottleneck in heavy concurrency scenarios. 2. Replaced with thread safe native 'CompletableDeferred', supports thread safe concurrent operations. 3. Removed unnecessary 'lock' and 'listeners' properties from Messages, since concurrency and channel serial access already taken care by CompletableDeferred 4. Updated tests using `DeferredValue` with `CompletableDeferred` --- .../src/main/java/com/ably/chat/Messages.kt | 119 ++++++------------ .../src/main/java/com/ably/chat/Utils.kt | 61 +-------- .../test/java/com/ably/chat/MessagesTest.kt | 17 ++- .../test/java/com/ably/chat/PresenceTest.kt | 13 +- .../java/com/ably/chat/RoomReactionsTest.kt | 5 +- .../src/test/java/com/ably/chat/TestUtils.kt | 4 - 6 files changed, 62 insertions(+), 157 deletions(-) diff --git a/chat-android/src/main/java/com/ably/chat/Messages.kt b/chat-android/src/main/java/com/ably/chat/Messages.kt index c155a388..f7cda351 100644 --- a/chat-android/src/main/java/com/ably/chat/Messages.kt +++ b/chat-android/src/main/java/com/ably/chat/Messages.kt @@ -7,9 +7,9 @@ import com.google.gson.JsonObject import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.ChannelStateListener -import io.ably.lib.types.AblyException -import io.ably.lib.types.ErrorInfo import io.ably.lib.types.MessageAction +import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.CompletableDeferred import io.ably.lib.realtime.Channel as AblyRealtimeChannel typealias PubSubMessageListener = AblyRealtimeChannel.MessageListener @@ -193,7 +193,7 @@ internal class DefaultMessagesSubscription( private val chatApi: ChatApi, private val roomId: String, private val subscription: Subscription, - internal val fromSerialProvider: () -> DeferredValue, + internal val fromSerialProvider: () -> CompletableDeferred, ) : MessagesSubscription { override fun unsubscribe() { subscription.unsubscribe() @@ -216,8 +216,6 @@ internal class DefaultMessages( override val featureName: String = "messages" - private var listeners: Map> = emptyMap() - private var channelStateListener: ChannelStateListener private val logger = room.roomLogger.withContext(tag = "Messages") @@ -228,8 +226,6 @@ internal class DefaultMessages( private val realtimeChannels = room.realtimeClient.channels - private var lock = Any() - /** * (CHA-M1) * the channel name for the chat messages channel. @@ -242,20 +238,37 @@ internal class DefaultMessages( override val detachmentErrorCode: ErrorCode = ErrorCode.MessagesDetachmentFailed + private val channelSerialMap = ConcurrentHashMap>() + + /** + * deferredChannelSerial is a thread safe reference to the channel serial. + * Provides common channel serial for all subscribers once discontinuity is detected. + */ + private var deferredChannelSerial = CompletableDeferred() + init { channelStateListener = ChannelStateListener { - if (it.current == ChannelState.attached && !it.resumed) updateChannelSerialsAfterDiscontinuity() + if (it.current == ChannelState.attached && !it.resumed) { + updateChannelSerialsAfterDiscontinuity(requireAttachSerial()) + } } channel.on(channelStateListener) } + // CHA-M5c, CHA-M5d - Updated channel serial after discontinuity + private fun updateChannelSerialsAfterDiscontinuity(value: String) { + if (deferredChannelSerial.isActive) { + deferredChannelSerial.complete(value) + } else { + deferredChannelSerial = CompletableDeferred(value) + } + // channel serials updated at the same time for all map entries + channelSerialMap.replaceAll { _, _ -> deferredChannelSerial } + } + override fun subscribe(listener: Messages.Listener): MessagesSubscription { - val deferredChannelSerial = DeferredValue() - addListener(listener, deferredChannelSerial) val messageListener = PubSubMessageListener { - val pubSubMessage = it ?: throw AblyException.fromErrorInfo( - ErrorInfo("Got empty pubsub channel message", HttpStatusCode.BadRequest, ErrorCode.BadRequest.code), - ) + val pubSubMessage = it ?: throw clientError("Got empty pubsub channel message") // Ignore any action that is not message.create if (pubSubMessage.action != MessageAction.MESSAGE_CREATE) return@PubSubMessageListener @@ -273,26 +286,24 @@ internal class DefaultMessages( ) listener.onEvent(MessageEvent(type = MessageEventType.Created, message = chatMessage)) } + channelSerialMap[messageListener] = deferredChannelSerial // (CHA-M4d) channel.subscribe(PubSubMessageNames.ChatMessage, messageListener) // (CHA-M5) setting subscription point - associateWithCurrentChannelSerial(deferredChannelSerial) + if (channel.state == ChannelState.attached) { + channelSerialMap[messageListener] = CompletableDeferred(requireChannelSerial()) + } return DefaultMessagesSubscription( chatApi = chatApi, roomId = roomId, subscription = { - removeListener(listener) + channelSerialMap.remove(messageListener) channel.unsubscribe(PubSubMessageNames.ChatMessage, messageListener) }, fromSerialProvider = { - listeners[listener] ?: throw AblyException.fromErrorInfo( - ErrorInfo( - "This messages subscription instance was already unsubscribed", - HttpStatusCode.BadRequest, - ErrorCode.BadRequest.code, - ), - ) + channelSerialMap[messageListener] + ?: throw clientError("This messages subscription instance was already unsubscribed") }, ) } @@ -301,73 +312,19 @@ internal class DefaultMessages( override suspend fun send(params: SendMessageParams): Message = chatApi.sendMessage(roomId, params) - /** - * Associate deferred channel serial value with the current channel's serial - * - * WARN: it not deterministic because of race condition, - * this can lead to duplicated messages in `getPreviousMessages` calls - */ - private fun associateWithCurrentChannelSerial(channelSerialProvider: DeferredValue) { - if (channel.state === ChannelState.attached) { - channelSerialProvider.completeWith(requireChannelSerial()) - return - } - - channel.once(ChannelState.attached) { - channelSerialProvider.completeWith(requireAttachSerial()) - } - } - private fun requireChannelSerial(): String { return channel.properties.channelSerial - ?: throw AblyException.fromErrorInfo( - ErrorInfo( - "Channel has been attached, but channelSerial is not defined", - HttpStatusCode.BadRequest, - ErrorCode.BadRequest.code, - ), - ) + ?: throw clientError("Channel has been attached, but channelSerial is not defined") } private fun requireAttachSerial(): String { return channel.properties.attachSerial - ?: throw AblyException.fromErrorInfo( - ErrorInfo( - "Channel has been attached, but attachSerial is not defined", - HttpStatusCode.BadRequest, - ErrorCode.BadRequest.code, - ), - ) - } - - private fun addListener(listener: Messages.Listener, deferredChannelSerial: DeferredValue) { - synchronized(lock) { - listeners += listener to deferredChannelSerial - } - } - - private fun removeListener(listener: Messages.Listener) { - synchronized(lock) { - listeners -= listener - } - } - - /** - * (CHA-M5c), (CHA-M5d) - */ - private fun updateChannelSerialsAfterDiscontinuity() { - val deferredChannelSerial = DeferredValue() - deferredChannelSerial.completeWith(requireAttachSerial()) - - synchronized(lock) { - listeners = listeners.mapValues { - if (it.value.completed) deferredChannelSerial else it.value - } - } + ?: throw clientError("Channel has been attached, but attachSerial is not defined") } override fun release() { channel.off(channelStateListener) + channelSerialMap.clear() realtimeChannels.release(channel.name) } } @@ -379,9 +336,7 @@ private data class PubSubMessageData(val text: String, val metadata: MessageMeta private fun parsePubSubMessageData(data: Any): PubSubMessageData { if (data !is JsonObject) { - throw AblyException.fromErrorInfo( - ErrorInfo("Unrecognized Pub/Sub channel's message for `Message.created` event", HttpStatusCode.InternalServerError), - ) + throw serverError("Unrecognized Pub/Sub channel's message for `Message.created` event") } return PubSubMessageData( text = data.requireString("text"), diff --git a/chat-android/src/main/java/com/ably/chat/Utils.kt b/chat-android/src/main/java/com/ably/chat/Utils.kt index 84d61f30..d9a02c1b 100644 --- a/chat-android/src/main/java/com/ably/chat/Utils.kt +++ b/chat-android/src/main/java/com/ably/chat/Utils.kt @@ -147,63 +147,6 @@ fun ChatChannelOptions(init: (ChannelOptions.() -> Unit)? = null): ChannelOption fun generateUUID() = UUID.randomUUID().toString() -/** - * A value that can be evaluated at a later time, similar to `kotlinx.coroutines.Deferred` or a JavaScript Promise. - * - * This class provides a thread-safe, simple blocking implementation. It is not designed for use in scenarios with - * heavy concurrency. - * - * @param T the type of the value that will be evaluated. - */ -internal class DeferredValue { - - private var value: T? = null - - private val lock = Any() - - private var observers: Set<(T) -> Unit> = setOf() - - private var _completed = false - - /** - * `true` if value has been set, `false` otherwise - */ - val completed get() = _completed - - /** - * Set value and mark DeferredValue completed, should be invoked only once - * - * @throws IllegalStateException if it's already `completed` - */ - fun completeWith(completionValue: T) { - synchronized(lock) { - check(!_completed) { "DeferredValue has already been completed" } - value = completionValue - _completed = true - observers.forEach { it(completionValue) } - observers = setOf() - } - } - - /** - * Wait until value is completed - * - * @return completed value - */ - suspend fun await(): T { - val result = suspendCancellableCoroutine { continuation -> - synchronized(lock) { - if (_completed) continuation.resume(value!!) - val observer: (T) -> Unit = { - continuation.resume(it) - } - observers += observer - } - } - return result - } -} - fun lifeCycleErrorInfo( errorMessage: String, errorCode: ErrorCode, @@ -253,3 +196,7 @@ private fun createAblyException( cause: Throwable?, ) = cause?.let { AblyException.fromErrorInfo(it, errorInfo) } ?: AblyException.fromErrorInfo(errorInfo) + +fun clientError(errorMessage: String) = ablyException(errorMessage, ErrorCode.BadRequest, HttpStatusCode.BadRequest) + +fun serverError(errorMessage: String) = ablyException(errorMessage, ErrorCode.InternalError, HttpStatusCode.InternalServerError) diff --git a/chat-android/src/test/java/com/ably/chat/MessagesTest.kt b/chat-android/src/test/java/com/ably/chat/MessagesTest.kt index c427c5de..a7a7804a 100644 --- a/chat-android/src/test/java/com/ably/chat/MessagesTest.kt +++ b/chat-android/src/test/java/com/ably/chat/MessagesTest.kt @@ -17,7 +17,7 @@ import io.mockk.every import io.mockk.mockk import io.mockk.slot import io.mockk.verify -import java.lang.reflect.Field +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.junit.Assert.assertEquals @@ -94,10 +94,10 @@ class MessagesTest { println("Pub/Sub message listener registered") } - val deferredValue = DeferredValue() + val deferredValue = CompletableDeferred() messages.subscribe { - deferredValue.completeWith(it) + deferredValue.complete(it) } verify { realtimeChannel.subscribe("chat.message", any()) } @@ -199,6 +199,13 @@ class MessagesTest { ) assertEquals("attach-serial-2", subscription1.fromSerialProvider().await()) + + // Check channelSerial is used at the point of subscription when state is attached + messages.channel.properties.channelSerial = "channel-serial-3" + messages.channel.state = ChannelState.attached + + val subscription2 = (messages.subscribe {}) as DefaultMessagesSubscription + assertEquals("channel-serial-3", subscription2.fromSerialProvider().await()) } @Test @@ -259,9 +266,7 @@ class MessagesTest { private val Channel.channelMulticaster: ChannelBase.MessageListener get() { - val field: Field = (ChannelBase::class.java).getDeclaredField("eventListeners") - field.isAccessible = true - val eventListeners = field.get(this) as HashMap<*, *> + val eventListeners = getPrivateField>("eventListeners") return eventListeners["chat.message"] as ChannelBase.MessageListener } diff --git a/chat-android/src/test/java/com/ably/chat/PresenceTest.kt b/chat-android/src/test/java/com/ably/chat/PresenceTest.kt index 718bbe22..70e3350c 100644 --- a/chat-android/src/test/java/com/ably/chat/PresenceTest.kt +++ b/chat-android/src/test/java/com/ably/chat/PresenceTest.kt @@ -11,6 +11,7 @@ import io.ably.lib.types.PresenceMessage import io.mockk.every import io.mockk.mockk import io.mockk.slot +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.test.runTest import org.junit.Assert.assertEquals import org.junit.Before @@ -42,10 +43,10 @@ class PresenceTest { every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit - val deferredValue = DeferredValue() + val deferredValue = CompletableDeferred() presence.subscribe { - deferredValue.completeWith(it) + deferredValue.complete(it) } presenceListenerSlot.captured.onPresenceMessage( @@ -78,10 +79,10 @@ class PresenceTest { every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit - val deferredValue = DeferredValue() + val deferredValue = CompletableDeferred() presence.subscribe { - deferredValue.completeWith(it) + deferredValue.complete(it) } presenceListenerSlot.captured.onPresenceMessage( @@ -115,10 +116,10 @@ class PresenceTest { every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit - val deferredValue = DeferredValue() + val deferredValue = CompletableDeferred() presence.subscribe { - deferredValue.completeWith(it) + deferredValue.complete(it) } presenceListenerSlot.captured.onPresenceMessage( diff --git a/chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt b/chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt index d8745d58..683b5a90 100644 --- a/chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt +++ b/chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt @@ -10,6 +10,7 @@ import io.ably.lib.types.MessageExtras import io.mockk.every import io.mockk.slot import io.mockk.verify +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.test.runTest import org.junit.Assert.assertEquals import org.junit.Before @@ -59,10 +60,10 @@ class RoomReactionsTest { every { realtimeChannel.subscribe("roomReaction", capture(pubSubMessageListenerSlot)) } returns Unit - val deferredValue = DeferredValue() + val deferredValue = CompletableDeferred() roomReactions.subscribe { - deferredValue.completeWith(it) + deferredValue.complete(it) } verify { realtimeChannel.subscribe("roomReaction", any()) } diff --git a/chat-android/src/test/java/com/ably/chat/TestUtils.kt b/chat-android/src/test/java/com/ably/chat/TestUtils.kt index 74cf26c7..4a91d730 100644 --- a/chat-android/src/test/java/com/ably/chat/TestUtils.kt +++ b/chat-android/src/test/java/com/ably/chat/TestUtils.kt @@ -114,7 +114,3 @@ suspend fun Any.invokePrivateSuspendMethod(methodName: String, vararg args: A it.invoke(this, *args, cont) } } - -fun clientError(errorMessage: String) = ablyException(errorMessage, ErrorCode.BadRequest, HttpStatusCode.BadRequest) - -fun serverError(errorMessage: String) = ablyException(errorMessage, ErrorCode.InternalError, HttpStatusCode.InternalServerError)