Skip to content

Commit

Permalink
Merge pull request #78 from ably/improve/messages-feature-channel-serial
Browse files Browse the repository at this point in the history
[CHA-M5] Improved message subscription history implementation
  • Loading branch information
sacOO7 authored Dec 4, 2024
2 parents 52563e8 + b21dd36 commit 41e71f3
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 157 deletions.
119 changes: 37 additions & 82 deletions chat-android/src/main/java/com/ably/chat/Messages.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import com.google.gson.JsonObject
import io.ably.lib.realtime.Channel
import io.ably.lib.realtime.ChannelState
import io.ably.lib.realtime.ChannelStateListener
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.MessageAction
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.CompletableDeferred
import io.ably.lib.realtime.Channel as AblyRealtimeChannel

typealias PubSubMessageListener = AblyRealtimeChannel.MessageListener
Expand Down Expand Up @@ -201,7 +201,7 @@ internal class DefaultMessagesSubscription(
private val chatApi: ChatApi,
private val roomId: String,
private val subscription: Subscription,
internal val fromSerialProvider: () -> DeferredValue<String>,
internal val fromSerialProvider: () -> CompletableDeferred<String>,
) : MessagesSubscription {
override fun unsubscribe() {
subscription.unsubscribe()
Expand All @@ -224,8 +224,6 @@ internal class DefaultMessages(

override val featureName: String = "messages"

private var listeners: Map<Messages.Listener, DeferredValue<String>> = emptyMap()

private var channelStateListener: ChannelStateListener

private val logger = room.roomLogger.withContext(tag = "Messages")
Expand All @@ -236,8 +234,6 @@ internal class DefaultMessages(

private val realtimeChannels = room.realtimeClient.channels

private var lock = Any()

/**
* (CHA-M1)
* the channel name for the chat messages channel.
Expand All @@ -250,20 +246,37 @@ internal class DefaultMessages(

override val detachmentErrorCode: ErrorCode = ErrorCode.MessagesDetachmentFailed

private val channelSerialMap = ConcurrentHashMap<PubSubMessageListener, CompletableDeferred<String>>()

/**
* deferredChannelSerial is a thread safe reference to the channel serial.
* Provides common channel serial for all subscribers once discontinuity is detected.
*/
private var deferredChannelSerial = CompletableDeferred<String>()

init {
channelStateListener = ChannelStateListener {
if (it.current == ChannelState.attached && !it.resumed) updateChannelSerialsAfterDiscontinuity()
if (it.current == ChannelState.attached && !it.resumed) {
updateChannelSerialsAfterDiscontinuity(requireAttachSerial())
}
}
channel.on(channelStateListener)
}

// CHA-M5c, CHA-M5d - Updated channel serial after discontinuity
private fun updateChannelSerialsAfterDiscontinuity(value: String) {
if (deferredChannelSerial.isActive) {
deferredChannelSerial.complete(value)
} else {
deferredChannelSerial = CompletableDeferred(value)
}
// channel serials updated at the same time for all map entries
channelSerialMap.replaceAll { _, _ -> deferredChannelSerial }
}

override fun subscribe(listener: Messages.Listener): MessagesSubscription {
val deferredChannelSerial = DeferredValue<String>()
addListener(listener, deferredChannelSerial)
val messageListener = PubSubMessageListener {
val pubSubMessage = it ?: throw AblyException.fromErrorInfo(
ErrorInfo("Got empty pubsub channel message", HttpStatusCode.BadRequest, ErrorCode.BadRequest.code),
)
val pubSubMessage = it ?: throw clientError("Got empty pubsub channel message")

// Ignore any action that is not message.create
if (pubSubMessage.action != MessageAction.MESSAGE_CREATE) return@PubSubMessageListener
Expand All @@ -281,26 +294,24 @@ internal class DefaultMessages(
)
listener.onEvent(MessageEvent(type = MessageEventType.Created, message = chatMessage))
}
channelSerialMap[messageListener] = deferredChannelSerial
// (CHA-M4d)
channel.subscribe(PubSubMessageNames.ChatMessage, messageListener)
// (CHA-M5) setting subscription point
associateWithCurrentChannelSerial(deferredChannelSerial)
if (channel.state == ChannelState.attached) {
channelSerialMap[messageListener] = CompletableDeferred(requireChannelSerial())
}

return DefaultMessagesSubscription(
chatApi = chatApi,
roomId = roomId,
subscription = {
removeListener(listener)
channelSerialMap.remove(messageListener)
channel.unsubscribe(PubSubMessageNames.ChatMessage, messageListener)
},
fromSerialProvider = {
listeners[listener] ?: throw AblyException.fromErrorInfo(
ErrorInfo(
"This messages subscription instance was already unsubscribed",
HttpStatusCode.BadRequest,
ErrorCode.BadRequest.code,
),
)
channelSerialMap[messageListener]
?: throw clientError("This messages subscription instance was already unsubscribed")
},
)
}
Expand All @@ -316,73 +327,19 @@ internal class DefaultMessages(
SendMessageParams(text, metadata, headers),
)

/**
* Associate deferred channel serial value with the current channel's serial
*
* WARN: it not deterministic because of race condition,
* this can lead to duplicated messages in `getPreviousMessages` calls
*/
private fun associateWithCurrentChannelSerial(channelSerialProvider: DeferredValue<String>) {
if (channel.state === ChannelState.attached) {
channelSerialProvider.completeWith(requireChannelSerial())
return
}

channel.once(ChannelState.attached) {
channelSerialProvider.completeWith(requireAttachSerial())
}
}

private fun requireChannelSerial(): String {
return channel.properties.channelSerial
?: throw AblyException.fromErrorInfo(
ErrorInfo(
"Channel has been attached, but channelSerial is not defined",
HttpStatusCode.BadRequest,
ErrorCode.BadRequest.code,
),
)
?: throw clientError("Channel has been attached, but channelSerial is not defined")
}

private fun requireAttachSerial(): String {
return channel.properties.attachSerial
?: throw AblyException.fromErrorInfo(
ErrorInfo(
"Channel has been attached, but attachSerial is not defined",
HttpStatusCode.BadRequest,
ErrorCode.BadRequest.code,
),
)
}

private fun addListener(listener: Messages.Listener, deferredChannelSerial: DeferredValue<String>) {
synchronized(lock) {
listeners += listener to deferredChannelSerial
}
}

private fun removeListener(listener: Messages.Listener) {
synchronized(lock) {
listeners -= listener
}
}

/**
* (CHA-M5c), (CHA-M5d)
*/
private fun updateChannelSerialsAfterDiscontinuity() {
val deferredChannelSerial = DeferredValue<String>()
deferredChannelSerial.completeWith(requireAttachSerial())

synchronized(lock) {
listeners = listeners.mapValues {
if (it.value.completed) deferredChannelSerial else it.value
}
}
?: throw clientError("Channel has been attached, but attachSerial is not defined")
}

override fun release() {
channel.off(channelStateListener)
channelSerialMap.clear()
realtimeChannels.release(channel.name)
}
}
Expand All @@ -394,9 +351,7 @@ private data class PubSubMessageData(val text: String, val metadata: MessageMeta

private fun parsePubSubMessageData(data: Any): PubSubMessageData {
if (data !is JsonObject) {
throw AblyException.fromErrorInfo(
ErrorInfo("Unrecognized Pub/Sub channel's message for `Message.created` event", HttpStatusCode.InternalServerError),
)
throw serverError("Unrecognized Pub/Sub channel's message for `Message.created` event")
}
return PubSubMessageData(
text = data.requireString("text"),
Expand Down
61 changes: 4 additions & 57 deletions chat-android/src/main/java/com/ably/chat/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -147,63 +147,6 @@ fun ChatChannelOptions(init: (ChannelOptions.() -> Unit)? = null): ChannelOption

fun generateUUID() = UUID.randomUUID().toString()

/**
* A value that can be evaluated at a later time, similar to `kotlinx.coroutines.Deferred` or a JavaScript Promise.
*
* This class provides a thread-safe, simple blocking implementation. It is not designed for use in scenarios with
* heavy concurrency.
*
* @param T the type of the value that will be evaluated.
*/
internal class DeferredValue<T> {

private var value: T? = null

private val lock = Any()

private var observers: Set<(T) -> Unit> = setOf()

private var _completed = false

/**
* `true` if value has been set, `false` otherwise
*/
val completed get() = _completed

/**
* Set value and mark DeferredValue completed, should be invoked only once
*
* @throws IllegalStateException if it's already `completed`
*/
fun completeWith(completionValue: T) {
synchronized(lock) {
check(!_completed) { "DeferredValue has already been completed" }
value = completionValue
_completed = true
observers.forEach { it(completionValue) }
observers = setOf()
}
}

/**
* Wait until value is completed
*
* @return completed value
*/
suspend fun await(): T {
val result = suspendCancellableCoroutine { continuation ->
synchronized(lock) {
if (_completed) continuation.resume(value!!)
val observer: (T) -> Unit = {
continuation.resume(it)
}
observers += observer
}
}
return result
}
}

fun lifeCycleErrorInfo(
errorMessage: String,
errorCode: ErrorCode,
Expand Down Expand Up @@ -253,3 +196,7 @@ private fun createAblyException(
cause: Throwable?,
) = cause?.let { AblyException.fromErrorInfo(it, errorInfo) }
?: AblyException.fromErrorInfo(errorInfo)

fun clientError(errorMessage: String) = ablyException(errorMessage, ErrorCode.BadRequest, HttpStatusCode.BadRequest)

fun serverError(errorMessage: String) = ablyException(errorMessage, ErrorCode.InternalError, HttpStatusCode.InternalServerError)
17 changes: 11 additions & 6 deletions chat-android/src/test/java/com/ably/chat/MessagesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.slot
import io.mockk.verify
import java.lang.reflect.Field
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -92,10 +92,10 @@ class MessagesTest {
println("Pub/Sub message listener registered")
}

val deferredValue = DeferredValue<MessageEvent>()
val deferredValue = CompletableDeferred<MessageEvent>()

messages.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

verify { realtimeChannel.subscribe("chat.message", any()) }
Expand Down Expand Up @@ -197,6 +197,13 @@ class MessagesTest {
)

assertEquals("attach-serial-2", subscription1.fromSerialProvider().await())

// Check channelSerial is used at the point of subscription when state is attached
messages.channel.properties.channelSerial = "channel-serial-3"
messages.channel.state = ChannelState.attached

val subscription2 = (messages.subscribe {}) as DefaultMessagesSubscription
assertEquals("channel-serial-3", subscription2.fromSerialProvider().await())
}

@Test
Expand Down Expand Up @@ -253,9 +260,7 @@ class MessagesTest {

private val Channel.channelMulticaster: ChannelBase.MessageListener
get() {
val field: Field = (ChannelBase::class.java).getDeclaredField("eventListeners")
field.isAccessible = true
val eventListeners = field.get(this) as HashMap<*, *>
val eventListeners = getPrivateField<HashMap<*, *>>("eventListeners")
return eventListeners["chat.message"] as ChannelBase.MessageListener
}

Expand Down
13 changes: 7 additions & 6 deletions chat-android/src/test/java/com/ably/chat/PresenceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.ably.lib.types.PresenceMessage
import io.mockk.every
import io.mockk.mockk
import io.mockk.slot
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertEquals
import org.junit.Before
Expand Down Expand Up @@ -42,10 +43,10 @@ class PresenceTest {

every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit

val deferredValue = DeferredValue<PresenceEvent>()
val deferredValue = CompletableDeferred<PresenceEvent>()

presence.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

presenceListenerSlot.captured.onPresenceMessage(
Expand Down Expand Up @@ -78,10 +79,10 @@ class PresenceTest {

every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit

val deferredValue = DeferredValue<PresenceEvent>()
val deferredValue = CompletableDeferred<PresenceEvent>()

presence.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

presenceListenerSlot.captured.onPresenceMessage(
Expand Down Expand Up @@ -115,10 +116,10 @@ class PresenceTest {

every { pubSubPresence.subscribe(capture(presenceListenerSlot)) } returns Unit

val deferredValue = DeferredValue<PresenceEvent>()
val deferredValue = CompletableDeferred<PresenceEvent>()

presence.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

presenceListenerSlot.captured.onPresenceMessage(
Expand Down
5 changes: 3 additions & 2 deletions chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.ably.lib.types.MessageExtras
import io.mockk.every
import io.mockk.slot
import io.mockk.verify
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertEquals
import org.junit.Before
Expand Down Expand Up @@ -59,10 +60,10 @@ class RoomReactionsTest {

every { realtimeChannel.subscribe("roomReaction", capture(pubSubMessageListenerSlot)) } returns Unit

val deferredValue = DeferredValue<Reaction>()
val deferredValue = CompletableDeferred<Reaction>()

roomReactions.subscribe {
deferredValue.completeWith(it)
deferredValue.complete(it)
}

verify { realtimeChannel.subscribe("roomReaction", any()) }
Expand Down
4 changes: 0 additions & 4 deletions chat-android/src/test/java/com/ably/chat/TestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,3 @@ suspend fun <T>Any.invokePrivateSuspendMethod(methodName: String, vararg args: A
it.invoke(this, *args, cont)
}
}

fun clientError(errorMessage: String) = ablyException(errorMessage, ErrorCode.BadRequest, HttpStatusCode.BadRequest)

fun serverError(errorMessage: String) = ablyException(errorMessage, ErrorCode.InternalError, HttpStatusCode.InternalServerError)

0 comments on commit 41e71f3

Please sign in to comment.