Skip to content

Commit

Permalink
[CHA-M5] Improved message subscription history implementation
Browse files Browse the repository at this point in the history
1. Removed use of custom 'DeferredValue', which can become bottleneck in
heavy concurrency scenarios.
2. Replaced with thread safe native 'CompletableDeferred', supports
thread safe concurrent operations.
3. Removed unnecessary 'lock' and 'listeners' properties from Messages, since
concurrency and channel serial access already taken care by CompletableDeferred
4. Updated tests using `DeferredValue` with `CompletableDeferred`
  • Loading branch information
sacOO7 committed Dec 3, 2024
1 parent 274d226 commit 1e053fb
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 157 deletions.
119 changes: 37 additions & 82 deletions chat-android/src/main/java/com/ably/chat/Messages.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import com.google.gson.JsonObject
import io.ably.lib.realtime.Channel
import io.ably.lib.realtime.ChannelState
import io.ably.lib.realtime.ChannelStateListener
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.MessageAction
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.CompletableDeferred
import io.ably.lib.realtime.Channel as AblyRealtimeChannel

typealias PubSubMessageListener = AblyRealtimeChannel.MessageListener
Expand Down Expand Up @@ -193,7 +193,7 @@ internal class DefaultMessagesSubscription(
private val chatApi: ChatApi,
private val roomId: String,
private val subscription: Subscription,
internal val fromSerialProvider: () -> DeferredValue<String>,
internal var fromSerialProvider: () -> CompletableDeferred<String>,
) : MessagesSubscription {
override fun unsubscribe() {
subscription.unsubscribe()
Expand All @@ -216,8 +216,6 @@ internal class DefaultMessages(

override val featureName: String = "messages"

private var listeners: Map<Messages.Listener, DeferredValue<String>> = emptyMap()

private var channelStateListener: ChannelStateListener

private val logger = room.roomLogger.withContext(tag = "Messages")
Expand All @@ -228,8 +226,6 @@ internal class DefaultMessages(

private val realtimeChannels = room.realtimeClient.channels

private var lock = Any()

/**
* (CHA-M1)
* the channel name for the chat messages channel.
Expand All @@ -242,20 +238,37 @@ internal class DefaultMessages(

override val detachmentErrorCode: ErrorCode = ErrorCode.MessagesDetachmentFailed

private val channelSerialMap = ConcurrentHashMap<PubSubMessageListener, CompletableDeferred<String>>()

/**
* deferredChannelSerial is a thread safe reference to the channel serial.
* Provides common channel serial for all subscribers once discontinuity is detected.
*/
private var deferredChannelSerial = CompletableDeferred<String>()

init {
channelStateListener = ChannelStateListener {
if (it.current == ChannelState.attached && !it.resumed) updateChannelSerialsAfterDiscontinuity()
if (it.current == ChannelState.attached && !it.resumed) {
updateChannelSerialsAfterDiscontinuity(requireAttachSerial())
}
}
channel.on(channelStateListener)
}

// CHA-M5c, CHA-M5d - Updated channel serial after discontinuity
private fun updateChannelSerialsAfterDiscontinuity(value: String) {
if (deferredChannelSerial.isActive) {
deferredChannelSerial.complete(value)
} else {
deferredChannelSerial = CompletableDeferred(value)
}
// channel serials updated at the same time for all map entries
channelSerialMap.replaceAll { _, _ -> deferredChannelSerial }
}

override fun subscribe(listener: Messages.Listener): MessagesSubscription {
val deferredChannelSerial = DeferredValue<String>()
addListener(listener, deferredChannelSerial)
val messageListener = PubSubMessageListener {
val pubSubMessage = it ?: throw AblyException.fromErrorInfo(
ErrorInfo("Got empty pubsub channel message", HttpStatusCode.BadRequest, ErrorCode.BadRequest.code),
)
val pubSubMessage = it ?: throw clientError("Got empty pubsub channel message")

// Ignore any action that is not message.create
if (pubSubMessage.action != MessageAction.MESSAGE_CREATE) return@PubSubMessageListener
Expand All @@ -273,26 +286,24 @@ internal class DefaultMessages(
)
listener.onEvent(MessageEvent(type = MessageEventType.Created, message = chatMessage))
}
channelSerialMap[messageListener] = deferredChannelSerial
// (CHA-M4d)
channel.subscribe(PubSubMessageNames.ChatMessage, messageListener)
// (CHA-M5) setting subscription point
associateWithCurrentChannelSerial(deferredChannelSerial)
if (channel.state == ChannelState.attached) {
channelSerialMap[messageListener] = CompletableDeferred(requireChannelSerial())
}

return DefaultMessagesSubscription(
chatApi = chatApi,
roomId = roomId,
subscription = {
removeListener(listener)
channelSerialMap.remove(messageListener)
channel.unsubscribe(PubSubMessageNames.ChatMessage, messageListener)
},
fromSerialProvider = {
listeners[listener] ?: throw AblyException.fromErrorInfo(
ErrorInfo(
"This messages subscription instance was already unsubscribed",
HttpStatusCode.BadRequest,
ErrorCode.BadRequest.code,
),
)
channelSerialMap[messageListener]
?: throw clientError("This messages subscription instance was already unsubscribed")
},
)
}
Expand All @@ -301,73 +312,19 @@ internal class DefaultMessages(

override suspend fun send(params: SendMessageParams): Message = chatApi.sendMessage(roomId, params)

/**
* Associate deferred channel serial value with the current channel's serial
*
* WARN: it not deterministic because of race condition,
* this can lead to duplicated messages in `getPreviousMessages` calls
*/
private fun associateWithCurrentChannelSerial(channelSerialProvider: DeferredValue<String>) {
if (channel.state === ChannelState.attached) {
channelSerialProvider.completeWith(requireChannelSerial())
return
}

channel.once(ChannelState.attached) {
channelSerialProvider.completeWith(requireAttachSerial())
}
}

private fun requireChannelSerial(): String {
return channel.properties.channelSerial
?: throw AblyException.fromErrorInfo(
ErrorInfo(
"Channel has been attached, but channelSerial is not defined",
HttpStatusCode.BadRequest,
ErrorCode.BadRequest.code,
),
)
?: throw clientError("Channel has been attached, but channelSerial is not defined")
}

private fun requireAttachSerial(): String {
return channel.properties.attachSerial
?: throw AblyException.fromErrorInfo(
ErrorInfo(
"Channel has been attached, but attachSerial is not defined",
HttpStatusCode.BadRequest,
ErrorCode.BadRequest.code,
),
)
}

private fun addListener(listener: Messages.Listener, deferredChannelSerial: DeferredValue<String>) {
synchronized(lock) {
listeners += listener to deferredChannelSerial
}
}

private fun removeListener(listener: Messages.Listener) {
synchronized(lock) {
listeners -= listener
}
}

/**
* (CHA-M5c), (CHA-M5d)
*/
private fun updateChannelSerialsAfterDiscontinuity() {
val deferredChannelSerial = DeferredValue<String>()
deferredChannelSerial.completeWith(requireAttachSerial())

synchronized(lock) {
listeners = listeners.mapValues {
if (it.value.completed) deferredChannelSerial else it.value
}
}
?: throw clientError("Channel has been attached, but attachSerial is not defined")
}

override fun release() {
channel.off(channelStateListener)
channelSerialMap.clear()
realtimeChannels.release(channel.name)
}
}
Expand All @@ -379,9 +336,7 @@ private data class PubSubMessageData(val text: String, val metadata: MessageMeta

private fun parsePubSubMessageData(data: Any): PubSubMessageData {
if (data !is JsonObject) {
throw AblyException.fromErrorInfo(
ErrorInfo("Unrecognized Pub/Sub channel's message for `Message.created` event", HttpStatusCode.InternalServerError),
)
throw serverError("Unrecognized Pub/Sub channel's message for `Message.created` event")
}
return PubSubMessageData(
text = data.requireString("text"),
Expand Down
61 changes: 4 additions & 57 deletions chat-android/src/main/java/com/ably/chat/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -147,63 +147,6 @@ fun ChatChannelOptions(init: (ChannelOptions.() -> Unit)? = null): ChannelOption

fun generateUUID() = UUID.randomUUID().toString()

/**
* A value that can be evaluated at a later time, similar to `kotlinx.coroutines.Deferred` or a JavaScript Promise.
*
* This class provides a thread-safe, simple blocking implementation. It is not designed for use in scenarios with
* heavy concurrency.
*
* @param T the type of the value that will be evaluated.
*/
internal class DeferredValue<T> {

private var value: T? = null

private val lock = Any()

private var observers: Set<(T) -> Unit> = setOf()

private var _completed = false

/**
* `true` if value has been set, `false` otherwise
*/
val completed get() = _completed

/**
* Set value and mark DeferredValue completed, should be invoked only once
*
* @throws IllegalStateException if it's already `completed`
*/
fun completeWith(completionValue: T) {
synchronized(lock) {
check(!_completed) { "DeferredValue has already been completed" }
value = completionValue
_completed = true
observers.forEach { it(completionValue) }
observers = setOf()
}
}

/**
* Wait until value is completed
*
* @return completed value
*/
suspend fun await(): T {
val result = suspendCancellableCoroutine { continuation ->
synchronized(lock) {
if (_completed) continuation.resume(value!!)
val observer: (T) -> Unit = {
continuation.resume(it)
}
observers += observer
}
}
return result
}
}

fun lifeCycleErrorInfo(
errorMessage: String,
errorCode: ErrorCode,
Expand Down Expand Up @@ -253,3 +196,7 @@ private fun createAblyException(
cause: Throwable?,
) = cause?.let { AblyException.fromErrorInfo(it, errorInfo) }
?: AblyException.fromErrorInfo(errorInfo)

fun clientError(errorMessage: String) = ablyException(errorMessage, ErrorCode.BadRequest, HttpStatusCode.BadRequest)

fun serverError(errorMessage: String) = ablyException(errorMessage, ErrorCode.InternalError, HttpStatusCode.InternalServerError)
17 changes: 11 additions & 6 deletions chat-android/src/test/java/com/ably/chat/MessagesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.slot
import io.mockk.verify
import java.lang.reflect.Field
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -94,10 +94,10 @@ class MessagesTest {
println("Pub/Sub message listener registered")
}

val deferredValue = DeferredValue<MessageEvent>()
val deferredValue = CompletableDeferred<MessageEvent>()

messages.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

verify { realtimeChannel.subscribe("chat.message", any()) }
Expand Down Expand Up @@ -199,6 +199,13 @@ class MessagesTest {
)

assertEquals("attach-serial-2", subscription1.fromSerialProvider().await())

// Check channelSerial is used at the point of subscription when state is attached
messages.channel.properties.channelSerial = "channel-serial-3"
messages.channel.state = ChannelState.attached

val subscription2 = (messages.subscribe {}) as DefaultMessagesSubscription
assertEquals("channel-serial-3", subscription2.fromSerialProvider().await())
}

@Test
Expand Down Expand Up @@ -259,9 +266,7 @@ class MessagesTest {

private val Channel.channelMulticaster: ChannelBase.MessageListener
get() {
val field: Field = (ChannelBase::class.java).getDeclaredField("eventListeners")
field.isAccessible = true
val eventListeners = field.get(this) as HashMap<*, *>
val eventListeners = getPrivateField<HashMap<*, *>>("eventListeners")
return eventListeners["chat.message"] as ChannelBase.MessageListener
}

Expand Down
13 changes: 7 additions & 6 deletions chat-android/src/test/java/com/ably/chat/PresenceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.ably.lib.types.PresenceMessage
import io.mockk.every
import io.mockk.mockk
import io.mockk.slot
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertEquals
import org.junit.Before
Expand Down Expand Up @@ -42,10 +43,10 @@ class PresenceTest {

every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit

val deferredValue = DeferredValue<PresenceEvent>()
val deferredValue = CompletableDeferred<PresenceEvent>()

presence.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

presenceListenerSlot.captured.onPresenceMessage(
Expand Down Expand Up @@ -78,10 +79,10 @@ class PresenceTest {

every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit

val deferredValue = DeferredValue<PresenceEvent>()
val deferredValue = CompletableDeferred<PresenceEvent>()

presence.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

presenceListenerSlot.captured.onPresenceMessage(
Expand Down Expand Up @@ -115,10 +116,10 @@ class PresenceTest {

every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit

val deferredValue = DeferredValue<PresenceEvent>()
val deferredValue = CompletableDeferred<PresenceEvent>()

presence.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

presenceListenerSlot.captured.onPresenceMessage(
Expand Down
5 changes: 3 additions & 2 deletions chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.ably.lib.types.MessageExtras
import io.mockk.every
import io.mockk.slot
import io.mockk.verify
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertEquals
import org.junit.Before
Expand Down Expand Up @@ -59,10 +60,10 @@ class RoomReactionsTest {

every { realtimeChannel.subscribe("roomReaction", capture(pubSubMessageListenerSlot)) } returns Unit

val deferredValue = DeferredValue<Reaction>()
val deferredValue = CompletableDeferred<Reaction>()

roomReactions.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

verify { realtimeChannel.subscribe("roomReaction", any()) }
Expand Down
4 changes: 0 additions & 4 deletions chat-android/src/test/java/com/ably/chat/TestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,3 @@ suspend fun <T>Any.invokePrivateSuspendMethod(methodName: String, vararg args: A
it.invoke(this, *args, cont)
}
}

fun clientError(errorMessage: String) = ablyException(errorMessage, ErrorCode.BadRequest, HttpStatusCode.BadRequest)

fun serverError(errorMessage: String) = ablyException(errorMessage, ErrorCode.InternalError, HttpStatusCode.InternalServerError)

0 comments on commit 1e053fb

Please sign in to comment.